Early check RuntimeMode in blocking apis (#739)

Since a previous PR merges the async and blocking APIs under one struct,
the blocking APIs become accessible from `UploadCommit` /
`DownloadGroup` created by the async APIs. This PR adds the similar
`External` runtime mode checking to these blocking APIs as done for the
`new_upload_commit_blocking` and `new_download_group_blocking`
functions, so that they return an `Err` gracefully if possible instead
of panic.

However, this doesn't guard users from "deliberately" creating an
`UploadCommit` / `DownloadGroup` instance with `Owned` runtime mode and
send it to an async context and call the blocking APIs, in which case it
will still panic.

Added unit tests and updated docs for the above changes.
This commit is contained in:
Di Xiao
2026-03-19 13:31:55 -07:00
committed by GitHub
parent fb83178d28
commit 44566cc288
2 changed files with 184 additions and 6 deletions

View File

@@ -12,7 +12,7 @@ use xet_runtime::core::XetRuntime;
use super::common::{GroupState, create_translator_config};
use super::errors::SessionError;
use super::session::XetSession;
use super::session::{RuntimeMode, XetSession};
use super::tasks::{DownloadTaskHandle, TaskHandle, TaskStatus};
/// API for grouping related file downloads into a single unit of work.
@@ -152,14 +152,28 @@ impl DownloadGroup {
/// Blocking version of [`download_file_to_path`](Self::download_file_to_path).
///
/// # Errors
///
/// Returns [`SessionError::WrongRuntimeMode`] if the session was created with an external
/// tokio runtime ([`XetSessionBuilder::with_tokio_handle`] / [`XetSessionBuilder::build_async`]
/// inside a tokio context). Use [`download_file_to_path`](Self::download_file_to_path)`.await`
/// instead.
///
/// # Panics
///
/// Panics if called from within a tokio async runtime.
/// Panics if called from within a tokio async runtime on an Owned-mode session.
pub fn download_file_to_path_blocking(
&self,
file_info: XetFileInfo,
dest_path: PathBuf,
) -> Result<DownloadTaskHandle, SessionError> {
if matches!(self.session.runtime_mode, RuntimeMode::External) {
return Err(SessionError::wrong_mode(
"download_file_to_path_blocking() cannot be called on a session using an \
external tokio runtime (with_tokio_handle() or tokio build_async()); \
use download_file_to_path().await instead",
));
}
let group = self.clone();
self.runtime()
.external_run_async_task(async move { group.download_file_to_path(file_info, dest_path).await })?
@@ -370,6 +384,7 @@ pub struct DownloadedFile {
#[cfg(test)]
mod tests {
use std::path::PathBuf;
use std::sync::mpsc;
use std::time::Duration;
@@ -1040,4 +1055,46 @@ mod tests {
fn test_blocking_download_round_trip_in_async_std() {
assert_blocking_download_round_trip(|fut| async_std::task::block_on(fut));
}
// ── RuntimeMode checks ────────────────────────────────────────────────────
#[tokio::test(flavor = "multi_thread")]
// download_file_to_path_blocking returns WrongRuntimeMode on an External-mode session.
async fn test_download_file_to_path_blocking_errors_in_external_mode() {
let session = XetSessionBuilder::new().build_async().await.unwrap();
assert_eq!(session.runtime_mode, RuntimeMode::External);
let group = session.new_download_group().await.unwrap();
let file_info = XetFileInfo {
hash: String::new(),
file_size: 0,
sha256: None,
};
let err = group
.download_file_to_path_blocking(file_info, PathBuf::from("/nonexistent"))
.err()
.unwrap();
assert!(matches!(err, SessionError::WrongRuntimeMode(_)));
}
// ── Owned-mode _blocking panic guard ─────────────────────────────────────
#[test]
// download_file_to_path_blocking panics when called from within a tokio runtime on an
// Owned-mode session: external_run_async_task calls handle.block_on(), which panics
// because tokio sets a thread-local runtime context that it detects and rejects.
fn test_download_file_to_path_blocking_panics_in_async_context() {
let session = XetSessionBuilder::new().build().unwrap();
assert_eq!(session.runtime_mode, RuntimeMode::Owned);
let group = session.new_download_group_blocking().unwrap();
let rt = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap();
let file_info = XetFileInfo {
hash: String::new(),
file_size: 0,
sha256: None,
};
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
rt.block_on(async { group.download_file_to_path_blocking(file_info, PathBuf::from("/nonexistent")) })
}));
assert!(result.is_err(), "download_file_to_path_blocking() must panic when called from async");
}
}

View File

@@ -12,7 +12,7 @@ use xet_runtime::core::XetRuntime;
use super::common::{GroupState, create_translator_config};
use super::errors::SessionError;
use super::session::XetSession;
use super::session::{RuntimeMode, XetSession};
use super::tasks::{TaskHandle, TaskStatus, UploadTaskHandle};
/// API for grouping related file uploads into a single atomic commit.
@@ -247,14 +247,27 @@ impl UploadCommit {
/// Blocking version of [`upload_from_path`](Self::upload_from_path).
///
/// # Errors
///
/// Returns [`SessionError::WrongRuntimeMode`] if the session was created with an external
/// tokio runtime ([`XetSessionBuilder::with_tokio_handle`] / [`XetSessionBuilder::build_async`]
/// inside a tokio context). Use [`upload_from_path`](Self::upload_from_path)`.await` instead.
///
/// # Panics
///
/// Panics if called from within a tokio async runtime.
/// Panics if called from within a tokio async runtime on an Owned-mode session.
pub fn upload_from_path_blocking(
&self,
file_path: PathBuf,
sha256: Sha256Policy,
) -> Result<UploadTaskHandle, SessionError> {
if matches!(self.session.runtime_mode, RuntimeMode::External) {
return Err(SessionError::wrong_mode(
"upload_from_path_blocking() cannot be called on a session using an \
external tokio runtime (with_tokio_handle() or tokio build_async()); \
use upload_from_path().await instead",
));
}
self.session.check_alive()?;
let absolute_path = std::path::absolute(file_path)?;
@@ -266,15 +279,28 @@ impl UploadCommit {
/// Blocking version of [`upload_bytes`](Self::upload_bytes).
///
/// # Errors
///
/// Returns [`SessionError::WrongRuntimeMode`] if the session was created with an external
/// tokio runtime ([`XetSessionBuilder::with_tokio_handle`] / [`XetSessionBuilder::build_async`]
/// inside a tokio context). Use [`upload_bytes`](Self::upload_bytes)`.await` instead.
///
/// # Panics
///
/// Panics if called from within a tokio async runtime.
/// Panics if called from within a tokio async runtime on an Owned-mode session.
pub fn upload_bytes_blocking(
&self,
bytes: Vec<u8>,
sha256: Sha256Policy,
tracking_name: Option<String>,
) -> Result<UploadTaskHandle, SessionError> {
if matches!(self.session.runtime_mode, RuntimeMode::External) {
return Err(SessionError::wrong_mode(
"upload_bytes_blocking() cannot be called on a session using an \
external tokio runtime (with_tokio_handle() or tokio build_async()); \
use upload_bytes().await instead",
));
}
self.session.check_alive()?;
let commit_inner = self.inner.clone();
@@ -285,15 +311,28 @@ impl UploadCommit {
/// Blocking version of [`upload_file`](Self::upload_file).
///
/// # Errors
///
/// Returns [`SessionError::WrongRuntimeMode`] if the session was created with an external
/// tokio runtime ([`XetSessionBuilder::with_tokio_handle`] / [`XetSessionBuilder::build_async`]
/// inside a tokio context). Use [`upload_file`](Self::upload_file)`.await` instead.
///
/// # Panics
///
/// Panics if called from within a tokio async runtime.
/// Panics if called from within a tokio async runtime on an Owned-mode session.
pub fn upload_file_blocking(
&self,
file_name: Option<String>,
file_size: u64,
sha256: Sha256Policy,
) -> Result<(TaskHandle, SingleFileCleaner), SessionError> {
if matches!(self.session.runtime_mode, RuntimeMode::External) {
return Err(SessionError::wrong_mode(
"upload_file_blocking() cannot be called on a session using an \
external tokio runtime (with_tokio_handle() or tokio build_async()); \
use upload_file().await instead",
));
}
self.session.check_alive()?;
let commit_inner = self.inner.clone();
@@ -1343,4 +1382,86 @@ mod tests {
fn test_blocking_upload_round_trip_in_async_std() {
assert_blocking_upload_round_trip(|fut| async_std::task::block_on(fut));
}
// ── RuntimeMode checks ────────────────────────────────────────────────────
#[tokio::test(flavor = "multi_thread")]
// upload_from_path_blocking returns WrongRuntimeMode on an External-mode session.
async fn test_upload_from_path_blocking_errors_in_external_mode() {
let session = XetSessionBuilder::new().build_async().await.unwrap();
assert_eq!(session.runtime_mode, RuntimeMode::External);
let commit = session.new_upload_commit().await.unwrap();
let err = commit
.upload_from_path_blocking(PathBuf::from("/nonexistent"), Sha256Policy::Compute)
.err()
.unwrap();
assert!(matches!(err, SessionError::WrongRuntimeMode(_)));
}
#[tokio::test(flavor = "multi_thread")]
// upload_bytes_blocking returns WrongRuntimeMode on an External-mode session.
async fn test_upload_bytes_blocking_errors_in_external_mode() {
let session = XetSessionBuilder::new().build_async().await.unwrap();
assert_eq!(session.runtime_mode, RuntimeMode::External);
let commit = session.new_upload_commit().await.unwrap();
let err = commit.upload_bytes_blocking(vec![], Sha256Policy::Compute, None).err().unwrap();
assert!(matches!(err, SessionError::WrongRuntimeMode(_)));
}
#[tokio::test(flavor = "multi_thread")]
// upload_file_blocking returns WrongRuntimeMode on an External-mode session.
async fn test_upload_file_blocking_errors_in_external_mode() {
let session = XetSessionBuilder::new().build_async().await.unwrap();
assert_eq!(session.runtime_mode, RuntimeMode::External);
let commit = session.new_upload_commit().await.unwrap();
let err = commit.upload_file_blocking(None, 0, Sha256Policy::Compute).err().unwrap();
assert!(matches!(err, SessionError::WrongRuntimeMode(_)));
}
// ── Owned-mode _blocking panic guard ─────────────────────────────────────
#[test]
// upload_from_path_blocking panics when called from within a tokio runtime on an
// Owned-mode session: external_run_async_task calls handle.block_on(), which panics
// because tokio sets a thread-local runtime context that it detects and rejects.
fn test_upload_from_path_blocking_panics_in_async_context() {
let session = XetSessionBuilder::new().build().unwrap();
assert_eq!(session.runtime_mode, RuntimeMode::Owned);
let commit = session.new_upload_commit_blocking().unwrap();
let rt = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap();
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
rt.block_on(async {
commit.upload_from_path_blocking(PathBuf::from("/nonexistent"), Sha256Policy::Compute)
})
}));
assert!(result.is_err(), "upload_from_path_blocking() must panic when called from async");
}
#[test]
// upload_bytes_blocking panics when called from within a tokio runtime on an
// Owned-mode session: same mechanism as the path variant above.
fn test_upload_bytes_blocking_panics_in_async_context() {
let session = XetSessionBuilder::new().build().unwrap();
assert_eq!(session.runtime_mode, RuntimeMode::Owned);
let commit = session.new_upload_commit_blocking().unwrap();
let rt = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap();
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
rt.block_on(async { commit.upload_bytes_blocking(vec![], Sha256Policy::Compute, None) })
}));
assert!(result.is_err(), "upload_bytes_blocking() must panic when called from async");
}
#[test]
// upload_file_blocking panics when called from within a tokio runtime on an
// Owned-mode session: same mechanism as the path variant above.
fn test_upload_file_blocking_panics_in_async_context() {
let session = XetSessionBuilder::new().build().unwrap();
assert_eq!(session.runtime_mode, RuntimeMode::Owned);
let commit = session.new_upload_commit_blocking().unwrap();
let rt = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap();
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
rt.block_on(async { commit.upload_file_blocking(None, 0, Sha256Policy::Compute) })
}));
assert!(result.is_err(), "upload_file_blocking() must panic when called from async");
}
}