feat: range aware file write (#717)

## Summary

APIs for range-aware file writes: instead of re-uploading an entire file
when only part of it changed, compose a new CAS file from stable
segments + re-chunked dirty windows. Supports resize edits (insert /
delete / arbitrary replace) in addition to in-place rewrites.

### API: `upload_ranges`

```rust
pub async fn upload_ranges(
    config: Arc<TranslatorConfig>,
    cas_client: Arc<dyn Client>,
    original_hash: MerkleHash,
    original_size: u64,
    dirty_inputs: Vec<DirtyInput>,
) -> Result<XetFileInfo>
```

```rust
/// A single edit applied to the original file: replace `original_range` with
/// `new_length` bytes from `reader`. Edits are expressed in original-file coordinates.
pub struct DirtyInput {
    pub original_range: Range<u64>,
    pub reader: Pin<Box<dyn AsyncRead + Send>>,
    pub new_length: u64,
}
```

The output file size is **derived** from the inputs (no `total_size`
parameter): `original_size - removed + added`.

### Edit shapes (all expressible with the same struct)

| Operation | `original_range` | `new_length` |
|---|---|---|
| In-place edit | `a..b` | `b - a` |
| Resize replace | `a..b` | any |
| Pure insert | `p..p` | `> 0` |
| Pure delete | `a..b` | `0` |
| Append | `original_size..original_size` | `> 0` |
| Truncate to N | `N..original_size` | `0` |
| No-op | empty `dirty_inputs` | — |

Motivating example:

```text
abc + upload_ranges([0..1), "foo", 3) = foobc
abc + upload_ranges([0..0), "foo", 3) = fooabc
abc + upload_ranges([0..1), "",    0) = bc
```

**Per-range `AsyncRead` instead of `ReadSeek` over the staging file.**
The earlier prototype took `dirty_ranges: &[(u64, u64)] + dirty_source:
&mut dyn ReadSeek`. That had a subtle bug: for truncation we silently
extended the dirty set with a boundary chunk and read those bytes from
the staging file, but if the file was never opened for write the staging
file contains zeros at those positions (real bytes are in CAS) → silent
corruption on the truncation boundary chunk. Pairing each edit with its
own reader makes that structurally impossible: any byte not provided by
the caller is fetched from CAS.

<details>
<summary>How it works</summary>

### High level

```
                           upload_ranges
   +----------------------+   |   +----------------------+
   |  original file (CAS) |---+-->|  composed file (CAS) |
   +----------------------+       +----------------------+
   only the dirty windows are re-uploaded; everything else
   is reused as whole CAS segments.
```

### Step 1 — coalesce + snap edits to segment boundaries

Edits are user-coordinates (byte ranges). We snap each edit's
`original_range` to the **enclosing CAS segments** so composition can
swap whole segments instead of truncating one mid-chunk. Adjacent /
overlapping snapped ranges are then coalesced.

Pure inserts (`start == end`) snap to the segment that owns `start`; an
insert at `original_size` snaps to the last segment.

### Step 2 — server returns windows + gap subtrees

Single CAS call: `GET /v2/file-chunk-hashes/{file_id}` with the
segment-aligned ranges in an `X-Range-Dirty: bytes=A-B,C-D` header.
Response shape (xetcas#987):

```rust
struct FileChunkHashesResponse {
    windows:      Vec<ChunkWindow>,         // one per dirty range
    hash_ranges:  Vec<Option<MerkleHashSubtree>>, // N+1 entries: [gap0, gap1, ..., gapN]
}
```

`windows[i].chunks` carries the chunk hashes the server actually owns
for that window (we re-upload these bytes). `hash_ranges[i]` is the
**MerkleHashSubtree** for the i-th unmodified gap, or `None` when there
is no gap there. This is the key to composing the final file hash
without touching unmodified bytes.

### Step 3 — for each window, stream `[CAS prefix | edits | CAS suffix]`
through a fresh cleaner

```
window = [w_start ............................................. w_end]
edits in this window:        [edit_a]    [edit_b]
                                ^           ^
streamed input to the cleaner:
  CAS bytes [w_start, edit_a.start)
  reader bytes for edit_a (new_length bytes)
  CAS bytes [edit_a.end, edit_b.start)
  reader bytes for edit_b
  CAS bytes [edit_b.end, w_end)
```

Pure inserts contribute zero original bytes but still emit `new_length`
reader bytes. Pure deletes contribute zero reader bytes. The cleaner
produces a new `MDBFileInfo` per window and a `ChunkHashList`.

### Step 4 — compose the file hash via `MerkleHashSubtree::merge`

```text
merge_seq = [gap0, w0, gap1, w1, ..., wN, gapN]   // skip None gaps

merged          = MerkleHashSubtree::merge(merge_seq)
aggregated_hash = merged.final_hash()
combined_hash   = aggregated_hash.hmac(zero)      // matches cleaner's file_hash
```

Special-case: if `total_size == 0` (e.g. truncate to empty) the result
is `MerkleHash::default()` *without* HMAC, mirroring `file_hash([])`.

### Step 5 — splice segments + register

Walk the original `MDBFileInfo.segments` and replace any segment that
falls inside a window with that window's freshly-uploaded segments.
Verification entries follow segment-for-segment when present.
`metadata_ext = None` (no SHA-256, see Limitations). Then
`register_composed_file` + `finalize`.

### Multi-window example

Two edits: replace `[50MB, 51MB)` and `[150MB, 151MB)` on a 200MB file:

```
+-----------+-------+------------+-------+-----------+
|  GAP 0    |  W0   |   GAP 1    |  W1   |  GAP 2    |
|  reused   |upload |  reused    |upload |  reused   |
| (subtree) | ~1MB  | (subtree)  | ~1MB  | (subtree) |
+-----------+-------+------------+-------+-----------+

Wire transfer: ~2MB upload + a few hundred KB of CAS reads for window
boundary chunks. Old approach: 200MB download + 200MB upload.
```

### Empty original short-circuit

When `original_size == 0` there is nothing to compose against — every
edit's `original_range` must be `0..0` (validated). We just stream the
new bytes through a fresh cleaner (`upload_fresh_file`).

</details>

### Reviewer note: `chunk_window_builder` is a re-implementation of
xetcas

`xet_client/src/cas_client/chunk_window_builder.rs` is a port of the
same window-building state machine that already lives in xetcas — it's
only used by the local / in-memory simulation clients (`local_client`,
`memory_client`) so the mock CAS server returns the same shape as the
real one in tests. **No need to re-review it as part of this PR**: it
mirrors logic already reviewed and merged in xetcas#987. A follow-up
xetcas PR will deduplicate by removing the server-side copy and pulling
this one in (or vice versa); the duplication is intentional and
temporary.

### Limitations

- **No SHA-256 metadata**: composed files have `metadata_ext = None`
since recomputing SHA-256 would require reading the full file. Only
suitable for contexts that don't require SHA-256 verification (HF
buckets, xet-native repos), not for Git LFS-backed repos.
- **Memory**: for very large files, the per-window in-memory state
(chunk hash list + composed segments) is bounded by the dirty regions,
not the whole file. The chunk-hashes response is paginated by the
server-defined window granularity.

### Tests (27)

Covering all edit shapes + edge cases. Notable:

| Test | Purpose |
|---|---|
| `test_resize_edits_abc` | The 3 motivating FUSE examples |
| `test_resize_large_replace_grows_file` | Replace `[a..b)` with much
more data |
| `test_resize_large_replace_shrinks_file` | Replace `[a..b)` with much
less data |
| `test_resize_mid_file_insert` | Pure insert in the middle |
| `test_resize_mid_file_delete` | Pure delete in the middle |
| `test_resize_multi_edit_mix` | Insert + replace + delete in one call |
| `test_resize_insert_at_segment_boundary` | Snapping correctness for
inserts |
| `test_upload_ranges_mid_file_edit` | In-place edit |
| `test_upload_ranges_truncation` | Pure truncate (sub-segment) |
| `test_upload_ranges_truncation_empty_staging` | Truncate when staging
is all-zero (boundary read from CAS) |
| `test_upload_ranges_truncation_with_overlapping_dirty` | Truncate +
dirty range overlapping the boundary |
| `test_truncate_to_empty_matches_clean_empty` | Truncating to 0 hashes
to `MerkleHash::default()` (matches a fresh empty cleaner) |
| `test_upload_ranges_append` | Pure append |
| `test_append_with_gap_before_dirty_range` | Append where reader covers
a sparse gap too |
| `test_append_sparse_staging_file` | Append on a sparse staging file |
| `test_mid_edit_plus_append` | Mid-file edit *and* append in one call
(P1 codex regression) |
| `test_empty_original_append` | `original_size == 0` + append falls
into the fresh-file path (P2 codex regression) |
| `test_empty_original_validates_ranges` | `original_size == 0` still
runs validation (reviewer regression) |
| `test_upload_ranges_at_file_start` | Edit at offset 0 (no stable
prefix) |
| `test_upload_ranges_multiple_regions` | Two non-adjacent dirty windows
with stable gap |
| `test_single_input_spanning_many_chunks` | One edit covering many CDC
chunks |
| `test_data_integrity_scenarios` | 5 sub-scenarios covering composition
correctness |
| `test_noop_returns_original_hash` | Empty `dirty_inputs` → no CAS
call, original hash returned |
| `test_rejects_dirty_range_past_total_size` | Validation: range past
`original_size` |
| `test_rejects_overlapping_dirty_ranges` | Validation: overlapping
edits |
| `test_rejects_unsorted_dirty_ranges` | Validation: unsorted edits |
| `test_upload_ranges_small_file_mid_edit` | Small files (single
segment) |

### Dependencies

- xetcas: `GET /v2/file-chunk-hashes/{file_id}` with `windows[] +
hash_ranges[]` response shape — huggingface-internal/xetcas#987
(merged).
- Consumer: huggingface-internal/hf-mount#41.


<!-- CURSOR_SUMMARY -->
---

> [!NOTE]
> **High Risk**
> High risk because it adds a new partial-upload composition path that
splices CAS segments and recomputes file hashes from window subtrees,
touching core data integrity and client/server chunk-boundary logic.
> 
> **Overview**
> Adds range-aware file writes via new `upload_ranges`, letting callers
apply insert/delete/replace edits and upload only re-chunked dirty
windows while reusing stable CAS segments.
> 
> Introduces a new CAS API `get_file_chunk_hashes` (`GET
/v2/file-chunk-hashes/{file_id}` with `X-Range-Dirty`) plus response
types (`FileChunkHashesResponse`, `ChunkWindow`) and simulation support
(`chunk_window_builder`) that extends dirty ranges to *stable* chunk
boundaries and returns gap `MerkleHashSubtree` summaries +
stable-segment verification.
> 
> Refactors dedup/cleaning plumbing to expose per-chunk hash lists
(`ChunkHashList`), adds detached cleaner/session completion and
`register_composed_file` to avoid orphan shard entries, and
moves/re-exports `next_stable_chunk_boundary` into `xet_core_structures`
for shared stable-window computations.
> 
> <sup>Reviewed by [Cursor Bugbot](https://cursor.com/bugbot) for commit
2f4cee46df. Bugbot is set up for automated
code reviews on this repo. Configure
[here](https://www.cursor.com/dashboard/bugbot).</sup>
<!-- /CURSOR_SUMMARY -->

---------

Signed-off-by: dependabot[bot] <support@github.com>
Signed-off-by: Arpit Jain <arpitjain099@gmail.com>
Co-authored-by: Hoyt Koepke <hoytak@huggingface.co>
Co-authored-by: tison <wander4096@gmail.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Cursor Agent <cursoragent@cursor.com>
Co-authored-by: Di Xiao <seanses@users.noreply.github.com>
Co-authored-by: Arpit Jain <3242828+arpitjain099@users.noreply.github.com>
Co-authored-by: Assaf Vayner <assaf@huggingface.co>
Co-authored-by: Rajat Arya <rajatarya@users.noreply.github.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Adrien
2026-05-21 21:27:59 +02:00
committed by GitHub
parent 51a20410c0
commit 40f9530753
31 changed files with 2903 additions and 117 deletions

View File

@@ -1,8 +1,9 @@
This update adds a new public deduplication helper for computing restart-safe chunk boundaries from existing chunk boundary metadata.
What changed
- Added `xet_data::deduplication::next_stable_chunk_boundary(starting_position, chunk_boundaries) -> Option<usize>`.
- Re-exported it from `xet_data::deduplication` so downstream crates can use it directly.
- Added `next_stable_chunk_boundary(starting_position, chunk_boundaries) -> Option<usize>`.
- Canonical implementation lives in `xet_core_structures::xorb_object::constants` (alongside the chunk-size constants it uses).
- Re-exported from `xet_data::deduplication` for convenience.
- The function scans forward from `starting_position` and returns the next chunk boundary that satisfies the stable-boundary condition:
- two consecutive chunk sizes in `[2 * min_chunk, max_chunk - min_chunk)`,
- where `min_chunk` and `max_chunk` are derived from chunking constants.
@@ -10,6 +11,7 @@ What changed
Why this matters
- Callers that already have chunk-boundary metadata can locate a stable resume boundary without re-reading file bytes.
- This enables deterministic alignment behavior for resumed/partial workflows that need chunk boundaries robust to prefix changes.
- The server-side `build_file_chunk_hashes_response` now extends dirty ranges to stable chunk boundaries before building windows, so that the client's rechunking around each dirty window is guaranteed to converge to the same chunk boundaries as the original file.
Usage notes
- `chunk_boundaries` should be monotonically increasing chunk-end offsets produced by the same chunking configuration.

View File

@@ -184,7 +184,8 @@ impl SingleFileCleaner {
let metadata_ext = FileMetadataExt::new(sha256);
// Now finish the deduplication process.
let (file_hash, remaining_file_data, deduplication_metrics) = self.dedup_manager.finalize(Some(metadata_ext));
let (file_hash, _chunk_hashes, remaining_file_data, deduplication_metrics) =
self.dedup_manager.finalize(Some(metadata_ext));
// Let's check some things that should be invariants
{

View File

@@ -0,0 +1,393 @@
//! Server-side state machine for `GET /v2/file-chunk-hashes/{file_id}` (mirrored from
//! xetcas PR #987) plus a small driver helper used by simulation clients to produce a
//! [`FileChunkHashesResponse`] without routing through HTTP.
use xet_core_structures::merklehash::{MerkleHash, MerkleHashSubtree};
use xet_core_structures::metadata_shard::file_structs::MDBFileInfo;
use xet_core_structures::xorb_object::constants::next_stable_chunk_boundary;
use crate::cas_types::{ChunkWindow, FileChunkHashesResponse, FileRange};
use crate::error::{ClientError, Result};
pub struct ChunkWindowBuilder<'a> {
dirty_ranges: &'a [FileRange],
dirty_idx: usize,
in_dirty_zone: bool,
gap_is_first: bool,
/// End-byte cursor: the start of the next chunk fed to the builder.
cursor: u64,
gap_chunks: Vec<(MerkleHash, u64)>,
windows: Vec<FileRange>,
hash_ranges: Vec<Option<MerkleHashSubtree>>,
}
fn next_stable_end_for_range(range_end: u64, file_size: u64, chunk_boundaries: &[usize]) -> u64 {
if range_end >= file_size {
return file_size;
}
let Ok(starting_position) = usize::try_from(range_end) else {
return file_size;
};
next_stable_chunk_boundary(starting_position, chunk_boundaries)
.and_then(|boundary| u64::try_from(boundary).ok())
.map(|boundary| boundary.min(file_size))
.unwrap_or(file_size)
}
fn extend_dirty_ranges_to_stable_windows(
mut dirty_ranges: Vec<FileRange>,
file_size: u64,
chunk_boundaries: &[usize],
) -> Vec<FileRange> {
for range in &mut dirty_ranges {
if range.end < file_size {
range.end = next_stable_end_for_range(range.end, file_size, chunk_boundaries);
}
}
dirty_ranges.sort_by_key(|range| range.start);
let mut coalesced: Vec<FileRange> = Vec::with_capacity(dirty_ranges.len());
for range in dirty_ranges {
if let Some(last) = coalesced.last_mut()
&& range.start <= last.end
{
last.end = last.end.max(range.end);
continue;
}
coalesced.push(range);
}
coalesced
}
impl<'a> ChunkWindowBuilder<'a> {
pub fn new(dirty_ranges: &'a [FileRange]) -> Self {
debug_assert!(
dirty_ranges.windows(2).all(|w| w[0].end <= w[1].start),
"dirty_ranges must be sorted and non-overlapping"
);
Self {
dirty_ranges,
dirty_idx: 0,
in_dirty_zone: false,
gap_is_first: true,
cursor: 0,
gap_chunks: Vec::new(),
windows: Vec::with_capacity(dirty_ranges.len()),
hash_ranges: Vec::new(),
}
}
pub fn process_chunk(&mut self, hash: MerkleHash, size: u64, byte_end: u64) {
let byte_start = self.cursor;
let overlaps_dirty = self.overlaps_current_dirty(byte_start, byte_end);
if !self.in_dirty_zone {
if overlaps_dirty {
self.open_window(byte_end);
self.in_dirty_zone = true;
} else {
self.gap_chunks.push((hash, size));
}
} else if overlaps_dirty {
self.windows
.last_mut()
.expect("in_dirty_zone implies a window has been opened")
.end = byte_end;
self.merge_ahead(byte_end);
} else {
self.dirty_idx += 1;
self.gap_is_first = false;
// The first clean chunk after a dirty zone may itself overlap the next
// dirty range (back-to-back dirty ranges on adjacent chunks).
let overlaps_next = self.overlaps_current_dirty(byte_start, byte_end);
if overlaps_next {
self.open_window(byte_end);
} else {
self.in_dirty_zone = false;
self.gap_chunks.push((hash, size));
}
}
self.cursor = byte_end;
}
/// Returns true when the entry ending at `byte_end` (and starting at the cursor)
/// is fully contained within the current dirty range.
pub fn entry_fully_dirty(&self, byte_end: u64) -> bool {
self.dirty_idx < self.dirty_ranges.len()
&& self.dirty_ranges[self.dirty_idx].start <= self.cursor
&& byte_end <= self.dirty_ranges[self.dirty_idx].end
}
/// Process a fully-dirty shard entry without iterating its individual chunks.
pub fn skip_dirty_entry(&mut self, byte_end: u64) {
if !self.in_dirty_zone {
self.open_window(byte_end);
self.in_dirty_zone = true;
} else {
self.windows
.last_mut()
.expect("in_dirty_zone implies a window has been opened")
.end = byte_end;
self.merge_ahead(byte_end);
}
self.cursor = byte_end;
}
/// Consume the builder and return the dirty windows + N+1 gap hash ranges.
pub fn finish(mut self) -> (Vec<FileRange>, Vec<Option<MerkleHashSubtree>>) {
let trailing = MerkleHashSubtree::from_chunks(self.gap_is_first, &self.gap_chunks, true);
self.hash_ranges.push(Self::to_option(trailing));
(self.windows, self.hash_ranges)
}
fn open_window(&mut self, byte_end: u64) {
let gap = MerkleHashSubtree::from_chunks(self.gap_is_first, &self.gap_chunks, false);
self.hash_ranges.push(Self::to_option(gap));
self.gap_chunks.clear();
self.windows.push(FileRange::new(self.cursor, byte_end));
self.merge_ahead(byte_end);
}
fn overlaps_current_dirty(&self, byte_start: u64, byte_end: u64) -> bool {
self.dirty_idx < self.dirty_ranges.len()
&& byte_end > self.dirty_ranges[self.dirty_idx].start
&& byte_start < self.dirty_ranges[self.dirty_idx].end
}
fn merge_ahead(&mut self, byte_end: u64) {
while self.dirty_idx + 1 < self.dirty_ranges.len() && byte_end > self.dirty_ranges[self.dirty_idx + 1].start {
self.dirty_idx += 1;
}
}
fn to_option(hr: MerkleHashSubtree) -> Option<MerkleHashSubtree> {
if hr.is_empty() { None } else { Some(hr) }
}
}
/// Drive [`ChunkWindowBuilder`] over a flat list of `(chunk_hash, size)` pairs and
/// assemble the [`FileChunkHashesResponse`]. Simulation clients pre-collect the chunks for
/// the file (typically by walking segments + xorb metadata) and call this to answer
/// `Client::get_file_chunk_hashes` locally.
///
/// Also emits `gap_verification`: for each **stable** original segment (one that lies
/// entirely outside the dirty windows), the corresponding `FileVerificationEntry` from
/// `file_info.verification` is copied into `gap_verification` in segment order. The
/// composed shard built by `upload_ranges` uses these to reconstruct its own verification
/// section without needing per-chunk hashes for the stable segments. When `file_info`
/// has no verification entries (legacy / test files), `gap_verification` is empty.
pub fn build_file_chunk_hashes_response(
file_info: &MDBFileInfo,
dirty_ranges: Vec<FileRange>,
chunks: impl IntoIterator<Item = (MerkleHash, u64)>,
) -> Result<FileChunkHashesResponse> {
let file_size = file_info.file_size();
let dirty_ranges: Vec<FileRange> = dirty_ranges
.into_iter()
.map(|r| FileRange::new(r.start, r.end.min(file_size)))
.filter(|r| r.start < r.end && r.start < file_size)
.collect();
if dirty_ranges.is_empty() {
return Err(ClientError::Other("no valid dirty ranges".into()));
}
let chunks: Vec<(MerkleHash, u64)> = chunks.into_iter().collect();
let mut chunk_boundaries: Vec<usize> = Vec::with_capacity(chunks.len());
let mut stable_ranges_supported = true;
{
let mut boundary_cursor: u64 = 0;
for (_, size) in &chunks {
boundary_cursor += *size;
let Ok(boundary) = usize::try_from(boundary_cursor) else {
stable_ranges_supported = false;
break;
};
chunk_boundaries.push(boundary);
}
}
let dirty_ranges = if stable_ranges_supported {
extend_dirty_ranges_to_stable_windows(dirty_ranges, file_size, &chunk_boundaries)
} else {
dirty_ranges
};
let total_chunks = chunks.len() as u64;
let mut builder = ChunkWindowBuilder::new(&dirty_ranges);
let mut cumulative_bytes: u64 = 0;
for (hash, size) in chunks {
cumulative_bytes += size;
builder.process_chunk(hash, size, cumulative_bytes);
}
let (windows, hash_ranges) = builder.finish();
if windows.is_empty() {
return Err(ClientError::Other("dirty ranges do not overlap any chunks".into()));
}
// Emit one range hash per stable segment (= no overlap with any window).
// Segments and windows are both monotonic, so a two-pointer walk is O(S+W).
//
// Contract: `verification` is either empty (legacy / test files without verification
// entries) or 1:1 with `segments`. A partially-populated mismatch is a real bug we
// want loud here, rather than as a confusing "ran out of gap_verification entries"
// error later in `compose_mdb`.
let gap_verification = if file_info.verification.is_empty() {
Vec::new()
} else if file_info.verification.len() == file_info.segments.len() {
let mut gv = Vec::new();
let mut acc = 0u64;
let mut wi = 0usize;
for (idx, seg) in file_info.segments.iter().enumerate() {
let seg_start = acc;
let seg_end = acc + seg.unpacked_segment_bytes as u64;
acc = seg_end;
while wi < windows.len() && windows[wi].end <= seg_start {
wi += 1;
}
let overlaps = wi < windows.len() && windows[wi].start < seg_end;
if !overlaps {
gv.push(crate::cas_types::HexMerkleHash::from(file_info.verification[idx].range_hash));
}
}
gv
} else {
return Err(ClientError::Other(format!(
"file_info has {} verification entries but {} segments; \
expected either zero or a 1:1 mapping",
file_info.verification.len(),
file_info.segments.len()
)));
};
Ok(FileChunkHashesResponse {
total_chunks,
file_size,
windows: windows
.into_iter()
.map(|r| ChunkWindow {
dirty_byte_range: [r.start, r.end],
})
.collect(),
hash_ranges,
gap_verification,
})
}
#[cfg(test)]
mod tests {
use xet_core_structures::merklehash::MerkleHash;
use xet_core_structures::metadata_shard::file_structs::{FileDataSequenceEntry, MDBFileInfo};
use xet_core_structures::xorb_object::constants::{
MAXIMUM_CHUNK_MULTIPLIER, MINIMUM_CHUNK_DIVISOR, TARGET_CHUNK_SIZE,
};
use super::*;
fn stable_chunk_size() -> u64 {
let minimum_chunk = *TARGET_CHUNK_SIZE / *MINIMUM_CHUNK_DIVISOR;
let maximum_chunk = *TARGET_CHUNK_SIZE * *MAXIMUM_CHUNK_MULTIPLIER;
let size = 2 * minimum_chunk;
assert!(size < maximum_chunk - minimum_chunk);
size as u64
}
fn build_test_file_info_and_chunks(n_chunks: usize, chunk_size: u64) -> (MDBFileInfo, Vec<(MerkleHash, u64)>) {
let chunks: Vec<(MerkleHash, u64)> = (0..n_chunks)
.map(|i| (MerkleHash::random_from_seed(i as u64 + 1), chunk_size))
.collect();
let file_size = chunk_size * n_chunks as u64;
let file_info = MDBFileInfo {
segments: vec![FileDataSequenceEntry {
xorb_hash: MerkleHash::random_from_seed(999),
xorb_flags: 0,
unpacked_segment_bytes: file_size as u32,
chunk_index_start: 0,
chunk_index_end: n_chunks as u32,
}],
..Default::default()
};
(file_info, chunks)
}
#[test]
fn test_server_extends_dirty_window_to_next_stable_boundary() {
let chunk_size = stable_chunk_size();
let (file_info, chunks) = build_test_file_info_and_chunks(6, chunk_size);
let dirty_ranges = vec![FileRange::new(1, chunk_size + 1)];
let response = build_file_chunk_hashes_response(&file_info, dirty_ranges, chunks).unwrap();
assert_eq!(response.windows.len(), 1);
assert_eq!(response.windows[0].dirty_byte_range, [0, 4 * chunk_size]);
}
#[test]
fn test_server_coalesces_ranges_after_stable_extension() {
let chunk_size = stable_chunk_size();
let (file_info, chunks) = build_test_file_info_and_chunks(8, chunk_size);
let dirty_ranges = vec![
FileRange::new(1, chunk_size + 1),
FileRange::new(3 * chunk_size + 7, 3 * chunk_size + 42),
];
let response = build_file_chunk_hashes_response(&file_info, dirty_ranges, chunks).unwrap();
assert_eq!(response.windows.len(), 1);
assert_eq!(response.windows[0].dirty_byte_range, [0, 6 * chunk_size]);
}
#[test]
fn test_server_no_extension_when_range_already_at_file_end() {
let chunk_size = stable_chunk_size();
let (file_info, chunks) = build_test_file_info_and_chunks(6, chunk_size);
let file_size = chunk_size * 6;
let dirty_ranges = vec![FileRange::new(4 * chunk_size, file_size)];
let response = build_file_chunk_hashes_response(&file_info, dirty_ranges, chunks).unwrap();
assert_eq!(response.windows.len(), 1);
assert_eq!(response.windows[0].dirty_byte_range, [4 * chunk_size, file_size]);
}
#[test]
fn test_server_separate_ranges_stay_separate_when_far_apart() {
let chunk_size = stable_chunk_size();
let n_chunks = 20;
let (file_info, chunks) = build_test_file_info_and_chunks(n_chunks, chunk_size);
let dirty_ranges = vec![
FileRange::new(0, chunk_size),
FileRange::new(14 * chunk_size, 15 * chunk_size),
];
let response = build_file_chunk_hashes_response(&file_info, dirty_ranges, chunks).unwrap();
assert!(
response.windows.len() >= 2,
"far-apart ranges should remain separate, got {} window(s)",
response.windows.len()
);
assert_eq!(response.hash_ranges.len(), response.windows.len() + 1);
}
#[test]
fn test_server_extension_falls_through_to_file_end_when_no_stable_boundary() {
let minimum_chunk = *TARGET_CHUNK_SIZE / *MINIMUM_CHUNK_DIVISOR;
let maximum_chunk = *TARGET_CHUNK_SIZE * *MAXIMUM_CHUNK_MULTIPLIER;
let forced_size = maximum_chunk as u64;
let (file_info, chunks) = build_test_file_info_and_chunks(4, forced_size);
let file_size = forced_size * 4;
let dirty_ranges = vec![FileRange::new(0, forced_size)];
let response = build_file_chunk_hashes_response(&file_info, dirty_ranges, chunks).unwrap();
assert_eq!(response.windows.len(), 1);
let w = &response.windows[0];
assert_eq!(
w.dirty_byte_range[1], file_size,
"when no stable boundary exists, window should extend to file end; \
minimum_chunk={minimum_chunk}, maximum_chunk={maximum_chunk}"
);
}
}

View File

@@ -5,7 +5,9 @@ use xet_core_structures::xorb_object::SerializedXorbObject;
use super::adaptive_concurrency::ConnectionPermit;
use super::progress_tracked_streams::ProgressCallback;
use crate::cas_types::{BatchQueryReconstructionResponse, FileRange, HttpRange, QueryReconstructionResponseV2};
use crate::cas_types::{
BatchQueryReconstructionResponse, FileChunkHashesResponse, FileRange, HttpRange, QueryReconstructionResponseV2,
};
use crate::error::Result;
#[async_trait::async_trait]
@@ -70,4 +72,17 @@ pub trait Client: Send + Sync {
progress_callback: Option<ProgressCallback>,
upload_permit: ConnectionPermit,
) -> Result<u64>;
/// Compute chunk-aligned dirty windows + opaque gap [`MerkleHashSubtree`] summaries for the
/// given file, narrowed to `dirty_ranges`.
///
/// `dirty_ranges` must be sorted and non-overlapping. Per-chunk hashes are never returned;
/// the response carries only `windows.len()` dirty windows and `windows.len() + 1` gap
/// subtrees, which the client merges with locally-recomputed window subtrees to obtain the
/// new file hash.
async fn get_file_chunk_hashes(
&self,
file_id: &MerkleHash,
dirty_ranges: Vec<FileRange>,
) -> Result<FileChunkHashesResponse>;
}

View File

@@ -14,6 +14,7 @@ pub use crate::common::http_client::{Api, ResponseErrorLogger, build_auth_http_c
pub mod adaptive_concurrency;
pub mod auth;
pub mod chunk_window_builder;
pub mod exports;
mod interface;
pub mod multipart;

View File

@@ -23,8 +23,9 @@ use super::progress_tracked_streams::{
use super::retry_wrapper::{RetryWrapper, RetryableReqwestError};
use super::{Client, INFORMATION_LOG_LEVEL};
use crate::cas_types::{
BatchQueryReconstructionResponse, FileRange, HttpRange, Key, QueryReconstructionResponse,
BatchQueryReconstructionResponse, FileChunkHashesResponse, FileRange, HttpRange, Key, QueryReconstructionResponse,
QueryReconstructionResponseV2, UploadShardResponse, UploadShardResponseType, UploadXorbResponse,
X_RANGE_DIRTY_HEADER,
};
use crate::common::http_client::{self, Api};
use crate::error::{ClientError, Result};
@@ -748,6 +749,48 @@ impl Client for RemoteClient {
Ok(n_upload_bytes)
}
#[instrument(skip_all, name = "RemoteClient::get_file_chunk_hashes", fields(file.hash = file_id.hex(), n_ranges = dirty_ranges.len()))]
async fn get_file_chunk_hashes(
&self,
file_id: &MerkleHash,
dirty_ranges: Vec<FileRange>,
) -> Result<FileChunkHashesResponse> {
if dirty_ranges.is_empty() {
return Err(ClientError::Other("get_file_chunk_hashes requires at least one dirty range".into()));
}
let url = Url::parse(&format!("{}/v2/file-chunk-hashes/{}", self.endpoint, file_id.hex()))?;
// Multi-range `bytes=A-B,C-D` value. `HttpRange` is inclusive-end and `Display`s as
// `start-end`; conversion from `FileRange` does the +1/-1 for us.
let header_value = HeaderValue::from_str(&format!(
"bytes={}",
dirty_ranges
.iter()
.copied()
.map(HttpRange::from)
.map(|r| r.to_string())
.collect::<Vec<_>>()
.join(",")
))
.map_err(|err| ClientError::Other(format!("invalid X-Range-Dirty header value: {err}")))?;
let api_tag = "cas::get_file_chunk_hashes";
let client = self.authenticated_http_client.clone();
let response: FileChunkHashesResponse = RetryWrapper::new(self.ctx.clone(), api_tag)
.run_and_extract_json(move || {
client
.get(url.clone())
.header(X_RANGE_DIRTY_HEADER, header_value.clone())
.with_extension(Api(api_tag))
.send()
})
.await?;
Ok(response)
}
}
#[cfg(test)]

View File

@@ -17,6 +17,7 @@ use tokio::time::{Duration, Instant};
use tracing::{error, info, warn};
use xet_core_structures::merklehash::{MerkleHash, compute_data_hash};
use xet_core_structures::metadata_shard::file_structs::{FileDataSequenceHeader, MDBFileInfo, MDBFileInfoView};
use xet_core_structures::metadata_shard::shard_file_reconstructor::FileReconstructor;
use xet_core_structures::metadata_shard::shard_format::MDB_FILE_INFO_ENTRY_SIZE;
use xet_core_structures::metadata_shard::shard_in_memory::MDBInMemoryShard;
use xet_core_structures::metadata_shard::streaming_shard::MDBMinimalShard;
@@ -35,10 +36,12 @@ use super::direct_access_client::DirectAccessClient;
use super::xorb_utils::{self, REFERENCE_INSTANT, duration_to_expiration_secs_ceil};
use crate::cas_client::Client;
use crate::cas_client::adaptive_concurrency::AdaptiveConcurrencyController;
use crate::cas_client::chunk_window_builder::build_file_chunk_hashes_response;
use crate::cas_client::progress_tracked_streams::ProgressCallback;
use crate::cas_types::{
BatchQueryReconstructionResponse, FileRange, HexMerkleHash, HttpRange, QueryReconstructionResponse,
QueryReconstructionResponseV2, XorbMultiRangeFetch, XorbRangeDescriptor, XorbReconstructionFetchInfo,
BatchQueryReconstructionResponse, FileChunkHashesResponse, FileRange, HexMerkleHash, HttpRange,
QueryReconstructionResponse, QueryReconstructionResponseV2, XorbMultiRangeFetch, XorbRangeDescriptor,
XorbReconstructionFetchInfo,
};
use crate::error::{ClientError, Result};
@@ -1692,6 +1695,30 @@ impl Client for LocalClient {
// Should not reach here, but return error if we do.
Err(ClientError::PresignedUrlExpirationError)
}
async fn get_file_chunk_hashes(
&self,
file_id: &MerkleHash,
dirty_ranges: Vec<FileRange>,
) -> Result<FileChunkHashesResponse> {
self.apply_api_delay().await;
let Some((file_info, _)) = self.shard_manager.get_file_reconstruction_info(file_id).await? else {
return Err(ClientError::FileNotFound(*file_id));
};
let mut chunks: Vec<(MerkleHash, u64)> = Vec::new();
for segment in &file_info.segments {
let xorb_obj = self.xorb_footer(&segment.xorb_hash).await?;
chunks.extend(
xorb_obj
.chunk_hash_sizes(segment.chunk_index_start, segment.chunk_index_end)
.map_err(|err| ClientError::Other(format!("chunk_hash_sizes error: {err}")))?,
);
}
build_file_chunk_hashes_response(&file_info, dirty_ranges, chunks)
}
}
fn map_redb_db_error(e: impl std::fmt::Debug) -> ClientError {

View File

@@ -493,6 +493,14 @@ impl Client for LocalTestServer {
.upload_xorb(prefix, serialized_xorb_object, progress_callback, upload_permit)
.await
}
async fn get_file_chunk_hashes(
&self,
file_id: &xet_core_structures::merklehash::MerkleHash,
dirty_ranges: Vec<crate::cas_types::FileRange>,
) -> Result<crate::cas_types::FileChunkHashesResponse> {
self.client.get_file_chunk_hashes(file_id, dirty_ranges).await
}
}
#[cfg(test)]

View File

@@ -19,7 +19,9 @@ use crate::cas_client::interface::Client;
use crate::cas_client::simulation::deletion_controls::ObjectTag;
use crate::cas_client::simulation::xorb_utils::duration_to_expiration_secs_ceil;
use crate::cas_client::simulation::{DeletionControlableClient, DirectAccessClient};
use crate::cas_types::{FileRange, HexMerkleHash, QueryReconstructionResponseV2, XorbReconstructionFetchInfo};
use crate::cas_types::{
FileChunkHashesResponse, FileRange, HexMerkleHash, QueryReconstructionResponseV2, XorbReconstructionFetchInfo,
};
use crate::error::{ClientError, Result};
const CONFIG_POST_MAX_ATTEMPTS: usize = 4;
@@ -243,6 +245,14 @@ impl Client for SimulationControlClient {
.upload_xorb(prefix, serialized_xorb_object, progress_callback, upload_permit)
.await
}
async fn get_file_chunk_hashes(
&self,
file_id: &MerkleHash,
dirty_ranges: Vec<FileRange>,
) -> Result<FileChunkHashesResponse> {
self.remote_client.get_file_chunk_hashes(file_id, dirty_ranges).await
}
}
#[async_trait]

View File

@@ -30,9 +30,11 @@ use super::deletion_controls::ObjectTag;
use super::direct_access_client::DirectAccessClient;
use super::random_xorb::RandomXorb;
use super::xorb_utils::{self, REFERENCE_INSTANT, duration_to_expiration_secs_ceil};
use crate::cas_client::chunk_window_builder::build_file_chunk_hashes_response;
use crate::cas_types::{
BatchQueryReconstructionResponse, FileRange, HexMerkleHash, HttpRange, QueryReconstructionResponse,
QueryReconstructionResponseV2, XorbMultiRangeFetch, XorbRangeDescriptor, XorbReconstructionFetchInfo,
BatchQueryReconstructionResponse, FileChunkHashesResponse, FileRange, HexMerkleHash, HttpRange,
QueryReconstructionResponse, QueryReconstructionResponseV2, XorbMultiRangeFetch, XorbRangeDescriptor,
XorbReconstructionFetchInfo,
};
use crate::error::{ClientError, Result};
@@ -950,6 +952,40 @@ impl Client for MemoryClient {
}
Ok((Bytes::from(all_decompressed), all_chunk_indices))
}
async fn get_file_chunk_hashes(
&self,
file_id: &MerkleHash,
dirty_ranges: Vec<FileRange>,
) -> Result<FileChunkHashesResponse> {
self.apply_api_delay().await;
let file_info = {
let shard = self.shard.read().await;
shard
.get_file_reconstruction_info(file_id)
.ok_or(ClientError::FileNotFound(*file_id))?
};
let xorbs = self.xorbs.read().await;
let mut chunks: Vec<(MerkleHash, u64)> = Vec::new();
for segment in &file_info.segments {
let storage = xorbs
.get(&segment.xorb_hash)
.ok_or(ClientError::XORBNotFound(segment.xorb_hash))?;
let xorb_obj = match storage {
XorbStorage::Materialized { entry, .. } => std::borrow::Cow::Borrowed(&entry.xorb_object),
XorbStorage::Random { xorb, .. } => std::borrow::Cow::Owned(xorb.get_xorb_object()),
};
chunks.extend(
xorb_obj
.chunk_hash_sizes(segment.chunk_index_start, segment.chunk_index_end)
.map_err(|err| ClientError::Other(format!("chunk_hash_sizes error: {err}")))?,
);
}
build_file_chunk_hashes_response(&file_info, dirty_ranges, chunks)
}
}
#[cfg(not(target_family = "wasm"))]

View File

@@ -234,4 +234,12 @@ impl Client for RemoteSimulationClient {
.upload_xorb(prefix, serialized_xorb_object, progress_callback, upload_permit)
.await
}
async fn get_file_chunk_hashes(
&self,
file_id: &xet_core_structures::merklehash::MerkleHash,
dirty_ranges: Vec<crate::cas_types::FileRange>,
) -> Result<crate::cas_types::FileChunkHashesResponse> {
self.inner.get_file_chunk_hashes(file_id, dirty_ranges).await
}
}

View File

@@ -541,6 +541,14 @@ impl Client for LocalTestServer {
.upload_xorb(prefix, serialized_xorb_object, progress_callback, upload_permit)
.await
}
async fn get_file_chunk_hashes(
&self,
file_id: &xet_core_structures::merklehash::MerkleHash,
dirty_ranges: Vec<crate::cas_types::FileRange>,
) -> Result<crate::cas_types::FileChunkHashesResponse> {
self.client.get_file_chunk_hashes(file_id, dirty_ranges).await
}
}
#[async_trait]

View File

@@ -7,7 +7,7 @@ use std::str::FromStr;
use serde::{Deserialize, Serialize};
use serde_repr::{Deserialize_repr, Serialize_repr};
use thiserror::Error;
use xet_core_structures::merklehash::MerkleHash;
use xet_core_structures::merklehash::{MerkleHash, MerkleHashSubtree};
mod key;
pub use key::*;
@@ -311,6 +311,42 @@ pub struct QueryChunkResponse {
pub shard: MerkleHash,
}
/// HTTP header carrying the dirty byte ranges to feed to `GET /v2/file-chunk-hashes/{file_id}`.
///
/// Distinct from the standard `Range` header (which scopes the response body): this header tags
/// regions that the client intends to re-chunk, and the response covers the whole file (windows +
/// gap subtrees). Value uses the same `bytes=A-B,C-D` syntax as `Range`.
pub const X_RANGE_DIRTY_HEADER: &str = "X-Range-Dirty";
/// One chunk-aligned dirty window of a file, returned by `GET /v2/file-chunk-hashes/{file_id}`.
///
/// `dirty_byte_range` is `[start, end)` and is expanded outward to the chunk boundaries that
/// fully contain the requested dirty range, so the client must re-chunk the entire span.
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct ChunkWindow {
pub dirty_byte_range: [u64; 2],
}
/// Response shape for `GET /v2/file-chunk-hashes/{file_id}`.
///
/// Contains `windows.len()` dirty windows interleaved with `windows.len() + 1` opaque
/// `MerkleHashSubtree` summaries for the surrounding gaps. To reconstruct the new file hash,
/// merge `[hash_ranges[0], window0_subtree, hash_ranges[1], window1_subtree, ..., hash_ranges[N]]`
/// using `MerkleHashSubtree::merge`. Per-chunk hashes are never transferred.
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct FileChunkHashesResponse {
pub total_chunks: u64,
pub file_size: u64,
pub windows: Vec<ChunkWindow>,
pub hash_ranges: Vec<Option<MerkleHashSubtree>>,
/// One range hash per **stable original segment** (= a segment that lies in a gap
/// between dirty windows or before/after them, in segment order). Wraps each into a
/// `FileVerificationEntry` to populate the composed shard's verification section.
pub gap_verification: Vec<HexMerkleHash>,
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -48,6 +48,9 @@ pub mod data_hash;
pub use data_hash::*;
pub type MerkleHash = DataHash;
/// List of (chunk_hash, chunk_uncompressed_size) pairs for a file or xorb range.
pub type ChunkHashList = Vec<(MerkleHash, u64)>;
mod aggregated_hashes;
pub mod merkle_hash_subtree;
pub mod passthrough_hasher;

View File

@@ -256,12 +256,7 @@ impl ShardFileManager {
s.verify_shard_integrity_debug_only();
// Make sure the shard is in the shard directory
debug_assert!(
s.path.starts_with(&self.shard_directory),
"{:?} not in {:?}",
&s.path,
&self.shard_directory
);
debug_assert!(s.path.starts_with(&self.shard_directory), "{:?} not in {:?}", s.path, self.shard_directory);
if self
.shard_bookkeeper

View File

@@ -90,14 +90,14 @@ impl MDBInMemoryShard {
pub fn recalculate_shard_size(&mut self) {
// Calculate the size
let mut num_bytes = 0u64;
for (_, xorb_block_contents) in self.xorb_content.iter() {
for xorb_block_contents in self.xorb_content.values() {
num_bytes += xorb_block_contents.num_bytes();
// The xorb lookup table
num_bytes += (size_of::<u64>() + size_of::<u32>()) as u64;
}
for (_, file_info) in self.file_content.iter() {
for file_info in self.file_content.values() {
num_bytes += file_info.num_bytes();
num_bytes += (size_of::<u64>() + size_of::<u32>()) as u64;
}

View File

@@ -187,8 +187,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let writer = Arc::new(Mutex::new(wtr));
eprintln!("Output File: {}", &args.output);
eprintln!("Input Files: {:?}", &args.files);
eprintln!("Output File: {}", args.output);
eprintln!("Input Files: {:?}", args.files);
let max_limiter = Arc::new(Semaphore::new(32 * 1024));

View File

@@ -27,3 +27,50 @@ lazy_static::lazy_static! {
/// The maximum chunk size, calculated from the configurable constants above
pub static ref MAX_CHUNK_SIZE: usize = (*TARGET_CHUNK_SIZE) * (*MAXIMUM_CHUNK_MULTIPLIER);
}
/// Given a list of chunk boundaries in a file and an arbitrary reference position,
/// returns the next stable chunk boundary at or after that position.
///
/// `starting_position` may be any byte offset in the file; it does not need to
/// be an existing chunk boundary. The search starts at the first chunk boundary
/// `>= starting_position`.
///
/// A stable chunk boundary is defined such that any possible changes in the data
/// before `starting_position` would produce the same chunk boundaries at the
/// stable boundary and later. The fixed data between `starting_position` and
/// the returned stable boundary is always sufficient to restore the chunker to
/// its original chunk boundaries.
///
/// The stability condition requires two consecutive chunks after `starting_position`,
/// both with sizes in `[2 * min_chunk, max_chunk - min_chunk)`. The boundary
/// at the end of the second such chunk is the stable chunk boundary.
///
/// The lower bound is `2 * min_chunk` rather than `min_chunk` (as used in
/// `find_partitions` in the chunking module) because this function operates on
/// existing chunk boundaries without data access, and cannot verify the absence
/// of hidden hash triggers in the `[c_k, c_k + min_chunk)` skip zone. A
/// shadow-zone trigger can advance a modified chunker by up to `min_chunk`, so
/// the next chunk must be at least `2 * min_chunk` to remain reachable.
///
/// See `parallel chunking.lyx` for the full proof and `find_stable_start` in
/// `merkle_hash_subtree.rs` for the analogous construction in merkle hashing.
pub fn next_stable_chunk_boundary(starting_position: usize, chunk_boundaries: &[usize]) -> Option<usize> {
let minimum_chunk = *TARGET_CHUNK_SIZE / *MINIMUM_CHUNK_DIVISOR;
let maximum_chunk = *TARGET_CHUNK_SIZE * *MAXIMUM_CHUNK_MULTIPLIER;
let start_idx = chunk_boundaries.partition_point(|&x| x < starting_position);
for i in start_idx..chunk_boundaries.len().saturating_sub(2) {
let size_a = chunk_boundaries[i + 1] - chunk_boundaries[i];
let size_b = chunk_boundaries[i + 2] - chunk_boundaries[i + 1];
if size_a >= 2 * minimum_chunk
&& size_a < maximum_chunk - minimum_chunk
&& size_b >= 2 * minimum_chunk
&& size_b < maximum_chunk - minimum_chunk
{
return Some(chunk_boundaries[i + 2]);
}
}
None
}

View File

@@ -13,15 +13,15 @@ use super::constants::{TARGET_CHUNK_SIZE, XORB_BLOCK_SIZE};
use super::xorb_chunk_format::{deserialize_chunk, deserialize_chunk_header, serialize_chunk, write_chunk_header};
use super::{CompressionScheme, RawXorbData, XorbChunkHeader};
use crate::error::{CoreError, Validate};
use crate::merklehash::{DataHash, MerkleHash};
use crate::merklehash::{ChunkHashList, DataHash, MerkleHash};
use crate::metadata_shard::chunk_verification::range_hash_from_chunks;
use crate::serialization_utils::*;
pub type XorbObjectIdent = [u8; 7];
pub(crate) const XORB_OBJECT_FORMAT_IDENT: XorbObjectIdent = [b'X', b'E', b'T', b'B', b'L', b'O', b'B'];
pub(crate) const XORB_OBJECT_FORMAT_IDENT: XorbObjectIdent = *b"XETBLOB";
pub(crate) const XORB_OBJECT_FORMAT_VERSION_V0: u8 = 0;
pub(crate) const XORB_OBJECT_FORMAT_IDENT_HASHES: XorbObjectIdent = [b'X', b'B', b'L', b'B', b'H', b'S', b'H'];
pub(crate) const XORB_OBJECT_FORMAT_IDENT_BOUNDARIES: XorbObjectIdent = [b'X', b'B', b'L', b'B', b'B', b'N', b'D'];
pub(crate) const XORB_OBJECT_FORMAT_IDENT_HASHES: XorbObjectIdent = *b"XBLBHSH";
pub(crate) const XORB_OBJECT_FORMAT_IDENT_BOUNDARIES: XorbObjectIdent = *b"XBLBBND";
pub(crate) const XORB_OBJECT_FORMAT_VERSION: u8 = 1;
pub(crate) const XORB_OBJECT_FORMAT_HASHES_VERSION: u8 = 0;
@@ -1240,6 +1240,21 @@ impl XorbObject {
Ok(incl_end - before_start)
}
/// Returns (chunk_hash, uncompressed_size) pairs for chunks in [start, end).
pub fn chunk_hash_sizes(&self, start: u32, end: u32) -> Result<ChunkHashList, CoreError> {
self.validate_xorb_object_info()?;
if end > self.info.num_chunks || start > end {
return Err(CoreError::InvalidArguments);
}
(start..end)
.map(|i| {
let hash = self.info.chunk_hashes[i as usize];
let size = self.uncompressed_chunk_length(i)? as u64;
Ok((hash, size))
})
.collect()
}
/// Helper method to verify that info object is complete
fn validate_xorb_object_info(&self) -> Result<(), CoreError> {
if self.info.num_chunks == 0 {

View File

@@ -359,52 +359,10 @@ pub fn find_partitions<R: Read + Seek>(
Ok(partitions)
}
/// Given a list of chunk boundaries in a file and an arbitrary reference position,
/// returns the next stable chunk boundary at or after that position.
///
/// `starting_position` may be any byte offset in the file; it does not need to
/// be an existing chunk boundary. The search starts at the first chunk boundary
/// `>= starting_position`.
///
/// A stable chunk boundary is defined such that any possible changes in the data
/// before `starting_position` would produce the same chunk boundaries at the
/// stable boundary and later. The fixed data between `starting_position` and
/// the returned stable boundary is always sufficient to restore the chunker to
/// its original chunk boundaries.
///
/// The stability condition requires two consecutive chunks after `starting_position`,
/// both with sizes in `[2 * min_chunk, max_chunk - min_chunk)`. The boundary
/// at the end of the second such chunk is the stable chunk boundary.
///
/// The lower bound is `2 * min_chunk` rather than `min_chunk` (as used in
/// [`find_partitions`]) because this function operates on existing chunk
/// boundaries without data access, and cannot verify the absence of hidden
/// hash triggers in the `[c_k, c_k + min_chunk)` skip zone. A shadow-zone
/// trigger can advance a modified chunker by up to `min_chunk`, so the next
/// chunk must be at least `2 * min_chunk` to remain reachable.
///
/// See `parallel chunking.lyx` for the full proof and `find_stable_start` in
/// `merkle_hash_subtree.rs` for the analogous construction in merkle hashing.
pub fn next_stable_chunk_boundary(starting_position: usize, chunk_boundaries: &[usize]) -> Option<usize> {
let minimum_chunk = *TARGET_CHUNK_SIZE / *MINIMUM_CHUNK_DIVISOR;
let maximum_chunk = *TARGET_CHUNK_SIZE * *MAXIMUM_CHUNK_MULTIPLIER;
let start_idx = chunk_boundaries.partition_point(|&x| x < starting_position);
for i in start_idx..chunk_boundaries.len().saturating_sub(2) {
let size_a = chunk_boundaries[i + 1] - chunk_boundaries[i];
let size_b = chunk_boundaries[i + 2] - chunk_boundaries[i + 1];
if size_a >= 2 * minimum_chunk
&& size_a < maximum_chunk - minimum_chunk
&& size_b >= 2 * minimum_chunk
&& size_b < maximum_chunk - minimum_chunk
{
return Some(chunk_boundaries[i + 2]);
}
}
None
}
// Re-exported from xet_core_structures where the canonical implementation lives,
// so that downstream users of xet_data::deduplication::next_stable_chunk_boundary
// continue to work without a source change.
pub use xet_core_structures::xorb_object::constants::next_stable_chunk_boundary;
#[cfg(test)]
mod tests {

View File

@@ -2,7 +2,7 @@ use std::result::Result;
use more_asserts::{debug_assert_le, debug_assert_lt};
use xet_core_structures::MerkleHashMap;
use xet_core_structures::merklehash::{MerkleHash, file_hash};
use xet_core_structures::merklehash::{ChunkHashList, MerkleHash, file_hash};
use xet_core_structures::metadata_shard::file_structs::{
FileDataSequenceEntry, FileDataSequenceHeader, FileMetadataExt, FileVerificationEntry, MDBFileInfo,
};
@@ -36,7 +36,7 @@ pub struct FileDeduper<DataInterfaceType: DeduplicationDataInterface> {
new_data_hash_lookup: MerkleHashMap<usize>,
/// The current chunk hashes for this file.
chunk_hashes: Vec<(MerkleHash, u64)>,
chunk_hashes: ChunkHashList,
/// The current file data entries.
file_info: Vec<FileDataSequenceEntry>,
@@ -399,8 +399,11 @@ impl<DataInterfaceType: DeduplicationDataInterface> FileDeduper<DataInterfaceTyp
/// and remaining data. Also returns the aggregated deduplication metrics and the list of xorb hashes that were
/// registered as part of this run.
///
/// Returns (file hash, data aggregation, deduplication metrics)
pub fn finalize(self, metadata_ext: Option<FileMetadataExt>) -> (MerkleHash, DataAggregator, DeduplicationMetrics) {
/// Returns (file hash, chunk_hashes, data aggregation, deduplication metrics)
pub fn finalize(
self,
metadata_ext: Option<FileMetadataExt>,
) -> (MerkleHash, ChunkHashList, DataAggregator, DeduplicationMetrics) {
let file_hash = file_hash(&self.chunk_hashes);
let metadata = FileDataSequenceHeader::new(file_hash, self.file_info.len(), true, metadata_ext.is_some());
@@ -434,6 +437,6 @@ impl<DataInterfaceType: DeduplicationDataInterface> FileDeduper<DataInterfaceTyp
let remaining_data = DataAggregator::new(self.new_data, fi, self.internally_referencing_entries, self.file_id);
(file_hash, remaining_data, self.deduplication_metrics)
(file_hash, self.chunk_hashes, remaining_data, self.deduplication_metrics)
}
}

View File

@@ -103,7 +103,7 @@ async fn clean(mut reader: impl Read, mut writer: impl Write, size: u64) -> Resu
debug_assert_eq!(size_read, size);
let (file_info, _) = handle.finish().await?;
let (file_info, _metrics) = handle.finish().await?;
translator.finalize().await?;

View File

@@ -47,7 +47,8 @@ pub async fn clean_bytes(
) -> Result<(XetFileInfo, DeduplicationMetrics)> {
let (_id, mut handle) = processor.start_clean(None, Some(bytes.len() as u64), sha256_policy)?;
handle.add_data(&bytes).await?;
handle.finish().await
let (info, metrics) = handle.finish().await?;
Ok((info, metrics))
}
#[instrument(skip_all, name = "clean_file", fields(file.name = tracing::field::Empty, file.len = tracing::field::Empty))]
@@ -76,7 +77,8 @@ pub async fn clean_file(
handle.add_data(&buffer[0..bytes]).await?;
}
handle.finish().await
let (info, metrics) = handle.finish().await?;
Ok((info, metrics))
}
/// Computes the xet hash for a single file without uploading.

View File

@@ -5,8 +5,9 @@ use std::sync::Arc;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use tracing::{Instrument, debug_span, info, instrument};
use xet_core_structures::merklehash::ChunkHashList;
use xet_core_structures::metadata_shard::Sha256;
use xet_core_structures::metadata_shard::file_structs::FileMetadataExt;
use xet_core_structures::metadata_shard::file_structs::{FileMetadataExt, MDBFileInfo};
use xet_runtime::core::XetContext;
use super::XetFileInfo;
@@ -201,15 +202,39 @@ impl SingleFileCleaner {
}
/// Return the representation of the file after clean as a pointer file instance.
#[instrument(skip_all, name = "FileCleaner::finish", fields(file_name=self.file_name.as_ref().map(|s|s.to_string())))]
pub async fn finish(mut self) -> Result<(XetFileInfo, DeduplicationMetrics)> {
// Chunk the rest of the data.
pub async fn finish(self) -> Result<(XetFileInfo, DeduplicationMetrics)> {
let (info, _chunks, metrics) = self.finish_with_chunks().await?;
Ok((info, metrics))
}
/// Same as [`finish`], but also returns the per-chunk hash list produced during CDC.
/// Only needed by composition flows (e.g. `upload_ranges`) that build partial
/// `MerkleHashSubtree` nodes for newly-uploaded windows; regular uploads should call
/// [`finish`] instead.
#[instrument(skip_all, name = "FileCleaner::finish_with_chunks", fields(file_name=self.file_name.as_ref().map(|s|s.to_string())))]
pub async fn finish_with_chunks(self) -> Result<(XetFileInfo, ChunkHashList, DeduplicationMetrics)> {
let (file_info, chunk_hashes, _, deduplication_metrics) = Self::finish_inner(self, true).await?;
Ok((file_info, chunk_hashes, deduplication_metrics))
}
/// Like `finish_with_chunks`, but does NOT register the file's MDBFileInfo in the
/// session shard. Returns the MDBFileInfo directly so the caller can compose it into a
/// larger file without creating orphan shard entries.
pub async fn finish_with_chunks_detached(
self,
) -> Result<(XetFileInfo, ChunkHashList, MDBFileInfo, DeduplicationMetrics)> {
Self::finish_inner(self, false).await
}
async fn finish_inner(
mut self,
register: bool,
) -> Result<(XetFileInfo, ChunkHashList, MDBFileInfo, DeduplicationMetrics)> {
if let Some(chunk) = self.chunker.finish() {
let data = Arc::new([chunk]);
self.deduper_process_chunks(data).await?;
}
// Resolve the SHA-256: computed, provided, or skipped.
let sha256 = if let Some(generator) = self.sha_generator.take() {
Some(generator.finalize().await?)
} else {
@@ -217,7 +242,7 @@ impl SingleFileCleaner {
};
let metadata_ext = sha256.map(FileMetadataExt::new);
let (file_hash, remaining_file_data, deduplication_metrics) =
let (file_hash, chunk_hashes, remaining_file_data, deduplication_metrics) =
self.dedup_manager_fut.await?.finalize(metadata_ext);
let file_info = XetFileInfo {
@@ -226,34 +251,33 @@ impl SingleFileCleaner {
sha256: sha256.map(|s| s.hex()),
};
// Let's check some things that should be invariants
#[cfg(debug_assertions)]
{
// There should be exactly one file referenced in the remaining file data.
debug_assert_eq!(remaining_file_data.pending_file_info.len(), 1);
// The size should be total bytes
debug_assert_eq!(remaining_file_data.pending_file_info[0].0.file_size(), deduplication_metrics.total_bytes)
}
// Now, return all this information to the
self.session
.register_single_file_clean_completion(remaining_file_data, &deduplication_metrics)
.await?;
// NB: xorb upload is happening in the background, this number is optimistic since it does
// not count transfer time of the uploaded xorbs, which is why `end_processing_ts`
let mdb_file_info = if register {
self.session
.register_single_file_clean_completion(remaining_file_data, &deduplication_metrics)
.await?;
MDBFileInfo::default()
} else {
self.session
.register_single_file_clean_completion_detached(remaining_file_data, &deduplication_metrics)
.await?
};
info!(
target: "client_telemetry",
action = "clean",
file_name = self.file_name.unwrap_or_default().to_string(),
file_name = self.file_name.as_deref().unwrap_or_default().to_string(),
file_size_count = deduplication_metrics.total_bytes,
new_bytes_count = deduplication_metrics.new_bytes,
start_ts = self.start_time.to_rfc3339(),
end_processing_ts = Utc::now().to_rfc3339(),
);
Ok((file_info, deduplication_metrics))
Ok((file_info, chunk_hashes, mdb_file_info, deduplication_metrics))
}
}

View File

@@ -467,36 +467,74 @@ impl FileUploadSession {
Ok(())
}
/// Like `register_single_file_clean_completion`, but does NOT register the MDBFileInfo
/// in the session shard. Returns the finalized MDBFileInfo instead.
/// Used by composition flows where only the final composed file should appear in the shard.
pub(crate) async fn register_single_file_clean_completion_detached(
self: &Arc<Self>,
file_data: DataAggregator,
dedup_metrics: &DeduplicationMetrics,
) -> Result<MDBFileInfo> {
// Always cut a dedicated xorb for detached files. This avoids mixing with other
// files in current_session_data whose MDBFileInfo must still be registered.
let file_infos = self.process_aggregated_data_as_xorb_detached(file_data).await?;
self.deduplication_metrics.lock().await.merge_in(dedup_metrics);
debug_assert_eq!(file_infos.len(), 1);
file_infos
.into_iter()
.next()
.ok_or_else(|| DataError::InternalError("detached completion produced no file info".into()))
}
/// Process the aggregated data, uploading the data as a xorb and registering the files
async fn process_aggregated_data_as_xorb(self: &Arc<Self>, data_agg: DataAggregator) -> Result<()> {
self.process_aggregated_data_as_xorb_impl(data_agg, true).await.map(|_| ())
}
/// Upload the xorb data but do NOT register file reconstruction info in the shard.
/// Returns the finalized MDBFileInfo for each file in the aggregator.
async fn process_aggregated_data_as_xorb_detached(
self: &Arc<Self>,
data_agg: DataAggregator,
) -> Result<Vec<MDBFileInfo>> {
self.process_aggregated_data_as_xorb_impl(data_agg, false).await
}
async fn process_aggregated_data_as_xorb_impl(
self: &Arc<Self>,
data_agg: DataAggregator,
register_files: bool,
) -> Result<Vec<MDBFileInfo>> {
let (xorb, new_files) = data_agg.finalize();
let xorb_hash = xorb.hash();
debug_assert_le!(xorb.num_bytes(), *MAX_XORB_BYTES);
debug_assert_le!(xorb.data.len(), *MAX_XORB_CHUNKS);
// Now, we need to scan all the file dependencies for dependencies on this xorb, as
// these would not have been registered yet as we just got the xorb hash.
let mut new_dependencies = Vec::with_capacity(new_files.len());
let mut file_infos = Vec::with_capacity(new_files.len());
{
for (file_id, fi, bytes_in_xorb) in new_files {
new_dependencies.push(FileXorbDependency {
file_id,
xorb_hash,
n_bytes: bytes_in_xorb,
is_external: false,
});
for (file_id, fi, bytes_in_xorb) in new_files {
new_dependencies.push(FileXorbDependency {
file_id,
xorb_hash,
n_bytes: bytes_in_xorb,
is_external: false,
});
// Record the reconstruction.
if register_files {
self.shard_interface.add_file_reconstruction_info(fi).await?;
} else {
file_infos.push(fi);
}
}
// Register the xorb and start the upload process.
self.register_new_xorb(xorb, &new_dependencies).await?;
Ok(())
Ok(file_infos)
}
/// Register a xorb dependencies that is given as part of the dedup process.
@@ -570,6 +608,13 @@ impl FileUploadSession {
Ok(())
}
/// Register a pre-composed file reconstruction plan (MDBFileInfo) with this session.
/// Used for append-aware writes where the caller builds the reconstruction plan
/// from existing segments + newly uploaded segments.
pub(crate) async fn register_composed_file(self: &Arc<Self>, file_info: MDBFileInfo) -> Result<()> {
self.shard_interface.add_file_reconstruction_info(file_info).await
}
fn check_not_finalized(&self) -> Result<()> {
if self.finalized.load(Ordering::Acquire) {
return Err(DataError::InvalidOperation("FileUploadSession already finalized".to_string()));
@@ -730,7 +775,7 @@ mod tests {
.start_clean(Some("test".into()), Some(data.len() as u64), Sha256Policy::Skip)
.unwrap();
cleaner.add_data(data).await.unwrap();
cleaner.finish().await.unwrap();
let _ = cleaner.finish().await.unwrap();
// Verify that the shard has no metadata_ext (no SHA-256).
let (_metrics, file_infos) = upload_session.finalize_with_file_info().await.unwrap();

View File

@@ -5,6 +5,7 @@ mod file_cleaner;
mod file_download_session;
mod file_upload_session;
pub mod migration_tool;
pub mod range_upload;
mod remote_client_interface;
mod sha256;
mod shard_interface;
@@ -14,9 +15,11 @@ mod xet_file;
pub use file_cleaner::{Sha256Policy, SingleFileCleaner};
pub use file_download_session::FileDownloadSession;
pub use file_upload_session::FileUploadSession;
pub use range_upload::{DirtyInput, upload_ranges};
pub use remote_client_interface::create_remote_client;
pub use xet_client::cas_client::Client as CasClient;
pub use xet_client::chunk_cache::{CacheConfig, ChunkCache, get_cache};
pub use xet_core_structures::merklehash::ChunkHashList;
pub use xet_file::XetFileInfo;
pub use crate::deduplication::RawXorbData;

File diff suppressed because it is too large Load Diff

View File

@@ -145,7 +145,7 @@ mod tests {
// Add all the data. Roughly the first half should dedup.
cleaner.add_data(&data).await.unwrap();
cleaner.finish().await.unwrap();
let _ = cleaner.finish().await.unwrap();
let report = file_upload_session.report();
assert!(report.total_bytes > 0);

View File

@@ -178,7 +178,7 @@ impl XetFileDownloadGroup {
pub fn abort(&self) -> Result<(), XetError> {
info!(group_id = %self.id(), "Download group abort");
self.task_runtime.cancel_subtree()?;
for (_tracking_id, handle) in self.inner.active_tasks.read()?.iter() {
for handle in self.inner.active_tasks.read()?.values() {
handle.cancel();
}
Ok(())

View File

@@ -430,8 +430,6 @@ impl XetUploadCommit {
.await
}
/// Queue raw bytes for upload, starting the transfer immediately.
///
/// Returns a [`XetFileUpload`] whose
/// [`finalize_ingestion`](XetFileUpload::finalize_ingestion) method yields
/// per-file [`XetFileMetadata`] once ingestion completes.

View File

@@ -138,7 +138,7 @@ impl SafeFileCreator {
Some(wr) => Ok(wr),
None => Err(io::Error::new(
io::ErrorKind::BrokenPipe,
format!("Writing to {:?} already completed.", &self.dest_path),
format!("Writing to {:?} already completed.", self.dest_path),
)),
}
}
@@ -163,7 +163,7 @@ impl Seek for SafeFileCreator {
impl Drop for SafeFileCreator {
fn drop(&mut self) {
if let Err(e) = self.close() {
eprintln!("Error: Failed to close writer for {:?}: {}", &self.dest_path, e);
eprintln!("Error: Failed to close writer for {:?}: {}", self.dest_path, e);
}
}
}