mirror of
https://github.com/huggingface/xet-core.git
synced 2026-06-04 13:30:29 +08:00
[codex] Deduplicate shard file infos (#834)
## Summary
- Deduplicate MDBMinimalShard file infos by file hash during sync and
async streaming parse.
- Keep only the first file info seen for a duplicate file hash; async
callbacks fire only for retained entries.
- Add a focused streaming-shard test covering parse, async callbacks,
and reserialization.
## Why
Duplicate file infos can survive the minimal streaming shard
parse/re-serialize path because it stores file entries as a Vec. This
narrows canonicalization to that streaming path while leaving in-memory
shard and set-operation behavior unchanged.
## Impact
- MDBMinimalShard::num_files() now reports unique file hashes for parsed
shards.
- Later duplicate file infos are ignored even if they contain richer
optional verification or metadata extension data.
- Raw full-section readers, MDBInMemoryShard behavior, and shard set
operations remain unchanged.
## Validation
- cargo test -p xet-core-structures metadata_shard
- cargo test -p xet-client test_global_dedup
- git diff --check
- rustfmt --edition 2024 --check
xet_core_structures/src/metadata_shard/set_operations.rs
xet_core_structures/src/metadata_shard/shard_in_memory.rs
xet_core_structures/src/metadata_shard/streaming_shard.rs
<!-- CURSOR_SUMMARY -->
---
> [!NOTE]
> **Medium Risk**
> Changes shard streaming parse semantics by dropping duplicate
`file_hash` entries, which can affect downstream counts/serialization
and may hide later entries’ richer metadata/verification.
>
> **Overview**
> `MDBMinimalShard` now **deduplicates file-info records by
`file_hash`** during both sync (`from_reader`) and async
(`from_reader_async_with_custom_callbacks`) streaming parses, keeping
only the *first* occurrence.
>
> Adds a focused test that constructs a shard stream with duplicate file
infos and asserts first-wins behavior, validates async parsing/callback
behavior, and confirms re-serialization only emits the retained entry.
>
> <sup>Reviewed by [Cursor Bugbot](https://cursor.com/bugbot) for commit
1320ce36ce. Bugbot is set up for automated
code reviews on this repo. Configure
[here](https://www.cursor.com/dashboard/bugbot).</sup>
<!-- /CURSOR_SUMMARY -->
This commit is contained in:
@@ -185,9 +185,10 @@ impl MDBMinimalShard {
|
||||
let _ = MDBShardFileHeader::deserialize(reader)?;
|
||||
|
||||
let mut file_info_views = Vec::<MDBFileInfoView>::new();
|
||||
let mut seen_file_hashes = HashSet::new();
|
||||
process_shard_file_info_section(reader, |fiv: MDBFileInfoView| {
|
||||
// register the offset here to the file entries
|
||||
if include_files {
|
||||
if include_files && seen_file_hashes.insert(fiv.file_hash()) {
|
||||
file_info_views.push(fiv);
|
||||
}
|
||||
Ok(())
|
||||
@@ -232,11 +233,14 @@ impl MDBMinimalShard {
|
||||
let _ = MDBShardFileHeader::deserialize(&mut Cursor::new(&buf))?;
|
||||
|
||||
let mut file_info_views = Vec::<MDBFileInfoView>::new();
|
||||
let mut seen_file_hashes = HashSet::new();
|
||||
process_shard_file_info_section_async(reader, |fiv: MDBFileInfoView| {
|
||||
// register the offset here to the file entries
|
||||
if include_files {
|
||||
file_callback(&fiv)?;
|
||||
file_info_views.push(fiv);
|
||||
if seen_file_hashes.insert(fiv.file_hash()) {
|
||||
file_info_views.push(fiv);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
@@ -507,17 +511,28 @@ mod tests {
|
||||
use rand::rngs::SmallRng;
|
||||
use rand::{RngExt, SeedableRng};
|
||||
|
||||
use super::super::MDBShardInfo;
|
||||
use super::super::file_structs::MDBFileInfo;
|
||||
use super::super::file_structs::{FileDataSequenceHeader, MDBFileInfo};
|
||||
use super::super::shard_file::test_routines::{
|
||||
convert_to_file, gen_random_shard, gen_random_shard_with_xorb_references,
|
||||
convert_to_file, gen_random_file_info, gen_random_shard, gen_random_shard_with_xorb_references,
|
||||
};
|
||||
use super::super::shard_in_memory::MDBInMemoryShard;
|
||||
use super::super::xorb_structs::MDBXorbInfo;
|
||||
use super::super::xorb_structs::{MDBXorbInfo, XorbChunkSequenceHeader};
|
||||
use super::super::{MDBShardFileHeader, MDBShardInfo};
|
||||
use super::MDBMinimalShard;
|
||||
use crate::error::Result;
|
||||
use crate::merklehash::MerkleHash;
|
||||
|
||||
fn file_info_stream(file_infos: &[MDBFileInfo]) -> Vec<u8> {
|
||||
let mut buffer = Vec::new();
|
||||
MDBShardFileHeader::default().serialize(&mut buffer).unwrap();
|
||||
for file_info in file_infos {
|
||||
file_info.serialize(&mut buffer).unwrap();
|
||||
}
|
||||
FileDataSequenceHeader::bookend().serialize(&mut buffer).unwrap();
|
||||
XorbChunkSequenceHeader::bookend().serialize(&mut buffer).unwrap();
|
||||
buffer
|
||||
}
|
||||
|
||||
fn verify_serialization(min_shard: &MDBMinimalShard, mem_shard: &MDBInMemoryShard) -> Result<()> {
|
||||
for verification in [true, false] {
|
||||
// compute size, with verification if possible only
|
||||
@@ -665,6 +680,43 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_minimal_shard_deduplicates_file_infos_first_wins() {
|
||||
let mut rng = rand::rngs::StdRng::seed_from_u64(7);
|
||||
let first = gen_random_file_info(&mut rng, &2, false, false);
|
||||
let mut duplicate = gen_random_file_info(&mut rng, &3, true, true);
|
||||
duplicate.metadata.file_hash = first.metadata.file_hash;
|
||||
|
||||
let buffer = file_info_stream(&[first.clone(), duplicate.clone()]);
|
||||
|
||||
let min_shard = MDBMinimalShard::from_reader(&mut Cursor::new(&buffer), true, true).unwrap();
|
||||
assert_eq!(min_shard.num_files(), 1);
|
||||
assert_eq!(MDBFileInfo::from(min_shard.file(0).unwrap()), first);
|
||||
|
||||
let mut callback_file_infos = Vec::new();
|
||||
let min_shard_async = MDBMinimalShard::from_reader_async_with_custom_callbacks(
|
||||
&mut &buffer[..],
|
||||
true,
|
||||
true,
|
||||
|f| {
|
||||
callback_file_infos.push(MDBFileInfo::from(f));
|
||||
Ok(())
|
||||
},
|
||||
|_| Ok(()),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(min_shard, min_shard_async);
|
||||
assert_eq!(callback_file_infos, vec![first.clone(), duplicate]);
|
||||
|
||||
let mut reserialized = Vec::new();
|
||||
min_shard.serialize(&mut reserialized, false).unwrap();
|
||||
let shard_info = MDBShardInfo::load_from_reader(&mut Cursor::new(&reserialized)).unwrap();
|
||||
let file_infos = shard_info.read_all_file_info_sections(&mut Cursor::new(&reserialized)).unwrap();
|
||||
assert_eq!(file_infos, vec![first]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_shards() -> Result<()> {
|
||||
let shard = gen_random_shard(0, &[], &[0], false, false)?;
|
||||
|
||||
Reference in New Issue
Block a user