From 83a28271ea1b93e045f1d351b52ad846a6e96320 Mon Sep 17 00:00:00 2001 From: Rajat Arya Date: Wed, 11 Mar 2026 09:05:40 -0700 Subject: [PATCH] fix: no timeout for shard uploads (XET-885) (#685) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes [XET-885](https://linear.app/xet/issue/XET-885/investigate-unsloth-upload-failure-shard-upload-timeout-on-cas) ## Summary Shard uploads to CAS can take a long time due to server-side processing (DynamoDB writes scale with file entry count). The default `read_timeout(120s)` on the reqwest client kills these uploads. **Key insight:** reqwest's per-request `RequestBuilder::timeout()` does NOT override the client-level `read_timeout()` — they are independent mechanisms polled as separate futures. So the original approach of using per-request timeouts was ineffective. **Fix:** Create a dedicated `shard_upload_http_client` on `RemoteClient` with **no `read_timeout`**, built once at construction time and reused for all shard uploads. All other settings (connect timeout, pool config, auth middleware) are identical to the standard client. ## Changes ### `cas_client/src/http_client.rs` - Added `reqwest_client_no_read_timeout()` — creates a reqwest client with no `read_timeout` - Added `build_auth_http_client_no_read_timeout()` — public API wrapping it with middleware - 4 unit tests for the new builder ### `cas_client/src/remote_client.rs` - Added `shard_upload_http_client` field to `RemoteClient` (cfg'd out on wasm) - `upload_shard()` uses the pre-built no-timeout client instead of building one per request ### `cas_client/tests/test_shard_upload_timeout.rs` - Updated: slow server test now asserts **success** (shard uploads should wait as long as needed) ### `xet_config/src/groups/client.rs` - Removed `shard_read_timeout` config field (no longer needed) --------- Co-authored-by: Claude Opus 4.6 (1M context) --- Cargo.lock | 1 + cas_client/Cargo.toml | 1 + cas_client/src/http_client.rs | 110 ++++++++++++++++++ cas_client/src/remote_client.rs | 34 +++--- cas_client/tests/test_shard_upload_timeout.rs | 40 +++++++ 5 files changed, 173 insertions(+), 13 deletions(-) create mode 100644 cas_client/tests/test_shard_upload_timeout.rs diff --git a/Cargo.lock b/Cargo.lock index acafe3a0..da0886d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -470,6 +470,7 @@ dependencies = [ "bytes", "cas_types", "clap", + "ctor", "deduplication", "duration-str", "error_printer", diff --git a/cas_client/Cargo.toml b/cas_client/Cargo.toml index 5f9d5385..669488e6 100644 --- a/cas_client/Cargo.toml +++ b/cas_client/Cargo.toml @@ -77,6 +77,7 @@ web-time = { workspace = true } [dev-dependencies] approx = { workspace = true } +ctor = { workspace = true } httpmock = { workspace = true } rand_distr = { workspace = true } tracing-test = { workspace = true } diff --git a/cas_client/src/http_client.rs b/cas_client/src/http_client.rs index f0efc946..9c102fc7 100644 --- a/cas_client/src/http_client.rs +++ b/cas_client/src/http_client.rs @@ -108,6 +108,46 @@ fn reqwest_client( Ok(client) } +/// Creates a reqwest client with no read_timeout. Used for shard uploads where server-side +/// processing time is unbounded and would otherwise be killed by the global read_timeout. +/// Not cached — uses a separate connection pool from the main client. +#[cfg(not(target_family = "wasm"))] +#[allow(unused_variables)] +fn reqwest_client_no_read_timeout( + unix_socket_path: Option<&str>, + custom_headers: Option>, +) -> Result { + let socket_path = unix_socket_path + .map(|s| s.to_string()) + .or_else(|| xet_config().client.unix_socket_path.clone()); + + let config = &xet_config().client; + let mut builder = reqwest::Client::builder() + .pool_idle_timeout(config.idle_connection_timeout) + .pool_max_idle_per_host(config.max_idle_connections) + .connect_timeout(config.connect_timeout) + // No read_timeout — shard processing time scales with entry count and is unbounded + .http1_only(); + + #[cfg(unix)] + if let Some(ref path) = socket_path { + builder = builder.unix_socket(path.clone()); + } + + if let Some(headers) = custom_headers { + builder = builder.default_headers((*headers).clone()); + } + + let client = builder.build()?; + + info!( + connect_timeout=?config.connect_timeout, + "No-read-timeout HTTP client configured (for shard uploads)" + ); + + Ok(client) +} + #[cfg(target_family = "wasm")] fn reqwest_client( _unix_socket_path: Option<&str>, @@ -149,6 +189,38 @@ pub fn build_auth_http_client( .build()) } +/// Builds an authenticated HTTP Client with no read_timeout. +/// +/// All other settings (connect timeout, pool config, middleware) are identical +/// to the standard client. Used for shard uploads where server-side processing time +/// scales with file entry count and can exceed the global read_timeout. +#[cfg(not(target_family = "wasm"))] +#[allow(unused_mut)] +pub fn build_auth_http_client_no_read_timeout( + auth_config: &Option, + session_id: &str, + unix_socket_path: Option<&str>, + custom_headers: Option>, +) -> Result { + let auth_middleware = auth_config.as_ref().map(AuthMiddleware::from).info_none("CAS auth disabled"); + let logging_middleware = Some(LoggingMiddleware); + let session_middleware = (!session_id.is_empty()).then(|| SessionMiddleware(session_id.to_owned())); + + let raw_client = reqwest_client_no_read_timeout(unix_socket_path, custom_headers)?; + let mut builder = ClientBuilder::new(raw_client); + + #[cfg(unix)] + if unix_socket_path.is_some() { + builder = builder.with(HttpsToHttpMiddleware); + } + + Ok(builder + .maybe_with(auth_middleware) + .maybe_with(logging_middleware) + .maybe_with(session_middleware) + .build()) +} + /// Builds HTTP Client to talk to CAS. pub fn build_http_client( session_id: &str, @@ -388,4 +460,42 @@ mod tests { let result = build_http_client("test-session", None, None); assert!(result.is_ok()); } + + mod shard_upload_client_tests { + use super::*; + + #[test] + fn test_build_no_read_timeout_succeeds() { + let result = build_auth_http_client_no_read_timeout(&None, "test-session", None, None); + assert!(result.is_ok()); + } + + #[test] + fn test_build_no_read_timeout_with_empty_session_id() { + let result = build_auth_http_client_no_read_timeout(&None, "", None, None); + assert!(result.is_ok()); + } + + #[test] + fn test_build_no_read_timeout_with_custom_headers() { + let mut headers = HeaderMap::new(); + headers.insert("X-Custom-Header", HeaderValue::from_static("test-value")); + headers.insert(reqwest::header::USER_AGENT, HeaderValue::from_static("test-agent/1.0")); + + let result = build_auth_http_client_no_read_timeout(&None, "test-session", None, Some(Arc::new(headers))); + assert!(result.is_ok()); + } + + #[test] + fn test_no_read_timeout_client_is_distinct_from_standard_client() { + let standard = build_auth_http_client(&None, "test-session", None, None).unwrap(); + let no_timeout = build_auth_http_client_no_read_timeout(&None, "test-session", None, None).unwrap(); + + assert_ne!( + format!("{:p}", &standard), + format!("{:p}", &no_timeout), + "Standard and no-timeout clients should be distinct instances" + ); + } + } } diff --git a/cas_client/src/remote_client.rs b/cas_client/src/remote_client.rs index ed44ae81..8079a49b 100644 --- a/cas_client/src/remote_client.rs +++ b/cas_client/src/remote_client.rs @@ -42,6 +42,10 @@ pub struct RemoteClient { dry_run: bool, http_client: Arc, authenticated_http_client: Arc, + /// Authenticated client with no read_timeout, used for shard uploads where server-side + /// processing time scales with file entry count and can exceed the global read_timeout. + #[cfg(not(target_family = "wasm"))] + shard_upload_http_client: Arc, upload_concurrency_controller: Arc, download_concurrency_controller: Arc, } @@ -72,7 +76,12 @@ impl RemoteClient { .unwrap(), ), http_client: Arc::new( - http_client::build_http_client(session_id, unix_socket_path, custom_headers).unwrap(), + http_client::build_http_client(session_id, unix_socket_path, custom_headers.clone()).unwrap(), + ), + #[cfg(not(target_family = "wasm"))] + shard_upload_http_client: Arc::new( + http_client::build_auth_http_client_no_read_timeout(auth, session_id, unix_socket_path, custom_headers) + .unwrap(), ), upload_concurrency_controller: AdaptiveConcurrencyController::new_upload("upload"), download_concurrency_controller: AdaptiveConcurrencyController::new_download("download"), @@ -97,16 +106,7 @@ impl RemoteClient { dry_run: bool, custom_headers: Option>, ) -> Arc { - Arc::new(Self { - endpoint: endpoint.to_string(), - dry_run, - authenticated_http_client: Arc::new( - http_client::build_auth_http_client(auth, session_id, None, custom_headers.clone()).unwrap(), - ), - http_client: Arc::new(http_client::build_http_client(session_id, None, custom_headers).unwrap()), - upload_concurrency_controller: AdaptiveConcurrencyController::new_upload("upload"), - download_concurrency_controller: AdaptiveConcurrencyController::new_download("download"), - }) + Self::new_with_socket(endpoint, auth, session_id, dry_run, None, custom_headers) } /// Get the endpoint URL. @@ -414,10 +414,18 @@ impl Client for RemoteClient { event!(INFORMATION_LOG_LEVEL, call_id, size = n_upload_bytes, "Starting upload_shard API call",); let api_tag = "cas::upload_shard"; - let client = self.authenticated_http_client.clone(); - let url = Url::parse(&format!("{}/shards", self.endpoint))?; + // Use the no-read-timeout client for shard uploads. reqwest's per-request timeout() + // does NOT override the client-level read_timeout(), so we use a separate client + // with no read_timeout. Server-side shard processing scales linearly with file entry + // count and can exceed the global read_timeout (120s) for large shards. + #[cfg(not(target_family = "wasm"))] + let client = self.shard_upload_http_client.clone(); + + #[cfg(target_family = "wasm")] + let client = self.authenticated_http_client.clone(); + let response: UploadShardResponse = RetryWrapper::new(api_tag) .with_connection_permit(upload_permit, Some(shard_data.len() as u64)) .run_and_extract_json(move || { diff --git a/cas_client/tests/test_shard_upload_timeout.rs b/cas_client/tests/test_shard_upload_timeout.rs new file mode 100644 index 00000000..8b962497 --- /dev/null +++ b/cas_client/tests/test_shard_upload_timeout.rs @@ -0,0 +1,40 @@ +//! Integration tests for the shard upload no-read-timeout client (XET-885). +//! +//! Verifies that shard uploads succeed even when the server takes a long time to process, +//! since the shard upload client has no read_timeout. + +use std::time::Duration; + +use cas_client::simulation::ClientTestingUtils; +use cas_client::{DirectAccessClient, LocalTestServerBuilder}; +use utils::test_set_config; + +test_set_config! { + client { + retry_max_attempts = 1usize; + retry_base_delay = Duration::from_millis(10); + } +} + +const CHUNK_SIZE: usize = 123; + +#[tokio::test] +async fn test_shard_upload_succeeds_with_no_server_delay() { + let server = LocalTestServerBuilder::new().start().await; + + let result = server.remote_client().upload_random_file(&[(1, (0, 5))], CHUNK_SIZE).await; + + assert!(result.is_ok(), "Shard upload should succeed with no server delay: {result:?}"); +} + +#[tokio::test] +async fn test_shard_upload_succeeds_with_slow_server() { + let server = LocalTestServerBuilder::new().start().await; + + // Server takes 3s to respond — shard upload client has no read_timeout so this should succeed + server.set_api_delay_range(Some(Duration::from_secs(3)..Duration::from_secs(3))); + + let result = server.remote_client().upload_random_file(&[(1, (0, 5))], CHUNK_SIZE).await; + + assert!(result.is_ok(), "Shard upload should succeed even with slow server (no read_timeout): {result:?}"); +}