mirror of
https://github.com/huggingface/xet-core.git
synced 2026-06-04 13:30:29 +08:00
feat: add skip_sha256 option to SingleFileCleaner (#679)
## Summary - Add `ShaGenerator::Skip` variant that skips SHA-256 computation entirely - `ShaGenerator::finalize()` now returns `Option<Sha256>` (None when skipped) - `SingleFileCleaner::new()` and `FileUploadSession::start_clean()` accept a `skip_sha256` boolean - When skipped, no `FileMetadataExt` is included in the shard ## Context Bucket uploads don't need SHA-256 in the shard metadata — the `sha_index` GSI is only used for LFS pointer resolution, which doesn't apply to buckets. Skipping SHA-256 for bucket uploads removes the main CPU bottleneck in the upload pipeline on non-SHA-NI instances. ## Alternative: dummy SHA-256 Instead of skipping entirely, the client could send a zeroed/dummy `FileMetadataExt`. The server would still store it but queries would never match. This avoids the server-side schema change (xetcas PR) but pollutes the GSI with dummy entries. Companion PRs: - xetcas: huggingface-internal/xetcas#498 (make `FileIdItem.sha256` optional server-side)
This commit is contained in:
@@ -6,7 +6,7 @@ use std::sync::{Arc, OnceLock};
|
||||
use anyhow::Result;
|
||||
use clap::{Args, Parser, Subcommand};
|
||||
use data::configurations::*;
|
||||
use data::{FileUploadSession, XetFileInfo};
|
||||
use data::{FileUploadSession, Sha256Policy, XetFileInfo};
|
||||
use ulid::Ulid;
|
||||
use xet_runtime::XetRuntime;
|
||||
|
||||
@@ -92,7 +92,7 @@ async fn clean(mut reader: impl Read, mut writer: impl Write, size: u64) -> Resu
|
||||
FileUploadSession::new(TranslatorConfig::local_config(std::env::current_dir()?)?.into(), None).await?;
|
||||
|
||||
let mut size_read = 0;
|
||||
let mut handle = translator.start_clean(None, size, None, Ulid::new()).await;
|
||||
let mut handle = translator.start_clean(None, size, Sha256Policy::Compute, Ulid::new()).await;
|
||||
|
||||
loop {
|
||||
let bytes = reader.read(&mut read_buf)?;
|
||||
|
||||
@@ -21,6 +21,7 @@ use xorb_object::CompressionScheme;
|
||||
|
||||
use crate::configurations::*;
|
||||
use crate::errors::DataProcessingError;
|
||||
use crate::file_cleaner::Sha256Policy;
|
||||
use crate::file_download_session::FileDownloadSession;
|
||||
use crate::{FileUploadSession, XetFileInfo, errors};
|
||||
|
||||
@@ -264,8 +265,9 @@ pub async fn clean_bytes(
|
||||
) -> errors::Result<(XetFileInfo, DeduplicationMetrics)> {
|
||||
#[allow(clippy::unwrap_or_default)] // Ulid::default is Ulid::nil
|
||||
let tracking_id = tracking_id.unwrap_or_else(Ulid::new);
|
||||
|
||||
let mut handle = processor.start_clean(None, bytes.len() as u64, None, tracking_id).await;
|
||||
let mut handle = processor
|
||||
.start_clean(None, bytes.len() as u64, Sha256Policy::Compute, tracking_id)
|
||||
.await;
|
||||
handle.add_data(&bytes).await?;
|
||||
handle.finish().await
|
||||
}
|
||||
@@ -292,7 +294,7 @@ pub async fn clean_file(
|
||||
.start_clean(
|
||||
Some(filename.as_ref().to_string_lossy().into()),
|
||||
filesize,
|
||||
Sha256::from_hex(sha256.as_ref()).ok(),
|
||||
Sha256::from_hex(sha256.as_ref()).ok().into(),
|
||||
tracking_id,
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -15,7 +15,26 @@ use crate::XetFileInfo;
|
||||
use crate::deduplication_interface::UploadSessionDataManager;
|
||||
use crate::errors::Result;
|
||||
use crate::file_upload_session::FileUploadSession;
|
||||
use crate::sha256::ShaGenerator;
|
||||
use crate::sha256::Sha256Generator;
|
||||
|
||||
/// Controls how SHA-256 is handled during file cleaning.
|
||||
pub enum Sha256Policy {
|
||||
/// Compute SHA-256 from the file data.
|
||||
Compute,
|
||||
/// Use a pre-computed SHA-256 value.
|
||||
Provided(Sha256),
|
||||
/// Skip SHA-256 entirely; no metadata_ext is written to the shard.
|
||||
Skip,
|
||||
}
|
||||
|
||||
impl From<Option<Sha256>> for Sha256Policy {
|
||||
fn from(sha256: Option<Sha256>) -> Self {
|
||||
match sha256 {
|
||||
Some(hash) => Self::Provided(hash),
|
||||
None => Self::Compute,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A class that encapsulates the clean and data task around a single file.
|
||||
pub struct SingleFileCleaner {
|
||||
@@ -35,30 +54,39 @@ pub struct SingleFileCleaner {
|
||||
// on await so that we can background this part.
|
||||
dedup_manager_fut: Pin<Box<dyn Future<Output = Result<FileDeduper<UploadSessionDataManager>>> + Send + 'static>>,
|
||||
|
||||
// Generating the sha256 hash
|
||||
sha_generator: ShaGenerator,
|
||||
// SHA-256 generator, present only when computing from file data.
|
||||
sha_generator: Option<Sha256Generator>,
|
||||
|
||||
// Pre-computed or finalized SHA-256 value.
|
||||
provided_sha256: Option<Sha256>,
|
||||
|
||||
// Start time
|
||||
start_time: DateTime<Utc>,
|
||||
}
|
||||
|
||||
impl SingleFileCleaner {
|
||||
// If a sha256 value is given in the parameter, the cleaner avoids computing the sha256 again internally.
|
||||
pub(crate) fn new(
|
||||
file_name: Option<Arc<str>>,
|
||||
file_id: CompletionTrackerFileId,
|
||||
sha256: Option<Sha256>,
|
||||
sha256: Sha256Policy,
|
||||
session: Arc<FileUploadSession>,
|
||||
) -> Self {
|
||||
let deduper = FileDeduper::new(UploadSessionDataManager::new(session.clone()), file_id);
|
||||
|
||||
let (sha_generator, provided_sha256) = match sha256 {
|
||||
Sha256Policy::Compute => (Some(Sha256Generator::default()), None),
|
||||
Sha256Policy::Provided(hash) => (None, Some(hash)),
|
||||
Sha256Policy::Skip => (None, None),
|
||||
};
|
||||
|
||||
Self {
|
||||
file_name,
|
||||
file_id,
|
||||
dedup_manager_fut: Box::pin(async move { Ok(deduper) }),
|
||||
session,
|
||||
chunker: deduplication::Chunker::default(),
|
||||
sha_generator: sha256.map(ShaGenerator::ProvidedValue).unwrap_or_else(ShaGenerator::generate),
|
||||
sha_generator,
|
||||
provided_sha256,
|
||||
start_time: Utc::now(),
|
||||
}
|
||||
}
|
||||
@@ -123,7 +151,9 @@ impl SingleFileCleaner {
|
||||
};
|
||||
|
||||
// Update the sha256 hasher, which hands this off to be done in the background.
|
||||
self.sha_generator.update(data.clone()).await?;
|
||||
if let Some(ref mut generator) = self.sha_generator {
|
||||
generator.update(data.clone()).await?;
|
||||
}
|
||||
|
||||
// Get the chunk data and start processing it.
|
||||
let (chunks, chunker) = chunk_data_jh.await?;
|
||||
@@ -142,7 +172,7 @@ impl SingleFileCleaner {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Ensures all current background work is completed.
|
||||
/// Ensures all current background work is completed.
|
||||
pub async fn checkpoint(&mut self) -> Result<()> {
|
||||
// Flush the background process by sending it a dummy bit of data.
|
||||
self.deduper_process_chunks(Arc::new([])).await
|
||||
@@ -157,12 +187,16 @@ impl SingleFileCleaner {
|
||||
self.deduper_process_chunks(data).await?;
|
||||
}
|
||||
|
||||
// Finalize the sha256 hashing and create the metadata extension
|
||||
let sha256: Sha256 = self.sha_generator.finalize().await?;
|
||||
let metadata_ext = FileMetadataExt::new(sha256);
|
||||
// Resolve the SHA-256: computed, provided, or skipped.
|
||||
let sha256 = if let Some(generator) = self.sha_generator.take() {
|
||||
Some(generator.finalize().await?)
|
||||
} else {
|
||||
self.provided_sha256
|
||||
};
|
||||
let metadata_ext = sha256.map(FileMetadataExt::new);
|
||||
|
||||
let (file_hash, remaining_file_data, deduplication_metrics) =
|
||||
self.dedup_manager_fut.await?.finalize(Some(metadata_ext));
|
||||
self.dedup_manager_fut.await?.finalize(metadata_ext);
|
||||
|
||||
let file_info = XetFileInfo::new(file_hash.hex(), deduplication_metrics.total_bytes);
|
||||
|
||||
|
||||
@@ -230,6 +230,7 @@ mod tests {
|
||||
|
||||
use super::*;
|
||||
use crate::configurations::TranslatorConfig;
|
||||
use crate::file_cleaner::Sha256Policy;
|
||||
use crate::{FileUploadSession, XetFileInfo};
|
||||
|
||||
fn get_threadpool() -> Arc<XetRuntime> {
|
||||
@@ -245,7 +246,7 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
let mut cleaner = upload_session
|
||||
.start_clean(Some("test".into()), data.len() as u64, None, Ulid::new())
|
||||
.start_clean(Some("test".into()), data.len() as u64, Sha256Policy::Compute, Ulid::new())
|
||||
.await;
|
||||
cleaner.add_data(data).await.unwrap();
|
||||
let (xfi, _metrics) = cleaner.finish().await.unwrap();
|
||||
|
||||
@@ -26,7 +26,7 @@ use xorb_object::SerializedXorbObject;
|
||||
|
||||
use crate::configurations::*;
|
||||
use crate::errors::*;
|
||||
use crate::file_cleaner::SingleFileCleaner;
|
||||
use crate::file_cleaner::{Sha256Policy, SingleFileCleaner};
|
||||
use crate::remote_client_interface::create_remote_client;
|
||||
use crate::shard_interface::SessionShardInterface;
|
||||
use crate::{XetFileInfo, prometheus_metrics};
|
||||
@@ -185,7 +185,7 @@ impl FileUploadSession {
|
||||
let mut reader = File::open(&file_path)?;
|
||||
|
||||
// Start the clean process for each file.
|
||||
let mut cleaner = SingleFileCleaner::new(Some(file_name), file_id, sha256, session);
|
||||
let mut cleaner = SingleFileCleaner::new(Some(file_name), file_id, sha256.into(), session);
|
||||
let mut bytes_read = 0;
|
||||
|
||||
while bytes_read < file_size {
|
||||
@@ -251,13 +251,14 @@ impl FileUploadSession {
|
||||
/// The caller is responsible for memory usage management, the parameter "buffer_size"
|
||||
/// indicates the maximum number of Vec<u8> in the internal buffer.
|
||||
///
|
||||
/// If a sha256 is provided, the value will be directly used in shard upload to
|
||||
/// avoid redundant computation.
|
||||
/// If a sha256 is provided via [`Sha256Policy::Provided`], the value will be directly
|
||||
/// used in shard upload to avoid redundant computation. [`Sha256Policy::Skip`] skips
|
||||
/// SHA-256 computation entirely and no metadata_ext is included in the shard.
|
||||
pub async fn start_clean(
|
||||
self: &Arc<Self>,
|
||||
tracking_name: Option<Arc<str>>,
|
||||
size: u64,
|
||||
sha256: Option<Sha256>,
|
||||
sha256: Sha256Policy,
|
||||
tracking_id: Ulid,
|
||||
) -> SingleFileCleaner {
|
||||
// Get a new file id for the completion tracking
|
||||
@@ -576,7 +577,7 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
let mut cleaner = upload_session
|
||||
.start_clean(Some("test".into()), read_data.len() as u64, None, Ulid::new())
|
||||
.start_clean(Some("test".into()), read_data.len() as u64, Sha256Policy::Compute, Ulid::new())
|
||||
.await;
|
||||
|
||||
// Read blocks from the source file and hand them to the cleaning handle
|
||||
@@ -644,4 +645,35 @@ mod tests {
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_clean_skip_sha256_no_metadata_ext() {
|
||||
let temp = tempdir().unwrap();
|
||||
let data = b"Hello, skip sha256!";
|
||||
|
||||
let runtime = get_threadpool();
|
||||
|
||||
runtime
|
||||
.clone()
|
||||
.external_run_async_task(async move {
|
||||
let cas_path = temp.path().join("cas");
|
||||
|
||||
let upload_session =
|
||||
FileUploadSession::new(TranslatorConfig::local_config(&cas_path).unwrap().into(), None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut cleaner = upload_session
|
||||
.start_clean(Some("test".into()), data.len() as u64, Sha256Policy::Skip, Ulid::new())
|
||||
.await;
|
||||
cleaner.add_data(data).await.unwrap();
|
||||
cleaner.finish().await.unwrap();
|
||||
|
||||
// Verify that the shard has no metadata_ext (no SHA-256).
|
||||
let (_metrics, file_infos) = upload_session.finalize_with_file_info().await.unwrap();
|
||||
assert_eq!(file_infos.len(), 1);
|
||||
assert!(file_infos[0].metadata_ext.is_none(), "Skip should produce no metadata_ext");
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,7 +14,7 @@ mod xet_file;
|
||||
|
||||
// Reexport this one for now
|
||||
pub use deduplication::RawXorbData;
|
||||
pub use file_cleaner::SingleFileCleaner;
|
||||
pub use file_cleaner::{Sha256Policy, SingleFileCleaner};
|
||||
pub use file_download_session::FileDownloadSession;
|
||||
pub use file_reconstruction::DownloadStream;
|
||||
pub use file_upload_session::FileUploadSession;
|
||||
|
||||
@@ -3,34 +3,9 @@ use sha2::{Digest, Sha256 as sha2Sha256};
|
||||
use tokio::task::{JoinError, JoinHandle};
|
||||
use xet_runtime::XetRuntime;
|
||||
|
||||
pub enum ShaGenerator {
|
||||
Generate(Sha256Generator),
|
||||
ProvidedValue(Sha256),
|
||||
}
|
||||
|
||||
impl ShaGenerator {
|
||||
pub async fn update(&mut self, new_data: impl AsRef<[u8]> + Send + Sync + 'static) -> Result<(), JoinError> {
|
||||
match self {
|
||||
Self::Generate(generator) => generator.update(new_data).await,
|
||||
Self::ProvidedValue(_) => Ok(()),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn finalize(self) -> Result<Sha256, JoinError> {
|
||||
match self {
|
||||
Self::Generate(generator) => generator.finalize().await,
|
||||
Self::ProvidedValue(hash) => Ok(hash),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn generate() -> Self {
|
||||
Self::Generate(Sha256Generator::default())
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper struct to generate a sha256 hash.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct Sha256Generator {
|
||||
pub(crate) struct Sha256Generator {
|
||||
hasher: Option<JoinHandle<Result<sha2Sha256, JoinError>>>,
|
||||
}
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ use std::time::Duration;
|
||||
use cas_client::LocalTestServerBuilder;
|
||||
// Run tests that determine deduplication, especially across different test subjects.
|
||||
use data::FileUploadSession;
|
||||
use data::Sha256Policy;
|
||||
use data::configurations::TranslatorConfig;
|
||||
use deduplication::constants::{MAX_XORB_BYTES, MAX_XORB_CHUNKS, TARGET_CHUNK_SIZE};
|
||||
use tempfile::TempDir;
|
||||
@@ -68,7 +69,7 @@ mod tests {
|
||||
|
||||
// Feed it half the data, and checkpoint.
|
||||
let mut cleaner = file_upload_session
|
||||
.start_clean(Some("data".into()), data.len() as u64, None, Ulid::new())
|
||||
.start_clean(Some("data".into()), data.len() as u64, Sha256Policy::Compute, Ulid::new())
|
||||
.await;
|
||||
cleaner.add_data(&data[..half_n]).await.unwrap();
|
||||
cleaner.checkpoint().await.unwrap();
|
||||
@@ -86,7 +87,7 @@ mod tests {
|
||||
|
||||
// Feed it half the data, and checkpoint.
|
||||
let mut cleaner = file_upload_session
|
||||
.start_clean(Some("data".into()), data.len() as u64, None, Ulid::new())
|
||||
.start_clean(Some("data".into()), data.len() as u64, Sha256Policy::Compute, Ulid::new())
|
||||
.await;
|
||||
|
||||
// Add all the data. Roughly the first half should dedup.
|
||||
@@ -141,7 +142,7 @@ mod tests {
|
||||
|
||||
// Feed it half the data, and checkpoint.
|
||||
let mut cleaner = file_upload_session
|
||||
.start_clean(Some("data".into()), data.len() as u64, None, Ulid::new())
|
||||
.start_clean(Some("data".into()), data.len() as u64, Sha256Policy::Compute, Ulid::new())
|
||||
.await;
|
||||
cleaner.add_data(&data[..rn]).await.unwrap();
|
||||
cleaner.checkpoint().await.unwrap();
|
||||
@@ -173,7 +174,7 @@ mod tests {
|
||||
|
||||
// Feed it half the data, and checkpoint.
|
||||
let mut cleaner = file_upload_session
|
||||
.start_clean(Some("data".into()), data.len() as u64, None, Ulid::new())
|
||||
.start_clean(Some("data".into()), data.len() as u64, Sha256Policy::Compute, Ulid::new())
|
||||
.await;
|
||||
|
||||
// Add all the data. Roughly the first half should dedup.
|
||||
|
||||
@@ -47,6 +47,9 @@ pub struct SingleFileCleaner {
|
||||
dedup_manager: FileDeduper<UploadSessionDataManager>,
|
||||
}
|
||||
|
||||
// NOTE: Unlike the native `data` crate, this WASM implementation always computes SHA-256
|
||||
// and does not support Sha256Policy::Skip. If skip support is needed for WASM uploads,
|
||||
// this should be updated to use a shared Sha256Policy enum (see data::file_upload_session::Sha256Policy).
|
||||
impl SingleFileCleaner {
|
||||
pub fn new(
|
||||
session: Arc<FileUploadSession>,
|
||||
|
||||
@@ -5,7 +5,7 @@ use std::path::PathBuf;
|
||||
use std::sync::{Arc, Mutex, MutexGuard, RwLock};
|
||||
|
||||
use data::data_client::{clean_bytes, clean_file};
|
||||
use data::{FileUploadSession, SingleFileCleaner, XetFileInfo};
|
||||
use data::{FileUploadSession, Sha256Policy, SingleFileCleaner, XetFileInfo};
|
||||
use tokio::task::JoinHandle;
|
||||
use ulid::Ulid;
|
||||
use xet_runtime::XetRuntime;
|
||||
@@ -336,7 +336,9 @@ impl UploadCommitInner {
|
||||
|
||||
let tracking_name: Option<Arc<str>> = tracking_name.as_deref().map(Arc::from);
|
||||
let cleaner = self.runtime().external_run_async_task(async move {
|
||||
upload_session.start_clean(tracking_name, file_size, None, tracking_id).await
|
||||
upload_session
|
||||
.start_clean(tracking_name, file_size, Sha256Policy::Compute, tracking_id)
|
||||
.await
|
||||
})?;
|
||||
|
||||
Ok((task_handle, cleaner))
|
||||
|
||||
Reference in New Issue
Block a user