mirror of
https://github.com/huggingface/xet-core.git
synced 2026-06-04 13:30:29 +08:00
Merge remote-tracking branch 'origin/feat/file-chunk-hashes-and-compose' into feat/file-chunk-hashes-and-compose
# Conflicts: # api_changes/update_260424_next_stable_chunk_boundary.md # xet_data/src/deduplication/chunking.rs
This commit is contained in:
@@ -1,8 +1,9 @@
|
||||
This update adds a new public deduplication helper for computing restart-safe chunk boundaries from existing chunk boundary metadata.
|
||||
|
||||
What changed
|
||||
- Added `xet_data::deduplication::next_stable_chunk_boundary(starting_position, chunk_boundaries) -> Option<usize>`.
|
||||
- Re-exported it from `xet_data::deduplication` so downstream crates can use it directly.
|
||||
- Added `next_stable_chunk_boundary(starting_position, chunk_boundaries) -> Option<usize>`.
|
||||
- Canonical implementation lives in `xet_core_structures::xorb_object::constants` (alongside the chunk-size constants it uses).
|
||||
- Re-exported from `xet_data::deduplication` for convenience.
|
||||
- The function scans forward from `starting_position` and returns the next chunk boundary that satisfies the stable-boundary condition:
|
||||
- two consecutive chunk sizes in `[2 * min_chunk, max_chunk - min_chunk)`,
|
||||
- where `min_chunk` and `max_chunk` are derived from chunking constants.
|
||||
@@ -10,6 +11,7 @@ What changed
|
||||
Why this matters
|
||||
- Callers that already have chunk-boundary metadata can locate a stable resume boundary without re-reading file bytes.
|
||||
- This enables deterministic alignment behavior for resumed/partial workflows that need chunk boundaries robust to prefix changes.
|
||||
- The server-side `build_file_chunk_hashes_response` now extends dirty ranges to stable chunk boundaries before building windows, so that the client's rechunking around each dirty window is guaranteed to converge to the same chunk boundaries as the original file.
|
||||
|
||||
Usage notes
|
||||
- `chunk_boundaries` should be monotonically increasing chunk-end offsets produced by the same chunking configuration.
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
|
||||
use xet_core_structures::merklehash::{MerkleHash, MerkleHashSubtree};
|
||||
use xet_core_structures::metadata_shard::file_structs::MDBFileInfo;
|
||||
use xet_core_structures::xorb_object::constants::next_stable_chunk_boundary;
|
||||
|
||||
use crate::cas_types::{ChunkWindow, FileChunkHashesResponse, FileRange};
|
||||
use crate::error::{ClientError, Result};
|
||||
@@ -20,6 +21,44 @@ pub struct ChunkWindowBuilder<'a> {
|
||||
hash_ranges: Vec<Option<MerkleHashSubtree>>,
|
||||
}
|
||||
|
||||
fn next_stable_end_for_range(range_end: u64, file_size: u64, chunk_boundaries: &[usize]) -> u64 {
|
||||
if range_end >= file_size {
|
||||
return file_size;
|
||||
}
|
||||
let Ok(starting_position) = usize::try_from(range_end) else {
|
||||
return file_size;
|
||||
};
|
||||
next_stable_chunk_boundary(starting_position, chunk_boundaries)
|
||||
.and_then(|boundary| u64::try_from(boundary).ok())
|
||||
.map(|boundary| boundary.min(file_size))
|
||||
.unwrap_or(file_size)
|
||||
}
|
||||
|
||||
fn extend_dirty_ranges_to_stable_windows(
|
||||
mut dirty_ranges: Vec<FileRange>,
|
||||
file_size: u64,
|
||||
chunk_boundaries: &[usize],
|
||||
) -> Vec<FileRange> {
|
||||
for range in &mut dirty_ranges {
|
||||
if range.end < file_size {
|
||||
range.end = next_stable_end_for_range(range.end, file_size, chunk_boundaries);
|
||||
}
|
||||
}
|
||||
|
||||
dirty_ranges.sort_by_key(|range| range.start);
|
||||
let mut coalesced: Vec<FileRange> = Vec::with_capacity(dirty_ranges.len());
|
||||
for range in dirty_ranges {
|
||||
if let Some(last) = coalesced.last_mut()
|
||||
&& range.start <= last.end
|
||||
{
|
||||
last.end = last.end.max(range.end);
|
||||
continue;
|
||||
}
|
||||
coalesced.push(range);
|
||||
}
|
||||
coalesced
|
||||
}
|
||||
|
||||
impl<'a> ChunkWindowBuilder<'a> {
|
||||
pub fn new(dirty_ranges: &'a [FileRange]) -> Self {
|
||||
debug_assert!(
|
||||
@@ -153,13 +192,33 @@ pub fn build_file_chunk_hashes_response(
|
||||
return Err(ClientError::Other("no valid dirty ranges".into()));
|
||||
}
|
||||
|
||||
let chunks: Vec<(MerkleHash, u64)> = chunks.into_iter().collect();
|
||||
let mut chunk_boundaries: Vec<usize> = Vec::with_capacity(chunks.len());
|
||||
let mut stable_ranges_supported = true;
|
||||
{
|
||||
let mut boundary_cursor: u64 = 0;
|
||||
for (_, size) in &chunks {
|
||||
boundary_cursor += *size;
|
||||
let Ok(boundary) = usize::try_from(boundary_cursor) else {
|
||||
stable_ranges_supported = false;
|
||||
break;
|
||||
};
|
||||
chunk_boundaries.push(boundary);
|
||||
}
|
||||
}
|
||||
|
||||
let dirty_ranges = if stable_ranges_supported {
|
||||
extend_dirty_ranges_to_stable_windows(dirty_ranges, file_size, &chunk_boundaries)
|
||||
} else {
|
||||
dirty_ranges
|
||||
};
|
||||
|
||||
let total_chunks = chunks.len() as u64;
|
||||
let mut builder = ChunkWindowBuilder::new(&dirty_ranges);
|
||||
let mut cumulative_bytes: u64 = 0;
|
||||
let mut total_chunks: u64 = 0;
|
||||
for (hash, size) in chunks {
|
||||
cumulative_bytes += size;
|
||||
builder.process_chunk(hash, size, cumulative_bytes);
|
||||
total_chunks += 1;
|
||||
}
|
||||
|
||||
let (windows, hash_ranges) = builder.finish();
|
||||
@@ -215,3 +274,120 @@ pub fn build_file_chunk_hashes_response(
|
||||
gap_verification,
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use xet_core_structures::merklehash::MerkleHash;
|
||||
use xet_core_structures::metadata_shard::file_structs::{FileDataSequenceEntry, MDBFileInfo};
|
||||
use xet_core_structures::xorb_object::constants::{
|
||||
MAXIMUM_CHUNK_MULTIPLIER, MINIMUM_CHUNK_DIVISOR, TARGET_CHUNK_SIZE,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
|
||||
fn stable_chunk_size() -> u64 {
|
||||
let minimum_chunk = *TARGET_CHUNK_SIZE / *MINIMUM_CHUNK_DIVISOR;
|
||||
let maximum_chunk = *TARGET_CHUNK_SIZE * *MAXIMUM_CHUNK_MULTIPLIER;
|
||||
let size = 2 * minimum_chunk;
|
||||
assert!(size < maximum_chunk - minimum_chunk);
|
||||
size as u64
|
||||
}
|
||||
|
||||
fn build_test_file_info_and_chunks(n_chunks: usize, chunk_size: u64) -> (MDBFileInfo, Vec<(MerkleHash, u64)>) {
|
||||
let chunks: Vec<(MerkleHash, u64)> = (0..n_chunks)
|
||||
.map(|i| (MerkleHash::random_from_seed(i as u64 + 1), chunk_size))
|
||||
.collect();
|
||||
let file_size = chunk_size * n_chunks as u64;
|
||||
let file_info = MDBFileInfo {
|
||||
segments: vec![FileDataSequenceEntry {
|
||||
xorb_hash: MerkleHash::random_from_seed(999),
|
||||
xorb_flags: 0,
|
||||
unpacked_segment_bytes: file_size as u32,
|
||||
chunk_index_start: 0,
|
||||
chunk_index_end: n_chunks as u32,
|
||||
}],
|
||||
..Default::default()
|
||||
};
|
||||
(file_info, chunks)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_server_extends_dirty_window_to_next_stable_boundary() {
|
||||
let chunk_size = stable_chunk_size();
|
||||
let (file_info, chunks) = build_test_file_info_and_chunks(6, chunk_size);
|
||||
let dirty_ranges = vec![FileRange::new(1, chunk_size + 1)];
|
||||
|
||||
let response = build_file_chunk_hashes_response(&file_info, dirty_ranges, chunks).unwrap();
|
||||
|
||||
assert_eq!(response.windows.len(), 1);
|
||||
assert_eq!(response.windows[0].dirty_byte_range, [0, 4 * chunk_size]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_server_coalesces_ranges_after_stable_extension() {
|
||||
let chunk_size = stable_chunk_size();
|
||||
let (file_info, chunks) = build_test_file_info_and_chunks(8, chunk_size);
|
||||
let dirty_ranges = vec![
|
||||
FileRange::new(1, chunk_size + 1),
|
||||
FileRange::new(3 * chunk_size + 7, 3 * chunk_size + 42),
|
||||
];
|
||||
|
||||
let response = build_file_chunk_hashes_response(&file_info, dirty_ranges, chunks).unwrap();
|
||||
|
||||
assert_eq!(response.windows.len(), 1);
|
||||
assert_eq!(response.windows[0].dirty_byte_range, [0, 6 * chunk_size]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_server_no_extension_when_range_already_at_file_end() {
|
||||
let chunk_size = stable_chunk_size();
|
||||
let (file_info, chunks) = build_test_file_info_and_chunks(6, chunk_size);
|
||||
let file_size = chunk_size * 6;
|
||||
let dirty_ranges = vec![FileRange::new(4 * chunk_size, file_size)];
|
||||
|
||||
let response = build_file_chunk_hashes_response(&file_info, dirty_ranges, chunks).unwrap();
|
||||
|
||||
assert_eq!(response.windows.len(), 1);
|
||||
assert_eq!(response.windows[0].dirty_byte_range, [4 * chunk_size, file_size]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_server_separate_ranges_stay_separate_when_far_apart() {
|
||||
let chunk_size = stable_chunk_size();
|
||||
let n_chunks = 20;
|
||||
let (file_info, chunks) = build_test_file_info_and_chunks(n_chunks, chunk_size);
|
||||
let dirty_ranges = vec![
|
||||
FileRange::new(0, chunk_size),
|
||||
FileRange::new(14 * chunk_size, 15 * chunk_size),
|
||||
];
|
||||
|
||||
let response = build_file_chunk_hashes_response(&file_info, dirty_ranges, chunks).unwrap();
|
||||
|
||||
assert!(
|
||||
response.windows.len() >= 2,
|
||||
"far-apart ranges should remain separate, got {} window(s)",
|
||||
response.windows.len()
|
||||
);
|
||||
assert_eq!(response.hash_ranges.len(), response.windows.len() + 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_server_extension_falls_through_to_file_end_when_no_stable_boundary() {
|
||||
let minimum_chunk = *TARGET_CHUNK_SIZE / *MINIMUM_CHUNK_DIVISOR;
|
||||
let maximum_chunk = *TARGET_CHUNK_SIZE * *MAXIMUM_CHUNK_MULTIPLIER;
|
||||
let forced_size = maximum_chunk as u64;
|
||||
let (file_info, chunks) = build_test_file_info_and_chunks(4, forced_size);
|
||||
let file_size = forced_size * 4;
|
||||
let dirty_ranges = vec![FileRange::new(0, forced_size)];
|
||||
|
||||
let response = build_file_chunk_hashes_response(&file_info, dirty_ranges, chunks).unwrap();
|
||||
|
||||
assert_eq!(response.windows.len(), 1);
|
||||
let w = &response.windows[0];
|
||||
assert_eq!(
|
||||
w.dirty_byte_range[1], file_size,
|
||||
"when no stable boundary exists, window should extend to file end; \
|
||||
minimum_chunk={minimum_chunk}, maximum_chunk={maximum_chunk}"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -256,12 +256,7 @@ impl ShardFileManager {
|
||||
s.verify_shard_integrity_debug_only();
|
||||
|
||||
// Make sure the shard is in the shard directory
|
||||
debug_assert!(
|
||||
s.path.starts_with(&self.shard_directory),
|
||||
"{:?} not in {:?}",
|
||||
&s.path,
|
||||
&self.shard_directory
|
||||
);
|
||||
debug_assert!(s.path.starts_with(&self.shard_directory), "{:?} not in {:?}", s.path, self.shard_directory);
|
||||
|
||||
if self
|
||||
.shard_bookkeeper
|
||||
|
||||
@@ -90,14 +90,14 @@ impl MDBInMemoryShard {
|
||||
pub fn recalculate_shard_size(&mut self) {
|
||||
// Calculate the size
|
||||
let mut num_bytes = 0u64;
|
||||
for (_, xorb_block_contents) in self.xorb_content.iter() {
|
||||
for xorb_block_contents in self.xorb_content.values() {
|
||||
num_bytes += xorb_block_contents.num_bytes();
|
||||
|
||||
// The xorb lookup table
|
||||
num_bytes += (size_of::<u64>() + size_of::<u32>()) as u64;
|
||||
}
|
||||
|
||||
for (_, file_info) in self.file_content.iter() {
|
||||
for file_info in self.file_content.values() {
|
||||
num_bytes += file_info.num_bytes();
|
||||
num_bytes += (size_of::<u64>() + size_of::<u32>()) as u64;
|
||||
}
|
||||
|
||||
@@ -187,8 +187,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
|
||||
let writer = Arc::new(Mutex::new(wtr));
|
||||
|
||||
eprintln!("Output File: {}", &args.output);
|
||||
eprintln!("Input Files: {:?}", &args.files);
|
||||
eprintln!("Output File: {}", args.output);
|
||||
eprintln!("Input Files: {:?}", args.files);
|
||||
|
||||
let max_limiter = Arc::new(Semaphore::new(32 * 1024));
|
||||
|
||||
|
||||
@@ -27,3 +27,50 @@ lazy_static::lazy_static! {
|
||||
/// The maximum chunk size, calculated from the configurable constants above
|
||||
pub static ref MAX_CHUNK_SIZE: usize = (*TARGET_CHUNK_SIZE) * (*MAXIMUM_CHUNK_MULTIPLIER);
|
||||
}
|
||||
|
||||
/// Given a list of chunk boundaries in a file and an arbitrary reference position,
|
||||
/// returns the next stable chunk boundary at or after that position.
|
||||
///
|
||||
/// `starting_position` may be any byte offset in the file; it does not need to
|
||||
/// be an existing chunk boundary. The search starts at the first chunk boundary
|
||||
/// `>= starting_position`.
|
||||
///
|
||||
/// A stable chunk boundary is defined such that any possible changes in the data
|
||||
/// before `starting_position` would produce the same chunk boundaries at the
|
||||
/// stable boundary and later. The fixed data between `starting_position` and
|
||||
/// the returned stable boundary is always sufficient to restore the chunker to
|
||||
/// its original chunk boundaries.
|
||||
///
|
||||
/// The stability condition requires two consecutive chunks after `starting_position`,
|
||||
/// both with sizes in `[2 * min_chunk, max_chunk - min_chunk)`. The boundary
|
||||
/// at the end of the second such chunk is the stable chunk boundary.
|
||||
///
|
||||
/// The lower bound is `2 * min_chunk` rather than `min_chunk` (as used in
|
||||
/// `find_partitions` in the chunking module) because this function operates on
|
||||
/// existing chunk boundaries without data access, and cannot verify the absence
|
||||
/// of hidden hash triggers in the `[c_k, c_k + min_chunk)` skip zone. A
|
||||
/// shadow-zone trigger can advance a modified chunker by up to `min_chunk`, so
|
||||
/// the next chunk must be at least `2 * min_chunk` to remain reachable.
|
||||
///
|
||||
/// See `parallel chunking.lyx` for the full proof and `find_stable_start` in
|
||||
/// `merkle_hash_subtree.rs` for the analogous construction in merkle hashing.
|
||||
pub fn next_stable_chunk_boundary(starting_position: usize, chunk_boundaries: &[usize]) -> Option<usize> {
|
||||
let minimum_chunk = *TARGET_CHUNK_SIZE / *MINIMUM_CHUNK_DIVISOR;
|
||||
let maximum_chunk = *TARGET_CHUNK_SIZE * *MAXIMUM_CHUNK_MULTIPLIER;
|
||||
|
||||
let start_idx = chunk_boundaries.partition_point(|&x| x < starting_position);
|
||||
|
||||
for i in start_idx..chunk_boundaries.len().saturating_sub(2) {
|
||||
let size_a = chunk_boundaries[i + 1] - chunk_boundaries[i];
|
||||
let size_b = chunk_boundaries[i + 2] - chunk_boundaries[i + 1];
|
||||
|
||||
if size_a >= 2 * minimum_chunk
|
||||
&& size_a < maximum_chunk - minimum_chunk
|
||||
&& size_b >= 2 * minimum_chunk
|
||||
&& size_b < maximum_chunk - minimum_chunk
|
||||
{
|
||||
return Some(chunk_boundaries[i + 2]);
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
@@ -18,10 +18,10 @@ use crate::metadata_shard::chunk_verification::range_hash_from_chunks;
|
||||
use crate::serialization_utils::*;
|
||||
|
||||
pub type XorbObjectIdent = [u8; 7];
|
||||
pub(crate) const XORB_OBJECT_FORMAT_IDENT: XorbObjectIdent = [b'X', b'E', b'T', b'B', b'L', b'O', b'B'];
|
||||
pub(crate) const XORB_OBJECT_FORMAT_IDENT: XorbObjectIdent = *b"XETBLOB";
|
||||
pub(crate) const XORB_OBJECT_FORMAT_VERSION_V0: u8 = 0;
|
||||
pub(crate) const XORB_OBJECT_FORMAT_IDENT_HASHES: XorbObjectIdent = [b'X', b'B', b'L', b'B', b'H', b'S', b'H'];
|
||||
pub(crate) const XORB_OBJECT_FORMAT_IDENT_BOUNDARIES: XorbObjectIdent = [b'X', b'B', b'L', b'B', b'B', b'N', b'D'];
|
||||
pub(crate) const XORB_OBJECT_FORMAT_IDENT_HASHES: XorbObjectIdent = *b"XBLBHSH";
|
||||
pub(crate) const XORB_OBJECT_FORMAT_IDENT_BOUNDARIES: XorbObjectIdent = *b"XBLBBND";
|
||||
pub(crate) const XORB_OBJECT_FORMAT_VERSION: u8 = 1;
|
||||
pub(crate) const XORB_OBJECT_FORMAT_HASHES_VERSION: u8 = 0;
|
||||
|
||||
|
||||
@@ -359,52 +359,10 @@ pub fn find_partitions<R: Read + Seek>(
|
||||
Ok(partitions)
|
||||
}
|
||||
|
||||
/// Given a list of chunk boundaries in a file and an arbitrary reference position,
|
||||
/// returns the next stable chunk boundary at or after that position.
|
||||
///
|
||||
/// `starting_position` may be any byte offset in the file; it does not need to
|
||||
/// be an existing chunk boundary. The search starts at the first chunk boundary
|
||||
/// `>= starting_position`.
|
||||
///
|
||||
/// A stable chunk boundary is defined such that any possible changes in the data
|
||||
/// before `starting_position` would produce the same chunk boundaries at the
|
||||
/// stable boundary and later. The fixed data between `starting_position` and
|
||||
/// the returned stable boundary is always sufficient to restore the chunker to
|
||||
/// its original chunk boundaries.
|
||||
///
|
||||
/// The stability condition requires two consecutive chunks after `starting_position`,
|
||||
/// both with sizes in `[2 * min_chunk, max_chunk - min_chunk)`. The boundary
|
||||
/// at the end of the second such chunk is the stable chunk boundary.
|
||||
///
|
||||
/// The lower bound is `2 * min_chunk` rather than `min_chunk` (as used in
|
||||
/// [`find_partitions`]) because this function operates on existing chunk
|
||||
/// boundaries without data access, and cannot verify the absence of hidden
|
||||
/// hash triggers in the `[c_k, c_k + min_chunk)` skip zone. A shadow-zone
|
||||
/// trigger can advance a modified chunker by up to `min_chunk`, so the next
|
||||
/// chunk must be at least `2 * min_chunk` to remain reachable.
|
||||
///
|
||||
/// See `parallel chunking.lyx` for the full proof and `find_stable_start` in
|
||||
/// `merkle_hash_subtree.rs` for the analogous construction in merkle hashing.
|
||||
pub fn next_stable_chunk_boundary(starting_position: usize, chunk_boundaries: &[usize]) -> Option<usize> {
|
||||
let minimum_chunk = *TARGET_CHUNK_SIZE / *MINIMUM_CHUNK_DIVISOR;
|
||||
let maximum_chunk = *TARGET_CHUNK_SIZE * *MAXIMUM_CHUNK_MULTIPLIER;
|
||||
|
||||
let start_idx = chunk_boundaries.partition_point(|&x| x < starting_position);
|
||||
|
||||
for i in start_idx..chunk_boundaries.len().saturating_sub(2) {
|
||||
let size_a = chunk_boundaries[i + 1] - chunk_boundaries[i];
|
||||
let size_b = chunk_boundaries[i + 2] - chunk_boundaries[i + 1];
|
||||
|
||||
if size_a >= 2 * minimum_chunk
|
||||
&& size_a < maximum_chunk - minimum_chunk
|
||||
&& size_b >= 2 * minimum_chunk
|
||||
&& size_b < maximum_chunk - minimum_chunk
|
||||
{
|
||||
return Some(chunk_boundaries[i + 2]);
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
// Re-exported from xet_core_structures where the canonical implementation lives,
|
||||
// so that downstream users of xet_data::deduplication::next_stable_chunk_boundary
|
||||
// continue to work without a source change.
|
||||
pub use xet_core_structures::xorb_object::constants::next_stable_chunk_boundary;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
@@ -168,22 +168,17 @@ pub async fn upload_ranges(
|
||||
return Err(DataError::InternalError("internal: non-empty dirty_inputs produced no server query".into()));
|
||||
}
|
||||
|
||||
let n_windows = server_query.len();
|
||||
let response: FileChunkHashesResponse = cas_client.get_file_chunk_hashes(&original_hash, server_query).await?;
|
||||
// These invariants are part of the server contract; violating them silently truncates
|
||||
// the merge sequence (`zip` would drop windows) and produces a wrong file hash, so we
|
||||
// bail loudly instead of trusting the response.
|
||||
if response.windows.len() != n_windows {
|
||||
return Err(DataError::InternalError(format!(
|
||||
"server returned {} windows, expected {n_windows} (one per dirty range)",
|
||||
response.windows.len()
|
||||
)));
|
||||
// The server may coalesce adjacent/overlapping dirty ranges after extending them to
|
||||
// stable boundaries, so only enforce shape invariants on the returned payload itself.
|
||||
if response.windows.is_empty() {
|
||||
return Err(DataError::InternalError("server returned no windows".into()));
|
||||
}
|
||||
if response.hash_ranges.len() != n_windows + 1 {
|
||||
if response.hash_ranges.len() != response.windows.len() + 1 {
|
||||
return Err(DataError::InternalError(format!(
|
||||
"server returned {} hash_ranges, expected {} (n_windows + 1)",
|
||||
response.hash_ranges.len(),
|
||||
n_windows + 1
|
||||
response.windows.len() + 1
|
||||
)));
|
||||
}
|
||||
let gap_verification = response.gap_verification;
|
||||
@@ -191,7 +186,7 @@ pub async fn upload_ranges(
|
||||
let ctx = config.ctx.clone();
|
||||
let session = FileUploadSession::new(config.clone()).await?;
|
||||
let mut input_idx = 0usize;
|
||||
let mut uploaded: Vec<UploadedWindow> = Vec::with_capacity(n_windows);
|
||||
let mut uploaded: Vec<UploadedWindow> = Vec::with_capacity(response.windows.len());
|
||||
|
||||
let mut buf = vec![0u8; STREAM_BLOCK_SIZE];
|
||||
for window in response.windows.iter() {
|
||||
@@ -283,7 +278,7 @@ pub async fn upload_ranges(
|
||||
let last_window_at_end = trailing_gap.is_none();
|
||||
let last_idx = uploaded.len() - 1;
|
||||
|
||||
let mut merge_seq: Vec<MerkleHashSubtree> = Vec::with_capacity(2 * n_windows + 1);
|
||||
let mut merge_seq: Vec<MerkleHashSubtree> = Vec::with_capacity(2 * uploaded.len() + 1);
|
||||
for (i, (w, gap)) in uploaded.iter().zip(hash_ranges).enumerate() {
|
||||
if let Some(g) = gap {
|
||||
merge_seq.push(g);
|
||||
@@ -533,6 +528,7 @@ fn snap_to_segment_end(seg_byte_starts: &[u64], byte: u64) -> u64 {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::io::Cursor;
|
||||
use std::ops::Range;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -1311,6 +1307,135 @@ mod tests {
|
||||
.collect()
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct DeterministicRng {
|
||||
state: u64,
|
||||
}
|
||||
|
||||
impl DeterministicRng {
|
||||
fn new(seed: u64) -> Self {
|
||||
Self { state: seed }
|
||||
}
|
||||
|
||||
fn next_u64(&mut self) -> u64 {
|
||||
self.state = self.state.wrapping_mul(6364136223846793005).wrapping_add(1);
|
||||
self.state
|
||||
}
|
||||
|
||||
fn gen_range(&mut self, start: usize, end: usize) -> usize {
|
||||
if end <= start {
|
||||
return start;
|
||||
}
|
||||
start + (self.next_u64() as usize % (end - start))
|
||||
}
|
||||
|
||||
fn gen_bytes(&mut self, len: usize) -> Vec<u8> {
|
||||
(0..len).map(|_| (self.next_u64() >> 56) as u8).collect()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct PlannedEdit {
|
||||
original_range: Range<usize>,
|
||||
replacement: Vec<u8>,
|
||||
}
|
||||
|
||||
fn build_random_non_overlapping_edits(
|
||||
rng: &mut DeterministicRng,
|
||||
original_len: usize,
|
||||
max_edits: usize,
|
||||
) -> Vec<PlannedEdit> {
|
||||
if original_len == 0 {
|
||||
let replacement_len = 1 + rng.gen_range(0, 8 * 1024);
|
||||
return vec![PlannedEdit {
|
||||
original_range: 0..0,
|
||||
replacement: rng.gen_bytes(replacement_len),
|
||||
}];
|
||||
}
|
||||
|
||||
let target_edits = 1 + rng.gen_range(0, max_edits.max(1));
|
||||
let mut edits: Vec<PlannedEdit> = Vec::with_capacity(target_edits);
|
||||
let mut cursor = 0usize;
|
||||
|
||||
while edits.len() < target_edits && cursor <= original_len {
|
||||
let remaining = original_len - cursor;
|
||||
let max_gap = remaining.min(64 * 1024);
|
||||
let start = cursor + rng.gen_range(0, max_gap + 1);
|
||||
|
||||
let (end, replacement_len) = if start == original_len {
|
||||
(start, 1 + rng.gen_range(0, 32 * 1024))
|
||||
} else {
|
||||
let op = rng.gen_range(0, 5);
|
||||
let max_span = (original_len - start).clamp(1, 64 * 1024);
|
||||
let span = 1 + rng.gen_range(0, max_span);
|
||||
let end = start + span;
|
||||
match op {
|
||||
0 => (start, 1 + rng.gen_range(0, 32 * 1024)),
|
||||
1 => (end, span),
|
||||
2 => (end, span + 1 + rng.gen_range(0, 16 * 1024)),
|
||||
3 => (end, rng.gen_range(0, span + 1)),
|
||||
_ => (end, 0),
|
||||
}
|
||||
};
|
||||
|
||||
edits.push(PlannedEdit {
|
||||
original_range: start..end,
|
||||
replacement: rng.gen_bytes(replacement_len),
|
||||
});
|
||||
cursor = if end > start { end } else { start.saturating_add(1) };
|
||||
}
|
||||
|
||||
if edits.is_empty() {
|
||||
let replacement_len = 1 + rng.gen_range(0, 32 * 1024);
|
||||
edits.push(PlannedEdit {
|
||||
original_range: original_len..original_len,
|
||||
replacement: rng.gen_bytes(replacement_len),
|
||||
});
|
||||
}
|
||||
|
||||
for w in edits.windows(2) {
|
||||
assert!(w[0].original_range.end <= w[1].original_range.start);
|
||||
}
|
||||
|
||||
edits
|
||||
}
|
||||
|
||||
fn apply_planned_edits(original: &[u8], edits: &[PlannedEdit]) -> Vec<u8> {
|
||||
let removed: usize = edits.iter().map(|e| e.original_range.end - e.original_range.start).sum();
|
||||
let added: usize = edits.iter().map(|e| e.replacement.len()).sum();
|
||||
let mut out: Vec<u8> = Vec::with_capacity(original.len() + added.saturating_sub(removed));
|
||||
let mut cursor = 0usize;
|
||||
|
||||
for edit in edits {
|
||||
assert!(edit.original_range.start >= cursor);
|
||||
out.extend_from_slice(&original[cursor..edit.original_range.start]);
|
||||
out.extend_from_slice(&edit.replacement);
|
||||
cursor = edit.original_range.end;
|
||||
}
|
||||
|
||||
out.extend_from_slice(&original[cursor..]);
|
||||
out
|
||||
}
|
||||
|
||||
fn edits_to_dirty_inputs(edits: &[PlannedEdit]) -> Vec<DirtyInput> {
|
||||
edits
|
||||
.iter()
|
||||
.map(|e| DirtyInput {
|
||||
original_range: e.original_range.start as u64..e.original_range.end as u64,
|
||||
new_length: e.replacement.len() as u64,
|
||||
reader: Box::pin(Cursor::new(e.replacement.clone())),
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn summarize_edits(edits: &[PlannedEdit]) -> String {
|
||||
edits
|
||||
.iter()
|
||||
.map(|e| format!("[{}..{}, new_len={}]", e.original_range.start, e.original_range.end, e.replacement.len()))
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ")
|
||||
}
|
||||
|
||||
async fn upload_file(config: &Arc<TranslatorConfig>, data: &[u8]) -> MerkleHash {
|
||||
let session = FileUploadSession::new(config.clone()).await.unwrap();
|
||||
let (_id, mut cleaner) = session
|
||||
@@ -1807,4 +1932,174 @@ mod tests {
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
#[ignore = "stress test"]
|
||||
async fn test_stress_random_resize_sequences() {
|
||||
let server = LocalTestServerBuilder::new().start().await;
|
||||
let base_dir = TempDir::new().unwrap();
|
||||
let config = test_config(server.http_endpoint(), base_dir.path());
|
||||
let cas_client: Arc<dyn Client> = Arc::new(server);
|
||||
|
||||
for seed in 0..6u64 {
|
||||
let mut rng = DeterministicRng::new(0x9E37_79B9_7F4A_7C15 ^ seed.wrapping_mul(0xD1B5_4A32_D192_ED03));
|
||||
let mut expected = random_data(10_000 + seed, 1_048_576 + (seed as usize * 91_117 % 262_144));
|
||||
let mut original_hash = upload_file(&config, &expected).await;
|
||||
let mut original_size = expected.len() as u64;
|
||||
|
||||
for round in 0..25usize {
|
||||
let edits = build_random_non_overlapping_edits(&mut rng, expected.len(), 8);
|
||||
let expected_next = apply_planned_edits(&expected, &edits);
|
||||
let inputs = edits_to_dirty_inputs(&edits);
|
||||
let result = upload_ranges(config.clone(), cas_client.clone(), original_hash, original_size, inputs)
|
||||
.await
|
||||
.unwrap();
|
||||
let result_hash = MerkleHash::from_hex(result.hash()).unwrap();
|
||||
|
||||
assert_eq!(
|
||||
result.file_size(),
|
||||
Some(expected_next.len() as u64),
|
||||
"seed={seed}, round={round}: size mismatch"
|
||||
);
|
||||
let clean_hash = upload_file(&config, &expected_next).await;
|
||||
assert_eq!(result.hash(), clean_hash.hex(), "seed={seed}, round={round}: hash mismatch");
|
||||
|
||||
let downloaded = download_file(&config, result_hash, expected_next.len() as u64).await;
|
||||
assert_eq!(downloaded, expected_next, "seed={seed}, round={round}: content mismatch");
|
||||
|
||||
expected = expected_next;
|
||||
original_hash = result_hash;
|
||||
original_size = expected.len() as u64;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "smoke-test"))]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_regression_hash_matches_clean_upload_seed1_round17() {
|
||||
let server = LocalTestServerBuilder::new().start().await;
|
||||
let base_dir = TempDir::new().unwrap();
|
||||
let config = test_config(server.http_endpoint(), base_dir.path());
|
||||
let cas_client: Arc<dyn Client> = Arc::new(server);
|
||||
|
||||
let seed = 1u64;
|
||||
let mut rng = DeterministicRng::new(0x9E37_79B9_7F4A_7C15 ^ seed.wrapping_mul(0xD1B5_4A32_D192_ED03));
|
||||
let mut expected = random_data(10_000 + seed, 1_048_576 + (seed as usize * 91_117 % 262_144));
|
||||
let mut original_hash = upload_file(&config, &expected).await;
|
||||
let mut original_size = expected.len() as u64;
|
||||
|
||||
for round in 0..=17usize {
|
||||
let edits = build_random_non_overlapping_edits(&mut rng, expected.len(), 8);
|
||||
let edits_summary = summarize_edits(&edits);
|
||||
let expected_next = apply_planned_edits(&expected, &edits);
|
||||
let inputs = edits_to_dirty_inputs(&edits);
|
||||
let result = upload_ranges(config.clone(), cas_client.clone(), original_hash, original_size, inputs)
|
||||
.await
|
||||
.unwrap();
|
||||
let result_hash = MerkleHash::from_hex(result.hash()).unwrap();
|
||||
|
||||
let clean_hash = upload_file(&config, &expected_next).await;
|
||||
assert_eq!(
|
||||
result.hash(),
|
||||
clean_hash.hex(),
|
||||
"seed={seed}, round={round}: hash mismatch; original_size={original_size}, expected_size={}, edits={edits_summary}",
|
||||
expected_next.len()
|
||||
);
|
||||
|
||||
let downloaded = download_file(&config, result_hash, expected_next.len() as u64).await;
|
||||
assert_eq!(downloaded, expected_next, "seed={seed}, round={round}: content mismatch");
|
||||
|
||||
expected = expected_next;
|
||||
original_hash = result_hash;
|
||||
original_size = expected.len() as u64;
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
#[ignore = "stress test"]
|
||||
async fn test_stress_many_sparse_windows_single_call() {
|
||||
let server = LocalTestServerBuilder::new().start().await;
|
||||
let base_dir = TempDir::new().unwrap();
|
||||
let config = test_config(server.http_endpoint(), base_dir.path());
|
||||
let cas_client: Arc<dyn Client> = Arc::new(server);
|
||||
|
||||
let original = random_data(13_337, 16 * 1024 * 1024);
|
||||
let mut rng = DeterministicRng::new(0xA5A5_5A5A_0123_4567);
|
||||
let mut edits: Vec<PlannedEdit> = Vec::new();
|
||||
let stride = original.len() / 200;
|
||||
let mut cursor = stride / 2;
|
||||
|
||||
while edits.len() < 128 && cursor < original.len() {
|
||||
let start = cursor;
|
||||
let max_span = (original.len() - start).clamp(1, 1536);
|
||||
let span = 128 + rng.gen_range(0, max_span);
|
||||
let end = (start + span).min(original.len());
|
||||
let replacement_len = match rng.gen_range(0, 4) {
|
||||
0 => end - start,
|
||||
1 => (end - start) + 64 + rng.gen_range(0, 512),
|
||||
2 => rng.gen_range(0, end - start + 1),
|
||||
_ => 0,
|
||||
};
|
||||
edits.push(PlannedEdit {
|
||||
original_range: start..end,
|
||||
replacement: rng.gen_bytes(replacement_len),
|
||||
});
|
||||
cursor = cursor.saturating_add(stride.max(1));
|
||||
}
|
||||
|
||||
let expected = apply_planned_edits(&original, &edits);
|
||||
let inputs = edits_to_dirty_inputs(&edits);
|
||||
assert_edits(&config, &cas_client, &original, inputs, &expected).await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
|
||||
#[ignore = "stress test"]
|
||||
async fn test_stress_parallel_random_resize_sequences() {
|
||||
let server = LocalTestServerBuilder::new().start().await;
|
||||
let base_dir = TempDir::new().unwrap();
|
||||
let config = test_config(server.http_endpoint(), base_dir.path());
|
||||
let cas_client: Arc<dyn Client> = Arc::new(server);
|
||||
|
||||
let mut handles = Vec::new();
|
||||
for worker in 0..8u64 {
|
||||
let config = config.clone();
|
||||
let cas_client = cas_client.clone();
|
||||
handles.push(tokio::spawn(async move {
|
||||
let mut rng = DeterministicRng::new(0xC0FF_EE00_1234_5678 ^ worker.wrapping_mul(0x94D0_49BB_1331_11EB));
|
||||
let mut expected = random_data(20_000 + worker, 786_432 + worker as usize * 17_321);
|
||||
let mut original_hash = upload_file(&config, &expected).await;
|
||||
let mut original_size = expected.len() as u64;
|
||||
|
||||
for round in 0..18usize {
|
||||
let edits = build_random_non_overlapping_edits(&mut rng, expected.len(), 6);
|
||||
let expected_next = apply_planned_edits(&expected, &edits);
|
||||
let inputs = edits_to_dirty_inputs(&edits);
|
||||
let result =
|
||||
upload_ranges(config.clone(), cas_client.clone(), original_hash, original_size, inputs)
|
||||
.await
|
||||
.unwrap();
|
||||
let result_hash = MerkleHash::from_hex(result.hash()).unwrap();
|
||||
|
||||
assert_eq!(
|
||||
result.file_size(),
|
||||
Some(expected_next.len() as u64),
|
||||
"worker={worker}, round={round}: size mismatch"
|
||||
);
|
||||
let clean_hash = upload_file(&config, &expected_next).await;
|
||||
assert_eq!(result.hash(), clean_hash.hex(), "worker={worker}, round={round}: hash mismatch");
|
||||
|
||||
let downloaded = download_file(&config, result_hash, expected_next.len() as u64).await;
|
||||
assert_eq!(downloaded, expected_next, "worker={worker}, round={round}: content mismatch");
|
||||
|
||||
expected = expected_next;
|
||||
original_hash = result_hash;
|
||||
original_size = expected.len() as u64;
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
for handle in handles {
|
||||
handle.await.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -178,7 +178,7 @@ impl XetFileDownloadGroup {
|
||||
pub fn abort(&self) -> Result<(), XetError> {
|
||||
info!(group_id = %self.id(), "Download group abort");
|
||||
self.task_runtime.cancel_subtree()?;
|
||||
for (_tracking_id, handle) in self.inner.active_tasks.read()?.iter() {
|
||||
for handle in self.inner.active_tasks.read()?.values() {
|
||||
handle.cancel();
|
||||
}
|
||||
Ok(())
|
||||
|
||||
@@ -138,7 +138,7 @@ impl SafeFileCreator {
|
||||
Some(wr) => Ok(wr),
|
||||
None => Err(io::Error::new(
|
||||
io::ErrorKind::BrokenPipe,
|
||||
format!("Writing to {:?} already completed.", &self.dest_path),
|
||||
format!("Writing to {:?} already completed.", self.dest_path),
|
||||
)),
|
||||
}
|
||||
}
|
||||
@@ -163,7 +163,7 @@ impl Seek for SafeFileCreator {
|
||||
impl Drop for SafeFileCreator {
|
||||
fn drop(&mut self) {
|
||||
if let Err(e) = self.close() {
|
||||
eprintln!("Error: Failed to close writer for {:?}: {}", &self.dest_path, e);
|
||||
eprintln!("Error: Failed to close writer for {:?}: {}", self.dest_path, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user