feat: optional chunk cache in download path for cross-file dedup

Add an optional ChunkCache to the download path (FileDownloadSession,
FileReconstructor, XorbBlock). When provided, xorb blocks are looked up
in cache before HTTP requests and stored after download. Cache hits skip
permit acquisition, so they don't consume network concurrency slots.

This enables cross-file deduplication for mount-style workloads.

Breaking change: FileDownloadSession::new() and from_client() signatures
now take an additional chunk_cache parameter.
This commit is contained in:
Adrien
2026-03-18 10:25:07 +01:00
parent a81fc5800a
commit f4fdea5175
13 changed files with 129 additions and 56 deletions

View File

@@ -8,6 +8,7 @@ use tokio_util::sync::CancellationToken;
use tracing::{debug, info};
use xet_client::cas_client::Client;
use xet_client::cas_types::FileRange;
use xet_client::chunk_cache::ChunkCache;
use xet_core_structures::merklehash::MerkleHash;
use xet_runtime::config::ReconstructionConfig;
use xet_runtime::core::{XetRuntime, xet_config};
@@ -30,6 +31,9 @@ pub struct FileReconstructor {
progress_updater: Option<Arc<ItemProgressUpdater>>,
config: Arc<ReconstructionConfig>,
/// Optional on-disk chunk cache for cross-file deduplication.
chunk_cache: Option<Arc<dyn ChunkCache>>,
/// Custom buffer semaphore for testing or specialized use cases.
custom_buffer_semaphore: Option<Arc<AdjustableSemaphore>>,
@@ -47,6 +51,7 @@ impl FileReconstructor {
byte_range: None,
progress_updater: default_progress_updater(),
config: Arc::new(xet_config().reconstruction.clone()),
chunk_cache: None,
custom_buffer_semaphore: None,
cancellation_token: CancellationToken::new(),
}
@@ -66,6 +71,13 @@ impl FileReconstructor {
}
}
pub fn with_chunk_cache(self, cache: Arc<dyn ChunkCache>) -> Self {
Self {
chunk_cache: Some(cache),
..self
}
}
pub fn with_config(self, config: impl AsRef<ReconstructionConfig>) -> Self {
Self {
config: Arc::new(config.as_ref().clone()),
@@ -220,6 +232,7 @@ impl FileReconstructor {
client,
byte_range,
config,
chunk_cache,
custom_buffer_semaphore,
..
} = self;
@@ -335,7 +348,7 @@ impl FileReconstructor {
};
let data_future = file_term
.get_data_task(client.clone(), run_state.progress_updater().cloned())
.get_data_task(client.clone(), run_state.progress_updater().cloned(), chunk_cache.clone())
.await?;
#[cfg(debug_assertions)]

View File

@@ -6,6 +6,7 @@ use bytes::Bytes;
use tokio::sync::OnceCell;
use xet_client::cas_client::Client;
use xet_client::cas_types::{ChunkRange, FileRange, HttpRange};
use xet_client::chunk_cache::ChunkCache;
use xet_core_structures::merklehash::MerkleHash;
use xet_runtime::core::xet_config;
use xet_runtime::utils::UniqueId;
@@ -59,6 +60,7 @@ impl FileTerm {
&self,
client: Arc<dyn Client>,
progress_updater: Option<Arc<ItemProgressUpdater>>,
chunk_cache: Option<Arc<dyn ChunkCache>>,
) -> Result<DataFuture> {
// Fast path: data already cached, no need to spawn a task.
if let Some(xorb_block_data) = self.xorb_block.data.get() {
@@ -71,7 +73,9 @@ impl FileTerm {
let xorb_block = self.xorb_block.clone();
let task = tokio::task::spawn(async move {
let xorb_block_data = xorb_block.retrieve_data(client, url_info, progress_updater).await?;
let xorb_block_data = xorb_block
.retrieve_data(client, url_info, progress_updater, chunk_cache)
.await?;
Ok(file_term.extract_bytes(&xorb_block_data))
});
@@ -475,7 +479,7 @@ mod tests {
assert!(file_contents.xorbs.contains_key(&file_term.xorb_block.xorb_hash));
// Get the data task and await it.
let data_future = file_term.get_data_task(dyn_client.clone(), None).await.unwrap();
let data_future = file_term.get_data_task(dyn_client.clone(), None, None).await.unwrap();
let data = data_future.await.unwrap();
// Verify the data size matches the byte range.

View File

@@ -4,8 +4,10 @@ use std::sync::Arc;
use bytes::Bytes;
use tokio::sync::{Mutex, OnceCell};
use xet_client::cas_client::{Client, ProgressCallback};
use xet_client::cas_types::ChunkRange;
use xet_client::cas_types::{ChunkRange, Key};
use xet_client::chunk_cache::ChunkCache;
use xet_core_structures::merklehash::MerkleHash;
use xet_runtime::core::xet_config;
use xet_runtime::utils::UniqueId;
use super::super::error::Result;
@@ -70,6 +72,19 @@ impl PartialEq for XorbBlock {
impl Eq for XorbBlock {}
/// Builds chunk offset pairs from chunk ranges and a flat byte-offset slice.
fn build_chunk_offsets(chunk_ranges: &[ChunkRange], byte_offsets: &[u32]) -> Vec<(usize, usize)> {
let mut chunk_offsets = Vec::new();
let mut offset_idx = 0;
for range in chunk_ranges {
for chunk_idx in range.start..range.end {
chunk_offsets.push((chunk_idx as usize, byte_offsets[offset_idx] as usize));
offset_idx += 1;
}
}
chunk_offsets
}
impl XorbBlock {
/// Retrieve the xorb block data from the client, caching it for subsequent calls.
///
@@ -82,6 +97,7 @@ impl XorbBlock {
client: Arc<dyn Client>,
url_info: Arc<TermBlockRetrievalURLs>,
progress_updater: Option<Arc<ItemProgressUpdater>>,
chunk_cache: Option<Arc<dyn ChunkCache>>,
) -> Result<Arc<XorbBlockData>> {
let xorb_block_index = self.xorb_block_index;
let uncompressed_size_if_known = self.uncompressed_size_if_known;
@@ -89,7 +105,31 @@ impl XorbBlock {
self.data
.get_or_try_init(|| async {
// Acquire a CAS download permit only when actually downloading.
// Try the on-disk chunk cache before hitting the network.
// NOTE: cache key uses only the first ChunkRange. This works when each
// XorbBlock has a single range, but will need rework if multi-range
// blocks (multiple disjoint chunk ranges per block) are cached.
if let Some(ref cache) = chunk_cache {
let cache_key = Key {
prefix: xet_config().data.default_prefix.clone(),
hash: self.xorb_hash,
};
let chunk_range = chunk_ranges.first().copied().unwrap_or_default();
if let Ok(Some(cache_range)) = cache.get(&cache_key, &chunk_range).await {
// Report cached bytes as completed so progress tracking stays consistent.
if let Some(ref updater) = progress_updater {
let (_, _, http_ranges) = url_info.get_retrieval_url(xorb_block_index).await;
let transfer_bytes: u64 = http_ranges.iter().map(|r| r.length()).sum();
updater.report_transfer_progress(transfer_bytes);
}
let chunk_offsets = build_chunk_offsets(&chunk_ranges, &cache_range.offsets);
let data = Bytes::from(cache_range.data);
return Ok(Arc::new(XorbBlockData { chunk_offsets, data }));
}
}
// Cache miss or no cache configured - download from CAS.
let permit = client.acquire_download_permit().await?;
let url_provider = XorbURLProvider {
@@ -112,17 +152,24 @@ impl XorbBlock {
.get_file_term_data(Box::new(url_provider), permit, progress_callback, uncompressed_size_if_known)
.await?;
// Build chunk_offsets by zipping each chunk index (from all chunk_ranges)
// with the corresponding byte offset from the returned data.
let mut chunk_offsets = Vec::new();
let mut offset_idx = 0;
for range in &chunk_ranges {
for chunk_idx in range.start..range.end {
chunk_offsets.push((chunk_idx as usize, chunk_byte_offsets[offset_idx] as usize));
offset_idx += 1;
}
// Store in chunk cache (best-effort, non-blocking).
if let Some(cache) = chunk_cache {
let cache_key = Key {
prefix: xet_config().data.default_prefix.clone(),
hash: self.xorb_hash,
};
let chunk_range = chunk_ranges.first().copied().unwrap_or_default();
let data = data.clone();
let chunk_byte_offsets = chunk_byte_offsets.clone();
tokio::spawn(async move {
if let Err(err) = cache.put(&cache_key, &chunk_range, &chunk_byte_offsets, &data).await {
tracing::warn!("chunk cache put failed: {err}");
}
});
}
let chunk_offsets = build_chunk_offsets(&chunk_ranges, &chunk_byte_offsets);
Ok(Arc::new(XorbBlockData { chunk_offsets, data }))
})
.await

View File

@@ -133,7 +133,7 @@ async fn smudge(_name: Arc<str>, mut reader: impl Read, output_path: PathBuf) ->
// Use local config pointing to current directory
let cas_path = std::env::current_dir()?;
let config = TranslatorConfig::local_config(cas_path)?;
let session = xet_data::processing::FileDownloadSession::new(config.into()).await?;
let session = xet_data::processing::FileDownloadSession::new(config.into(), None).await?;
let (_id, _n_bytes) = session.download_file(&xet_file, &output_path).await?;

View File

@@ -10,6 +10,7 @@ use tokio::task::JoinHandle;
use tracing::instrument;
use xet_client::cas_client::Client;
use xet_client::cas_types::FileRange;
use xet_client::chunk_cache::ChunkCache;
use xet_runtime::core::{XetRuntime, xet_config};
use super::configurations::TranslatorConfig;
@@ -25,13 +26,14 @@ use crate::progress_tracking::{GroupProgress, ItemProgressUpdater, UniqueID};
/// CAS client and a shared progress group for all downloads in the session.
pub struct FileDownloadSession {
client: Arc<dyn Client>,
chunk_cache: Option<Arc<dyn ChunkCache>>,
progress: Arc<GroupProgress>,
active_stream_abort_callbacks: Mutex<HashMap<UniqueID, Box<dyn Fn() + Send + Sync>>>,
finalized: AtomicBool,
}
impl FileDownloadSession {
pub async fn new(config: Arc<TranslatorConfig>) -> Result<Arc<Self>> {
pub async fn new(config: Arc<TranslatorConfig>, chunk_cache: Option<Arc<dyn ChunkCache>>) -> Result<Arc<Self>> {
let session_id = config
.session
.session_id
@@ -47,6 +49,7 @@ impl FileDownloadSession {
Ok(Arc::new(Self {
client,
chunk_cache,
progress,
active_stream_abort_callbacks: Mutex::new(HashMap::new()),
finalized: AtomicBool::new(false),
@@ -57,10 +60,11 @@ impl FileDownloadSession {
///
/// This path uses default progress speed settings. Use [`Self::new`] when the
/// session should inherit the configured speed parameters from `xet_config`.
pub fn from_client(client: Arc<dyn Client>) -> Arc<Self> {
pub fn from_client(client: Arc<dyn Client>, chunk_cache: Option<Arc<dyn ChunkCache>>) -> Arc<Self> {
let progress = GroupProgress::new();
Arc::new(Self {
client,
chunk_cache,
progress,
active_stream_abort_callbacks: Mutex::new(HashMap::new()),
finalized: AtomicBool::new(false),
@@ -325,6 +329,11 @@ impl FileDownloadSession {
if let Some(updater) = progress_updater {
reconstructor = reconstructor.with_progress_updater(updater);
}
if let Some(ref cache) = self.chunk_cache {
reconstructor = reconstructor.with_chunk_cache(cache.clone());
}
Ok(reconstructor)
}
}
@@ -405,7 +414,7 @@ mod tests {
let xfi = upload_data(&cas_path, original_data).await;
let config = TranslatorConfig::local_config(&cas_path).unwrap();
let session = FileDownloadSession::new(config.into()).await.unwrap();
let session = FileDownloadSession::new(config.into(), None).await.unwrap();
let out_path = temp.path().join("output.txt");
let (_id, n_bytes) = session.download_file(&xfi, &out_path).await.unwrap();
@@ -429,7 +438,7 @@ mod tests {
let xfi = upload_data(&cas_path, original_data).await;
let config = TranslatorConfig::local_config(&cas_path).unwrap();
let session = FileDownloadSession::new(config.into()).await.unwrap();
let session = FileDownloadSession::new(config.into(), None).await.unwrap();
let out_path = temp.path().join("deep").join("nested").join("dir").join("output.txt");
assert!(!out_path.parent().unwrap().exists());
@@ -454,7 +463,7 @@ mod tests {
let xfi = upload_data(&cas_path, original_data).await;
let config = TranslatorConfig::local_config(&cas_path).unwrap();
let session = FileDownloadSession::new(config.into()).await.unwrap();
let session = FileDownloadSession::new(config.into(), None).await.unwrap();
let out_path = temp.path().join("partial_writer.txt");
write(&out_path, vec![0u8; original_data.len()]).unwrap();
@@ -483,7 +492,7 @@ mod tests {
let xfi = upload_data(&cas_path, original_data).await;
let config = TranslatorConfig::local_config(&cas_path).unwrap();
let session = FileDownloadSession::new(config.into()).await.unwrap();
let session = FileDownloadSession::new(config.into(), None).await.unwrap();
let out_path = temp.path().join("partitioned.txt");
write(&out_path, vec![0u8; original_data.len()]).unwrap();
@@ -535,7 +544,7 @@ mod tests {
let xfi_b = upload_data(&cas_path, data_b).await;
let config = TranslatorConfig::local_config(&cas_path).unwrap();
let session = FileDownloadSession::new(config.into()).await.unwrap();
let session = FileDownloadSession::new(config.into(), None).await.unwrap();
let out_a = temp.path().join("out_a.txt");
let out_b = temp.path().join("out_b.txt");
@@ -574,7 +583,7 @@ mod tests {
let xfi = upload_data(&cas_path, original_data).await;
let config = TranslatorConfig::local_config(&cas_path).unwrap();
let session = FileDownloadSession::new(config.into()).await.unwrap();
let session = FileDownloadSession::new(config.into(), None).await.unwrap();
let (_id, mut stream) = session.download_stream(&xfi, None).await.unwrap();
@@ -601,7 +610,7 @@ mod tests {
let xfi = upload_data(&cas_path, original_data).await;
let config = TranslatorConfig::local_config(&cas_path).unwrap();
let session = FileDownloadSession::new(config.into()).await.unwrap();
let session = FileDownloadSession::new(config.into(), None).await.unwrap();
let (_id, stream) = session.download_stream(&xfi, None).await.unwrap();
@@ -634,7 +643,7 @@ mod tests {
let xfi = upload_data(&cas_path, original_data).await;
let config = TranslatorConfig::local_config(&cas_path).unwrap();
let session = FileDownloadSession::new(config.into()).await.unwrap();
let session = FileDownloadSession::new(config.into(), None).await.unwrap();
let (_id, mut stream) = session.download_stream(&xfi, None).await.unwrap();
@@ -663,7 +672,7 @@ mod tests {
let xfi_b = upload_data(&cas_path, data_b).await;
let config = TranslatorConfig::local_config(&cas_path).unwrap();
let session = FileDownloadSession::new(config.into()).await.unwrap();
let session = FileDownloadSession::new(config.into(), None).await.unwrap();
let (_id_a, mut stream_a) = session.download_stream(&xfi_a, None).await.unwrap();
let (_id_b, mut stream_b) = session.download_stream(&xfi_b, None).await.unwrap();
@@ -706,7 +715,7 @@ mod tests {
let xfi = upload_data(&cas_path, original_data).await;
let config = TranslatorConfig::local_config(&cas_path).unwrap();
let session = FileDownloadSession::new(config.into()).await.unwrap();
let session = FileDownloadSession::new(config.into(), None).await.unwrap();
let (_id, stream) = session.download_stream(&xfi, None).await.unwrap();
drop(stream);
@@ -732,7 +741,7 @@ mod tests {
let xfi = upload_data(&cas_path, original_data).await;
let config = TranslatorConfig::local_config(&cas_path).unwrap();
let session = FileDownloadSession::new(config.into()).await.unwrap();
let session = FileDownloadSession::new(config.into(), None).await.unwrap();
for i in 0..5u32 {
let (_id, mut stream) = session.download_stream(&xfi, None).await.unwrap();
@@ -763,7 +772,7 @@ mod tests {
let xfi = upload_data(&cas_path, original_data).await;
let config = TranslatorConfig::local_config(&cas_path).unwrap();
let session = FileDownloadSession::new(config.into()).await.unwrap();
let session = FileDownloadSession::new(config.into(), None).await.unwrap();
let (_id, stream) = session.download_stream(&xfi, None).await.unwrap();
@@ -796,7 +805,7 @@ mod tests {
let xfi = upload_data(&cas_path, original_data).await;
let config = TranslatorConfig::local_config(&cas_path).unwrap();
let session = FileDownloadSession::new(config.into()).await.unwrap();
let session = FileDownloadSession::new(config.into(), None).await.unwrap();
let (_id, mut stream) = session.download_stream(&xfi, None).await.unwrap();
stream.cancel();
@@ -819,7 +828,7 @@ mod tests {
let xfi = upload_data(&cas_path, original_data).await;
let config = TranslatorConfig::local_config(&cas_path).unwrap();
let session = FileDownloadSession::new(config.into()).await.unwrap();
let session = FileDownloadSession::new(config.into(), None).await.unwrap();
let (_id, mut stream) = session.download_stream(&xfi, None).await.unwrap();
let _ = stream.next().await.unwrap();
@@ -849,7 +858,7 @@ mod tests {
let xfi = upload_data(&cas_path, original_data).await;
let config = TranslatorConfig::local_config(&cas_path).unwrap();
let session = FileDownloadSession::new(config.into()).await.unwrap();
let session = FileDownloadSession::new(config.into(), None).await.unwrap();
let out_path = temp.path().join("range_from.bin");
let file = std::fs::File::create(&out_path).unwrap();
@@ -874,7 +883,7 @@ mod tests {
let xfi = upload_data(&cas_path, original_data).await;
let config = TranslatorConfig::local_config(&cas_path).unwrap();
let session = FileDownloadSession::new(config.into()).await.unwrap();
let session = FileDownloadSession::new(config.into(), None).await.unwrap();
let out_path = temp.path().join("range_to.bin");
let file = std::fs::File::create(&out_path).unwrap();
@@ -899,7 +908,7 @@ mod tests {
let xfi = upload_data(&cas_path, original_data).await;
let config = TranslatorConfig::local_config(&cas_path).unwrap();
let session = FileDownloadSession::new(config.into()).await.unwrap();
let session = FileDownloadSession::new(config.into(), None).await.unwrap();
let out_path = temp.path().join("full_range.bin");
let file = std::fs::File::create(&out_path).unwrap();
@@ -924,7 +933,7 @@ mod tests {
let xfi = upload_data(&cas_path, original_data).await;
let config = TranslatorConfig::local_config(&cas_path).unwrap();
let session = FileDownloadSession::new(config.into()).await.unwrap();
let session = FileDownloadSession::new(config.into(), None).await.unwrap();
let out_path = temp.path().join("range_incl.bin");
let file = std::fs::File::create(&out_path).unwrap();
@@ -951,7 +960,7 @@ mod tests {
let xfi = upload_data(&cas_path, original_data).await;
let config = TranslatorConfig::local_config(&cas_path).unwrap();
let session = FileDownloadSession::new(config.into()).await.unwrap();
let session = FileDownloadSession::new(config.into(), None).await.unwrap();
let (_id, mut stream) = session.download_stream_range(&xfi, 4..12).await.unwrap();
@@ -978,7 +987,7 @@ mod tests {
let xfi = upload_data(&cas_path, original_data).await;
let config = TranslatorConfig::local_config(&cas_path).unwrap();
let session = FileDownloadSession::new(config.into()).await.unwrap();
let session = FileDownloadSession::new(config.into(), None).await.unwrap();
let (_id, mut stream) = session.download_stream_range(&xfi, 10..).await.unwrap();
@@ -1005,7 +1014,7 @@ mod tests {
let xfi = upload_data(&cas_path, original_data).await;
let config = TranslatorConfig::local_config(&cas_path).unwrap();
let session = FileDownloadSession::new(config.into()).await.unwrap();
let session = FileDownloadSession::new(config.into(), None).await.unwrap();
let (_id, mut stream) = session.download_stream_range(&xfi, ..6).await.unwrap();
@@ -1035,7 +1044,7 @@ mod tests {
let xfi_no_size = XetFileInfo::new_hash_only(xfi.hash().to_string());
let config = TranslatorConfig::local_config(&cas_path).unwrap();
let session = FileDownloadSession::new(config.into()).await.unwrap();
let session = FileDownloadSession::new(config.into(), None).await.unwrap();
let out_path = temp.path().join("output_unknown.txt");
let (_id, n_bytes) = session.download_file(&xfi_no_size, &out_path).await.unwrap();
@@ -1060,7 +1069,7 @@ mod tests {
let xfi_no_size = XetFileInfo::new_hash_only(xfi.hash().to_string());
let config = TranslatorConfig::local_config(&cas_path).unwrap();
let session = FileDownloadSession::new(config.into()).await.unwrap();
let session = FileDownloadSession::new(config.into(), None).await.unwrap();
let (_id, mut stream) = session.download_stream(&xfi_no_size, None).await.unwrap();
@@ -1089,7 +1098,7 @@ mod tests {
let wrong_size_xfi = XetFileInfo::new(xfi.hash().to_string(), 999);
let config = TranslatorConfig::local_config(&cas_path).unwrap();
let session = FileDownloadSession::new(config.into()).await.unwrap();
let session = FileDownloadSession::new(config.into(), None).await.unwrap();
let out_path = temp.path().join("output_mismatch.txt");
let err = session.download_file(&wrong_size_xfi, &out_path).await.unwrap_err();
@@ -1136,7 +1145,7 @@ mod tests {
let xfi = upload_data(&cas_path, original_data).await;
let config = TranslatorConfig::local_config(&cas_path).unwrap();
let session = FileDownloadSession::new(config.into()).await.unwrap();
let session = FileDownloadSession::new(config.into(), None).await.unwrap();
let out_path = temp.path().join("empty_range.bin");
let file = std::fs::File::create(&out_path).unwrap();
@@ -1161,7 +1170,7 @@ mod tests {
let xfi = upload_data(&cas_path, original_data).await;
let config = TranslatorConfig::local_config(&cas_path).unwrap();
let session = FileDownloadSession::new(config.into()).await.unwrap();
let session = FileDownloadSession::new(config.into(), None).await.unwrap();
let out_path = temp.path().join("inverted_range.bin");
let file = std::fs::File::create(&out_path).unwrap();
@@ -1186,7 +1195,7 @@ mod tests {
let xfi = upload_data(&cas_path, original_data).await;
let config = TranslatorConfig::local_config(&cas_path).unwrap();
let session = FileDownloadSession::new(config.into()).await.unwrap();
let session = FileDownloadSession::new(config.into(), None).await.unwrap();
let out_path = temp.path().join("beyond_size.bin");
let file = std::fs::File::create(&out_path).unwrap();

View File

@@ -641,7 +641,7 @@ mod tests {
let xet_file = serde_json::from_str::<XetFileInfo>(&input).unwrap();
let config = TranslatorConfig::local_config(cas_path).unwrap();
let session = FileDownloadSession::new(config.into()).await.unwrap();
let session = FileDownloadSession::new(config.into(), None).await.unwrap();
let (_id, _n_bytes) = session.download_file(&xet_file, output_path).await.unwrap();
}

View File

@@ -345,7 +345,7 @@ impl HydrateDehydrateTest {
pub async fn hydrate(&mut self) {
let client = self.get_or_create_client().await;
let session = FileDownloadSession::from_client(client);
let session = FileDownloadSession::from_client(client, None);
for entry in read_dir(&self.ptr_dir).unwrap() {
let entry = entry.unwrap();
@@ -358,7 +358,7 @@ impl HydrateDehydrateTest {
pub async fn hydrate_partitioned_writers(&mut self, partitions: usize) {
let client = self.get_or_create_client().await;
let session = FileDownloadSession::from_client(client);
let session = FileDownloadSession::from_client(client, None);
for entry in read_dir(&self.ptr_dir).unwrap() {
let entry = entry.unwrap();
@@ -402,7 +402,7 @@ impl HydrateDehydrateTest {
pub async fn hydrate_stream(&mut self) {
let client = self.get_or_create_client().await;
let session = FileDownloadSession::from_client(client);
let session = FileDownloadSession::from_client(client, None);
for entry in read_dir(&self.ptr_dir).unwrap() {
let entry = entry.unwrap();

View File

@@ -47,7 +47,7 @@ mod tests {
upload_session.finalize().await.unwrap();
// Now download and verify each file.
let download_session = FileDownloadSession::new(env.config.clone()).await.unwrap();
let download_session = FileDownloadSession::new(env.config.clone(), None).await.unwrap();
for (name, data, xfi) in &xfis {
let out_path = env.base_dir.join(format!("out_{name}"));
let (_id, n_bytes) = download_session.download_file(xfi, &out_path).await.unwrap();

View File

@@ -35,7 +35,7 @@ mod tests {
let xfi = upload_bytes(&upload_session, "range_test", &data).await;
upload_session.finalize().await.unwrap();
let download_session = FileDownloadSession::new(env.config.clone()).await.unwrap();
let download_session = FileDownloadSession::new(env.config.clone(), None).await.unwrap();
TestHarness {
_env: env,
session: download_session,

View File

@@ -44,7 +44,7 @@ mod tests {
}
upload_session.finalize().await.unwrap();
let download_session = FileDownloadSession::new(env.config.clone()).await.unwrap();
let download_session = FileDownloadSession::new(env.config.clone(), None).await.unwrap();
(env, download_session, xfis)
}
@@ -54,7 +54,7 @@ mod tests {
let upload_session = FileUploadSession::new(env.config.clone()).await.unwrap();
let xfi = upload_bytes(&upload_session, name, data).await;
upload_session.finalize().await.unwrap();
let download_session = FileDownloadSession::new(env.config.clone()).await.unwrap();
let download_session = FileDownloadSession::new(env.config.clone(), None).await.unwrap();
(env, download_session, xfi)
}
@@ -70,7 +70,7 @@ mod tests {
let xfi_a = upload_bytes(&upload_session, name_a, data_a).await;
let xfi_b = upload_bytes(&upload_session, name_b, data_b).await;
upload_session.finalize().await.unwrap();
let download_session = FileDownloadSession::new(env.config.clone()).await.unwrap();
let download_session = FileDownloadSession::new(env.config.clone(), None).await.unwrap();
(env, download_session, xfi_a, xfi_b)
}

View File

@@ -152,7 +152,7 @@ pub async fn download_async(
Some(updaters) => updaters.into_iter().map(Some).collect(),
};
let session = FileDownloadSession::new(config).await?;
let session = FileDownloadSession::new(config, None).await?;
let mut tasks = Vec::with_capacity(file_infos.len());
let mut bridges: Vec<Option<ItemProgressCallbackUpdater>> = Vec::with_capacity(file_infos.len());

View File

@@ -173,7 +173,7 @@ impl XetDownloadStreamGroup {
) -> Result<Self, XetError> {
let group_id = UniqueID::new();
let config = create_translator_config(&session, token_info, token_refresh.as_ref())?;
let download_session = FileDownloadSession::new(Arc::new(config)).await?;
let download_session = FileDownloadSession::new(Arc::new(config), None).await?;
Ok(Self {
inner: Arc::new(XetDownloadStreamGroupInner {

View File

@@ -164,7 +164,7 @@ impl XetFileDownloadGroup {
) -> Result<Self, XetError> {
let group_id = UniqueID::new();
let config = create_translator_config(&session, token_info, token_refresh.as_ref())?;
let download_session = FileDownloadSession::new(Arc::new(config)).await?;
let download_session = FileDownloadSession::new(Arc::new(config), None).await?;
let inner = Arc::new(XetFileDownloadGroupInner {
group_id,