diff --git a/data/src/bin/example.rs b/data/src/bin/example.rs index 08278fe3..9adcc730 100644 --- a/data/src/bin/example.rs +++ b/data/src/bin/example.rs @@ -6,7 +6,7 @@ use std::sync::{Arc, OnceLock}; use anyhow::Result; use clap::{Args, Parser, Subcommand}; use data::configurations::*; -use data::{FileUploadSession, XetFileInfo}; +use data::{FileUploadSession, Sha256Policy, XetFileInfo}; use ulid::Ulid; use xet_runtime::XetRuntime; @@ -92,7 +92,7 @@ async fn clean(mut reader: impl Read, mut writer: impl Write, size: u64) -> Resu FileUploadSession::new(TranslatorConfig::local_config(std::env::current_dir()?)?.into(), None).await?; let mut size_read = 0; - let mut handle = translator.start_clean(None, size, None, Ulid::new()).await; + let mut handle = translator.start_clean(None, size, Sha256Policy::Compute, Ulid::new()).await; loop { let bytes = reader.read(&mut read_buf)?; diff --git a/data/src/data_client.rs b/data/src/data_client.rs index cc775a3e..66bd2b52 100644 --- a/data/src/data_client.rs +++ b/data/src/data_client.rs @@ -21,6 +21,7 @@ use xorb_object::CompressionScheme; use crate::configurations::*; use crate::errors::DataProcessingError; +use crate::file_cleaner::Sha256Policy; use crate::file_download_session::FileDownloadSession; use crate::{FileUploadSession, XetFileInfo, errors}; @@ -264,8 +265,9 @@ pub async fn clean_bytes( ) -> errors::Result<(XetFileInfo, DeduplicationMetrics)> { #[allow(clippy::unwrap_or_default)] // Ulid::default is Ulid::nil let tracking_id = tracking_id.unwrap_or_else(Ulid::new); - - let mut handle = processor.start_clean(None, bytes.len() as u64, None, tracking_id).await; + let mut handle = processor + .start_clean(None, bytes.len() as u64, Sha256Policy::Compute, tracking_id) + .await; handle.add_data(&bytes).await?; handle.finish().await } @@ -292,7 +294,7 @@ pub async fn clean_file( .start_clean( Some(filename.as_ref().to_string_lossy().into()), filesize, - Sha256::from_hex(sha256.as_ref()).ok(), + Sha256::from_hex(sha256.as_ref()).ok().into(), tracking_id, ) .await; diff --git a/data/src/file_cleaner.rs b/data/src/file_cleaner.rs index bfdd7cf3..0b7e0184 100644 --- a/data/src/file_cleaner.rs +++ b/data/src/file_cleaner.rs @@ -15,7 +15,26 @@ use crate::XetFileInfo; use crate::deduplication_interface::UploadSessionDataManager; use crate::errors::Result; use crate::file_upload_session::FileUploadSession; -use crate::sha256::ShaGenerator; +use crate::sha256::Sha256Generator; + +/// Controls how SHA-256 is handled during file cleaning. +pub enum Sha256Policy { + /// Compute SHA-256 from the file data. + Compute, + /// Use a pre-computed SHA-256 value. + Provided(Sha256), + /// Skip SHA-256 entirely; no metadata_ext is written to the shard. + Skip, +} + +impl From> for Sha256Policy { + fn from(sha256: Option) -> Self { + match sha256 { + Some(hash) => Self::Provided(hash), + None => Self::Compute, + } + } +} /// A class that encapsulates the clean and data task around a single file. pub struct SingleFileCleaner { @@ -35,30 +54,39 @@ pub struct SingleFileCleaner { // on await so that we can background this part. dedup_manager_fut: Pin>> + Send + 'static>>, - // Generating the sha256 hash - sha_generator: ShaGenerator, + // SHA-256 generator, present only when computing from file data. + sha_generator: Option, + + // Pre-computed or finalized SHA-256 value. + provided_sha256: Option, // Start time start_time: DateTime, } impl SingleFileCleaner { - // If a sha256 value is given in the parameter, the cleaner avoids computing the sha256 again internally. pub(crate) fn new( file_name: Option>, file_id: CompletionTrackerFileId, - sha256: Option, + sha256: Sha256Policy, session: Arc, ) -> Self { let deduper = FileDeduper::new(UploadSessionDataManager::new(session.clone()), file_id); + let (sha_generator, provided_sha256) = match sha256 { + Sha256Policy::Compute => (Some(Sha256Generator::default()), None), + Sha256Policy::Provided(hash) => (None, Some(hash)), + Sha256Policy::Skip => (None, None), + }; + Self { file_name, file_id, dedup_manager_fut: Box::pin(async move { Ok(deduper) }), session, chunker: deduplication::Chunker::default(), - sha_generator: sha256.map(ShaGenerator::ProvidedValue).unwrap_or_else(ShaGenerator::generate), + sha_generator, + provided_sha256, start_time: Utc::now(), } } @@ -123,7 +151,9 @@ impl SingleFileCleaner { }; // Update the sha256 hasher, which hands this off to be done in the background. - self.sha_generator.update(data.clone()).await?; + if let Some(ref mut generator) = self.sha_generator { + generator.update(data.clone()).await?; + } // Get the chunk data and start processing it. let (chunks, chunker) = chunk_data_jh.await?; @@ -142,7 +172,7 @@ impl SingleFileCleaner { Ok(()) } - /// Ensures all current background work is completed. + /// Ensures all current background work is completed. pub async fn checkpoint(&mut self) -> Result<()> { // Flush the background process by sending it a dummy bit of data. self.deduper_process_chunks(Arc::new([])).await @@ -157,12 +187,16 @@ impl SingleFileCleaner { self.deduper_process_chunks(data).await?; } - // Finalize the sha256 hashing and create the metadata extension - let sha256: Sha256 = self.sha_generator.finalize().await?; - let metadata_ext = FileMetadataExt::new(sha256); + // Resolve the SHA-256: computed, provided, or skipped. + let sha256 = if let Some(generator) = self.sha_generator.take() { + Some(generator.finalize().await?) + } else { + self.provided_sha256 + }; + let metadata_ext = sha256.map(FileMetadataExt::new); let (file_hash, remaining_file_data, deduplication_metrics) = - self.dedup_manager_fut.await?.finalize(Some(metadata_ext)); + self.dedup_manager_fut.await?.finalize(metadata_ext); let file_info = XetFileInfo::new(file_hash.hex(), deduplication_metrics.total_bytes); diff --git a/data/src/file_download_session.rs b/data/src/file_download_session.rs index e1bad093..c5c9a2a4 100644 --- a/data/src/file_download_session.rs +++ b/data/src/file_download_session.rs @@ -230,6 +230,7 @@ mod tests { use super::*; use crate::configurations::TranslatorConfig; + use crate::file_cleaner::Sha256Policy; use crate::{FileUploadSession, XetFileInfo}; fn get_threadpool() -> Arc { @@ -245,7 +246,7 @@ mod tests { .unwrap(); let mut cleaner = upload_session - .start_clean(Some("test".into()), data.len() as u64, None, Ulid::new()) + .start_clean(Some("test".into()), data.len() as u64, Sha256Policy::Compute, Ulid::new()) .await; cleaner.add_data(data).await.unwrap(); let (xfi, _metrics) = cleaner.finish().await.unwrap(); diff --git a/data/src/file_upload_session.rs b/data/src/file_upload_session.rs index 0e9f88e5..ac12bc76 100644 --- a/data/src/file_upload_session.rs +++ b/data/src/file_upload_session.rs @@ -26,7 +26,7 @@ use xorb_object::SerializedXorbObject; use crate::configurations::*; use crate::errors::*; -use crate::file_cleaner::SingleFileCleaner; +use crate::file_cleaner::{Sha256Policy, SingleFileCleaner}; use crate::remote_client_interface::create_remote_client; use crate::shard_interface::SessionShardInterface; use crate::{XetFileInfo, prometheus_metrics}; @@ -185,7 +185,7 @@ impl FileUploadSession { let mut reader = File::open(&file_path)?; // Start the clean process for each file. - let mut cleaner = SingleFileCleaner::new(Some(file_name), file_id, sha256, session); + let mut cleaner = SingleFileCleaner::new(Some(file_name), file_id, sha256.into(), session); let mut bytes_read = 0; while bytes_read < file_size { @@ -251,13 +251,14 @@ impl FileUploadSession { /// The caller is responsible for memory usage management, the parameter "buffer_size" /// indicates the maximum number of Vec in the internal buffer. /// - /// If a sha256 is provided, the value will be directly used in shard upload to - /// avoid redundant computation. + /// If a sha256 is provided via [`Sha256Policy::Provided`], the value will be directly + /// used in shard upload to avoid redundant computation. [`Sha256Policy::Skip`] skips + /// SHA-256 computation entirely and no metadata_ext is included in the shard. pub async fn start_clean( self: &Arc, tracking_name: Option>, size: u64, - sha256: Option, + sha256: Sha256Policy, tracking_id: Ulid, ) -> SingleFileCleaner { // Get a new file id for the completion tracking @@ -576,7 +577,7 @@ mod tests { .unwrap(); let mut cleaner = upload_session - .start_clean(Some("test".into()), read_data.len() as u64, None, Ulid::new()) + .start_clean(Some("test".into()), read_data.len() as u64, Sha256Policy::Compute, Ulid::new()) .await; // Read blocks from the source file and hand them to the cleaning handle @@ -644,4 +645,35 @@ mod tests { }) .unwrap(); } + + #[test] + fn test_clean_skip_sha256_no_metadata_ext() { + let temp = tempdir().unwrap(); + let data = b"Hello, skip sha256!"; + + let runtime = get_threadpool(); + + runtime + .clone() + .external_run_async_task(async move { + let cas_path = temp.path().join("cas"); + + let upload_session = + FileUploadSession::new(TranslatorConfig::local_config(&cas_path).unwrap().into(), None) + .await + .unwrap(); + + let mut cleaner = upload_session + .start_clean(Some("test".into()), data.len() as u64, Sha256Policy::Skip, Ulid::new()) + .await; + cleaner.add_data(data).await.unwrap(); + cleaner.finish().await.unwrap(); + + // Verify that the shard has no metadata_ext (no SHA-256). + let (_metrics, file_infos) = upload_session.finalize_with_file_info().await.unwrap(); + assert_eq!(file_infos.len(), 1); + assert!(file_infos[0].metadata_ext.is_none(), "Skip should produce no metadata_ext"); + }) + .unwrap(); + } } diff --git a/data/src/lib.rs b/data/src/lib.rs index 83aba049..253597aa 100644 --- a/data/src/lib.rs +++ b/data/src/lib.rs @@ -14,7 +14,7 @@ mod xet_file; // Reexport this one for now pub use deduplication::RawXorbData; -pub use file_cleaner::SingleFileCleaner; +pub use file_cleaner::{Sha256Policy, SingleFileCleaner}; pub use file_download_session::FileDownloadSession; pub use file_reconstruction::DownloadStream; pub use file_upload_session::FileUploadSession; diff --git a/data/src/sha256.rs b/data/src/sha256.rs index 3b744e8b..ecb9d74c 100644 --- a/data/src/sha256.rs +++ b/data/src/sha256.rs @@ -3,34 +3,9 @@ use sha2::{Digest, Sha256 as sha2Sha256}; use tokio::task::{JoinError, JoinHandle}; use xet_runtime::XetRuntime; -pub enum ShaGenerator { - Generate(Sha256Generator), - ProvidedValue(Sha256), -} - -impl ShaGenerator { - pub async fn update(&mut self, new_data: impl AsRef<[u8]> + Send + Sync + 'static) -> Result<(), JoinError> { - match self { - Self::Generate(generator) => generator.update(new_data).await, - Self::ProvidedValue(_) => Ok(()), - } - } - - pub async fn finalize(self) -> Result { - match self { - Self::Generate(generator) => generator.finalize().await, - Self::ProvidedValue(hash) => Ok(hash), - } - } - - pub fn generate() -> Self { - Self::Generate(Sha256Generator::default()) - } -} - /// Helper struct to generate a sha256 hash. #[derive(Debug, Default)] -pub struct Sha256Generator { +pub(crate) struct Sha256Generator { hasher: Option>>, } diff --git a/data/tests/test_session_resume.rs b/data/tests/test_session_resume.rs index 713fc01a..f4e36786 100644 --- a/data/tests/test_session_resume.rs +++ b/data/tests/test_session_resume.rs @@ -3,6 +3,7 @@ use std::time::Duration; use cas_client::LocalTestServerBuilder; // Run tests that determine deduplication, especially across different test subjects. use data::FileUploadSession; +use data::Sha256Policy; use data::configurations::TranslatorConfig; use deduplication::constants::{MAX_XORB_BYTES, MAX_XORB_CHUNKS, TARGET_CHUNK_SIZE}; use tempfile::TempDir; @@ -68,7 +69,7 @@ mod tests { // Feed it half the data, and checkpoint. let mut cleaner = file_upload_session - .start_clean(Some("data".into()), data.len() as u64, None, Ulid::new()) + .start_clean(Some("data".into()), data.len() as u64, Sha256Policy::Compute, Ulid::new()) .await; cleaner.add_data(&data[..half_n]).await.unwrap(); cleaner.checkpoint().await.unwrap(); @@ -86,7 +87,7 @@ mod tests { // Feed it half the data, and checkpoint. let mut cleaner = file_upload_session - .start_clean(Some("data".into()), data.len() as u64, None, Ulid::new()) + .start_clean(Some("data".into()), data.len() as u64, Sha256Policy::Compute, Ulid::new()) .await; // Add all the data. Roughly the first half should dedup. @@ -141,7 +142,7 @@ mod tests { // Feed it half the data, and checkpoint. let mut cleaner = file_upload_session - .start_clean(Some("data".into()), data.len() as u64, None, Ulid::new()) + .start_clean(Some("data".into()), data.len() as u64, Sha256Policy::Compute, Ulid::new()) .await; cleaner.add_data(&data[..rn]).await.unwrap(); cleaner.checkpoint().await.unwrap(); @@ -173,7 +174,7 @@ mod tests { // Feed it half the data, and checkpoint. let mut cleaner = file_upload_session - .start_clean(Some("data".into()), data.len() as u64, None, Ulid::new()) + .start_clean(Some("data".into()), data.len() as u64, Sha256Policy::Compute, Ulid::new()) .await; // Add all the data. Roughly the first half should dedup. diff --git a/hf_xet_wasm/src/wasm_file_cleaner.rs b/hf_xet_wasm/src/wasm_file_cleaner.rs index 746d77ed..b6ea9237 100644 --- a/hf_xet_wasm/src/wasm_file_cleaner.rs +++ b/hf_xet_wasm/src/wasm_file_cleaner.rs @@ -47,6 +47,9 @@ pub struct SingleFileCleaner { dedup_manager: FileDeduper, } +// NOTE: Unlike the native `data` crate, this WASM implementation always computes SHA-256 +// and does not support Sha256Policy::Skip. If skip support is needed for WASM uploads, +// this should be updated to use a shared Sha256Policy enum (see data::file_upload_session::Sha256Policy). impl SingleFileCleaner { pub fn new( session: Arc, diff --git a/xet_session/src/upload_commit.rs b/xet_session/src/upload_commit.rs index f370919f..7970e10b 100644 --- a/xet_session/src/upload_commit.rs +++ b/xet_session/src/upload_commit.rs @@ -5,7 +5,7 @@ use std::path::PathBuf; use std::sync::{Arc, Mutex, MutexGuard, RwLock}; use data::data_client::{clean_bytes, clean_file}; -use data::{FileUploadSession, SingleFileCleaner, XetFileInfo}; +use data::{FileUploadSession, Sha256Policy, SingleFileCleaner, XetFileInfo}; use tokio::task::JoinHandle; use ulid::Ulid; use xet_runtime::XetRuntime; @@ -336,7 +336,9 @@ impl UploadCommitInner { let tracking_name: Option> = tracking_name.as_deref().map(Arc::from); let cleaner = self.runtime().external_run_async_task(async move { - upload_session.start_clean(tracking_name, file_size, None, tracking_id).await + upload_session + .start_clean(tracking_name, file_size, Sha256Policy::Compute, tracking_id) + .await })?; Ok((task_handle, cleaner))