mirror of
https://github.com/huggingface/xet-core.git
synced 2026-06-04 13:30:29 +08:00
Test suite for directory logging functionality (#536)
This commit is contained in:
5
.vscode/settings.json
vendored
5
.vscode/settings.json
vendored
@@ -1,5 +1,6 @@
|
||||
{
|
||||
"[rust]": {
|
||||
"editor.defaultFormatter": "rust-lang.rust-analyzer"
|
||||
}
|
||||
}
|
||||
},
|
||||
"rust-analyzer.rustc.source": null
|
||||
}
|
||||
3
Cargo.lock
generated
3
Cargo.lock
generated
@@ -5049,7 +5049,10 @@ dependencies = [
|
||||
"console-subscriber",
|
||||
"error_printer",
|
||||
"git-version",
|
||||
"rand 0.9.1",
|
||||
"sysinfo",
|
||||
"tempfile",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"tracing-appender",
|
||||
"tracing-subscriber",
|
||||
|
||||
@@ -26,3 +26,12 @@ console-subscriber = { version = "0.4.1", optional = true }
|
||||
[features]
|
||||
default = []
|
||||
tokio-console = ["dep:console-subscriber"]
|
||||
|
||||
[[bin]]
|
||||
name = "log_test_executable"
|
||||
path = "tests/bin/log_test_executable.rs"
|
||||
|
||||
[dev-dependencies]
|
||||
tempfile = "3.0"
|
||||
tokio = { version = "1.0", features = ["rt", "time"] }
|
||||
rand = { workspace = true }
|
||||
|
||||
@@ -3,4 +3,4 @@ mod constants;
|
||||
mod logging;
|
||||
|
||||
pub use config::{LogDirConfig, LoggingConfig, LoggingMode};
|
||||
pub use logging::init;
|
||||
pub use logging::{init, wait_for_log_directory_cleanup};
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
use std::ffi::OsStr;
|
||||
use std::io;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::OnceLock;
|
||||
use std::sync::{Mutex, OnceLock};
|
||||
use std::thread::JoinHandle;
|
||||
use std::time::Duration;
|
||||
|
||||
use chrono::{DateTime, FixedOffset, Local, Utc};
|
||||
@@ -17,6 +18,19 @@ use utils::ByteSize;
|
||||
use crate::config::*;
|
||||
use crate::constants::{DEFAULT_LOG_LEVEL_CONSOLE, DEFAULT_LOG_LEVEL_FILE};
|
||||
|
||||
/// Global variable to hold the JoinHandle for the log cleanup thread
|
||||
static LOG_CLEANUP_HANDLE: Mutex<Option<JoinHandle<()>>> = Mutex::new(None);
|
||||
|
||||
/// Wait for the log directory cleanup to complete.
|
||||
/// This function blocks until the background cleanup thread finishes.
|
||||
pub fn wait_for_log_directory_cleanup() {
|
||||
if let Ok(mut handle_opt) = LOG_CLEANUP_HANDLE.lock()
|
||||
&& let Some(handle) = handle_opt.take()
|
||||
{
|
||||
let _ = handle.join();
|
||||
}
|
||||
}
|
||||
|
||||
/// The main entry point to set up logging. Should only be called once.
|
||||
pub fn init(cfg: LoggingConfig) {
|
||||
let mut dir_cleanup_task = None;
|
||||
@@ -112,6 +126,7 @@ fn init_logging_to_file(path: &Path, use_json: bool) -> Result<(), std::io::Erro
|
||||
let _ = FILE_GUARD.set(guard); // ignore error if already initialised
|
||||
|
||||
let registry = tracing_subscriber::registry();
|
||||
|
||||
#[cfg(feature = "tokio-console")]
|
||||
let registry = {
|
||||
// Console subscriber layer for tokio-console, custom filter for tokio trace level events
|
||||
@@ -189,11 +204,17 @@ struct CandidateLogFile {
|
||||
fn run_log_directory_cleanup_background(cfg: LogDirConfig, log_dir: &Path) {
|
||||
// Spawn run_log_directory_cleanup as background thread, logging any errors as a warn!
|
||||
let log_dir = log_dir.to_path_buf();
|
||||
std::thread::spawn(move || {
|
||||
let handle = std::thread::spawn(move || {
|
||||
if let Err(e) = run_log_directory_cleanup(cfg, &log_dir) {
|
||||
warn!("Error during log directory cleanup in {:?}: {}", log_dir, e);
|
||||
}
|
||||
});
|
||||
|
||||
// Store the JoinHandle in the global variable
|
||||
if let Ok(mut handle_opt) = LOG_CLEANUP_HANDLE.lock() {
|
||||
debug_assert!(handle_opt.is_none(), "Log directory cleanup called multiple times.");
|
||||
*handle_opt = Some(handle);
|
||||
}
|
||||
}
|
||||
|
||||
fn run_log_directory_cleanup(cfg: LogDirConfig, log_dir: &Path) -> io::Result<()> {
|
||||
|
||||
38
xet_logging/tests/bin/log_test_executable.rs
Normal file
38
xet_logging/tests/bin/log_test_executable.rs
Normal file
@@ -0,0 +1,38 @@
|
||||
use std::env;
|
||||
use std::time::Duration;
|
||||
|
||||
use tracing::info;
|
||||
use xet_logging::{LoggingConfig, LoggingMode};
|
||||
|
||||
fn main() {
|
||||
let args: Vec<String> = env::args().collect();
|
||||
if args.len() != 3 {
|
||||
eprintln!("Usage: {} <log_directory> <num_lines>", args[0]);
|
||||
std::process::exit(1);
|
||||
}
|
||||
|
||||
let log_directory = &args[1];
|
||||
let num_lines: usize = args[2].parse().expect("num_lines must be a number");
|
||||
|
||||
// Initialize logging to the specified directory
|
||||
let config = LoggingConfig {
|
||||
logging_mode: LoggingMode::Directory(log_directory.into()),
|
||||
use_json: true,
|
||||
enable_log_dir_cleanup: true,
|
||||
version: "test".to_string(),
|
||||
log_dir_config: xet_logging::LogDirConfig::default(),
|
||||
};
|
||||
|
||||
xet_logging::init(config);
|
||||
|
||||
// Generate log messages with 5ms delay between each
|
||||
for i in 0..num_lines {
|
||||
info!("Test log message number {} - this is a dummy log message for testing purposes", i + 1);
|
||||
|
||||
// Wait for 50 microseconds between each log message just to spread it out a little...
|
||||
std::thread::sleep(Duration::from_micros(50));
|
||||
}
|
||||
|
||||
// Wait for background cleanup to complete before exiting
|
||||
xet_logging::wait_for_log_directory_cleanup();
|
||||
}
|
||||
387
xet_logging/tests/integration_tests.rs
Normal file
387
xet_logging/tests/integration_tests.rs
Normal file
@@ -0,0 +1,387 @@
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::process::Command;
|
||||
use std::time::Duration;
|
||||
use std::{fs, thread};
|
||||
|
||||
use rand::prelude::*;
|
||||
use tempfile::TempDir;
|
||||
|
||||
/// Helper function to run the test executable with environment variables
|
||||
fn run_test_executable(log_dir: &Path, num_lines: usize, env_vars: &[(&str, &str)]) {
|
||||
let executable_path = PathBuf::from(env!("CARGO_BIN_EXE_log_test_executable"));
|
||||
|
||||
let mut command = Command::new(&executable_path);
|
||||
command.arg(log_dir.to_string_lossy().as_ref()).arg(num_lines.to_string());
|
||||
|
||||
// Set environment variables
|
||||
for (key, value) in env_vars {
|
||||
command.env(key, value);
|
||||
}
|
||||
|
||||
// Use spawn() instead of output() to pipe stderr through
|
||||
let mut child = command.spawn().expect("Failed to execute test executable");
|
||||
let status = child.wait().expect("Failed to wait for test executable");
|
||||
|
||||
if !status.success() {
|
||||
panic!("Test executable failed with status: {}", status);
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper function to run the test executable in parallel
|
||||
fn run_test_executable_parallel(log_dir: &Path, num_lines: usize, env_vars: &[(&str, &str)]) -> thread::JoinHandle<()> {
|
||||
let executable_path = PathBuf::from(env!("CARGO_BIN_EXE_log_test_executable"));
|
||||
let log_dir = log_dir.to_path_buf();
|
||||
let env_vars: Vec<(String, String)> = env_vars.iter().map(|(k, v)| (k.to_string(), v.to_string())).collect();
|
||||
|
||||
thread::spawn(move || {
|
||||
let mut command = Command::new(&executable_path);
|
||||
command.arg(log_dir.to_string_lossy().as_ref()).arg(num_lines.to_string());
|
||||
|
||||
// Set environment variables
|
||||
for (key, value) in &env_vars {
|
||||
command.env(key, value);
|
||||
}
|
||||
|
||||
// Use spawn() instead of output() to pipe stderr through
|
||||
let mut child = command.spawn().expect("Failed to execute test executable");
|
||||
let status = child.wait().expect("Failed to wait for test executable");
|
||||
|
||||
if !status.success() {
|
||||
panic!("Test executable failed with status: {}", status);
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Helper function to calculate total size of directory
|
||||
fn get_directory_size(dir: &Path) -> u64 {
|
||||
let mut total_size = 0;
|
||||
if let Ok(entries) = fs::read_dir(dir) {
|
||||
for entry in entries.flatten() {
|
||||
if let Ok(metadata) = entry.metadata() {
|
||||
if metadata.is_file() {
|
||||
total_size += metadata.len();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
total_size
|
||||
}
|
||||
|
||||
/// Helper function to count log files in directory
|
||||
fn count_log_files(dir: &Path) -> usize {
|
||||
if let Ok(entries) = fs::read_dir(dir) {
|
||||
entries
|
||||
.flatten()
|
||||
.filter(|entry| entry.path().extension().map_or(false, |ext| ext == "log"))
|
||||
.count()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_maximum_age_cleanup() {
|
||||
let temp_dir = TempDir::new().expect("Failed to create temp directory");
|
||||
let log_dir = temp_dir.path();
|
||||
|
||||
// Set up environment variables for 500ms max age
|
||||
let env_vars = [
|
||||
("HF_XET_LOG_DIR_MAX_RETENTION_AGE", "500ms"),
|
||||
("HF_XET_LOG_DIR_MIN_DELETION_AGE", "100ms"),
|
||||
("HF_XET_LOG_DIR_MAX_SIZE", "1gb"), // Set high to avoid size-based cleanup
|
||||
("HF_XET_LOG_DIR_DISABLE_CLEANUP", "false"),
|
||||
];
|
||||
|
||||
// Run the test executable multiple times to create enough log files
|
||||
for _ in 0..5 {
|
||||
run_test_executable(log_dir, 100, &env_vars);
|
||||
}
|
||||
|
||||
run_test_executable(log_dir, 10, &env_vars);
|
||||
|
||||
// Wait for files to age beyond the retention period
|
||||
std::thread::sleep(Duration::from_millis(1000));
|
||||
|
||||
// Run the test executable again to trigger cleanup
|
||||
run_test_executable(log_dir, 5, &env_vars);
|
||||
|
||||
// Check that old files have been cleaned up
|
||||
let log_files = count_log_files(log_dir);
|
||||
assert!(log_files <= 1, "Expected at most 1 log file after age-based cleanup, found {}", log_files);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_maximum_size_cleanup() {
|
||||
let temp_dir = TempDir::new().expect("Failed to create temp directory");
|
||||
let log_dir = temp_dir.path();
|
||||
|
||||
// Set up environment variables for 10kb max size
|
||||
let env_vars = [
|
||||
("HF_XET_LOG_DIR_MAX_SIZE", "10kb"),
|
||||
("HF_XET_LOG_DIR_MIN_DELETION_AGE", "1ms"), // Disable the minimum deletion guard
|
||||
("HF_XET_LOG_DIR_MAX_RETENTION_AGE", "1h"), // Set high to avoid age-based cleanup
|
||||
("HF_XET_LOG_DIR_DISABLE_CLEANUP", "false"),
|
||||
];
|
||||
|
||||
// Run the test executable multiple times to create enough log files
|
||||
for _ in 0..20 {
|
||||
run_test_executable(log_dir, 1000, &env_vars); // Small log files
|
||||
}
|
||||
|
||||
// Wait for disk to be synchronized.
|
||||
std::thread::sleep(Duration::from_millis(100));
|
||||
|
||||
// Run one final tiny executable to trigger cleanup of the previous file
|
||||
// This file should be small enough to stay under 10KB even if it's not cleaned up
|
||||
run_test_executable(log_dir, 1, &env_vars);
|
||||
|
||||
// Wait for the final cleanup to complete
|
||||
std::thread::sleep(Duration::from_millis(100));
|
||||
|
||||
// Check that directory size is within limits
|
||||
let total_size = get_directory_size(log_dir);
|
||||
assert!(total_size <= 10 * 1024, "Directory size {} exceeds 10kb limit", total_size);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_active_window_protection() {
|
||||
let temp_dir = TempDir::new().expect("Failed to create temp directory");
|
||||
let log_dir = temp_dir.path();
|
||||
|
||||
// Set up environment variables for small size limit to trigger cleanup
|
||||
let env_vars = [
|
||||
("HF_XET_LOG_DIR_MAX_SIZE", "1kb"),
|
||||
("HF_XET_LOG_DIR_MIN_DELETION_AGE", "1s"),
|
||||
("HF_XET_LOG_DIR_MAX_RETENTION_AGE", "1h"),
|
||||
("HF_XET_LOG_DIR_DISABLE_CLEANUP", "false"),
|
||||
];
|
||||
|
||||
// Run the test executable to create log files
|
||||
for _ in 0..3 {
|
||||
run_test_executable(log_dir, 100, &env_vars);
|
||||
}
|
||||
|
||||
// Wait for disk to be synchronized.
|
||||
std::thread::sleep(Duration::from_millis(100));
|
||||
|
||||
// Immediately check that the current log file still exists
|
||||
// (it should not be deleted because it's associated with an active process)
|
||||
let log_files = count_log_files(log_dir);
|
||||
assert!(log_files <= 5);
|
||||
|
||||
// Directory is larger than the minimum due to the above protection.
|
||||
let log_dir_size = get_directory_size(log_dir);
|
||||
assert!(log_dir_size > 1 * 1024);
|
||||
|
||||
// Wait a bit longer than the minimum deletion age and run again to create another log -- and clean the rest up.
|
||||
std::thread::sleep(Duration::from_secs(2));
|
||||
|
||||
for _ in 0..2 {
|
||||
run_test_executable(log_dir, 100, &env_vars);
|
||||
}
|
||||
|
||||
// Wait for disk to be synchronized.
|
||||
std::thread::sleep(Duration::from_millis(100));
|
||||
|
||||
// All the previous ones should now have been cleaned up.
|
||||
let log_files = count_log_files(log_dir);
|
||||
assert_eq!(log_files, 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cleanup_disabled() {
|
||||
let temp_dir = TempDir::new().expect("Failed to create temp directory");
|
||||
let log_dir = temp_dir.path();
|
||||
|
||||
// Set up environment variables to disable cleanup
|
||||
let env_vars = [
|
||||
("HF_XET_LOG_DIR_DISABLE_CLEANUP", "true"),
|
||||
("HF_XET_LOG_DIR_MAX_SIZE", "1kb"),
|
||||
("HF_XET_LOG_DIR_MAX_RETENTION_AGE", "1s"),
|
||||
];
|
||||
|
||||
// Run the test executable multiple times
|
||||
for _ in 0..3 {
|
||||
run_test_executable(log_dir, 20, &env_vars);
|
||||
std::thread::sleep(Duration::from_millis(50));
|
||||
}
|
||||
|
||||
// Wait for disk to be synchronized.
|
||||
std::thread::sleep(Duration::from_millis(100));
|
||||
|
||||
// All files should still be there since cleanup is disabled
|
||||
let log_files = count_log_files(log_dir);
|
||||
assert_eq!(log_files, 3);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_maximum_age_cleanup_parallel() {
|
||||
let temp_dir = TempDir::new().expect("Failed to create temp directory");
|
||||
let log_dir = temp_dir.path();
|
||||
std::fs::create_dir_all(log_dir).expect("Failed to create log directory");
|
||||
|
||||
let env_vars = [
|
||||
("HF_XET_LOG_DIR_MAX_RETENTION_AGE", "500ms"),
|
||||
("HF_XET_LOG_DIR_MIN_DELETION_AGE", "100ms"),
|
||||
("HF_XET_LOG_DIR_MAX_SIZE", "1gb"),
|
||||
("HF_XET_LOG_DIR_DISABLE_CLEANUP", "false"),
|
||||
];
|
||||
|
||||
// Spawn multiple background tasks
|
||||
let mut handles = Vec::new();
|
||||
for _ in 0..5 {
|
||||
let handle = run_test_executable_parallel(log_dir, 100, &env_vars);
|
||||
handles.push(handle);
|
||||
}
|
||||
|
||||
// Run one more to trigger cleanup
|
||||
run_test_executable(log_dir, 10, &env_vars);
|
||||
|
||||
// Wait for all background tasks to complete
|
||||
for handle in handles {
|
||||
handle.join().expect("Background task failed");
|
||||
}
|
||||
|
||||
std::thread::sleep(Duration::from_millis(1000));
|
||||
run_test_executable(log_dir, 5, &env_vars);
|
||||
std::thread::sleep(Duration::from_millis(500)); // Wait for background cleanup
|
||||
let log_files = count_log_files(log_dir);
|
||||
assert!(log_files <= 1, "Expected at most 1 log file after age-based cleanup, found {}", log_files);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_maximum_size_cleanup_parallel() {
|
||||
let temp_dir = TempDir::new().expect("Failed to create temp directory");
|
||||
let log_dir = temp_dir.path();
|
||||
std::fs::create_dir_all(log_dir).expect("Failed to create log directory");
|
||||
|
||||
let env_vars = [
|
||||
("HF_XET_LOG_DIR_MAX_RETENTION_AGE", "1h"), // Set high to avoid age-based cleanup
|
||||
("HF_XET_LOG_DIR_MIN_DELETION_AGE", "1ms"), // Disable the minimum deletion guard
|
||||
("HF_XET_LOG_DIR_MAX_SIZE", "10kb"),
|
||||
("HF_XET_LOG_DIR_DISABLE_CLEANUP", "false"),
|
||||
];
|
||||
|
||||
// Spawn multiple background tasks
|
||||
let mut handles = Vec::new();
|
||||
for _ in 0..20 {
|
||||
let handle = run_test_executable_parallel(log_dir, 1000, &env_vars);
|
||||
handles.push(handle);
|
||||
}
|
||||
|
||||
// Wait for all background tasks to complete
|
||||
for handle in handles {
|
||||
handle.join().expect("Background task failed");
|
||||
}
|
||||
|
||||
// Wait for disk to be synchronized.
|
||||
std::thread::sleep(Duration::from_millis(100));
|
||||
|
||||
// Run one final tiny executable to trigger cleanup of the previous file
|
||||
// This file should be small enough to stay under 10KB even if it's not cleaned up
|
||||
run_test_executable(log_dir, 1, &env_vars);
|
||||
|
||||
// Wait for the final cleanup to complete
|
||||
std::thread::sleep(Duration::from_millis(100));
|
||||
|
||||
// Check that directory size is within limits
|
||||
let total_size = get_directory_size(log_dir);
|
||||
assert!(total_size <= 10 * 1024, "Directory size {} exceeds 10kb limit", total_size);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_active_window_protection_parallel() {
|
||||
let temp_dir = TempDir::new().expect("Failed to create temp directory");
|
||||
let log_dir = temp_dir.path();
|
||||
std::fs::create_dir_all(log_dir).expect("Failed to create log directory");
|
||||
|
||||
let env_vars = [
|
||||
("HF_XET_LOG_DIR_MAX_RETENTION_AGE", "1h"), // Set high to avoid age-based cleanup
|
||||
("HF_XET_LOG_DIR_MIN_DELETION_AGE", "1ms"), // Disable the minimum deletion guard
|
||||
("HF_XET_LOG_DIR_MAX_SIZE", "10kb"),
|
||||
("HF_XET_LOG_DIR_DISABLE_CLEANUP", "false"),
|
||||
];
|
||||
|
||||
// Spawn multiple background tasks that will run concurrently
|
||||
let mut handles = Vec::new();
|
||||
for _ in 0..5 {
|
||||
let handle = run_test_executable_parallel(log_dir, 1000, &env_vars);
|
||||
handles.push(handle);
|
||||
}
|
||||
|
||||
// Wait for all background tasks to complete
|
||||
for handle in handles {
|
||||
handle.join().expect("Background task failed");
|
||||
}
|
||||
|
||||
// Wait for disk to be synchronized.
|
||||
std::thread::sleep(Duration::from_millis(100));
|
||||
|
||||
// Check that directory size is within limits (should be larger than 10kb due to active window protection)
|
||||
let total_size = get_directory_size(log_dir);
|
||||
assert!(
|
||||
total_size > 10 * 1024,
|
||||
"Expected directory size to exceed 10kb due to active window protection, found {}",
|
||||
total_size
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cleanup_stress_test() {
|
||||
let temp_dir = TempDir::new().expect("Failed to create temp directory");
|
||||
let log_dir = temp_dir.path();
|
||||
|
||||
let env_vars = [
|
||||
("HF_XET_LOG_DIR_MAX_RETENTION_AGE", "1s"), // Set to 1 sec so that files are deleted properly
|
||||
("HF_XET_LOG_DIR_MIN_DELETION_AGE", "1ms"), // Disable the minimum deletion guard
|
||||
("HF_XET_LOG_DIR_MAX_SIZE", "10kb"), // Small size limit to trigger frequent cleanup
|
||||
("HF_XET_LOG_DIR_DISABLE_CLEANUP", "false"),
|
||||
];
|
||||
|
||||
// Spawn many background tasks to try to reproduce race conditions
|
||||
let mut handles = Vec::new();
|
||||
let mut rng = StdRng::seed_from_u64(42);
|
||||
for i in 0..100 {
|
||||
//
|
||||
let log_size = rng.random_range(1..=250);
|
||||
let handle = run_test_executable_parallel(log_dir, log_size, &env_vars);
|
||||
handles.push(handle);
|
||||
|
||||
// Small delay between spawns to create overlapping execution
|
||||
if i % 10 == 0 {
|
||||
std::thread::sleep(Duration::from_millis(20));
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for all background tasks to complete
|
||||
for handle in handles {
|
||||
handle.join().expect("Background task failed");
|
||||
}
|
||||
|
||||
// Wait for any remaining cleanup to complete
|
||||
std::thread::sleep(Duration::from_millis(100));
|
||||
|
||||
// Run a few more tasks to trigger final cleanup
|
||||
let mut handles = Vec::new();
|
||||
for _ in 0..5 {
|
||||
let handle = run_test_executable_parallel(log_dir, 5, &env_vars);
|
||||
handles.push(handle);
|
||||
}
|
||||
|
||||
for handle in handles {
|
||||
handle.join().expect("Background task failed");
|
||||
}
|
||||
|
||||
// Wait for the above files to expire.
|
||||
std::thread::sleep(Duration::from_millis(2000));
|
||||
|
||||
run_test_executable(log_dir, 1, &env_vars);
|
||||
|
||||
// Check that directory size is within limits (should be under 50kb due to cleanup)
|
||||
let total_size = get_directory_size(log_dir);
|
||||
assert!(
|
||||
total_size <= 10 * 1024,
|
||||
"Expected directory size to be under 50kb after stress test cleanup, found {}",
|
||||
total_size
|
||||
);
|
||||
}
|
||||
Reference in New Issue
Block a user