Unify sync and async download/upload groups in session interface. (#719)

Currently, `UploadCommitSync` and `DownloadGroupSync` are thin wrappers
around `UploadCommit` and `DownloadGroup` that delegate every method
through `external_run_async_task`. This means two types, two sets of doc
comments, and two test suites covering the same underlying behavior.

This PR removes the separate sync types and adds `_blocking` suffixed
methods directly on `UploadCommit` and `DownloadGroup`. The session
factory methods `new_upload_commit_blocking()` and
`new_download_group_blocking()` now return the same types as their async
counterparts, and the entire `xet_session::sync` module is deleted (~680
lines removed).

This also fixes a minor bug: `UploadCommitSync::upload_from_path` did
not call `std::path::absolute()` on the file path before dispatching,
unlike the async version. The new `upload_from_path_blocking` includes
the `std::path::absolute()` call, matching the async version's behavior.
This commit is contained in:
Hoyt Koepke
2026-03-16 13:33:46 -07:00
committed by GitHub
parent 71f8570a0e
commit 79df99ad01
9 changed files with 594 additions and 764 deletions

View File

@@ -0,0 +1,123 @@
# API Update: Unify UploadCommit and DownloadGroup async/sync types (2026-03-16)
## Overview
`UploadCommitSync` and `DownloadGroupSync` have been removed. Their blocking
methods now live directly on `UploadCommit` and `DownloadGroup` as `_blocking`
suffixed methods. The session factory methods `new_upload_commit_blocking()`
and `new_download_group_blocking()` now return the same types as their async
counterparts.
---
## Removed Types
| Removed Type | Replacement |
|---|---|
| `UploadCommitSync` (was in `xet::xet_session::sync::UploadCommitSync`) | `UploadCommit` with `_blocking` methods |
| `DownloadGroupSync` (was in `xet::xet_session::sync::DownloadGroupSync`) | `DownloadGroup` with `_blocking` methods |
The entire `xet::xet_session::sync` module has been deleted.
---
## Changed Return Types
| Method | Old Return Type | New Return Type |
|---|---|---|
| `XetSession::new_upload_commit_blocking()` | `Result<UploadCommitSync, SessionError>` | `Result<UploadCommit, SessionError>` |
| `XetSession::new_download_group_blocking()` | `Result<DownloadGroupSync, SessionError>` | `Result<DownloadGroup, SessionError>` |
---
## New Blocking Methods on `UploadCommit`
| New Method | Async Equivalent |
|---|---|
| `upload_from_path_blocking(&self, PathBuf, Sha256Policy) -> Result<UploadTaskHandle, SessionError>` | `upload_from_path(&self, PathBuf, Sha256Policy).await` |
| `upload_bytes_blocking(&self, Vec<u8>, Sha256Policy, Option<String>) -> Result<UploadTaskHandle, SessionError>` | `upload_bytes(&self, Vec<u8>, Sha256Policy, Option<String>).await` |
| `upload_file_blocking(&self, Option<String>, u64, Sha256Policy) -> Result<(TaskHandle, SingleFileCleaner), SessionError>` | `upload_file(&self, Option<String>, u64, Sha256Policy).await` |
| `commit_blocking(self) -> Result<HashMap<Ulid, UploadResult>, SessionError>` | `commit(self).await` |
All blocking methods use `runtime.external_run_async_task()` internally and
**must not be called from within a tokio runtime** (they will panic).
---
## New Blocking Method on `DownloadGroup`
| New Method | Async Equivalent |
|---|---|
| `finish_blocking(self) -> Result<HashMap<Ulid, DownloadResult>, SessionError>` | `finish(self).await` |
`download_file_to_path` and `get_progress` were already synchronous — no
changes needed.
---
## Migration Guide
### Sync callers (old `UploadCommitSync` / `DownloadGroupSync` usage)
```rust
// Old
let commit: UploadCommitSync = session.new_upload_commit_blocking()?;
let handle = commit.upload_from_path(path, sha256)?;
let handle2 = commit.upload_bytes(bytes, sha256, name)?;
let (_h, cleaner) = commit.upload_file(name, size, sha256)?;
let results = commit.commit()?;
let group: DownloadGroupSync = session.new_download_group_blocking()?;
group.download_file_to_path(info, dest)?;
let results = group.finish()?;
// New
let commit: UploadCommit = session.new_upload_commit_blocking()?;
let handle = commit.upload_from_path_blocking(path, sha256)?;
let handle2 = commit.upload_bytes_blocking(bytes, sha256, name)?;
let (_h, cleaner) = commit.upload_file_blocking(name, size, sha256)?;
let results = commit.commit_blocking()?;
let group: DownloadGroup = session.new_download_group_blocking()?;
group.download_file_to_path(info, dest)?; // unchanged — already sync
let results = group.finish_blocking()?;
```
### Async callers
No changes needed. `UploadCommit` and `DownloadGroup` retain all their
existing async methods (`upload_from_path`, `upload_bytes`, `upload_file`,
`commit`, `finish`).
### Import changes
```rust
// Old
use xet::xet_session::{UploadCommitSync, DownloadGroupSync};
use xet::xet_session::sync::{UploadCommitSync, DownloadGroupSync};
// New — remove the imports entirely; use UploadCommit / DownloadGroup instead
use xet::xet_session::{UploadCommit, DownloadGroup};
```
---
## Bug Fix
`UploadCommitSync::upload_from_path` did not call `std::path::absolute()` on
the file path before dispatching, unlike the async `UploadCommit::upload_from_path`.
The new `upload_from_path_blocking` includes the `std::path::absolute()` call,
matching the async version's behavior.
---
## Files Changed
| File | Change |
|---|---|
| `xet_pkg/src/xet_session/upload_commit.rs` | Added `_blocking` methods, updated doc comments |
| `xet_pkg/src/xet_session/download_group.rs` | Added `finish_blocking`, updated doc comments |
| `xet_pkg/src/xet_session/session.rs` | Changed `new_*_blocking()` return types from `*Sync` to unified types |
| `xet_pkg/src/xet_session/mod.rs` | Removed `pub mod sync` and sync type re-exports, updated doc comments |
| `xet_pkg/src/xet_session/sync/` | Entire directory deleted |
| `xet_pkg/examples/example_sync.rs` | Updated to use `_blocking` methods |

View File

@@ -60,7 +60,7 @@ fn upload_files(files: Vec<PathBuf>, endpoint: Option<String>) -> Result<()> {
let n_files = files.len();
let mut handles = Vec::with_capacity(n_files);
for f in &files {
handles.push(commit.upload_from_path(f.clone(), Sha256Policy::Compute)?);
handles.push(commit.upload_from_path_blocking(f.clone(), Sha256Policy::Compute)?);
}
// Spawn a task to print progress; the main thread blocks in commit() below.
@@ -80,7 +80,7 @@ fn upload_files(files: Vec<PathBuf>, endpoint: Option<String>) -> Result<()> {
});
// Block until all uploads finish and metadata is finalized.
let results = commit.commit()?;
let results = commit.commit_blocking()?;
for m in results.values().filter_map(|m| m.as_ref().as_ref().ok()) {
println!(" {} -> {} ({} bytes)", m.tracking_name.as_deref().unwrap_or("?"), m.hash, m.file_size);
@@ -138,7 +138,7 @@ fn download_files(metadata_file: PathBuf, output_dir: PathBuf, endpoint: Option<
});
// Block until all downloads finish.
let results = group.finish()?;
let results = group.finish_blocking()?;
for (_task_id, result) in &results {
if let Ok(r) = result.as_ref() {

View File

@@ -14,16 +14,17 @@ use super::errors::SessionError;
use super::progress::{DownloadTaskHandle, GroupProgress, ProgressSnapshot, TaskHandle, TaskStatus};
use super::session::XetSession;
/// Async API for grouping related file downloads into a single unit of work.
/// API for grouping related file downloads into a single unit of work.
///
/// Obtain via [`XetSession::new_download_group`] from an `async` context.
/// For sync / non-async code use [`DownloadGroupSync`] from
/// [`XetSession::new_download_group_blocking`] instead.
/// Obtain via [`XetSession::new_download_group`] (async) or
/// [`XetSession::new_download_group_blocking`] (sync).
///
/// Queue files with [`download_file_to_path`](Self::download_file_to_path) (they start
/// downloading immediately in the background), poll progress with
/// [`get_progress`](Self::get_progress), then `await`
/// [`finish`](Self::finish) to wait for all downloads to complete.
/// [`get_progress`](Self::get_progress), then call
/// [`finish`](Self::finish) (async) or
/// [`finish_blocking`](Self::finish_blocking) (sync) to wait for all
/// downloads to complete.
///
/// # Cloning
///
@@ -35,8 +36,6 @@ use super::session::XetSession;
/// Methods return [`SessionError::Aborted`] if the parent session has been
/// aborted, and [`SessionError::AlreadyFinished`] if
/// [`finish`](Self::finish) has already been called.
///
/// [`DownloadGroupSync`]: crate::xet_session::sync::DownloadGroupSync
#[derive(Clone)]
pub struct DownloadGroup {
pub(super) inner: Arc<DownloadGroupInner>,
@@ -149,6 +148,16 @@ impl DownloadGroup {
Err(_) => false,
}
}
/// Blocking version of [`finish`](Self::finish).
///
/// # Panics
///
/// Panics if called from within a tokio async runtime.
pub fn finish_blocking(self) -> Result<HashMap<Ulid, DownloadResult>, SessionError> {
let group = self.clone();
self.runtime().external_run_async_task(group.finish())?
}
}
/// Per-file result type returned by [`DownloadGroup::finish`].
@@ -801,4 +810,163 @@ mod tests {
assert_eq!(std::fs::read(&dest).unwrap(), data);
});
}
// ── Blocking API tests ────────────────────────────────────────────────────
fn local_session_sync(temp: &TempDir) -> Result<XetSession, Box<dyn std::error::Error>> {
let cas_path = temp.path().join("cas");
Ok(XetSessionBuilder::new()
.with_endpoint(format!("local://{}", cas_path.display()))
.build()?)
}
fn upload_bytes_blocking(
session: &XetSession,
data: &[u8],
name: &str,
) -> Result<XetFileInfo, Box<dyn std::error::Error>> {
let commit = session.new_upload_commit_blocking()?;
let handle = commit.upload_bytes_blocking(data.to_vec(), Sha256Policy::Compute, Some(name.into()))?;
let results = commit.commit_blocking()?;
let meta = results.get(&handle.task_id).unwrap().as_ref().as_ref().unwrap();
Ok(XetFileInfo {
hash: meta.hash.clone(),
file_size: meta.file_size,
})
}
#[test]
fn test_blocking_download_file_round_trip() -> Result<(), Box<dyn std::error::Error>> {
let temp = tempdir()?;
let session = local_session_sync(&temp)?;
let original = b"Hello, download round-trip!";
let file_info = upload_bytes_blocking(&session, original, "payload.bin")?;
let dest = temp.path().join("downloaded.bin");
let group = session.new_download_group_blocking()?;
group.download_file_to_path(file_info, dest.clone())?;
group.finish_blocking()?;
assert_eq!(std::fs::read(&dest)?, original);
Ok(())
}
#[test]
fn test_blocking_download_multiple_files() -> Result<(), Box<dyn std::error::Error>> {
let temp = tempdir()?;
let session = local_session_sync(&temp)?;
let data_a = b"First file content";
let data_b = b"Second file content - different";
let commit = session.new_upload_commit_blocking()?;
let handle_a = commit.upload_bytes_blocking(data_a.to_vec(), Sha256Policy::Compute, Some("a.bin".into()))?;
let handle_b = commit.upload_bytes_blocking(data_b.to_vec(), Sha256Policy::Compute, Some("b.bin".into()))?;
let results = commit.commit_blocking()?;
let to_file_info = |handle: &crate::xet_session::progress::UploadTaskHandle| -> XetFileInfo {
let meta = results.get(&handle.task_id).unwrap().as_ref().as_ref().unwrap();
XetFileInfo {
hash: meta.hash.clone(),
file_size: meta.file_size,
}
};
let dest_a = temp.path().join("a_out.bin");
let dest_b = temp.path().join("b_out.bin");
let group = session.new_download_group_blocking()?;
group.download_file_to_path(to_file_info(&handle_a), dest_a.clone())?;
group.download_file_to_path(to_file_info(&handle_b), dest_b.clone())?;
group.finish_blocking()?;
assert_eq!(std::fs::read(&dest_a)?, data_a);
assert_eq!(std::fs::read(&dest_b)?, data_b);
Ok(())
}
#[test]
fn test_blocking_download_progress_reflects_bytes_after_finish() -> Result<(), Box<dyn std::error::Error>> {
let temp = tempdir()?;
let session = local_session_sync(&temp)?;
let original = b"download progress tracking data";
let file_info = upload_bytes_blocking(&session, original, "prog.bin")?;
let dest = temp.path().join("out.bin");
let group = session.new_download_group_blocking()?;
let progress_observer = group.clone();
group.download_file_to_path(file_info, dest)?;
group.finish_blocking()?;
std::thread::sleep(
session
.runtime
.config()
.data
.progress_update_interval
.saturating_add(Duration::from_secs(1)),
);
let snapshot = progress_observer.get_progress()?;
assert!(snapshot.total().total_bytes_completed > 0);
Ok(())
}
#[test]
fn test_blocking_download_result_access_patterns() -> Result<(), Box<dyn std::error::Error>> {
let temp = tempdir()?;
let session = local_session_sync(&temp)?;
let data = b"download result access patterns";
let file_info = upload_bytes_blocking(&session, data, "file.bin")?;
let dest = temp.path().join("out.bin");
let group = session.new_download_group_blocking()?;
let handle = group.download_file_to_path(file_info.clone(), dest)?;
// Before finish, per-task result is not available yet.
assert!(handle.result().is_none());
let results = group.finish_blocking()?;
// Result should be available in the finish map by task id.
let map_result = results.get(&handle.task_id).expect("task_id must be present in results");
assert_eq!(map_result.as_ref().as_ref().unwrap().file_info.file_size, data.len() as u64);
// Result should also be available via the task handle.
let result = handle.result().expect("result must be set after finish");
let dl = result.as_ref().as_ref().unwrap();
assert_eq!(dl.file_info.file_size, data.len() as u64);
assert_eq!(dl.file_info.hash, file_info.hash);
Ok(())
}
fn assert_blocking_download_round_trip<R>(run: R)
where
R: FnOnce(std::pin::Pin<Box<dyn std::future::Future<Output = ()>>>),
{
let temp = tempdir().unwrap();
let session = local_session_sync(&temp).unwrap();
run(Box::pin(async move {
let data = b"download from smol executor";
let file_info = upload_bytes_blocking(&session, data, "test.bin").unwrap();
let dest = temp.path().join("out_smol.bin");
let group = session.new_download_group_blocking().unwrap();
group.download_file_to_path(file_info, dest.clone()).unwrap();
group.finish_blocking().unwrap();
assert_eq!(std::fs::read(&dest).unwrap(), data);
}));
}
#[test]
fn test_blocking_download_round_trip_in_smol() {
assert_blocking_download_round_trip(|fut| smol::block_on(fut));
}
#[test]
fn test_blocking_download_round_trip_in_futures_executor() {
assert_blocking_download_round_trip(|fut| futures::executor::block_on(fut));
}
#[test]
fn test_blocking_download_round_trip_in_async_std() {
assert_blocking_download_round_trip(|fut| async_std::task::block_on(fut));
}
}

View File

@@ -16,16 +16,16 @@
//!
//! ## Uploads
//!
//! For **sync** callers: create an [`UploadCommitSync`] with
//! [`XetSession::new_upload_commit_blocking`], queue files with
//! [`upload_from_path`](UploadCommitSync::upload_from_path) or
//! [`upload_bytes`](UploadCommitSync::upload_bytes), then call
//! [`commit`](UploadCommitSync::commit) to block until all transfers finish and
//! receive a `HashMap<Ulid, `[`UploadResult`]`>` keyed by task ID.
//!
//! For **async** callers: create an [`UploadCommit`] with
//! [`XetSession::new_upload_commit`], queue files the same way, then
//! `await` [`commit`](UploadCommit::commit).
//! Create an [`UploadCommit`] with [`XetSession::new_upload_commit`] (async)
//! or [`XetSession::new_upload_commit_blocking`] (sync), queue files with
//! [`upload_from_path`](UploadCommit::upload_from_path) /
//! [`upload_from_path_blocking`](UploadCommit::upload_from_path_blocking) or
//! [`upload_bytes`](UploadCommit::upload_bytes) /
//! [`upload_bytes_blocking`](UploadCommit::upload_bytes_blocking), then call
//! [`commit`](UploadCommit::commit) or
//! [`commit_blocking`](UploadCommit::commit_blocking) to wait for all
//! transfers to finish and receive a `HashMap<Ulid, `[`UploadResult`]`>`
//! keyed by task ID.
//!
//! `UploadResult` = `Arc<Result<`[`FileMetadata`]`, `[`SessionError`]`>>`.
//! Per-task results can also be read from the returned [`UploadTaskHandle`]
@@ -33,15 +33,13 @@
//!
//! ## Downloads
//!
//! For **sync** callers: create a [`DownloadGroupSync`] with
//! [`XetSession::new_download_group_blocking`], queue files with
//! [`download_file_to_path`](DownloadGroupSync::download_file_to_path), then
//! call [`finish`](DownloadGroupSync::finish) to block until all transfers
//! complete and receive a `HashMap<Ulid, `[`DownloadResult`]`>` keyed by task ID.
//!
//! For **async** callers: create a [`DownloadGroup`] with
//! [`XetSession::new_download_group`], queue files the same way, then
//! `await` [`finish`](DownloadGroup::finish).
//! Create a [`DownloadGroup`] with [`XetSession::new_download_group`] (async)
//! or [`XetSession::new_download_group_blocking`] (sync), queue files with
//! [`download_file_to_path`](DownloadGroup::download_file_to_path), then call
//! [`finish`](DownloadGroup::finish) (async) or
//! [`finish_blocking`](DownloadGroup::finish_blocking) (sync) to wait for all
//! transfers to complete and receive a `HashMap<Ulid, `[`DownloadResult`]`>`
//! keyed by task ID.
//!
//! `DownloadResult` = `Arc<Result<`[`DownloadedFile`]`, `[`SessionError`]`>>`.
//! Per-task results can also be read from the returned [`DownloadTaskHandle`]
@@ -49,12 +47,11 @@
//!
//! ## Progress tracking
//!
//! All four types ([`UploadCommit`], [`UploadCommitSync`], [`DownloadGroup`],
//! [`DownloadGroupSync`]) expose `get_progress()`, which returns a
//! [`ProgressSnapshot`] without acquiring a lock on the calling thread
//! (useful for Python bindings that must release the GIL). Poll it from a
//! background thread/task while the main thread/task blocks in
//! `commit()` / `finish()`.
//! Both [`UploadCommit`] and [`DownloadGroup`] expose `get_progress()`,
//! which returns a [`ProgressSnapshot`] without acquiring a lock on the
//! calling thread (useful for Python bindings that must release the GIL).
//! Poll it from a background thread/task while the main thread/task blocks
//! in `commit()` / `finish()`.
//!
//! ## Error handling
//!
@@ -76,21 +73,21 @@
//! .with_token_info("my-token".into(), 1_700_000_000)
//! .build()?;
//!
//! // 2. Upload — use the _blocking factory; returns UploadCommitSync
//! // 2. Upload — use the _blocking factory and _blocking methods
//! let commit = session.new_upload_commit_blocking()?;
//! let handle = commit.upload_from_path("file.bin".into(), Sha256Policy::Compute)?;
//! let handle = commit.upload_from_path_blocking("file.bin".into(), Sha256Policy::Compute)?;
//! // UploadResult = Arc<Result<FileMetadata, SessionError>>
//! let results = commit.commit()?;
//! let results = commit.commit_blocking()?;
//! let m = results.values().next().unwrap().as_ref().as_ref().unwrap();
//!
//! // 3. Download — use the _blocking factory; returns DownloadGroupSync
//! // 3. Download — use the _blocking factory and finish_blocking
//! let group = session.new_download_group_blocking()?;
//! let info = XetFileInfo {
//! hash: m.hash.clone(),
//! file_size: m.file_size,
//! };
//! let dl_handle = group.download_file_to_path(info, "out/file.bin".into())?;
//! let finish_results = group.finish()?;
//! let finish_results = group.finish_blocking()?;
//! // DownloadResult = Arc<Result<DownloadedFile, SessionError>>
//! let r = finish_results.get(&dl_handle.task_id).unwrap().as_ref().as_ref().unwrap();
//!
@@ -112,14 +109,14 @@
//! .build_async()
//! .await?;
//!
//! // 2. Upload — use the async factory; returns UploadCommit
//! // 2. Upload — use the async factory and async methods
//! let commit = session.new_upload_commit().await?;
//! let handle = commit.upload_from_path("file.bin".into(), Sha256Policy::Compute).await?;
//! // UploadResult = Arc<Result<FileMetadata, SessionError>>
//! let results = commit.commit().await?;
//! let m = results.values().next().unwrap().as_ref().as_ref().unwrap();
//!
//! // 3. Download — use the async factory; returns DownloadGroup
//! // 3. Download — use the async factory and async finish
//! let group = session.new_download_group().await?;
//! let info = XetFileInfo {
//! hash: m.hash.clone(),
@@ -138,7 +135,6 @@ mod download_group;
mod errors;
mod progress;
mod session;
pub mod sync;
mod upload_commit;
pub use download_group::{DownloadGroup, DownloadResult, DownloadedFile};
@@ -147,8 +143,6 @@ pub use progress::{
DownloadTaskHandle, FileProgress, ProgressSnapshot, TaskHandle, TaskStatus, TotalProgressSnapshot, UploadTaskHandle,
};
pub use session::{XetSession, XetSessionBuilder};
pub use sync::{DownloadGroupSync, UploadCommitSync};
pub use upload_commit::{FileMetadata, UploadCommit, UploadResult};
// Re-export for convenience
pub use xet_data::processing::{Sha256Policy, XetFileInfo};
pub use xet_runtime::config::XetConfig;

View File

@@ -16,7 +16,6 @@ use xet_runtime::core::XetRuntime;
use super::download_group::DownloadGroup;
use super::errors::SessionError;
use super::sync::{DownloadGroupSync, UploadCommitSync};
use super::upload_commit::UploadCommit;
/// Session state
@@ -387,9 +386,13 @@ impl XetSession {
/// Create a new [`UploadCommit`] from a **sync** (non-async) context.
///
/// The returned [`UploadCommit`] supports both async methods (`upload_from_path`,
/// `commit`) and blocking methods (`upload_from_path_blocking`, `commit_blocking`).
///
/// Returns `Err(SessionError::Aborted)` if the session has been aborted.
/// Returns `Err(SessionError::WrongRuntimeMode)` if the session was built with
/// [`XetSessionBuilder::with_tokio_handle`] / [`XetSessionBuilder::build_async`].
/// Returns `Err(SessionError::WrongRuntimeMode)` if the session uses an external
/// tokio runtime (from [`XetSessionBuilder::with_tokio_handle`] or tokio-detected
/// [`XetSessionBuilder::build_async`]).
///
/// # Panics
///
@@ -398,11 +401,12 @@ impl XetSession {
/// async-std, `futures::executor`) do not set this context, so calling from those is
/// safe — it blocks the executor thread until the task completes. Use
/// [`new_upload_commit`](Self::new_upload_commit) from async contexts instead.
pub fn new_upload_commit_blocking(&self) -> Result<UploadCommitSync, SessionError> {
pub fn new_upload_commit_blocking(&self) -> Result<UploadCommit, SessionError> {
if matches!(self.runtime_mode, RuntimeMode::External) {
return Err(SessionError::wrong_mode(
"new_upload_commit_blocking() cannot be called on a session built with \
with_tokio_handle() / build_async(); use new_upload_commit().await instead",
"new_upload_commit_blocking() cannot be called on a session using an \
external tokio runtime (with_tokio_handle() or tokio build_async()); \
use new_upload_commit().await instead",
));
}
{
@@ -412,11 +416,9 @@ impl XetSession {
}
}
let sync_commit = UploadCommitSync::new(self.clone())?;
self.active_upload_commits
.lock()?
.insert(sync_commit.inner.id(), sync_commit.inner.clone());
Ok(sync_commit)
let commit = self.runtime.external_run_async_task(UploadCommit::new(self.clone()))??;
self.active_upload_commits.lock()?.insert(commit.id(), commit.clone());
Ok(commit)
}
/// Create a new [`DownloadGroup`] that groups related file downloads.
@@ -449,9 +451,13 @@ impl XetSession {
/// Create a new [`DownloadGroup`] from a **sync** (non-async) context.
///
/// The returned [`DownloadGroup`] supports both the async [`finish`](DownloadGroup::finish)
/// and blocking [`finish_blocking`](DownloadGroup::finish_blocking) methods.
///
/// Returns `Err(SessionError::Aborted)` if the session has been aborted.
/// Returns `Err(SessionError::WrongRuntimeMode)` if the session was built with
/// [`XetSessionBuilder::with_tokio_handle`] / [`XetSessionBuilder::build_async`].
/// Returns `Err(SessionError::WrongRuntimeMode)` if the session uses an external
/// tokio runtime (from [`XetSessionBuilder::with_tokio_handle`] or tokio-detected
/// [`XetSessionBuilder::build_async`]).
///
/// # Panics
///
@@ -460,11 +466,12 @@ impl XetSession {
/// async-std, `futures::executor`) do not set this context, so calling from those is
/// safe — it blocks the executor thread until the task completes. Use
/// [`new_download_group`](Self::new_download_group) from async contexts instead.
pub fn new_download_group_blocking(&self) -> Result<DownloadGroupSync, SessionError> {
pub fn new_download_group_blocking(&self) -> Result<DownloadGroup, SessionError> {
if matches!(self.runtime_mode, RuntimeMode::External) {
return Err(SessionError::wrong_mode(
"new_download_group_blocking() cannot be called on a session built with \
with_tokio_handle() / build_async(); use new_download_group().await instead",
"new_download_group_blocking() cannot be called on a session using an \
external tokio runtime (with_tokio_handle() or tokio build_async()); \
use new_download_group().await instead",
));
}
{
@@ -474,11 +481,9 @@ impl XetSession {
}
}
let sync_group = DownloadGroupSync::new(self.clone())?;
self.active_download_groups
.lock()?
.insert(sync_group.inner.id(), sync_group.inner.clone());
Ok(sync_group)
let group = self.runtime.external_run_async_task(DownloadGroup::new(self.clone()))??;
self.active_download_groups.lock()?.insert(group.id(), group.clone());
Ok(group)
}
/// Abort the session - cancel all currently running tasks
@@ -621,7 +626,7 @@ mod tests {
let c1 = session.new_upload_commit_blocking().unwrap();
let _c2 = session.new_upload_commit_blocking().unwrap();
assert_eq!(session.active_upload_commits.lock().unwrap().len(), 2);
session.finish_upload_commit(c1.inner.id()).unwrap();
session.finish_upload_commit(c1.id()).unwrap();
assert_eq!(session.active_upload_commits.lock().unwrap().len(), 1);
}
@@ -632,7 +637,7 @@ mod tests {
let g1 = session.new_download_group_blocking().unwrap();
let _g2 = session.new_download_group_blocking().unwrap();
assert_eq!(session.active_download_groups.lock().unwrap().len(), 2);
session.finish_download_group(g1.inner.id()).unwrap();
session.finish_download_group(g1.id()).unwrap();
assert_eq!(session.active_download_groups.lock().unwrap().len(), 1);
}

View File

@@ -1,306 +0,0 @@
//! Sync-context download group wrapper.
//!
//! [`DownloadGroupSync`] is obtained from [`XetSession::new_download_group_blocking`] and
//! provides a fully blocking API suitable for sync Rust or Python (PyO3) callers.
//! For async Rust use [`DownloadGroup`] instead.
use std::collections::HashMap;
use std::path::PathBuf;
use ulid::Ulid;
use xet_data::processing::XetFileInfo;
use super::super::download_group::{DownloadGroup, DownloadResult};
use super::super::errors::SessionError;
use super::super::progress::{DownloadTaskHandle, ProgressSnapshot};
use super::super::session::XetSession;
/// Sync-context handle for grouping related file downloads.
///
/// Obtained from [`XetSession::new_download_group_blocking`]. All methods block
/// the calling thread — **do not use from within a tokio async runtime** (it will panic).
/// For async Rust code use [`DownloadGroup`] from [`XetSession::new_download_group`].
///
/// # Cloning
///
/// Cloning is cheap — it simply increments an atomic reference count.
/// All clones share the same background worker and task state.
#[derive(Clone)]
pub struct DownloadGroupSync {
pub(in super::super) inner: DownloadGroup,
}
impl DownloadGroupSync {
/// Create a new download group from a **sync** (non-async) context.
///
/// # Panics
///
/// Panics if called from within a tokio async runtime — use
/// [`XetSession::new_download_group`] instead.
pub(in super::super) fn new(session: XetSession) -> Result<Self, SessionError> {
let group = session.runtime.external_run_async_task(DownloadGroup::new(session.clone()))??;
Ok(Self { inner: group })
}
/// Queue a file for download, starting the transfer immediately if system resource permits.
///
/// This is the sync-context equivalent of [`DownloadGroup::download_file_to_path`].
///
/// # Parameters
///
/// - `file_info`: identifies the file to download (hash and size).
/// - `dest_path`: path where the downloaded content will be written.
///
/// # Errors
///
/// Returns [`SessionError::Aborted`] if the session has been aborted, or
/// [`SessionError::AlreadyFinished`] if [`finish`](Self::finish) has already been called.
pub fn download_file_to_path(
&self,
file_info: XetFileInfo,
dest_path: PathBuf,
) -> Result<DownloadTaskHandle, SessionError> {
self.inner.download_file_to_path(file_info, dest_path)
}
/// Return a snapshot of progress for every queued download.
pub fn get_progress(&self) -> Result<ProgressSnapshot, SessionError> {
self.inner.get_progress()
}
/// Wait for all downloads to complete and return their results.
///
/// Returns a `HashMap` keyed by task ID where each value is
/// [`DownloadResult`] (= `Arc<Result<`[`DownloadMetadata`](xet_data::processing::DownloadMetadata)`,
/// [`SessionError`]`>>`). A single failed download does not prevent the others from being collected.
///
/// Consumes `self` — subsequent calls on any clone will return
/// [`SessionError::AlreadyFinished`].
///
/// # Panics
///
/// Panics if called from within a tokio async runtime. Use [`DownloadGroup::finish`] instead.
pub fn finish(self) -> Result<HashMap<Ulid, DownloadResult>, SessionError> {
let group_inner = self.inner.inner.clone();
self.inner
.runtime()
.external_run_async_task(async move { group_inner.handle_finish().await })?
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use tempfile::{TempDir, tempdir};
use xet_data::processing::Sha256Policy;
use super::*;
use crate::xet_session::progress::UploadTaskHandle;
use crate::xet_session::session::{XetSession, XetSessionBuilder};
fn local_session(temp: &TempDir) -> Result<XetSession, Box<dyn std::error::Error>> {
let cas_path = temp.path().join("cas");
Ok(XetSessionBuilder::new()
.with_endpoint(format!("local://{}", cas_path.display()))
.build()?)
}
fn upload_bytes(session: &XetSession, data: &[u8], name: &str) -> Result<XetFileInfo, Box<dyn std::error::Error>> {
let commit = session.new_upload_commit_blocking()?;
let handle = commit.upload_bytes(data.to_vec(), Sha256Policy::Compute, Some(name.into()))?;
let results = commit.commit()?;
let meta = results.get(&handle.task_id).unwrap().as_ref().as_ref().unwrap();
Ok(XetFileInfo {
hash: meta.hash.clone(),
file_size: meta.file_size,
})
}
// ── Round-trip tests ─────────────────────────────────────────────────────
#[test]
// Downloading a previously uploaded file produces byte-identical content at the destination.
fn test_download_file_round_trip() -> Result<(), Box<dyn std::error::Error>> {
let temp = tempdir()?;
let session = local_session(&temp)?;
let original = b"Hello, download round-trip!";
let file_info = upload_bytes(&session, original, "payload.bin")?;
let dest = temp.path().join("downloaded.bin");
let group = session.new_download_group_blocking()?;
group.download_file_to_path(file_info, dest.clone())?;
group.finish()?;
assert_eq!(std::fs::read(&dest)?, original);
Ok(())
}
#[test]
// Downloading multiple files from a single group produces correct content for each.
fn test_download_multiple_files() -> Result<(), Box<dyn std::error::Error>> {
let temp = tempdir()?;
let session = local_session(&temp)?;
let data_a = b"First file content";
let data_b = b"Second file content - different";
let commit = session.new_upload_commit_blocking()?;
let handle_a = commit.upload_bytes(data_a.to_vec(), Sha256Policy::Compute, Some("a.bin".into()))?;
let handle_b = commit.upload_bytes(data_b.to_vec(), Sha256Policy::Compute, Some("b.bin".into()))?;
let results = commit.commit()?;
let to_file_info = |handle: &UploadTaskHandle| -> XetFileInfo {
let meta = results.get(&handle.task_id).unwrap().as_ref().as_ref().unwrap();
XetFileInfo {
hash: meta.hash.clone(),
file_size: meta.file_size,
}
};
let dest_a = temp.path().join("a_out.bin");
let dest_b = temp.path().join("b_out.bin");
let group = session.new_download_group_blocking()?;
group.download_file_to_path(to_file_info(&handle_a), dest_a.clone())?;
group.download_file_to_path(to_file_info(&handle_b), dest_b.clone())?;
group.finish()?;
assert_eq!(std::fs::read(&dest_a)?, data_a);
assert_eq!(std::fs::read(&dest_b)?, data_b);
Ok(())
}
#[test]
// After a successful finish the aggregate download progress reflects bytes received.
fn test_download_progress_reflects_bytes_after_finish() -> Result<(), Box<dyn std::error::Error>> {
let temp = tempdir()?;
let session = local_session(&temp)?;
let original = b"download progress tracking data";
let file_info = upload_bytes(&session, original, "prog.bin")?;
let dest = temp.path().join("out.bin");
let group = session.new_download_group_blocking()?;
let progress_observer = group.clone();
group.download_file_to_path(file_info, dest)?;
group.finish()?;
std::thread::sleep(
session
.runtime
.config()
.data
.progress_update_interval
.saturating_add(Duration::from_secs(1)),
);
let snapshot = progress_observer.get_progress()?;
assert!(snapshot.total().total_bytes_completed > 0);
Ok(())
}
// ── Per-task result access patterns ──────────────────────────────────────
#[test]
// Pattern 1: per-task result is accessible via task_id in the finish() HashMap.
fn test_download_result_accessible_via_task_id_in_finish_map() -> Result<(), Box<dyn std::error::Error>> {
let temp = tempdir()?;
let session = local_session(&temp)?;
let data = b"result via task_id in finish map";
let file_info = upload_bytes(&session, data, "file.bin")?;
let dest = temp.path().join("out.bin");
let group = session.new_download_group_blocking()?;
let handle = group.download_file_to_path(file_info, dest)?;
let results = group.finish()?;
let result = results.get(&handle.task_id).expect("task_id must be present in results");
assert_eq!(result.as_ref().as_ref().unwrap().file_info.file_size, data.len() as u64);
Ok(())
}
#[test]
// DownloadTaskHandle::result() returns None before finish() is called.
fn test_download_result_none_before_finish() -> Result<(), Box<dyn std::error::Error>> {
let temp = tempdir()?;
let session = local_session(&temp)?;
let file_info = upload_bytes(&session, b"some data", "file.bin")?;
let dest = temp.path().join("out.bin");
let group = session.new_download_group_blocking()?;
let handle = group.download_file_to_path(file_info, dest)?;
assert!(handle.result().is_none(), "result must be None before finish()");
group.finish()?;
Ok(())
}
#[test]
// DownloadTaskHandle::result() returns Some after finish() completes.
fn test_download_result_some_after_finish() -> Result<(), Box<dyn std::error::Error>> {
let temp = tempdir()?;
let session = local_session(&temp)?;
let data = b"download result test data";
let file_info = upload_bytes(&session, data, "file.bin")?;
let dest = temp.path().join("out.bin");
let group = session.new_download_group_blocking()?;
let handle = group.download_file_to_path(file_info.clone(), dest)?;
group.finish()?;
let result = handle.result().unwrap();
let dl = result.as_ref().as_ref().unwrap();
assert_eq!(dl.file_info.file_size, data.len() as u64);
assert_eq!(dl.file_info.hash, file_info.hash);
Ok(())
}
// ── Non-tokio executor (no-panic + round-trip) ────────────────────────────
//
// smol, async-std, and futures::executor do not set tokio's thread-local
// runtime context, so Handle::block_on inside external_run_async_task does
// not panic — it just blocks the calling executor thread.
#[test]
// new_download_group_blocking completes a full upload+download round-trip inside smol.
fn test_download_blocking_round_trip_in_smol() {
let temp = tempdir().unwrap();
let session = local_session(&temp).unwrap();
smol::block_on(async {
let data = b"download from smol executor";
let file_info = upload_bytes(&session, data, "test.bin").unwrap();
let dest = temp.path().join("out_smol.bin");
let group = session.new_download_group_blocking().unwrap();
group.download_file_to_path(file_info, dest.clone()).unwrap();
group.finish().unwrap();
assert_eq!(std::fs::read(&dest).unwrap(), data);
});
}
#[test]
// new_download_group_blocking completes a full upload+download round-trip inside futures::executor.
fn test_download_blocking_round_trip_in_futures_executor() {
let temp = tempdir().unwrap();
let session = local_session(&temp).unwrap();
futures::executor::block_on(async {
let data = b"download from futures executor";
let file_info = upload_bytes(&session, data, "test.bin").unwrap();
let dest = temp.path().join("out_futures.bin");
let group = session.new_download_group_blocking().unwrap();
group.download_file_to_path(file_info, dest.clone()).unwrap();
group.finish().unwrap();
assert_eq!(std::fs::read(&dest).unwrap(), data);
});
}
#[test]
// new_download_group_blocking completes a full upload+download round-trip inside async-std.
fn test_download_blocking_round_trip_in_async_std() {
let temp = tempdir().unwrap();
let session = local_session(&temp).unwrap();
async_std::task::block_on(async {
let data = b"download from async-std executor";
let file_info = upload_bytes(&session, data, "test.bin").unwrap();
let dest = temp.path().join("out_async_std.bin");
let group = session.new_download_group_blocking().unwrap();
group.download_file_to_path(file_info, dest.clone()).unwrap();
group.finish().unwrap();
assert_eq!(std::fs::read(&dest).unwrap(), data);
});
}
}

View File

@@ -1,5 +0,0 @@
pub mod download_group_sync;
pub mod upload_commit_sync;
pub use download_group_sync::DownloadGroupSync;
pub use upload_commit_sync::UploadCommitSync;

View File

@@ -1,373 +0,0 @@
//! Sync-context upload commit wrapper.
//!
//! [`UploadCommitSync`] is obtained from [`XetSession::new_upload_commit_blocking`] and
//! provides a fully blocking API suitable for sync Rust or Python (PyO3) callers.
//! For async Rust use [`UploadCommit`] instead.
use std::collections::HashMap;
use std::path::PathBuf;
use ulid::Ulid;
use xet_data::processing::{Sha256Policy, SingleFileCleaner};
use super::super::errors::SessionError;
use super::super::progress::{ProgressSnapshot, TaskHandle, UploadTaskHandle};
use super::super::session::XetSession;
use super::super::upload_commit::{UploadCommit, UploadResult};
/// Sync-context handle for grouping related file uploads.
///
/// Obtained from [`XetSession::new_upload_commit_blocking`]. All methods block
/// the calling thread — **do not use from within a tokio async runtime** (it will panic).
/// For async Rust code use [`UploadCommit`] from [`XetSession::new_upload_commit`].
///
/// # Cloning
///
/// Cloning is cheap — it simply increments an atomic reference count.
/// All clones share the same upload session and task state.
#[derive(Clone)]
pub struct UploadCommitSync {
pub(in super::super) inner: UploadCommit,
}
impl UploadCommitSync {
/// Create a new upload commit from a **sync** (non-async) context.
///
/// # Panics
///
/// Panics if called from within a tokio async runtime — use
/// [`XetSession::new_upload_commit`] instead.
pub(in super::super) fn new(session: XetSession) -> Result<Self, SessionError> {
let commit = session.runtime.external_run_async_task(UploadCommit::new(session.clone()))??;
Ok(Self { inner: commit })
}
/// Queue a file for upload, starting the transfer immediately if system resource permits.
///
/// This is the sync-context equivalent of [`UploadCommit::upload_from_path`].
///
/// # Parameters
///
/// - `file_path`: path to the file on disk. Resolved to an absolute path internally so the upload is not affected
/// by subsequent changes to the process working directory.
/// - `sha256`: controls SHA-256 handling during upload. Use [`Sha256Policy::Compute`] to compute it from the data,
/// [`Sha256Policy::Provided`] to supply a pre-computed digest, or [`Sha256Policy::Skip`] to omit it entirely.
///
/// # Errors
///
/// Returns [`SessionError::Aborted`] if the session has been aborted, or
/// [`SessionError::AlreadyCommitted`] if [`commit`](Self::commit) has already been called.
///
/// # Panics
///
/// Panics if called from within a tokio async runtime. Use [`UploadCommit::upload_from_path`] instead.
pub fn upload_from_path(&self, file_path: PathBuf, sha256: Sha256Policy) -> Result<UploadTaskHandle, SessionError> {
self.inner.session.check_alive()?;
let commit_inner = self.inner.inner.clone();
self.inner
.runtime()
.external_run_async_task(async move { commit_inner.start_upload_file_from_path(file_path, sha256).await })?
}
/// Queue raw bytes for upload, starting the transfer immediately if system resource permits.
///
/// This is the sync-context equivalent of [`UploadCommit::upload_bytes`].
///
/// # Parameters
///
/// - `bytes`: the raw byte content to upload.
/// - `sha256`: controls SHA-256 handling during upload. Use [`Sha256Policy::Compute`] to compute it from the data,
/// [`Sha256Policy::Provided`] to supply a pre-computed digest, or [`Sha256Policy::Skip`] to omit it entirely.
/// - `tracking_name`: optional display name used for progress and telemetry reporting; does not affect the upload
/// itself.
///
/// # Errors
///
/// Returns [`SessionError::Aborted`] if the session has been aborted, or
/// [`SessionError::AlreadyCommitted`] if [`commit`](Self::commit) has already been called.
///
/// # Panics
///
/// Panics if called from within a tokio async runtime. Use [`UploadCommit::upload_bytes`] instead.
pub fn upload_bytes(
&self,
bytes: Vec<u8>,
sha256: Sha256Policy,
tracking_name: Option<String>,
) -> Result<UploadTaskHandle, SessionError> {
self.inner.session.check_alive()?;
let commit_inner = self.inner.inner.clone();
self.inner.runtime().external_run_async_task(async move {
commit_inner.start_upload_bytes(bytes, sha256, tracking_name).await
})?
}
/// Begin an incremental file upload, returning a [`SingleFileCleaner`] to stream bytes.
///
/// This is the sync-context equivalent of [`UploadCommit::upload_file`].
///
/// # Parameters
///
/// - `file_name`: optional display name used for progress and telemetry reporting; does not affect the upload
/// itself.
/// - `file_size`: expected total size in bytes, used for progress tracking. Pass `0` when the size is not known in
/// advance.
/// - `sha256`: controls SHA-256 handling during upload. Use [`Sha256Policy::Compute`] to compute it from the data,
/// [`Sha256Policy::Provided`] to supply a pre-computed digest, or [`Sha256Policy::Skip`] to omit it entirely.
///
/// # Errors
///
/// Returns [`SessionError::Aborted`] if the session has been aborted, or
/// [`SessionError::AlreadyCommitted`] if [`commit`](Self::commit) has already been called.
///
/// # Panics
///
/// Panics if called from within a tokio async runtime. Use [`UploadCommit::upload_file`] instead.
pub fn upload_file(
&self,
file_name: Option<String>,
file_size: u64,
sha256: Sha256Policy,
) -> Result<(TaskHandle, SingleFileCleaner), SessionError> {
self.inner.session.check_alive()?;
let commit_inner = self.inner.clone();
self.inner.runtime().external_run_async_task(async move {
commit_inner.start_upload_file(file_name, file_size, sha256).await
})?
}
/// Return a snapshot of progress for every queued upload.
pub fn get_progress(&self) -> Result<ProgressSnapshot, SessionError> {
self.inner.get_progress()
}
/// Wait for all uploads to complete and push metadata to the CAS server.
///
/// Returns a `HashMap` keyed by task ID. See [`UploadCommit::commit`] for full documentation.
///
/// # Panics
///
/// Panics if called from within a tokio async runtime. Use [`UploadCommit::commit`] instead.
pub fn commit(self) -> Result<HashMap<Ulid, UploadResult>, SessionError> {
let commit_inner = self.inner.inner.clone();
self.inner
.runtime()
.external_run_async_task(async move { commit_inner.handle_commit().await })?
}
}
#[cfg(test)]
mod tests {
use tempfile::{TempDir, tempdir};
use xet_data::processing::Sha256Policy;
use crate::xet_session::session::{XetSession, XetSessionBuilder};
fn local_session(temp: &TempDir) -> Result<XetSession, Box<dyn std::error::Error>> {
let cas_path = temp.path().join("cas");
Ok(XetSessionBuilder::new()
.with_endpoint(format!("local://{}", cas_path.display()))
.build()?)
}
// ── Round-trip tests ─────────────────────────────────────────────────────
#[test]
// Uploading raw bytes and committing returns a non-empty hash and the correct file size.
fn test_upload_bytes_round_trip() -> Result<(), Box<dyn std::error::Error>> {
let temp = tempdir()?;
let session = local_session(&temp)?;
let data = b"Hello, upload commit round-trip!";
let commit = session.new_upload_commit_blocking()?;
let task_handle = commit.upload_bytes(data.to_vec(), Sha256Policy::Compute, Some("hello.bin".into()))?;
let results = commit.commit()?;
assert_eq!(results.len(), 1);
let meta = results.get(&task_handle.task_id).unwrap().as_ref().as_ref().unwrap();
assert_eq!(meta.file_size, data.len() as u64);
assert!(!meta.hash.is_empty());
Ok(())
}
#[test]
// Uploading a file from disk and committing returns the correct file size.
fn test_upload_from_path_round_trip() -> Result<(), Box<dyn std::error::Error>> {
let temp = tempdir()?;
let session = local_session(&temp)?;
let src = temp.path().join("data.bin");
let data = b"file path upload content";
std::fs::write(&src, data)?;
let commit = session.new_upload_commit_blocking()?;
let handle = commit.upload_from_path(src, Sha256Policy::Compute)?;
commit.commit()?;
let meta = handle.result().unwrap();
let meta = meta.as_ref().as_ref().unwrap();
assert_eq!(meta.file_size, data.len() as u64);
assert!(!meta.hash.is_empty());
Ok(())
}
// ── Per-task result access patterns ──────────────────────────────────────
#[test]
// UploadTaskHandle::result() returns None before commit() is called.
fn test_upload_result_none_before_commit() -> Result<(), Box<dyn std::error::Error>> {
let temp = tempdir()?;
let session = local_session(&temp)?;
let src = temp.path().join("data.bin");
std::fs::write(&src, b"content")?;
let commit = session.new_upload_commit_blocking()?;
let handle = commit.upload_from_path(src, Sha256Policy::Compute)?;
assert!(handle.result().is_none(), "result must be None before commit()");
commit.commit()?;
Ok(())
}
#[test]
// Pattern 1: per-task result is accessible via task_id in the commit() HashMap.
fn test_upload_result_accessible_via_task_id_in_commit_map() -> Result<(), Box<dyn std::error::Error>> {
let temp = tempdir()?;
let session = local_session(&temp)?;
let data = b"result via task_id";
let src = temp.path().join("data.bin");
std::fs::write(&src, data)?;
let commit = session.new_upload_commit_blocking()?;
let handle = commit.upload_from_path(src, Sha256Policy::Compute)?;
let results = commit.commit()?;
let result = results.get(&handle.task_id).expect("task_id must be present in results");
assert_eq!(result.as_ref().as_ref().unwrap().file_size, data.len() as u64);
Ok(())
}
#[test]
// Pattern 2: per-task result is accessible directly from the UploadTaskHandle after commit().
fn test_upload_result_accessible_via_handle_after_commit() -> Result<(), Box<dyn std::error::Error>> {
let temp = tempdir()?;
let session = local_session(&temp)?;
let data = b"result via handle";
let src = temp.path().join("data.bin");
std::fs::write(&src, data)?;
let commit = session.new_upload_commit_blocking()?;
let handle = commit.upload_from_path(src, Sha256Policy::Compute)?;
commit.commit()?;
let result = handle.result().expect("result must be set after commit");
assert_eq!(result.as_ref().as_ref().unwrap().file_size, data.len() as u64);
Ok(())
}
#[test]
// Streaming upload via upload_file + SingleFileCleaner.
fn test_upload_streaming_round_trip() -> Result<(), Box<dyn std::error::Error>> {
let temp = tempdir()?;
let session = local_session(&temp)?;
let data = b"streamed upload bytes";
let runtime = session.runtime.clone();
let commit = session.new_upload_commit_blocking()?;
let (_handle, mut cleaner) =
commit.upload_file(Some("stream.bin".into()), data.len() as u64, Sha256Policy::Compute)?;
let (hash, file_size) = runtime.external_run_async_task(async move {
cleaner.add_data(data).await.unwrap();
let (xfi, _) = cleaner.finish().await.unwrap();
(xfi.hash, xfi.file_size)
})?;
let results = commit.commit()?;
assert!(results.is_empty());
assert_eq!(file_size, data.len() as u64);
assert!(!hash.is_empty());
Ok(())
}
#[test]
// Uploading multiple blobs in one commit returns one result per upload.
fn test_upload_multiple_files_in_one_commit() -> Result<(), Box<dyn std::error::Error>> {
let temp = tempdir()?;
let session = local_session(&temp)?;
let commit = session.new_upload_commit_blocking()?;
commit.upload_bytes(b"file one".to_vec(), Sha256Policy::Compute, Some("a.bin".into()))?;
commit.upload_bytes(b"file two".to_vec(), Sha256Policy::Compute, Some("b.bin".into()))?;
commit.upload_bytes(b"file three".to_vec(), Sha256Policy::Compute, Some("c.bin".into()))?;
let results = commit.commit()?;
assert_eq!(results.len(), 3);
Ok(())
}
#[test]
// After a successful commit the aggregate progress reflects bytes processed.
fn test_upload_progress_reflects_bytes_after_commit() -> Result<(), Box<dyn std::error::Error>> {
let temp = tempdir()?;
let session = local_session(&temp)?;
let data = b"progress tracking upload data";
let commit = session.new_upload_commit_blocking()?;
let progress_observer = commit.clone();
commit.upload_bytes(data.to_vec(), Sha256Policy::Compute, Some("prog.bin".into()))?;
commit.commit()?;
let snapshot = progress_observer.get_progress()?;
assert!(snapshot.total().total_bytes_completed > 0);
Ok(())
}
// ── Non-tokio executor (no-panic + round-trip) ────────────────────────────
//
// smol, async-std, and futures::executor do not set tokio's thread-local
// runtime context, so Handle::block_on inside external_run_async_task does
// not panic — it just blocks the calling executor thread.
#[test]
// new_upload_commit_blocking completes a full upload round-trip inside smol.
fn test_upload_blocking_round_trip_in_smol() {
let temp = tempdir().unwrap();
let session = local_session(&temp).unwrap();
smol::block_on(async {
let data = b"upload from smol executor";
let commit = session.new_upload_commit_blocking().unwrap();
let handle = commit
.upload_bytes(data.to_vec(), Sha256Policy::Compute, Some("test.bin".into()))
.unwrap();
let results = commit.commit().unwrap();
let meta = results.get(&handle.task_id).unwrap().as_ref().as_ref().unwrap();
assert_eq!(meta.file_size, data.len() as u64);
assert!(!meta.hash.is_empty());
});
}
#[test]
// new_upload_commit_blocking completes a full upload round-trip inside futures::executor.
fn test_upload_blocking_round_trip_in_futures_executor() {
let temp = tempdir().unwrap();
let session = local_session(&temp).unwrap();
futures::executor::block_on(async {
let data = b"upload from futures executor";
let commit = session.new_upload_commit_blocking().unwrap();
let handle = commit
.upload_bytes(data.to_vec(), Sha256Policy::Compute, Some("test.bin".into()))
.unwrap();
let results = commit.commit().unwrap();
let meta = results.get(&handle.task_id).unwrap().as_ref().as_ref().unwrap();
assert_eq!(meta.file_size, data.len() as u64);
assert!(!meta.hash.is_empty());
});
}
#[test]
// new_upload_commit_blocking completes a full upload round-trip inside async-std.
fn test_upload_blocking_round_trip_in_async_std() {
let temp = tempdir().unwrap();
let session = local_session(&temp).unwrap();
async_std::task::block_on(async {
let data = b"upload from async-std executor";
let commit = session.new_upload_commit_blocking().unwrap();
let handle = commit
.upload_bytes(data.to_vec(), Sha256Policy::Compute, Some("test.bin".into()))
.unwrap();
let results = commit.commit().unwrap();
let meta = results.get(&handle.task_id).unwrap().as_ref().as_ref().unwrap();
assert_eq!(meta.file_size, data.len() as u64);
assert!(!meta.hash.is_empty());
});
}
}

View File

@@ -15,17 +15,20 @@ use super::errors::SessionError;
use super::progress::{GroupProgress, ProgressSnapshot, TaskHandle, TaskStatus, UploadTaskHandle};
use super::session::XetSession;
/// Async API for grouping related file uploads into a single atomic commit.
/// API for grouping related file uploads into a single atomic commit.
///
/// Obtain via [`XetSession::new_upload_commit`] from an `async` context.
/// For sync / non-async code use [`UploadCommitSync`] from
/// [`XetSession::new_upload_commit_blocking`] instead.
/// Obtain via [`XetSession::new_upload_commit`] (async) or
/// [`XetSession::new_upload_commit_blocking`] (sync).
///
/// Enqueue files with [`upload_from_path`](Self::upload_from_path) or stream
/// bytes with [`upload_file`](Self::upload_file) — transfers start immediately
/// in the background. Poll progress with [`get_progress`](Self::get_progress),
/// then `await` [`commit`](Self::commit) to wait for all uploads to finish and
/// push the final metadata to the CAS server.
/// Enqueue files with [`upload_from_path`](Self::upload_from_path) /
/// [`upload_from_path_blocking`](Self::upload_from_path_blocking) or stream
/// bytes with [`upload_file`](Self::upload_file) /
/// [`upload_file_blocking`](Self::upload_file_blocking) — transfers start
/// immediately in the background. Poll progress with
/// [`get_progress`](Self::get_progress), then call
/// [`commit`](Self::commit) (async) or
/// [`commit_blocking`](Self::commit_blocking) (sync) to wait for all uploads
/// to finish and push the final metadata to the CAS server.
///
/// # Cloning
///
@@ -37,8 +40,6 @@ use super::session::XetSession;
/// Methods return [`SessionError::Aborted`] if the parent session has been
/// aborted, and [`SessionError::AlreadyCommitted`] if [`commit`](Self::commit)
/// has already been called.
///
/// [`UploadCommitSync`]: crate::xet_session::sync::UploadCommitSync
#[derive(Clone)]
pub struct UploadCommit {
pub(super) inner: Arc<UploadCommitInner>,
@@ -233,6 +234,79 @@ impl UploadCommit {
async fn is_committed(&self) -> bool {
*self.state.lock().await == GroupState::Finished
}
// ===== Blocking (sync) variants =====
//
// These methods block the calling thread via `external_run_async_task`.
// **Do not call from within a tokio runtime** — it will panic.
// Use the async counterparts instead.
/// Blocking version of [`upload_from_path`](Self::upload_from_path).
///
/// # Panics
///
/// Panics if called from within a tokio async runtime.
pub fn upload_from_path_blocking(
&self,
file_path: PathBuf,
sha256: Sha256Policy,
) -> Result<UploadTaskHandle, SessionError> {
self.session.check_alive()?;
let absolute_path = std::path::absolute(file_path)?;
let commit_inner = self.inner.clone();
self.runtime().external_run_async_task(async move {
commit_inner.start_upload_file_from_path(absolute_path, sha256).await
})?
}
/// Blocking version of [`upload_bytes`](Self::upload_bytes).
///
/// # Panics
///
/// Panics if called from within a tokio async runtime.
pub fn upload_bytes_blocking(
&self,
bytes: Vec<u8>,
sha256: Sha256Policy,
tracking_name: Option<String>,
) -> Result<UploadTaskHandle, SessionError> {
self.session.check_alive()?;
let commit_inner = self.inner.clone();
self.runtime().external_run_async_task(async move {
commit_inner.start_upload_bytes(bytes, sha256, tracking_name).await
})?
}
/// Blocking version of [`upload_file`](Self::upload_file).
///
/// # Panics
///
/// Panics if called from within a tokio async runtime.
pub fn upload_file_blocking(
&self,
file_name: Option<String>,
file_size: u64,
sha256: Sha256Policy,
) -> Result<(TaskHandle, SingleFileCleaner), SessionError> {
self.session.check_alive()?;
let commit_inner = self.inner.clone();
self.runtime().external_run_async_task(async move {
commit_inner.start_upload_file(file_name, file_size, sha256).await
})?
}
/// Blocking version of [`commit`](Self::commit).
///
/// # Panics
///
/// Panics if called from within a tokio async runtime.
pub fn commit_blocking(self) -> Result<HashMap<Ulid, UploadResult>, SessionError> {
let commit = self.clone();
self.runtime().external_run_async_task(commit.commit())?
}
}
/// Per-file result type returned by [`UploadCommit::commit`].
@@ -1068,4 +1142,154 @@ mod tests {
assert_eq!(meta.file_size, data.len() as u64);
});
}
// ── Blocking API tests ────────────────────────────────────────────────────
fn local_session_sync(temp: &TempDir) -> Result<XetSession, Box<dyn std::error::Error>> {
let cas_path = temp.path().join("cas");
Ok(XetSessionBuilder::new()
.with_endpoint(format!("local://{}", cas_path.display()))
.build()?)
}
#[test]
fn test_blocking_upload_bytes_round_trip() -> Result<(), Box<dyn std::error::Error>> {
let temp = tempdir()?;
let session = local_session_sync(&temp)?;
let data = b"Hello, upload commit round-trip!";
let commit = session.new_upload_commit_blocking()?;
let task_handle =
commit.upload_bytes_blocking(data.to_vec(), Sha256Policy::Compute, Some("hello.bin".into()))?;
let results = commit.commit_blocking()?;
assert_eq!(results.len(), 1);
let meta = results.get(&task_handle.task_id).unwrap().as_ref().as_ref().unwrap();
assert_eq!(meta.file_size, data.len() as u64);
assert!(!meta.hash.is_empty());
Ok(())
}
#[test]
fn test_blocking_upload_from_path_round_trip() -> Result<(), Box<dyn std::error::Error>> {
let temp = tempdir()?;
let session = local_session_sync(&temp)?;
let src = temp.path().join("data.bin");
let data = b"file path upload content";
std::fs::write(&src, data)?;
let commit = session.new_upload_commit_blocking()?;
let handle = commit.upload_from_path_blocking(src, Sha256Policy::Compute)?;
commit.commit_blocking()?;
let meta = handle.result().unwrap();
let meta = meta.as_ref().as_ref().unwrap();
assert_eq!(meta.file_size, data.len() as u64);
assert!(!meta.hash.is_empty());
Ok(())
}
#[test]
fn test_blocking_upload_result_access_patterns() -> Result<(), Box<dyn std::error::Error>> {
let temp = tempdir()?;
let session = local_session_sync(&temp)?;
let data = b"result access patterns";
let src = temp.path().join("data.bin");
std::fs::write(&src, data)?;
let commit = session.new_upload_commit_blocking()?;
let handle = commit.upload_from_path_blocking(src, Sha256Policy::Compute)?;
// Before commit, per-task result is not available yet.
assert!(handle.result().is_none());
let results = commit.commit_blocking()?;
// Result should be available in the commit map by task id.
let map_result = results.get(&handle.task_id).expect("task_id must be present in results");
assert_eq!(map_result.as_ref().as_ref().unwrap().file_size, data.len() as u64);
// Result should also be available via the task handle.
let handle_result = handle.result().expect("result must be set after commit");
assert_eq!(handle_result.as_ref().as_ref().unwrap().file_size, data.len() as u64);
Ok(())
}
#[test]
fn test_blocking_upload_streaming_round_trip() -> Result<(), Box<dyn std::error::Error>> {
let temp = tempdir()?;
let session = local_session_sync(&temp)?;
let data = b"streamed upload bytes";
let runtime = session.runtime.clone();
let commit = session.new_upload_commit_blocking()?;
let (_handle, mut cleaner) =
commit.upload_file_blocking(Some("stream.bin".into()), data.len() as u64, Sha256Policy::Compute)?;
let (hash, file_size) = runtime.external_run_async_task(async move {
cleaner.add_data(data).await.unwrap();
let (xfi, _) = cleaner.finish().await.unwrap();
(xfi.hash, xfi.file_size)
})?;
let results = commit.commit_blocking()?;
assert!(results.is_empty());
assert_eq!(file_size, data.len() as u64);
assert!(!hash.is_empty());
Ok(())
}
#[test]
fn test_blocking_upload_multiple_files_in_one_commit() -> Result<(), Box<dyn std::error::Error>> {
let temp = tempdir()?;
let session = local_session_sync(&temp)?;
let commit = session.new_upload_commit_blocking()?;
commit.upload_bytes_blocking(b"file one".to_vec(), Sha256Policy::Compute, Some("a.bin".into()))?;
commit.upload_bytes_blocking(b"file two".to_vec(), Sha256Policy::Compute, Some("b.bin".into()))?;
commit.upload_bytes_blocking(b"file three".to_vec(), Sha256Policy::Compute, Some("c.bin".into()))?;
let results = commit.commit_blocking()?;
assert_eq!(results.len(), 3);
Ok(())
}
#[test]
fn test_blocking_upload_progress_reflects_bytes_after_commit() -> Result<(), Box<dyn std::error::Error>> {
let temp = tempdir()?;
let session = local_session_sync(&temp)?;
let data = b"progress tracking upload data";
let commit = session.new_upload_commit_blocking()?;
let progress_observer = commit.clone();
commit.upload_bytes_blocking(data.to_vec(), Sha256Policy::Compute, Some("prog.bin".into()))?;
commit.commit_blocking()?;
let snapshot = progress_observer.get_progress()?;
assert!(snapshot.total().total_bytes_completed > 0);
Ok(())
}
fn assert_blocking_upload_round_trip<R>(run: R)
where
R: FnOnce(std::pin::Pin<Box<dyn std::future::Future<Output = ()>>>),
{
let temp = tempdir().unwrap();
let session = local_session_sync(&temp).unwrap();
run(Box::pin(async move {
let data = b"upload from smol executor";
let commit = session.new_upload_commit_blocking().unwrap();
let handle = commit
.upload_bytes_blocking(data.to_vec(), Sha256Policy::Compute, Some("test.bin".into()))
.unwrap();
let results = commit.commit_blocking().unwrap();
let meta = results.get(&handle.task_id).unwrap().as_ref().as_ref().unwrap();
assert_eq!(meta.file_size, data.len() as u64);
assert!(!meta.hash.is_empty());
}));
}
#[test]
fn test_blocking_upload_round_trip_in_smol() {
assert_blocking_upload_round_trip(|fut| smol::block_on(fut));
}
#[test]
fn test_blocking_upload_round_trip_in_futures_executor() {
assert_blocking_upload_round_trip(|fut| futures::executor::block_on(fut));
}
#[test]
fn test_blocking_upload_round_trip_in_async_std() {
assert_blocking_upload_round_trip(|fut| async_std::task::block_on(fut));
}
}