diff --git a/Cargo.lock b/Cargo.lock index 066c86ea..1de633cf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1719,6 +1719,7 @@ dependencies = [ "thiserror 2.0.12", "tokio", "utils", + "xet_config", ] [[package]] @@ -5332,8 +5333,10 @@ dependencies = [ "pin-project", "rand 0.9.1", "serde", + "serde_json", "serial_test", "shellexpand", + "sysinfo", "tempfile", "thiserror 2.0.12", "tokio", diff --git a/git_xet/Cargo.toml b/git_xet/Cargo.toml index 309fc54a..c2a833fa 100644 --- a/git_xet/Cargo.toml +++ b/git_xet/Cargo.toml @@ -13,6 +13,7 @@ data = { path = "../data" } progress_tracking = { path = "../progress_tracking" } utils = { path = "../utils" } hub_client = { path = "../hub_client" } +xet_config = { path = "../xet_config" } anyhow = { workspace = true } async-trait = { workspace = true } diff --git a/git_xet/src/lfs_agent_protocol.rs b/git_xet/src/lfs_agent_protocol.rs index 317ee15b..10a00f4e 100644 --- a/git_xet/src/lfs_agent_protocol.rs +++ b/git_xet/src/lfs_agent_protocol.rs @@ -13,6 +13,8 @@ use agent_state::LFSAgentState; pub use errors::GitLFSProtocolError; pub use progress_updater::ProgressUpdater; pub use protocol_spec::*; +use utils::SystemMonitor; +use xet_config::XetConfig; // Any Git LFS custom transfer agent should implement this trait to be plugged // into the driver function `lfs_protocol_loop`. @@ -64,6 +66,21 @@ where W: Write + Send + Sync + 'static, A: TransferAgent, { + let xet_config = XetConfig::new(); + let sys_monitor = xet_config + .system_monitor + .enabled + .then(|| { + { + SystemMonitor::follow_process( + xet_config.system_monitor.sample_interval, + xet_config.system_monitor.log_path.clone(), + ) + } + .ok() // ignore error + }) + .flatten(); + let mut stdin = input_channel; let stdout = Arc::new(Mutex::new(output_channel)); let mut state = LFSAgentState::PendingInit; @@ -119,6 +136,10 @@ where stdout.lock().map_err(GitXetError::internal)?.write_all(response.as_bytes())?; } + if let Some(monitor) = sys_monitor { + let _ = monitor.stop(); // ignore error + } + Ok(()) } diff --git a/hf_xet/Cargo.lock b/hf_xet/Cargo.lock index 27296e7d..693c7e1c 100644 --- a/hf_xet/Cargo.lock +++ b/hf_xet/Cargo.lock @@ -4110,7 +4110,9 @@ dependencies = [ "pin-project", "rand 0.9.2", "serde", + "serde_json", "shellexpand", + "sysinfo", "thiserror 2.0.18", "tokio", "tokio-util", diff --git a/hf_xet_thin_wasm/Cargo.lock b/hf_xet_thin_wasm/Cargo.lock index e3f2c6f2..59108b05 100644 --- a/hf_xet_thin_wasm/Cargo.lock +++ b/hf_xet_thin_wasm/Cargo.lock @@ -1126,6 +1126,15 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1fafa6961cabd9c63bcd77a45d7e3b7f3b552b70417831fb0f56db717e72407e" +[[package]] +name = "ntapi" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3b335231dfd352ffb0f8017f3b6027a4917f7df785ea2143d8af2adc66980ae" +dependencies = [ + "winapi", +] + [[package]] name = "num-conv" version = "0.1.0" @@ -1141,6 +1150,25 @@ dependencies = [ "autocfg", ] +[[package]] +name = "objc2-core-foundation" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c10c2894a6fed806ade6027bcd50662746363a9589d3ec9d9bef30a4e4bc166" +dependencies = [ + "bitflags", +] + +[[package]] +name = "objc2-io-kit" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71c1c64d6120e51cd86033f67176b1cb66780c2efe34dec55176f77befd93c0a" +dependencies = [ + "libc", + "objc2-core-foundation", +] + [[package]] name = "once_cell" version = "1.21.3" @@ -1667,6 +1695,20 @@ dependencies = [ "syn 2.0.114", ] +[[package]] +name = "sysinfo" +version = "0.38.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1efc19935b4b66baa6f654ac7924c192f55b175c00a7ab72410fc24284dacda8" +dependencies = [ + "libc", + "memchr", + "ntapi", + "objc2-core-foundation", + "objc2-io-kit", + "windows", +] + [[package]] name = "system-configuration" version = "0.6.1" @@ -1930,7 +1972,9 @@ dependencies = [ "pin-project", "rand 0.9.1", "serde", + "serde_json", "shellexpand", + "sysinfo", "thiserror", "tokio", "tokio-util", @@ -2142,6 +2186,27 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.62.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "527fadee13e0c05939a6a05d5bd6eec6cd2e3dbd648b9f8e447c6518133d8580" +dependencies = [ + "windows-collections", + "windows-core", + "windows-future", + "windows-numerics", +] + +[[package]] +name = "windows-collections" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23b2d95af1a8a14a3c7367e1ed4fc9c20e0a26e79551b1454d72583c97cc6610" +dependencies = [ + "windows-core", +] + [[package]] name = "windows-core" version = "0.62.2" @@ -2155,6 +2220,17 @@ dependencies = [ "windows-strings", ] +[[package]] +name = "windows-future" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1d6f90251fe18a279739e78025bd6ddc52a7e22f921070ccdc67dde84c605cb" +dependencies = [ + "windows-core", + "windows-link", + "windows-threading", +] + [[package]] name = "windows-implement" version = "0.60.2" @@ -2183,6 +2259,16 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" +[[package]] +name = "windows-numerics" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e2e40844ac143cdb44aead537bbf727de9b044e107a0f1220392177d15b0f26" +dependencies = [ + "windows-core", + "windows-link", +] + [[package]] name = "windows-registry" version = "0.6.1" @@ -2272,6 +2358,15 @@ dependencies = [ "windows_x86_64_msvc 0.53.1", ] +[[package]] +name = "windows-threading" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3949bd5b99cafdf1c7ca86b43ca564028dfe27d66958f2470940f73d86d75b37" +dependencies = [ + "windows-link", +] + [[package]] name = "windows_aarch64_gnullvm" version = "0.52.6" diff --git a/hf_xet_wasm/Cargo.lock b/hf_xet_wasm/Cargo.lock index cfa3c734..5ff132a7 100644 --- a/hf_xet_wasm/Cargo.lock +++ b/hf_xet_wasm/Cargo.lock @@ -404,7 +404,7 @@ dependencies = [ "js-sys", "num-traits", "wasm-bindgen", - "windows-link 0.2.0", + "windows-link 0.2.1", ] [[package]] @@ -1766,6 +1766,15 @@ dependencies = [ "typenum", ] +[[package]] +name = "ntapi" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3b335231dfd352ffb0f8017f3b6027a4917f7df785ea2143d8af2adc66980ae" +dependencies = [ + "winapi", +] + [[package]] name = "nu-ansi-term" version = "0.50.3" @@ -1830,6 +1839,25 @@ dependencies = [ "libm", ] +[[package]] +name = "objc2-core-foundation" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c10c2894a6fed806ade6027bcd50662746363a9589d3ec9d9bef30a4e4bc166" +dependencies = [ + "bitflags", +] + +[[package]] +name = "objc2-io-kit" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71c1c64d6120e51cd86033f67176b1cb66780c2efe34dec55176f77befd93c0a" +dependencies = [ + "libc", + "objc2-core-foundation", +] + [[package]] name = "once_cell" version = "1.21.3" @@ -1899,7 +1927,7 @@ dependencies = [ "libc", "redox_syscall", "smallvec", - "windows-link 0.2.0", + "windows-link 0.2.1", ] [[package]] @@ -2813,6 +2841,20 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "sysinfo" +version = "0.38.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5792d209c2eac902426c0c4a166c9f72147db453af548cf9bf3242644c4d4fe3" +dependencies = [ + "libc", + "memchr", + "ntapi", + "objc2-core-foundation", + "objc2-io-kit", + "windows", +] + [[package]] name = "system-configuration" version = "0.6.1" @@ -3235,7 +3277,9 @@ dependencies = [ "pin-project", "rand 0.9.2", "serde", + "serde_json", "shellexpand", + "sysinfo", "thiserror 2.0.16", "tokio", "tokio-util", @@ -3606,16 +3650,48 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" [[package]] -name = "windows-core" -version = "0.62.1" +name = "windows" +version = "0.62.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6844ee5416b285084d3d3fffd743b925a6c9385455f64f6d4fa3031c4c2749a9" +checksum = "527fadee13e0c05939a6a05d5bd6eec6cd2e3dbd648b9f8e447c6518133d8580" +dependencies = [ + "windows-collections", + "windows-core", + "windows-future", + "windows-numerics", +] + +[[package]] +name = "windows-collections" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23b2d95af1a8a14a3c7367e1ed4fc9c20e0a26e79551b1454d72583c97cc6610" +dependencies = [ + "windows-core", +] + +[[package]] +name = "windows-core" +version = "0.62.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8e83a14d34d0623b51dce9581199302a221863196a1dde71a7663a4c2be9deb" dependencies = [ "windows-implement", "windows-interface", - "windows-link 0.2.0", - "windows-result 0.4.0", - "windows-strings 0.5.0", + "windows-link 0.2.1", + "windows-result 0.4.1", + "windows-strings 0.5.1", +] + +[[package]] +name = "windows-future" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1d6f90251fe18a279739e78025bd6ddc52a7e22f921070ccdc67dde84c605cb" +dependencies = [ + "windows-core", + "windows-link 0.2.1", + "windows-threading", ] [[package]] @@ -3648,9 +3724,19 @@ checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a" [[package]] name = "windows-link" -version = "0.2.0" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45e46c0661abb7180e7b9c281db115305d49ca1709ab8242adf09666d2173c65" +checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" + +[[package]] +name = "windows-numerics" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e2e40844ac143cdb44aead537bbf727de9b044e107a0f1220392177d15b0f26" +dependencies = [ + "windows-core", + "windows-link 0.2.1", +] [[package]] name = "windows-registry" @@ -3674,11 +3760,11 @@ dependencies = [ [[package]] name = "windows-result" -version = "0.4.0" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7084dcc306f89883455a206237404d3eaf961e5bd7e0f312f7c91f57eb44167f" +checksum = "7781fa89eaf60850ac3d2da7af8e5242a5ea78d1a11c49bf2910bb5a73853eb5" dependencies = [ - "windows-link 0.2.0", + "windows-link 0.2.1", ] [[package]] @@ -3692,11 +3778,11 @@ dependencies = [ [[package]] name = "windows-strings" -version = "0.5.0" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7218c655a553b0bed4426cf54b20d7ba363ef543b52d515b3e48d7fd55318dda" +checksum = "7837d08f69c77cf6b07689544538e017c1bfcf57e34b4c0ff58e6c2cd3b37091" dependencies = [ - "windows-link 0.2.0", + "windows-link 0.2.1", ] [[package]] @@ -3741,7 +3827,7 @@ version = "0.61.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e201184e40b2ede64bc2ea34968b28e33622acdbbf37104f0e4a33f7abe657aa" dependencies = [ - "windows-link 0.2.0", + "windows-link 0.2.1", ] [[package]] @@ -3792,6 +3878,15 @@ dependencies = [ "windows_x86_64_msvc 0.53.0", ] +[[package]] +name = "windows-threading" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3949bd5b99cafdf1c7ca86b43ca564028dfe27d66958f2470940f73d86d75b37" +dependencies = [ + "windows-link 0.2.1", +] + [[package]] name = "windows_aarch64_gnullvm" version = "0.42.2" diff --git a/utils/Cargo.toml b/utils/Cargo.toml index 435d88e7..553fb9a1 100644 --- a/utils/Cargo.toml +++ b/utils/Cargo.toml @@ -25,6 +25,7 @@ futures = { workspace = true } lazy_static = { workspace = true } pin-project = { workspace = true } serde = { workspace = true } +serde_json = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, features = [ "time", @@ -37,6 +38,7 @@ tracing = { workspace = true } more-asserts = { workspace = true } [target.'cfg(not(target_family = "wasm"))'.dependencies] +sysinfo = { workspace = true } bincode = { workspace = true } rand = { workspace = true } shellexpand = { workspace = true, features = ["path"] } diff --git a/utils/src/lib.rs b/utils/src/lib.rs index e33aa9d9..d7347cca 100644 --- a/utils/src/lib.rs +++ b/utils/src/lib.rs @@ -9,11 +9,16 @@ pub mod errors; #[cfg(not(target_family = "wasm"))] pub mod limited_joinset; mod output_bytes; +pub use output_bytes::output_bytes; pub mod serialization_utils; #[cfg(not(target_family = "wasm"))] pub mod singleflight; -pub use output_bytes::output_bytes; +#[cfg(not(target_family = "wasm"))] +pub mod system_monitor; + +#[cfg(not(target_family = "wasm"))] +pub use system_monitor::SystemMonitor; pub mod rw_task_lock; pub use rw_task_lock::{RwTaskLock, RwTaskLockError, RwTaskLockReadGuard}; diff --git a/utils/src/system_monitor.rs b/utils/src/system_monitor.rs new file mode 100644 index 00000000..4561f9d0 --- /dev/null +++ b/utils/src/system_monitor.rs @@ -0,0 +1,653 @@ +use std::fs::OpenOptions; +use std::io::Write; +use std::result::Result as stdResult; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Mutex}; +use std::thread::JoinHandle; +use std::time::{Duration, Instant}; + +use serde::{Deserialize, Serialize}; +use sysinfo::{Networks, Pid, Process, ProcessRefreshKind, RefreshKind, System}; +use thiserror::Error; +use tracing::info; + +use crate::TemplatedPathBuf; + +/// A utility for monitoring system resource usage of a process. +/// +/// `SystemMonitor` can be configured to track a specific process ID or the current process. +/// It periodically samples CPU usage, memory usage, disk I/O, and network I/O, +/// and writes the metrics to a specified output file or to the tracing log. +/// +/// # Example +/// +/// ```no_run +/// use std::time::Duration; +/// +/// use utils::{SystemMonitor, TemplatedPathBuf}; +/// +/// # fn main() -> Result<(), Box> { +/// let monitor = SystemMonitor::follow_process( +/// Duration::from_secs(5), +/// Some(TemplatedPathBuf::from("monitor_{PID}_{TIMESTAMP}.log")), +/// )?; +/// +/// // ... application logic ... +/// +/// monitor.stop()?; +/// # Ok(()) +/// # } +/// ``` +#[derive(Debug)] +pub struct SystemMonitor { + pid: Option, + sample_interval: Duration, + log_path: Arc>, + monitor_loop: Mutex>>>, + stop: Arc, +} + +/// Internal state for sampling system metrics. +/// +/// This struct holds the `sysinfo` `System` and `Networks` objects, which are refreshed +/// at each sampling interval. It also tracks the process ID being monitored and timing +/// information to calculate rates and averages. +#[derive(Debug)] +struct SystemSampler { + system: System, + network: Networks, + pid: Option, + start_measurement_time: Instant, + last_measurement_time: Instant, + last_sample: Option, + baseline_sample: Metrics, +} + +/// A snapshot of system metrics at a specific point in time. +/// +/// This struct contains detailed information about CPU, memory, disk, and network usage +/// for the monitored process. +#[derive(Debug, Serialize, Deserialize, Clone)] +struct Metrics { + /// Process ID of the monitored process + pid: u32, + /// Name of the process + name: String, + /// Total run time of the process in seconds + run_time: u64, + /// CPU usage metrics + cpu: CpuUsage, + /// Memory usage metrics + memory: MemoryUsage, + /// Disk I/O numbers and speed + disk: DiskUsage, + /// Network I/O numbers and speed + network: NetworkUsage, +} + +impl Metrics { + pub fn create( + system: &System, + network: &Networks, + pid: Pid, + sample_interval: Duration, + total_duration: Duration, + last_sample: Option, + baseline: &Metrics, + ) -> Result { + let Some(process) = system.process(pid) else { + return Err(SystemMonitorError::NoProcess(pid.as_u32())); + }; + + Ok(Self { + pid: pid.as_u32(), + name: process.name().to_string_lossy().into(), + run_time: process.run_time(), + cpu: CpuUsage::from(process, system), + memory: MemoryUsage::from(process, system, last_sample.map(|s| s.memory)), + disk: DiskUsage::from(process, sample_interval, total_duration, &baseline.disk), + network: NetworkUsage::from(network, sample_interval, total_duration, &baseline.network), + }) + } + + /// Creates a baseline `Metrics` snapshot at the start of monitoring. + /// + /// This captures the initial state of disk and network I/O, which are reported + /// as cumulative values by the underlying system library. This baseline allows + /// for calculating the delta of resource usage during the monitoring session. + /// + /// This helps provide useful information when used by hf_xet in a long running + /// Python process, e.g. a iPython notebook + pub fn baseline(system: &System, network: &Networks, pid: Pid) -> Result { + let Some(process) = system.process(pid) else { + return Err(SystemMonitorError::NoProcess(pid.as_u32())); + }; + + Ok(Self { + pid: pid.as_u32(), + name: process.name().to_string_lossy().into(), + run_time: process.run_time(), + cpu: CpuUsage::from(process, system), + memory: MemoryUsage::from(process, system, None), + disk: DiskUsage::baseline(process), + network: NetworkUsage::baseline(network), + }) + } + + pub fn to_json(&self) -> Result { + Ok(serde_json::to_string(&self)?) + } +} + +/// Represents CPU usage metrics. +#[derive(Debug, Serialize, Deserialize, Clone)] +struct CpuUsage { + /// CPU usage of the monitored process as a percentage. + process_usage: f32, + /// Total number of CPUs in the system. + ncpus: u32, + /// Usage of individual CPUs as a percentage. + global_usage: Vec, +} + +impl CpuUsage { + pub fn from(process: &Process, system: &System) -> Self { + Self { + process_usage: process.cpu_usage(), + ncpus: system.cpus().len() as u32, + global_usage: system.cpus().iter().map(|c| c.cpu_usage()).collect(), + } + } +} + +/// Represents memory usage metrics. +#[derive(Debug, Serialize, Deserialize, Clone)] +struct MemoryUsage { + /// Current memory usage in bytes of the monitored process. + used_bytes: u64, + /// Peak memory usage in bytes observed for the monitored process during the session. + peak_used_bytes: u64, + /// Memory usage of the monitored process as a percentage of total system RAM. + percentage: f64, + /// Total system RAM size in bytes. + total_bytes: u64, +} + +impl MemoryUsage { + pub fn from(process: &Process, system: &System, last_sample: Option) -> Self { + Self { + used_bytes: process.memory(), + peak_used_bytes: process.memory().max(last_sample.map(|s| s.peak_used_bytes).unwrap_or_default()), + percentage: process.memory() as f64 / system.total_memory() as f64, + total_bytes: system.total_memory(), + } + } +} + +/// Represents disk I/O metrics. +#[derive(Debug, Serialize, Deserialize, Clone)] +struct DiskUsage { + /// Total number of bytes written by the process since the monitor started. + total_written_bytes: u64, + /// Number of bytes written by the process since the last sample. + written_bytes: u64, + /// Total number of bytes read by the process since the monitor started. + total_read_bytes: u64, + /// Number of bytes read by the process since the last sample. + read_bytes: u64, + + /// Average write speed in bytes per second over the entire monitoring duration. + average_write_speed: f64, + /// Instantaneous write speed in bytes per second over the last sample interval. + instant_write_speed: f64, + /// Average read speed in bytes per second over the entire monitoring duration. + average_read_speed: f64, + /// Instantaneous read speed in bytes per second over the last sample interval. + instant_read_speed: f64, +} + +impl DiskUsage { + /// Creates a baseline for disk usage at the start of monitoring. + /// + /// This is necessary because `sysinfo` provides cumulative disk I/O statistics + /// since the process started. To measure usage only during the monitoring period, + /// we capture this initial state and subtract it from later samples. + pub fn baseline(process: &Process) -> Self { + let usage = process.disk_usage(); + Self { + total_written_bytes: usage.total_written_bytes, + written_bytes: 0, + total_read_bytes: usage.total_read_bytes, + read_bytes: 0, + average_write_speed: 0., + instant_write_speed: 0., + average_read_speed: 0., + instant_read_speed: 0., + } + } + + pub fn from(process: &Process, sample_interval: Duration, total_duration: Duration, baseline: &DiskUsage) -> Self { + let usage = process.disk_usage(); + + // Subtract stats before the monitor + let total_written_bytes = usage.total_written_bytes - baseline.total_written_bytes; + let total_read_bytes = usage.total_read_bytes - baseline.total_read_bytes; + + Self { + total_written_bytes, + written_bytes: usage.written_bytes, + total_read_bytes, + read_bytes: usage.read_bytes, + average_write_speed: total_written_bytes as f64 / total_duration.as_secs_f64(), + instant_write_speed: usage.written_bytes as f64 / sample_interval.as_secs_f64(), + average_read_speed: total_read_bytes as f64 / total_duration.as_secs_f64(), + instant_read_speed: usage.read_bytes as f64 / sample_interval.as_secs_f64(), + } + } +} + +/// Represents network I/O metrics for all interfaces combined. +#[derive(Debug, Serialize, Deserialize, Clone)] +struct NetworkUsage { + /// Total number of bytes transmitted across all network interfaces since the monitor started. + total_tx_bytes: u64, + /// Number of bytes transmitted across all network interfaces since the last sample. + tx_bytes: u64, + /// Total number of bytes received across all network interfaces since the monitor started. + total_rx_bytes: u64, + /// Number of bytes received across all network interfaces since the last sample. + rx_bytes: u64, + + /// Average transmit speed in bytes per second over the entire monitoring duration. + average_tx_speed: f64, + /// Instantaneous transmit speed in bytes per second over the last sample interval. + instant_tx_speed: f64, + /// Average receive speed in bytes per second over the entire monitoring duration. + average_rx_speed: f64, + /// Instantaneous receive speed in bytes per second over the last sample interval. + instant_rx_speed: f64, +} + +impl NetworkUsage { + /// Creates a baseline for network usage at the start of monitoring. + /// + /// This is necessary because `sysinfo` provides cumulative network I/O statistics + /// since the system booted. To measure usage only during the monitoring period, + /// we capture this initial state and subtract it from later samples. + pub fn baseline(network: &Networks) -> Self { + let total_tx_bytes = network.iter().fold(0u64, |sum, (_, nic)| sum + nic.total_transmitted()); + let total_rx_bytes = network.iter().fold(0u64, |sum, (_, nic)| sum + nic.total_received()); + + Self { + total_tx_bytes, + tx_bytes: 0, + total_rx_bytes, + rx_bytes: 0, + average_tx_speed: 0., + instant_tx_speed: 0., + average_rx_speed: 0., + instant_rx_speed: 0., + } + } + + pub fn from( + network: &Networks, + sample_interval: Duration, + total_duration: Duration, + baseline: &NetworkUsage, + ) -> Self { + let total_tx_bytes = + network.iter().fold(0u64, |sum, (_, nic)| sum + nic.total_transmitted()) - baseline.total_tx_bytes; + let tx_bytes = network.iter().fold(0u64, |sum, (_, nic)| sum + nic.transmitted()); + let total_rx_bytes = + network.iter().fold(0u64, |sum, (_, nic)| sum + nic.total_received()) - baseline.total_rx_bytes; + let rx_bytes = network.iter().fold(0u64, |sum, (_, nic)| sum + nic.received()); + + Self { + total_tx_bytes, + tx_bytes, + total_rx_bytes, + rx_bytes, + average_tx_speed: total_tx_bytes as f64 / total_duration.as_secs_f64(), + instant_tx_speed: tx_bytes as f64 / sample_interval.as_secs_f64(), + average_rx_speed: total_rx_bytes as f64 / total_duration.as_secs_f64(), + instant_rx_speed: rx_bytes as f64 / sample_interval.as_secs_f64(), + } + } +} + +impl SystemSampler { + pub fn new(pid: Option) -> Result { + let Some(pid) = pid.or_else(|| sysinfo::get_current_pid().ok()) else { + return Err(SystemMonitorError::NoPid); + }; + + let system = System::new_all(); + let network = Networks::new_with_refreshed_list(); + + let baseline = Metrics::baseline(&system, &network, pid)?; + + let now = Instant::now(); + + Ok(Self { + system, + network, + pid: Some(pid), + start_measurement_time: now, + last_measurement_time: now, + last_sample: None, + baseline_sample: baseline, + }) + } + + pub fn sample(&mut self) -> Result<()> { + // refresh process, cpu, memory and disk usage + self.system.refresh_all(); + // refresh network interface usage + self.network.refresh(true); + + let Some(pid) = self.pid.or_else(|| sysinfo::get_current_pid().ok()) else { + return Err(SystemMonitorError::NoPid); + }; + + let sample_interval = self.last_measurement_time.elapsed(); + self.last_measurement_time = Instant::now(); + let total_duration = self.start_measurement_time.elapsed(); + + self.last_sample = Some(Metrics::create( + &self.system, + &self.network, + pid, + sample_interval, + total_duration, + self.last_sample.take(), + &self.baseline_sample, + )?); + + Ok(()) + } +} + +/// Errors that can occur during system monitoring. +#[derive(Error, Debug)] +pub enum SystemMonitorError { + #[error("Failed to get pid")] + NoPid, + + #[error("Failed to get process from pid {0}")] + NoProcess(u32), + + #[error("IO Error: {0}")] + IOError(#[from] std::io::Error), + + #[error("Serde Json error: {0}")] + Serde(#[from] serde_json::Error), + + #[error("Internal error: {0}")] + Internal(String), +} + +type Result = std::result::Result; + +impl SystemMonitor { + /// Creates a new SystemMonitor that follows the current process. + /// + /// Monitoring starts immediately upon creation. The background thread begins + /// sampling system metrics at the specified interval. + /// + /// # Arguments + /// * `sample_interval` - The interval at which to sample system metrics. + /// * `log_path` - Optional path template for the output log file. If None, logs to tracing at INFO level. + pub fn follow_process(sample_interval: Duration, log_path: Option) -> Result { + sysinfo::get_current_pid().map_err(|_| SystemMonitorError::NoPid)?; + Self::new_impl(None, sample_interval, log_path) + } + + /// Creates a new SystemMonitor that follows a specific process ID. + /// + /// Monitoring starts immediately upon creation. The background thread begins + /// sampling system metrics at the specified interval. + /// + /// # Arguments + /// * `pid` - The process ID to monitor. + /// * `sample_interval` - The interval at which to sample system metrics. + /// * `log_path` - Optional path template for the output log file. If None, logs to tracing at INFO level. + pub fn with_pid(pid: Pid, sample_interval: Duration, log_path: Option) -> Result { + let system = + System::new_with_specifics(RefreshKind::nothing().with_processes(ProcessRefreshKind::everything())); + if system.process(pid).is_none() { + return Err(SystemMonitorError::NoProcess(pid.as_u32())); + }; + + Self::new_impl(Some(pid), sample_interval, log_path) + } + + fn new_impl(pid: Option, sample_interval: Duration, log_path: Option) -> Result { + let ret = Self { + pid, + sample_interval, + log_path: log_path.into(), + monitor_loop: Mutex::new(None), + stop: Arc::new(AtomicBool::new(false)), + }; + + ret.start()?; + + Ok(ret) + } + + /// Starts the monitoring thread. + /// + /// This function is called automatically by `follow_process()` and `with_pid()`, + /// so it typically doesn't need to be called manually. If the monitor is already + /// running, this is a no-op. + /// + /// # Errors + /// Returns an error if: + /// - The log path is invalid or cannot be written to + /// - The monitored process no longer exists + /// - Internal synchronization fails + pub fn start(&self) -> Result<()> { + if self.is_running()? { + return Ok(()); + } + + let mut sampler = SystemSampler::new(self.pid)?; + + // Take a sample before the thread starts so that errors like a bad log_path + // show up immediately in the caller. + sampler.sample()?; + + let mut inner_runner = self + .monitor_loop + .lock() + .map_err(|e| SystemMonitorError::Internal(e.to_string()))?; + self.stop.store(false, Ordering::Relaxed); + + let sample_interval = self.sample_interval; + let log_path = self.log_path.clone(); + let stop_clone = self.stop.clone(); + + *inner_runner = Some(std::thread::spawn(move || { + loop { + if stop_clone.load(Ordering::Relaxed) { + break; + } + std::thread::sleep(sample_interval); + sampler.sample()?; + + if let Some(sample) = &sampler.last_sample { + Self::output_report(sample, &log_path)?; + } + } + Ok(()) + })); + + Ok(()) + } + + fn output_report(sample: &Metrics, log_path: &Option) -> Result<()> { + let json_report = sample.to_json()?; + + if let Some(path) = log_path { + let path = path.as_path(); + let mut file = OpenOptions::new().create(true).append(true).open(path)?; + writeln!(file, "{json_report}")?; + } else { + info!(system_usage = json_report); + } + + Ok(()) + } + + fn is_running(&self) -> Result { + let inner_runner = self + .monitor_loop + .lock() + .map_err(|e| SystemMonitorError::Internal(e.to_string()))?; + Ok(inner_runner.is_some()) + } + + /// Stops the monitoring thread. + /// + /// Signals the background thread to stop and waits for it to join. + /// + /// # Errors + /// Returns an error if there is an issue stopping the thread, such as if the thread + /// panicked or if there are internal synchronization issues. + pub fn stop(&self) -> Result<()> { + self.stop.store(true, Ordering::Relaxed); + + if let Some(inner_runner) = self + .monitor_loop + .lock() + .map_err(|e| SystemMonitorError::Internal(e.to_string()))? + .take() + { + match inner_runner + .join() + .map_err(|_| SystemMonitorError::Internal("join error".to_owned()))? + { + Ok(_) => (), + Err(SystemMonitorError::NoProcess(_)) => (), // monitored process naturally died + e => e?, + } + } + + Ok(()) + } +} + +impl Drop for SystemMonitor { + fn drop(&mut self) { + let _ = self.stop(); + } +} + +#[cfg(test)] +mod tests { + use std::fs::File; + use std::io::{BufRead, BufReader}; + use std::time::Duration; + + use serial_test::serial; + use tempfile::tempdir; + + use super::*; + + #[test] + #[serial(monitor_process)] + fn test_monitor_self_disk_usage() -> Result<()> { + // Verifies that the system monitor correctly tracks and reports disk usage of this process + + let tempdir = tempdir()?; + let tempdir_path = tempdir.path(); + let log_path = TemplatedPathBuf::from(tempdir_path.join("system_monitor_{pid}.txt")); + let sample_interval = Duration::from_millis(500); + let monitor = SystemMonitor::follow_process(sample_interval, Some(log_path.clone()))?; + + // produce some disk usage + let data_file = tempdir_path.join("data"); + let total_written_bytes = { + let buffer = vec![0; 1024 * 1024]; // 1MiB + let mut fd = std::fs::OpenOptions::new() + .create(true) + .truncate(true) + .write(true) + .open(&data_file)?; + + for _ in 0..10 { + fd.write_all(&buffer)?; + } + fd.flush()?; + + 10 * 1024 * 1024 // 10MiB + }; + + // wait for the last sample and abort monitor + std::thread::sleep(Duration::from_secs(2)); + monitor.stop()?; + + // check monitor logs + let filesize = std::fs::metadata(data_file)?.len(); + assert_eq!(filesize, total_written_bytes); + + let log_reader = BufReader::new(File::open(log_path.as_path())?); + let last_message = log_reader.lines().last().unwrap()?; + let metrics: Metrics = serde_json::from_str(&last_message)?; + + // The total_written_bytes should be at least the size of the file created by this process. + assert!(metrics.disk.total_written_bytes >= total_written_bytes); + + Ok(()) + } + + #[test] + #[serial(monitor_process)] + fn test_monitor_self_memory_usage() -> Result<()> { + // Verifies that the system monitor correctly tracks and reports peak memory usage. + let tempdir = tempdir()?; + let tempdir_path = tempdir.path(); + let log_path = TemplatedPathBuf::from(tempdir_path.join("system_monitor_{pid}.txt")); + let sample_interval = Duration::from_millis(500); + let monitor = SystemMonitor::follow_process(sample_interval, Some(log_path.clone()))?; + + let peak_allocation_size = 512 * 1024 * 1024; // 512 MiB + + // Allocate a large chunk of memory. + let mut large_vec = vec![0u8; peak_allocation_size]; + // Touch each Page to commit usage. + for i in 0..peak_allocation_size / (4 * 1024) { + large_vec[i * 4 * 1024] = 1; + } + + // Wait for a sample to be taken while memory usage is high. + std::thread::sleep(Duration::from_secs(2)); + + // Drop the large allocation. + drop(large_vec); + + monitor.stop()?; + + // Check monitor logs. + let log_reader = BufReader::new(File::open(log_path.as_path())?); + let last_message = log_reader.lines().last().unwrap()?; + let metrics: Metrics = serde_json::from_str(&last_message)?; + + // The peak memory usage should be at least the size of our large allocation. + assert!(metrics.memory.peak_used_bytes >= peak_allocation_size as u64); + + Ok(()) + } + + #[test] + #[serial(monitor_process)] + fn test_monitor_nonexist_process() -> Result<()> { + // Verifies that the system monitor fails to initiate if targeted at an invalid pid + + let maybe_monitor = SystemMonitor::with_pid(Pid::from_u32(u32::MAX), Duration::from_secs(5), None); + assert!(maybe_monitor.is_err()); + + Ok(()) + } +} diff --git a/xet_config/src/groups/mod.rs b/xet_config/src/groups/mod.rs index a63793b6..2f68b0bc 100644 --- a/xet_config/src/groups/mod.rs +++ b/xet_config/src/groups/mod.rs @@ -5,4 +5,6 @@ pub mod deduplication; pub mod log; pub mod mdb_shard; pub mod reconstruction; +#[cfg(not(target_family = "wasm"))] +pub mod system_monitor; pub mod xorb; diff --git a/xet_config/src/groups/system_monitor.rs b/xet_config/src/groups/system_monitor.rs new file mode 100644 index 00000000..78e33bbe --- /dev/null +++ b/xet_config/src/groups/system_monitor.rs @@ -0,0 +1,38 @@ +use std::time::Duration; + +use utils::TemplatedPathBuf; + +crate::config_group!({ + /// Whether to enable system resource monitoring. + /// + /// When enabled, the system monitor will periodically sample and log system statistics + /// such as CPU usage, memory usage, and other resource metrics. + /// + /// The default value is false. + /// + /// Use the environment variable `HF_XET_SYSTEM_MONITOR_ENABLED` to set this value. + ref enabled: bool = false; + + /// The interval at which to sample system statistics. + /// + /// The default value is 5 seconds. + /// + /// Use the environment variable `HF_XET_SYSTEM_MONITOR_SAMPLE_INTERVAL` to set this value. + ref sample_interval: Duration = Duration::from_secs(5); + + /// The path to write the system monitor output to. + /// + /// If not set, the output will be written to tracing log at "INFO" level. + /// + /// Supports template variables (case-insensitive): + /// - `{PID}` - Replaced with the current process ID + /// - `{TIMESTAMP}` - Replaced with ISO 8601 timestamp in local timezone with offset + /// (e.g., `2024-02-05T14-30-45-0500`) + /// + /// Example: `~/logs/monitor_{PID}_{TIMESTAMP}.log` + /// + /// The default value is None. + /// + /// Use the environment variable `HF_XET_SYSTEM_MONITOR_LOG_PATH` to set this value. + ref log_path: Option = None; +}); diff --git a/xet_config/src/xet_config.rs b/xet_config/src/xet_config.rs index 3fe2467e..435bd274 100644 --- a/xet_config/src/xet_config.rs +++ b/xet_config/src/xet_config.rs @@ -13,6 +13,8 @@ pub struct XetConfig { pub log: groups::log::ConfigValues, pub reconstruction: groups::reconstruction::ConfigValues, pub xorb: groups::xorb::ConfigValues, + #[cfg(not(target_family = "wasm"))] + pub system_monitor: groups::system_monitor::ConfigValues, } impl XetConfig { @@ -42,6 +44,8 @@ impl XetConfig { self.log.apply_env_overrides(); self.reconstruction.apply_env_overrides(); self.xorb.apply_env_overrides(); + #[cfg(not(target_family = "wasm"))] + self.system_monitor.apply_env_overrides(); self } diff --git a/xet_runtime/src/runtime.rs b/xet_runtime/src/runtime.rs index d95fa21f..1cbe3f8e 100644 --- a/xet_runtime/src/runtime.rs +++ b/xet_runtime/src/runtime.rs @@ -8,6 +8,8 @@ use reqwest::Client; use tokio::runtime::{Builder as TokioRuntimeBuilder, Handle as TokioRuntimeHandle, Runtime as TokioRuntime}; use tokio::task::JoinHandle; use tracing::{debug, info}; +#[cfg(not(target_family = "wasm"))] +use utils::SystemMonitor; use xet_config::XetConfig; use crate::common::XetCommon; @@ -133,6 +135,10 @@ pub struct XetRuntime { // Primary configuration struct config: Arc, + + // System monitor instance if enabled, monitor starts on initiation + #[cfg(not(target_family = "wasm"))] + system_monitor: Option, } // Use thread-local references to the runtime that are set on initialization among all @@ -190,6 +196,18 @@ impl XetRuntime { external_executor_count: 0.into(), sigint_shutdown: false.into(), common: XetCommon::new(&config), + #[cfg(not(target_family = "wasm"))] + system_monitor: config + .system_monitor + .enabled + .then(|| { + SystemMonitor::follow_process( + config.system_monitor.sample_interval, + config.system_monitor.log_path.clone(), + ) + .ok() + }) + .flatten(), config: Arc::new(config), }); @@ -253,6 +271,18 @@ impl XetRuntime { external_executor_count: 0.into(), sigint_shutdown: false.into(), common: XetCommon::new(&config), + #[cfg(not(target_family = "wasm"))] + system_monitor: config + .system_monitor + .enabled + .then(|| { + SystemMonitor::follow_process( + config.system_monitor.sample_interval, + config.system_monitor.log_path.clone(), + ) + .ok() + }) + .flatten(), config: Arc::new(config), }) } @@ -324,6 +354,12 @@ impl XetRuntime { // Dropping the runtime will cancel all the tasks; shutdown occurs when the next async call // is encountered. Ideally, all async code should be cancellation safe. drop(runtime); + + // Stops the system monitor loop if there is one running. + #[cfg(not(target_family = "wasm"))] + if let Some(monitor) = &self.system_monitor { + let _ = monitor.stop(); + } } /// Discards the runtime without shutdown; to be used after fork-exec or spawn. @@ -371,6 +407,7 @@ impl XetRuntime { }); self.external_executor_count.fetch_sub(1, Ordering::SeqCst); + ret }