fix: no timeout for shard uploads (XET-885) (#685)

Fixes
[XET-885](https://linear.app/xet/issue/XET-885/investigate-unsloth-upload-failure-shard-upload-timeout-on-cas)

## Summary

Shard uploads to CAS can take a long time due to server-side processing
(DynamoDB writes scale with file entry count). The default
`read_timeout(120s)` on the reqwest client kills these uploads.

**Key insight:** reqwest's per-request `RequestBuilder::timeout()` does
NOT override the client-level `read_timeout()` — they are independent
mechanisms polled as separate futures. So the original approach of using
per-request timeouts was ineffective.

**Fix:** Create a dedicated `shard_upload_http_client` on `RemoteClient`
with **no `read_timeout`**, built once at construction time and reused
for all shard uploads. All other settings (connect timeout, pool config,
auth middleware) are identical to the standard client.

## Changes

### `cas_client/src/http_client.rs`
- Added `reqwest_client_no_read_timeout()` — creates a reqwest client
with no `read_timeout`
- Added `build_auth_http_client_no_read_timeout()` — public API wrapping
it with middleware
- 4 unit tests for the new builder

### `cas_client/src/remote_client.rs`
- Added `shard_upload_http_client` field to `RemoteClient` (cfg'd out on
wasm)
- `upload_shard()` uses the pre-built no-timeout client instead of
building one per request

### `cas_client/tests/test_shard_upload_timeout.rs`
- Updated: slow server test now asserts **success** (shard uploads
should wait as long as needed)

### `xet_config/src/groups/client.rs`
- Removed `shard_read_timeout` config field (no longer needed)

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Rajat Arya
2026-03-11 09:05:40 -07:00
committed by GitHub
parent 9ba5fb3e5b
commit 83a28271ea
5 changed files with 173 additions and 13 deletions

1
Cargo.lock generated
View File

@@ -470,6 +470,7 @@ dependencies = [
"bytes",
"cas_types",
"clap",
"ctor",
"deduplication",
"duration-str",
"error_printer",

View File

@@ -77,6 +77,7 @@ web-time = { workspace = true }
[dev-dependencies]
approx = { workspace = true }
ctor = { workspace = true }
httpmock = { workspace = true }
rand_distr = { workspace = true }
tracing-test = { workspace = true }

View File

@@ -108,6 +108,46 @@ fn reqwest_client(
Ok(client)
}
/// Creates a reqwest client with no read_timeout. Used for shard uploads where server-side
/// processing time is unbounded and would otherwise be killed by the global read_timeout.
/// Not cached — uses a separate connection pool from the main client.
#[cfg(not(target_family = "wasm"))]
#[allow(unused_variables)]
fn reqwest_client_no_read_timeout(
unix_socket_path: Option<&str>,
custom_headers: Option<Arc<HeaderMap>>,
) -> Result<reqwest::Client, CasClientError> {
let socket_path = unix_socket_path
.map(|s| s.to_string())
.or_else(|| xet_config().client.unix_socket_path.clone());
let config = &xet_config().client;
let mut builder = reqwest::Client::builder()
.pool_idle_timeout(config.idle_connection_timeout)
.pool_max_idle_per_host(config.max_idle_connections)
.connect_timeout(config.connect_timeout)
// No read_timeout — shard processing time scales with entry count and is unbounded
.http1_only();
#[cfg(unix)]
if let Some(ref path) = socket_path {
builder = builder.unix_socket(path.clone());
}
if let Some(headers) = custom_headers {
builder = builder.default_headers((*headers).clone());
}
let client = builder.build()?;
info!(
connect_timeout=?config.connect_timeout,
"No-read-timeout HTTP client configured (for shard uploads)"
);
Ok(client)
}
#[cfg(target_family = "wasm")]
fn reqwest_client(
_unix_socket_path: Option<&str>,
@@ -149,6 +189,38 @@ pub fn build_auth_http_client(
.build())
}
/// Builds an authenticated HTTP Client with no read_timeout.
///
/// All other settings (connect timeout, pool config, middleware) are identical
/// to the standard client. Used for shard uploads where server-side processing time
/// scales with file entry count and can exceed the global read_timeout.
#[cfg(not(target_family = "wasm"))]
#[allow(unused_mut)]
pub fn build_auth_http_client_no_read_timeout(
auth_config: &Option<AuthConfig>,
session_id: &str,
unix_socket_path: Option<&str>,
custom_headers: Option<Arc<HeaderMap>>,
) -> Result<ClientWithMiddleware, CasClientError> {
let auth_middleware = auth_config.as_ref().map(AuthMiddleware::from).info_none("CAS auth disabled");
let logging_middleware = Some(LoggingMiddleware);
let session_middleware = (!session_id.is_empty()).then(|| SessionMiddleware(session_id.to_owned()));
let raw_client = reqwest_client_no_read_timeout(unix_socket_path, custom_headers)?;
let mut builder = ClientBuilder::new(raw_client);
#[cfg(unix)]
if unix_socket_path.is_some() {
builder = builder.with(HttpsToHttpMiddleware);
}
Ok(builder
.maybe_with(auth_middleware)
.maybe_with(logging_middleware)
.maybe_with(session_middleware)
.build())
}
/// Builds HTTP Client to talk to CAS.
pub fn build_http_client(
session_id: &str,
@@ -388,4 +460,42 @@ mod tests {
let result = build_http_client("test-session", None, None);
assert!(result.is_ok());
}
mod shard_upload_client_tests {
use super::*;
#[test]
fn test_build_no_read_timeout_succeeds() {
let result = build_auth_http_client_no_read_timeout(&None, "test-session", None, None);
assert!(result.is_ok());
}
#[test]
fn test_build_no_read_timeout_with_empty_session_id() {
let result = build_auth_http_client_no_read_timeout(&None, "", None, None);
assert!(result.is_ok());
}
#[test]
fn test_build_no_read_timeout_with_custom_headers() {
let mut headers = HeaderMap::new();
headers.insert("X-Custom-Header", HeaderValue::from_static("test-value"));
headers.insert(reqwest::header::USER_AGENT, HeaderValue::from_static("test-agent/1.0"));
let result = build_auth_http_client_no_read_timeout(&None, "test-session", None, Some(Arc::new(headers)));
assert!(result.is_ok());
}
#[test]
fn test_no_read_timeout_client_is_distinct_from_standard_client() {
let standard = build_auth_http_client(&None, "test-session", None, None).unwrap();
let no_timeout = build_auth_http_client_no_read_timeout(&None, "test-session", None, None).unwrap();
assert_ne!(
format!("{:p}", &standard),
format!("{:p}", &no_timeout),
"Standard and no-timeout clients should be distinct instances"
);
}
}
}

View File

@@ -42,6 +42,10 @@ pub struct RemoteClient {
dry_run: bool,
http_client: Arc<ClientWithMiddleware>,
authenticated_http_client: Arc<ClientWithMiddleware>,
/// Authenticated client with no read_timeout, used for shard uploads where server-side
/// processing time scales with file entry count and can exceed the global read_timeout.
#[cfg(not(target_family = "wasm"))]
shard_upload_http_client: Arc<ClientWithMiddleware>,
upload_concurrency_controller: Arc<AdaptiveConcurrencyController>,
download_concurrency_controller: Arc<AdaptiveConcurrencyController>,
}
@@ -72,7 +76,12 @@ impl RemoteClient {
.unwrap(),
),
http_client: Arc::new(
http_client::build_http_client(session_id, unix_socket_path, custom_headers).unwrap(),
http_client::build_http_client(session_id, unix_socket_path, custom_headers.clone()).unwrap(),
),
#[cfg(not(target_family = "wasm"))]
shard_upload_http_client: Arc::new(
http_client::build_auth_http_client_no_read_timeout(auth, session_id, unix_socket_path, custom_headers)
.unwrap(),
),
upload_concurrency_controller: AdaptiveConcurrencyController::new_upload("upload"),
download_concurrency_controller: AdaptiveConcurrencyController::new_download("download"),
@@ -97,16 +106,7 @@ impl RemoteClient {
dry_run: bool,
custom_headers: Option<Arc<HeaderMap>>,
) -> Arc<Self> {
Arc::new(Self {
endpoint: endpoint.to_string(),
dry_run,
authenticated_http_client: Arc::new(
http_client::build_auth_http_client(auth, session_id, None, custom_headers.clone()).unwrap(),
),
http_client: Arc::new(http_client::build_http_client(session_id, None, custom_headers).unwrap()),
upload_concurrency_controller: AdaptiveConcurrencyController::new_upload("upload"),
download_concurrency_controller: AdaptiveConcurrencyController::new_download("download"),
})
Self::new_with_socket(endpoint, auth, session_id, dry_run, None, custom_headers)
}
/// Get the endpoint URL.
@@ -414,10 +414,18 @@ impl Client for RemoteClient {
event!(INFORMATION_LOG_LEVEL, call_id, size = n_upload_bytes, "Starting upload_shard API call",);
let api_tag = "cas::upload_shard";
let client = self.authenticated_http_client.clone();
let url = Url::parse(&format!("{}/shards", self.endpoint))?;
// Use the no-read-timeout client for shard uploads. reqwest's per-request timeout()
// does NOT override the client-level read_timeout(), so we use a separate client
// with no read_timeout. Server-side shard processing scales linearly with file entry
// count and can exceed the global read_timeout (120s) for large shards.
#[cfg(not(target_family = "wasm"))]
let client = self.shard_upload_http_client.clone();
#[cfg(target_family = "wasm")]
let client = self.authenticated_http_client.clone();
let response: UploadShardResponse = RetryWrapper::new(api_tag)
.with_connection_permit(upload_permit, Some(shard_data.len() as u64))
.run_and_extract_json(move || {

View File

@@ -0,0 +1,40 @@
//! Integration tests for the shard upload no-read-timeout client (XET-885).
//!
//! Verifies that shard uploads succeed even when the server takes a long time to process,
//! since the shard upload client has no read_timeout.
use std::time::Duration;
use cas_client::simulation::ClientTestingUtils;
use cas_client::{DirectAccessClient, LocalTestServerBuilder};
use utils::test_set_config;
test_set_config! {
client {
retry_max_attempts = 1usize;
retry_base_delay = Duration::from_millis(10);
}
}
const CHUNK_SIZE: usize = 123;
#[tokio::test]
async fn test_shard_upload_succeeds_with_no_server_delay() {
let server = LocalTestServerBuilder::new().start().await;
let result = server.remote_client().upload_random_file(&[(1, (0, 5))], CHUNK_SIZE).await;
assert!(result.is_ok(), "Shard upload should succeed with no server delay: {result:?}");
}
#[tokio::test]
async fn test_shard_upload_succeeds_with_slow_server() {
let server = LocalTestServerBuilder::new().start().await;
// Server takes 3s to respond — shard upload client has no read_timeout so this should succeed
server.set_api_delay_range(Some(Duration::from_secs(3)..Duration::from_secs(3)));
let result = server.remote_client().upload_random_file(&[(1, (0, 5))], CHUNK_SIZE).await;
assert!(result.is_ok(), "Shard upload should succeed even with slow server (no read_timeout): {result:?}");
}