mirror of
https://github.com/huggingface/xet-core.git
synced 2026-06-04 13:30:29 +08:00
download terms once and write where needed (#320)
Fix XET-493 Changes the parallel download process to download each fetch info term only once ever and write all terms fulfilled by it at once.
This commit is contained in:
38
Cargo.lock
generated
38
Cargo.lock
generated
@@ -488,6 +488,7 @@ dependencies = [
|
||||
"cas_types",
|
||||
"chunk_cache",
|
||||
"deduplication",
|
||||
"derivative",
|
||||
"error_printer",
|
||||
"file_utils",
|
||||
"futures",
|
||||
@@ -972,6 +973,17 @@ dependencies = [
|
||||
"powerfmt",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "derivative"
|
||||
version = "2.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fcc3dd5e9e9c0b295d6e1e4d811fb6f157d5ffd784b8d202fc62eac8035a770b"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 1.0.109",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "digest"
|
||||
version = "0.10.7"
|
||||
@@ -1640,9 +1652,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "hyper-util"
|
||||
version = "0.1.11"
|
||||
version = "0.1.12"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "497bbc33a26fdd4af9ed9c70d63f61cf56a938375fbb32df34db9b1cd6d643f2"
|
||||
checksum = "cf9f1e950e0d9d1d3c47184416723cf29c0d1f93bd8cccf37e4beb6b44f31710"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"futures-channel",
|
||||
@@ -1731,9 +1743,9 @@ checksum = "00210d6893afc98edb752b664b8890f0ef174c8adbb8d0be9710fa66fbbf72d3"
|
||||
|
||||
[[package]]
|
||||
name = "icu_properties"
|
||||
version = "2.0.0"
|
||||
version = "2.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2549ca8c7241c82f59c80ba2a6f415d931c5b58d24fb8412caa1a1f02c49139a"
|
||||
checksum = "016c619c1eeb94efb86809b015c58f479963de65bdb6253345c1a1276f22e32b"
|
||||
dependencies = [
|
||||
"displaydoc",
|
||||
"icu_collections",
|
||||
@@ -1747,9 +1759,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "icu_properties_data"
|
||||
version = "2.0.0"
|
||||
version = "2.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8197e866e47b68f8f7d95249e172903bec06004b18b2937f1095d40a0c57de04"
|
||||
checksum = "298459143998310acd25ffe6810ed544932242d3f07083eee1084d83a71bd632"
|
||||
|
||||
[[package]]
|
||||
name = "icu_provider"
|
||||
@@ -4177,15 +4189,15 @@ checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
|
||||
|
||||
[[package]]
|
||||
name = "windows-core"
|
||||
version = "0.61.1"
|
||||
version = "0.61.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "46ec44dc15085cea82cf9c78f85a9114c463a369786585ad2882d1ff0b0acf40"
|
||||
checksum = "c0fdd3ddb90610c7638aa2b3a3ab2904fb9e5cdbecc643ddb3647212781c4ae3"
|
||||
dependencies = [
|
||||
"windows-implement",
|
||||
"windows-interface",
|
||||
"windows-link",
|
||||
"windows-result",
|
||||
"windows-strings 0.4.1",
|
||||
"windows-strings 0.4.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -4229,9 +4241,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "windows-result"
|
||||
version = "0.3.3"
|
||||
version = "0.3.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4b895b5356fc36103d0f64dd1e94dfa7ac5633f1c9dd6e80fe9ec4adef69e09d"
|
||||
checksum = "56f42bd332cc6c8eac5af113fc0c1fd6a8fd2aa08a0119358686e5160d0586c6"
|
||||
dependencies = [
|
||||
"windows-link",
|
||||
]
|
||||
@@ -4247,9 +4259,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "windows-strings"
|
||||
version = "0.4.1"
|
||||
version = "0.4.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2a7ab927b2637c19b3dbe0965e75d8f2d30bdd697a1516191cad2ec4df8fb28a"
|
||||
checksum = "56e6c93f3a0c3b36176cb1327a4958a0353d5d166c2a35cb268ace15e91d3b57"
|
||||
dependencies = [
|
||||
"windows-link",
|
||||
]
|
||||
|
||||
@@ -46,6 +46,7 @@ countio = { version = "0.2", features = ["futures"] }
|
||||
crc32fast = "1.4"
|
||||
csv = "1"
|
||||
ctor = "0.4"
|
||||
derivative = "2.2.0"
|
||||
dirs = "5.0"
|
||||
futures = "0.3"
|
||||
futures-io = "0.3"
|
||||
@@ -58,7 +59,6 @@ http = "1"
|
||||
itertools = "0.14"
|
||||
jsonwebtoken = "9.3"
|
||||
lazy_static = "1.5"
|
||||
len-trait = "0.6"
|
||||
libc = "0.2"
|
||||
lz4_flex = "0.11"
|
||||
mockall = "0.13"
|
||||
|
||||
@@ -24,6 +24,7 @@ xet_threadpool = { path = "../xet_threadpool" }
|
||||
anyhow = { workspace = true }
|
||||
async-trait = { workspace = true }
|
||||
bytes = { workspace = true }
|
||||
derivative = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
heed = { workspace = true }
|
||||
http = { workspace = true }
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
use std::collections::HashMap;
|
||||
use std::fmt;
|
||||
use std::io::Write;
|
||||
use std::sync::{Arc, Mutex, RwLock};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use cas_object::error::CasObjectError;
|
||||
use cas_types::{CASReconstructionFetchInfo, CASReconstructionTerm, ChunkRange, FileRange, HexMerkleHash, Key};
|
||||
use chunk_cache::ChunkCache;
|
||||
use chunk_cache::{CacheRange, ChunkCache};
|
||||
use deduplication::constants::MAX_XORB_BYTES;
|
||||
use derivative::Derivative;
|
||||
use error_printer::ErrorPrinter;
|
||||
use futures::TryStreamExt;
|
||||
use http::header::RANGE;
|
||||
@@ -26,7 +27,7 @@ use crate::OutputProvider;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub(crate) enum DownloadRangeResult {
|
||||
Data(Vec<u8>, Vec<u32>),
|
||||
Data(TermDownloadOutput),
|
||||
// This is a workaround to propagate the underlying request error as
|
||||
// the underlying reqwest_middleware::Error doesn't impl Clone.
|
||||
// Otherwise, if two download tasks with the same key in the single flight
|
||||
@@ -40,7 +41,7 @@ pub(crate) type RangeDownloadSingleFlight = Arc<Group<DownloadRangeResult, CasCl
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct FetchInfo {
|
||||
file_hash: MerkleHash,
|
||||
file_range: FileRange,
|
||||
pub(crate) file_range: FileRange,
|
||||
endpoint: String,
|
||||
client: Arc<ClientWithMiddleware>, // only used for fetching file info
|
||||
inner: RwLock<HashMap<HexMerkleHash, Vec<CASReconstructionFetchInfo>>>,
|
||||
@@ -137,54 +138,109 @@ impl FetchInfo {
|
||||
|
||||
/// Helper object containing the structs needed when downloading a term in parallel
|
||||
/// during reconstruction.
|
||||
pub(crate) struct TermDownload {
|
||||
pub term: CASReconstructionTerm,
|
||||
pub skip_bytes: u64, // number of bytes to skip at the front
|
||||
pub take: u64, // number of bytes to take after skipping bytes,
|
||||
// effectively taking [skip_bytes..skip_bytes+take]
|
||||
// out of the downloaded range
|
||||
#[derive(Derivative)]
|
||||
#[derivative(Debug)]
|
||||
pub(crate) struct FetchTermDownload {
|
||||
pub hash: MerkleHash,
|
||||
pub range: ChunkRange,
|
||||
#[derivative(Debug = "ignore")]
|
||||
pub fetch_info: Arc<FetchInfo>, // utility to get URL to download this term
|
||||
#[derivative(Debug = "ignore")]
|
||||
pub chunk_cache: Option<Arc<dyn ChunkCache>>,
|
||||
#[derivative(Debug = "ignore")]
|
||||
pub client: Arc<ClientWithMiddleware>, // only used for downloading range
|
||||
#[derivative(Debug = "ignore")]
|
||||
pub range_download_single_flight: RangeDownloadSingleFlight,
|
||||
}
|
||||
|
||||
impl fmt::Debug for TermDownload {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
// print out everything except the chunk cache
|
||||
f.debug_struct("TermDownload")
|
||||
.field("term", &self.term)
|
||||
.field("skip_bytes", &self.skip_bytes)
|
||||
.field("take", &self.take)
|
||||
.field("fetch_info", &self.fetch_info)
|
||||
.field("client", &self.client)
|
||||
.field("range_download_single_flight", &self.range_download_single_flight)
|
||||
.finish()
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct SequentialTermDownload {
|
||||
pub term: CASReconstructionTerm,
|
||||
pub download: FetchTermDownload,
|
||||
pub skip_bytes: u64, // number of bytes to skip at the front
|
||||
pub take: u64, /* number of bytes to take after skipping bytes,
|
||||
* effectively taking [skip_bytes..skip_bytes+take]
|
||||
* out of the downloaded range */
|
||||
}
|
||||
|
||||
impl SequentialTermDownload {
|
||||
pub async fn run(self) -> Result<TermDownloadResult<Vec<u8>>> {
|
||||
let TermDownloadResult {
|
||||
payload:
|
||||
TermDownloadOutput {
|
||||
data,
|
||||
chunk_byte_indices,
|
||||
chunk_range,
|
||||
},
|
||||
duration,
|
||||
n_retries_on_403,
|
||||
} = self.download.run().await?;
|
||||
|
||||
// if the requested range is smaller than the fetched range, trim it down to the right data
|
||||
// the requested range cannot be larger than the fetched range.
|
||||
// "else" case data matches exact, save some work, return whole data.
|
||||
let start_idx = (self.term.range.start - chunk_range.start) as usize;
|
||||
let end_idx = (self.term.range.end - chunk_range.start) as usize;
|
||||
|
||||
let start_byte_index = chunk_byte_indices[start_idx] as usize;
|
||||
let end_byte_index = chunk_byte_indices[end_idx] as usize;
|
||||
debug_assert!(start_byte_index < data.len());
|
||||
debug_assert!(end_byte_index <= data.len());
|
||||
debug_assert!(start_byte_index < end_byte_index);
|
||||
let data_slice = &data[start_byte_index..end_byte_index];
|
||||
|
||||
// extract just the actual range data out of the term download output
|
||||
let start = self.skip_bytes as usize;
|
||||
let end = start + self.take as usize;
|
||||
let final_term_data = &data_slice[start..end];
|
||||
|
||||
Ok(TermDownloadResult {
|
||||
payload: final_term_data.to_vec(),
|
||||
duration,
|
||||
n_retries_on_403,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct TermDownloadResult<T> {
|
||||
pub data: T, // download result
|
||||
pub payload: T, // download result
|
||||
pub duration: Duration, // duration to download
|
||||
pub n_retries_on_403: u32,
|
||||
}
|
||||
|
||||
impl TermDownload {
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct TermDownloadOutput {
|
||||
pub data: Vec<u8>,
|
||||
pub chunk_byte_indices: Vec<u32>,
|
||||
pub chunk_range: ChunkRange,
|
||||
}
|
||||
|
||||
impl From<CacheRange> for TermDownloadOutput {
|
||||
fn from(CacheRange { data, offsets, range }: CacheRange) -> Self {
|
||||
Self {
|
||||
data,
|
||||
chunk_byte_indices: offsets,
|
||||
chunk_range: range,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl FetchTermDownload {
|
||||
// Download and return results, retry on 403
|
||||
pub async fn run(self) -> Result<TermDownloadResult<Vec<u8>>> {
|
||||
pub async fn run(self) -> Result<TermDownloadResult<TermDownloadOutput>> {
|
||||
let instant = Instant::now();
|
||||
let mut n_retries_on_403 = 0;
|
||||
|
||||
let key = (self.term.hash, self.term.range);
|
||||
let mut data = loop {
|
||||
let key = (self.hash.into(), self.range);
|
||||
let data = loop {
|
||||
let (fetch_info, v) = self.fetch_info.find(key).await?;
|
||||
|
||||
let range_data = get_one_term(
|
||||
let range_data = get_one_fetch_term_data(
|
||||
self.hash,
|
||||
fetch_info,
|
||||
self.client.clone(),
|
||||
self.chunk_cache.clone(),
|
||||
self.term.clone(),
|
||||
fetch_info,
|
||||
self.range_download_single_flight.clone(),
|
||||
)
|
||||
.await;
|
||||
@@ -198,43 +254,79 @@ impl TermDownload {
|
||||
break range_data?;
|
||||
};
|
||||
|
||||
let skip_bytes = self.skip_bytes.try_into().log_error("incorrect offset into range")?;
|
||||
let take = self.take.try_into().log_error("incorrect take bytes")?;
|
||||
if skip_bytes > 0 {
|
||||
data = data[skip_bytes..skip_bytes + take].to_vec()
|
||||
} else {
|
||||
data.truncate(take);
|
||||
}
|
||||
|
||||
Ok(TermDownloadResult {
|
||||
data,
|
||||
payload: data,
|
||||
duration: instant.elapsed(),
|
||||
n_retries_on_403,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct ChunkRangeWrite {
|
||||
pub chunk_range: ChunkRange,
|
||||
pub unpacked_length: u32,
|
||||
pub skip_bytes: u64, // number of bytes to skip at the front
|
||||
pub take: u64, // number of bytes to take after skipping bytes,
|
||||
pub writer_offset: u64,
|
||||
}
|
||||
|
||||
/// Helper object containing the structs needed when downloading and writing a term in parallel
|
||||
/// during reconstruction.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct TermDownloadAndWrite {
|
||||
pub download: TermDownload,
|
||||
pub write_offset: u64, // start position of the writer to write to
|
||||
pub(crate) struct FetchTermDownloadOnceAndWriteEverywhereUsed {
|
||||
pub download: FetchTermDownload,
|
||||
// pub write_offset: u64, // start position of the writer to write to
|
||||
pub output: OutputProvider,
|
||||
pub writes: Vec<ChunkRangeWrite>,
|
||||
}
|
||||
|
||||
impl TermDownloadAndWrite {
|
||||
impl FetchTermDownloadOnceAndWriteEverywhereUsed {
|
||||
/// Download the term and write it to the underlying storage, retry on 403
|
||||
pub async fn run(self) -> Result<TermDownloadResult<usize>> {
|
||||
pub async fn run(self) -> Result<TermDownloadResult<u64>> {
|
||||
let download_result = self.download.run().await?;
|
||||
let TermDownloadOutput {
|
||||
data,
|
||||
chunk_byte_indices,
|
||||
chunk_range,
|
||||
} = download_result.payload;
|
||||
debug_assert_eq!(chunk_byte_indices.len(), (chunk_range.end - chunk_range.start + 1) as usize);
|
||||
debug_assert_eq!(*chunk_byte_indices.last().expect("checked len is something") as usize, data.len());
|
||||
|
||||
// write out the term
|
||||
let mut writer = self.output.get_writer_at(self.write_offset)?;
|
||||
writer.write_all(&download_result.data)?;
|
||||
writer.flush()?;
|
||||
// write out the data
|
||||
let mut total_written = 0;
|
||||
for write in self.writes {
|
||||
debug_assert!(write.chunk_range.start >= chunk_range.start);
|
||||
debug_assert!(write.chunk_range.end > chunk_range.start);
|
||||
debug_assert!(
|
||||
write.chunk_range.start < chunk_range.end,
|
||||
"{} < {} ;;; write {:?} term {:?}",
|
||||
write.chunk_range.start,
|
||||
chunk_range.end,
|
||||
write.chunk_range,
|
||||
chunk_range
|
||||
);
|
||||
debug_assert!(write.chunk_range.end <= chunk_range.end);
|
||||
|
||||
let start_chunk_offset_index = write.chunk_range.start - chunk_range.start;
|
||||
let end_chunk_offset_index = write.chunk_range.end - chunk_range.start;
|
||||
let start_chunk_offset = chunk_byte_indices[start_chunk_offset_index as usize] as usize;
|
||||
let end_chunk_offset = chunk_byte_indices[end_chunk_offset_index as usize] as usize;
|
||||
let data_sub_range = &data[start_chunk_offset..end_chunk_offset];
|
||||
debug_assert_eq!(data_sub_range.len(), write.unpacked_length as usize);
|
||||
|
||||
debug_assert!(data_sub_range.len() as u64 >= write.skip_bytes + write.take);
|
||||
let data_sub_range_sliced =
|
||||
&data_sub_range[(write.skip_bytes as usize)..((write.skip_bytes + write.take) as usize)];
|
||||
|
||||
let mut writer = self.output.get_writer_at(write.writer_offset)?;
|
||||
writer.write_all(data_sub_range_sliced)?;
|
||||
writer.flush()?;
|
||||
total_written += write.take;
|
||||
}
|
||||
|
||||
Ok(TermDownloadResult {
|
||||
data: download_result.data.len(),
|
||||
payload: total_written,
|
||||
duration: download_result.duration,
|
||||
n_retries_on_403: download_result.n_retries_on_403,
|
||||
})
|
||||
@@ -243,7 +335,7 @@ impl TermDownloadAndWrite {
|
||||
|
||||
pub(crate) enum DownloadQueueItem<T> {
|
||||
End,
|
||||
Term(T),
|
||||
DownloadTask(T),
|
||||
Metadata(FetchInfo),
|
||||
}
|
||||
|
||||
@@ -300,16 +392,15 @@ impl DownloadScheduler {
|
||||
///
|
||||
/// If the fetch_info section (provided as in the QueryReconstructionResponse) fails to contain a term
|
||||
/// that matches our requested CASReconstructionTerm, it is considered a bad output from the CAS API.
|
||||
pub(crate) async fn get_one_term(
|
||||
pub(crate) async fn get_one_fetch_term_data(
|
||||
hash: MerkleHash,
|
||||
fetch_term: CASReconstructionFetchInfo,
|
||||
http_client: Arc<ClientWithMiddleware>,
|
||||
chunk_cache: Option<Arc<dyn ChunkCache>>,
|
||||
term: CASReconstructionTerm,
|
||||
fetch_term: CASReconstructionFetchInfo,
|
||||
range_download_single_flight: RangeDownloadSingleFlight,
|
||||
) -> Result<Vec<u8>> {
|
||||
debug!("term: {term:?}");
|
||||
|
||||
if term.range.end < term.range.start {
|
||||
) -> Result<TermDownloadOutput> {
|
||||
debug!("getting {hash} {fetch_term:?}");
|
||||
if fetch_term.range.end < fetch_term.range.start {
|
||||
return Err(CasClientError::InvalidRange);
|
||||
}
|
||||
|
||||
@@ -317,20 +408,20 @@ pub(crate) async fn get_one_term(
|
||||
if let Some(cache) = &chunk_cache {
|
||||
let key = Key {
|
||||
prefix: PREFIX_DEFAULT.to_string(),
|
||||
hash: term.hash.into(),
|
||||
hash,
|
||||
};
|
||||
if let Ok(Some(cached)) = cache.get(&key, &term.range).await.log_error("cache error") {
|
||||
return Ok(cached.data.to_vec());
|
||||
if let Ok(Some(cached)) = cache.get(&key, &fetch_term.range).await.log_error("cache error") {
|
||||
return Ok(cached.into());
|
||||
}
|
||||
}
|
||||
|
||||
// fetch the range from blob store and deserialize the chunks
|
||||
// then put into the cache if used
|
||||
let download_range_result = range_download_single_flight
|
||||
.work_dump_caller_info(&fetch_term.url, download_range(http_client, fetch_term.clone(), term.hash))
|
||||
.work_dump_caller_info(&fetch_term.url, download_fetch_term_data(hash.into(), fetch_term.clone(), http_client))
|
||||
.await?;
|
||||
|
||||
let DownloadRangeResult::Data(mut data, chunk_byte_indices) = download_range_result else {
|
||||
let DownloadRangeResult::Data(term_download_output) = download_range_result else {
|
||||
return Err(CasClientError::PresignedUrlExpirationError);
|
||||
};
|
||||
|
||||
@@ -338,39 +429,17 @@ pub(crate) async fn get_one_term(
|
||||
if let Some(cache) = chunk_cache {
|
||||
let key = Key {
|
||||
prefix: PREFIX_DEFAULT.to_string(),
|
||||
hash: term.hash.into(),
|
||||
hash,
|
||||
};
|
||||
if let Err(e) = cache.put(&key, &fetch_term.range, &chunk_byte_indices, &data).await {
|
||||
if let Err(e) = cache
|
||||
.put(&key, &fetch_term.range, &term_download_output.chunk_byte_indices, &term_download_output.data)
|
||||
.await
|
||||
{
|
||||
info!("Writing to local cache failed, continuing. Error: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
// if the requested range is smaller than the fetched range, trim it down to the right data
|
||||
// the requested range cannot be larger than the fetched range.
|
||||
// "else" case data matches exact, save some work, return whole data.
|
||||
if term.range != fetch_term.range {
|
||||
let start_idx = term.range.start - fetch_term.range.start;
|
||||
let end_idx = term.range.end - fetch_term.range.start;
|
||||
let start_byte_index = chunk_byte_indices[start_idx as usize] as usize;
|
||||
let end_byte_index = chunk_byte_indices[end_idx as usize] as usize;
|
||||
debug_assert!(start_byte_index < data.len());
|
||||
debug_assert!(end_byte_index <= data.len());
|
||||
debug_assert!(start_byte_index < end_byte_index);
|
||||
// [0, len] -> [0, end_byte_index)
|
||||
data.truncate(end_byte_index);
|
||||
// [0, end_byte_index) -> [start_byte_index, end_byte_index)
|
||||
data = data.split_off(start_byte_index);
|
||||
}
|
||||
|
||||
if data.len() != term.unpacked_length as usize {
|
||||
return Err(CasClientError::Other(format!(
|
||||
"result term data length {} did not match expected value {}",
|
||||
data.len(),
|
||||
term.unpacked_length
|
||||
)));
|
||||
}
|
||||
|
||||
Ok(data)
|
||||
Ok(term_download_output)
|
||||
}
|
||||
|
||||
struct ChunkRangeDeserializeFromBytesStreamRetryCondition;
|
||||
@@ -398,10 +467,10 @@ impl tokio_retry::Condition<CasClientError> for ChunkRangeDeserializeFromBytesSt
|
||||
/// use the provided http_client to make requests to S3/blob store using the url and url_range
|
||||
/// parts of a CASReconstructionFetchInfo. The url_range part is used directly in a http Range header
|
||||
/// value (see fn `range_header`).
|
||||
async fn download_range(
|
||||
http_client: Arc<ClientWithMiddleware>,
|
||||
fetch_term: CASReconstructionFetchInfo,
|
||||
async fn download_fetch_term_data(
|
||||
hash: HexMerkleHash,
|
||||
fetch_term: CASReconstructionFetchInfo,
|
||||
http_client: Arc<ClientWithMiddleware>,
|
||||
) -> Result<DownloadRangeResult> {
|
||||
trace!("{hash},{},{}", fetch_term.range.start, fetch_term.range.end);
|
||||
|
||||
@@ -445,7 +514,11 @@ async fn download_range(
|
||||
response.bytes_stream().map_err(std::io::Error::other),
|
||||
)
|
||||
.await?;
|
||||
Ok(DownloadRangeResult::Data(data, chunk_byte_indices))
|
||||
Ok(DownloadRangeResult::Data(TermDownloadOutput {
|
||||
data,
|
||||
chunk_byte_indices,
|
||||
chunk_range: fetch_term.range,
|
||||
}))
|
||||
},
|
||||
ChunkRangeDeserializeFromBytesStreamRetryCondition,
|
||||
)
|
||||
@@ -637,14 +710,18 @@ mod tests {
|
||||
|
||||
let (offset_info_first_range, terms) = fetch_info.query().await?.unwrap();
|
||||
|
||||
let download_task = TermDownload {
|
||||
let download_task = SequentialTermDownload {
|
||||
download: FetchTermDownload {
|
||||
hash: xorb1.into(),
|
||||
range: x1range[0].range,
|
||||
fetch_info: Arc::new(fetch_info),
|
||||
chunk_cache: None,
|
||||
client: Arc::new(build_http_client(RetryConfig::default(), "")?),
|
||||
range_download_single_flight: Arc::new(Group::new()),
|
||||
},
|
||||
term: terms[0].clone(),
|
||||
skip_bytes: offset_info_first_range,
|
||||
take: file_range.length(),
|
||||
fetch_info: Arc::new(fetch_info),
|
||||
chunk_cache: None,
|
||||
client: Arc::new(build_http_client(RetryConfig::default(), "")?),
|
||||
range_download_single_flight: Arc::new(Group::new()),
|
||||
};
|
||||
|
||||
let handle = tokio::spawn(async move { download_task.run().await });
|
||||
|
||||
@@ -24,9 +24,9 @@ pub(crate) const BASE_RETRY_DELAY_MS: u64 = 3000; // 3s
|
||||
pub(crate) const BASE_RETRY_MAX_DURATION_MS: u64 = 6 * 60 * 1000; // 6m
|
||||
|
||||
/// A strategy that doesn't retry on 429, and defaults to `DefaultRetryableStrategy` otherwise.
|
||||
pub struct No429RetryStratey;
|
||||
pub struct No429RetryStrategy;
|
||||
|
||||
impl RetryableStrategy for No429RetryStratey {
|
||||
impl RetryableStrategy for No429RetryStrategy {
|
||||
fn handle(&self, res: &Result<reqwest::Response, reqwest_middleware::Error>) -> Option<Retryable> {
|
||||
if let Ok(success) = res {
|
||||
if success.status() == StatusCode::TOO_MANY_REQUESTS {
|
||||
@@ -65,13 +65,13 @@ impl Default for RetryConfig<DefaultRetryableStrategy> {
|
||||
}
|
||||
}
|
||||
|
||||
impl RetryConfig<No429RetryStratey> {
|
||||
impl RetryConfig<No429RetryStrategy> {
|
||||
pub fn no429retry() -> Self {
|
||||
Self {
|
||||
num_retries: NUM_RETRIES,
|
||||
min_retry_interval_ms: BASE_RETRY_DELAY_MS,
|
||||
max_retry_interval_ms: BASE_RETRY_MAX_DURATION_MS,
|
||||
strategy: No429RetryStratey,
|
||||
strategy: No429RetryStrategy,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -90,7 +90,7 @@ pub fn build_auth_http_client<R: RetryableStrategy + Send + Sync + 'static>(
|
||||
let reqwest_client = reqwest::Client::builder().build()?;
|
||||
Ok(ClientBuilder::new(reqwest_client)
|
||||
.maybe_with(auth_middleware)
|
||||
.maybe_with(Some(retry_middleware))
|
||||
.with(retry_middleware)
|
||||
.maybe_with(logging_middleware)
|
||||
.maybe_with(session_middleware)
|
||||
.build())
|
||||
@@ -123,7 +123,7 @@ pub fn build_http_client<R: RetryableStrategy + Send + Sync + 'static>(
|
||||
let session_middleware = (!session_id.is_empty()).then(|| SessionMiddleware(session_id.to_owned()));
|
||||
let reqwest_client = reqwest::Client::builder().build()?;
|
||||
Ok(ClientBuilder::new(reqwest_client)
|
||||
.maybe_with(Some(retry_middleware))
|
||||
.with(retry_middleware)
|
||||
.maybe_with(logging_middleware)
|
||||
.maybe_with(session_middleware)
|
||||
.build())
|
||||
@@ -350,7 +350,7 @@ mod tests {
|
||||
num_retries: 1,
|
||||
min_retry_interval_ms: 0,
|
||||
max_retry_interval_ms: 3000,
|
||||
strategy: No429RetryStratey,
|
||||
strategy: No429RetryStrategy,
|
||||
};
|
||||
let client = build_auth_http_client(&None, retry_config, "").unwrap();
|
||||
|
||||
@@ -404,7 +404,7 @@ mod tests {
|
||||
num_retries: 2,
|
||||
min_retry_interval_ms: 0,
|
||||
max_retry_interval_ms: 3000,
|
||||
strategy: No429RetryStratey,
|
||||
strategy: No429RetryStrategy,
|
||||
};
|
||||
let client = build_auth_http_client(&None, retry_config, "").unwrap();
|
||||
|
||||
@@ -461,7 +461,7 @@ mod tests {
|
||||
num_retries: 2,
|
||||
min_retry_interval_ms: 1000,
|
||||
max_retry_interval_ms: 6000,
|
||||
strategy: No429RetryStratey,
|
||||
strategy: No429RetryStrategy,
|
||||
};
|
||||
let client = build_auth_http_client(&None, retry_config, "").unwrap();
|
||||
|
||||
@@ -490,7 +490,7 @@ mod tests {
|
||||
num_retries: 10,
|
||||
min_retry_interval_ms: 1000,
|
||||
max_retry_interval_ms: 6000,
|
||||
strategy: No429RetryStratey,
|
||||
strategy: No429RetryStrategy,
|
||||
};
|
||||
let client = build_auth_http_client(&None, retry_config, "").unwrap();
|
||||
|
||||
|
||||
@@ -1,15 +1,15 @@
|
||||
use std::collections::HashMap;
|
||||
use std::io::Write;
|
||||
use std::mem::take;
|
||||
use std::path::PathBuf;
|
||||
use std::result::Result as stdResult;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::anyhow;
|
||||
use async_trait::async_trait;
|
||||
use cas_object::SerializedCasObject;
|
||||
use cas_types::{
|
||||
BatchQueryReconstructionResponse, FileRange, HttpRange, Key, QueryReconstructionResponse, UploadShardResponse,
|
||||
UploadShardResponseType, UploadXorbResponse,
|
||||
BatchQueryReconstructionResponse, CASReconstructionTerm, ChunkRange, FileRange, HttpRange, Key,
|
||||
QueryReconstructionResponse, UploadShardResponse, UploadShardResponseType, UploadXorbResponse,
|
||||
};
|
||||
use chunk_cache::{CacheConfig, ChunkCache};
|
||||
use error_printer::ErrorPrinter;
|
||||
@@ -25,7 +25,7 @@ use progress_tracking::upload_tracking::CompletionTracker;
|
||||
use reqwest::{Body, StatusCode, Url};
|
||||
use reqwest_middleware::ClientWithMiddleware;
|
||||
use tokio::sync::{mpsc, OwnedSemaphorePermit};
|
||||
use tokio::task::{JoinError, JoinHandle, JoinSet};
|
||||
use tokio::task::{JoinHandle, JoinSet};
|
||||
use tracing::{debug, info, instrument};
|
||||
use utils::auth::AuthConfig;
|
||||
use utils::singleflight::Group;
|
||||
@@ -125,8 +125,9 @@ impl RemoteClient {
|
||||
|
||||
#[async_trait]
|
||||
impl UploadClient for RemoteClient {
|
||||
#[instrument(skip_all, name="RemoteClient::upload_xorb", fields(key = Key{prefix : prefix.to_string(), hash : serialized_cas_object.hash}.to_string(),
|
||||
xorb.len = serialized_cas_object.serialized_data.len(), xorb.num_chunks = serialized_cas_object.num_chunks))]
|
||||
#[instrument(skip_all, name = "RemoteClient::upload_xorb", fields(key = Key{prefix : prefix.to_string(), hash : serialized_cas_object.hash}.to_string(),
|
||||
xorb.len = serialized_cas_object.serialized_data.len(), xorb.num_chunks = serialized_cas_object.num_chunks
|
||||
))]
|
||||
async fn upload_xorb(
|
||||
&self,
|
||||
prefix: &str,
|
||||
@@ -339,7 +340,8 @@ impl RemoteClient {
|
||||
// at the beginning of the download, but queried in segments. Range downloads are executed with
|
||||
// a certain degree of parallelism, but writing out to storage is sequential. Ideal when the external
|
||||
// storage uses HDDs.
|
||||
#[instrument(skip_all, name="RemoteClient::reconstruct_file_segmented", fields(file.hash = file_hash.hex()))]
|
||||
#[instrument(skip_all, name = "RemoteClient::reconstruct_file_segmented", fields(file.hash = file_hash.hex()
|
||||
))]
|
||||
async fn reconstruct_file_to_writer_segmented(
|
||||
&self,
|
||||
file_hash: &MerkleHash,
|
||||
@@ -348,7 +350,7 @@ impl RemoteClient {
|
||||
progress_updater: Option<Arc<SingleItemProgressUpdater>>,
|
||||
) -> Result<u64> {
|
||||
// queue size is inherently bounded by degree of concurrency.
|
||||
let (task_tx, mut task_rx) = mpsc::unbounded_channel::<DownloadQueueItem<TermDownload>>();
|
||||
let (task_tx, mut task_rx) = mpsc::unbounded_channel::<DownloadQueueItem<SequentialTermDownload>>();
|
||||
let (running_downloads_tx, mut running_downloads_rx) =
|
||||
mpsc::unbounded_channel::<JoinHandle<Result<(TermDownloadResult<Vec<u8>>, OwnedSemaphorePermit)>>>();
|
||||
|
||||
@@ -387,7 +389,7 @@ impl RemoteClient {
|
||||
drop(running_downloads_tx);
|
||||
break;
|
||||
},
|
||||
DownloadQueueItem::Term(term_download) => {
|
||||
DownloadQueueItem::DownloadTask(term_download) => {
|
||||
// acquire the permit before spawning the task, so that there's limited
|
||||
// number of active downloads.
|
||||
let permit = download_scheduler_clone.download_permit().await?;
|
||||
@@ -420,21 +422,26 @@ impl RemoteClient {
|
||||
let take = remaining_total_len
|
||||
.min(remaining_segment_len)
|
||||
.min(term.unpacked_length as u64 - skip_bytes);
|
||||
let (individual_fetch_info, _) = segment.find((term.hash, term.range)).await?;
|
||||
|
||||
let download_task = TermDownload {
|
||||
let download_task = SequentialTermDownload {
|
||||
download: FetchTermDownload {
|
||||
hash: term.hash.into(),
|
||||
range: individual_fetch_info.range,
|
||||
fetch_info: segment.clone(),
|
||||
chunk_cache: chunk_cache.clone(),
|
||||
client: term_download_client.clone(),
|
||||
range_download_single_flight: range_download_single_flight.clone(),
|
||||
},
|
||||
term,
|
||||
skip_bytes,
|
||||
take,
|
||||
fetch_info: segment.clone(),
|
||||
chunk_cache: chunk_cache.clone(),
|
||||
client: term_download_client.clone(),
|
||||
range_download_single_flight: range_download_single_flight.clone(),
|
||||
};
|
||||
|
||||
remaining_total_len -= take;
|
||||
remaining_segment_len -= take;
|
||||
debug!("enqueueing {download_task:?}");
|
||||
task_tx.send(DownloadQueueItem::Term(download_task))?;
|
||||
task_tx.send(DownloadQueueItem::DownloadTask(download_task))?;
|
||||
}
|
||||
|
||||
// enqueue the remainder of file info fetch task
|
||||
@@ -455,7 +462,7 @@ impl RemoteClient {
|
||||
while let Some(result) = running_downloads_rx.recv().await {
|
||||
match result.await {
|
||||
Ok(Ok((mut download_result, permit))) => {
|
||||
let data = take(&mut download_result.data);
|
||||
let data = take(&mut download_result.payload);
|
||||
writer.write_all(&data)?;
|
||||
// drop permit after data written out so they don't accumulate in memory unbounded
|
||||
drop(permit);
|
||||
@@ -484,7 +491,8 @@ impl RemoteClient {
|
||||
// at the beginning of the download, but queried in segments. Range downloads are executed with
|
||||
// a certain degree of parallelism, and so does writing out to storage. Ideal when the external
|
||||
// storage is fast at seeks, e.g. RAM or SSDs.
|
||||
#[instrument(skip_all, name="RemoteClient::reconstruct_file_segmented_parallel", fields(file.hash = file_hash.hex()))]
|
||||
#[instrument(skip_all, name = "RemoteClient::reconstruct_file_segmented_parallel", fields(file.hash = file_hash.hex()
|
||||
))]
|
||||
async fn reconstruct_file_to_writer_segmented_parallel_write(
|
||||
&self,
|
||||
file_hash: &MerkleHash,
|
||||
@@ -493,14 +501,15 @@ impl RemoteClient {
|
||||
progress_updater: Option<Arc<SingleItemProgressUpdater>>,
|
||||
) -> Result<u64> {
|
||||
// queue size is inherently bounded by degree of concurrency.
|
||||
let (task_tx, mut task_rx) = mpsc::unbounded_channel::<DownloadQueueItem<TermDownloadAndWrite>>();
|
||||
let mut running_downloads = JoinSet::<Result<TermDownloadResult<usize>>>::new();
|
||||
let (task_tx, mut task_rx) =
|
||||
mpsc::unbounded_channel::<DownloadQueueItem<FetchTermDownloadOnceAndWriteEverywhereUsed>>();
|
||||
let mut running_downloads = JoinSet::<Result<TermDownloadResult<u64>>>::new();
|
||||
|
||||
// derive the actual range to reconstruct
|
||||
let file_reconstruct_range = byte_range.unwrap_or_else(FileRange::full);
|
||||
let total_len = file_reconstruct_range.length();
|
||||
let base_write_negative_offset = file_reconstruct_range.start;
|
||||
|
||||
// kick start the download by enqueue the fetch info task.
|
||||
// kick-start the download by enqueue the fetch info task.
|
||||
task_tx.send(DownloadQueueItem::Metadata(FetchInfo::new(
|
||||
*file_hash,
|
||||
file_reconstruct_range,
|
||||
@@ -518,31 +527,23 @@ impl RemoteClient {
|
||||
let term_download_client = self.http_client.clone();
|
||||
let download_scheduler = DownloadScheduler::new(*NUM_CONCURRENT_RANGE_GETS);
|
||||
|
||||
let process_result =
|
||||
move |result: stdResult<stdResult<TermDownloadResult<usize>, CasClientError>, JoinError>,
|
||||
total_written: &mut u64,
|
||||
download_scheduler: &DownloadScheduler|
|
||||
-> Result<u64> {
|
||||
match result {
|
||||
Ok(Ok(download_result)) => {
|
||||
let write_len = download_result.data as u64;
|
||||
*total_written += write_len;
|
||||
let process_result = move |result: TermDownloadResult<u64>,
|
||||
total_written: &mut u64,
|
||||
download_scheduler: &DownloadScheduler|
|
||||
-> Result<u64> {
|
||||
let write_len = result.payload;
|
||||
*total_written += write_len;
|
||||
|
||||
// Now inspect the download metrics and tune the download degree of concurrency
|
||||
download_scheduler.tune_on(download_result)?;
|
||||
Ok(write_len)
|
||||
},
|
||||
Ok(Err(e)) => Err(e)?,
|
||||
Err(e) => Err(anyhow!("{e:?}"))?,
|
||||
}
|
||||
};
|
||||
// Now inspect the download metrics and tune the download degree of concurrency
|
||||
download_scheduler.tune_on(result)?;
|
||||
Ok(write_len)
|
||||
};
|
||||
|
||||
let mut total_written = 0;
|
||||
let mut remaining_total_len = total_len;
|
||||
while let Some(item) = task_rx.recv().await {
|
||||
// first try to join some tasks
|
||||
while let Some(result) = running_downloads.try_join_next() {
|
||||
let write_len = process_result(result, &mut total_written, &download_scheduler)?;
|
||||
let write_len = process_result(result??, &mut total_written, &download_scheduler)?;
|
||||
if let Some(updater) = progress_updater.as_ref() {
|
||||
updater.update(write_len).await;
|
||||
}
|
||||
@@ -551,10 +552,10 @@ impl RemoteClient {
|
||||
match item {
|
||||
DownloadQueueItem::End => {
|
||||
// everything processed
|
||||
debug!("download queue emptyed");
|
||||
debug!("download queue emptied");
|
||||
break;
|
||||
},
|
||||
DownloadQueueItem::Term(term_download) => {
|
||||
DownloadQueueItem::DownloadTask(term_download) => {
|
||||
// acquire the permit before spawning the task, so that there's limited
|
||||
// number of active downloads.
|
||||
let permit = download_scheduler.download_permit().await?;
|
||||
@@ -578,33 +579,23 @@ impl RemoteClient {
|
||||
};
|
||||
|
||||
let segment = Arc::new(segment);
|
||||
|
||||
// define the term download tasks
|
||||
let mut remaining_segment_len = segment_size;
|
||||
debug!("enqueueing {} download tasks", terms.len());
|
||||
for (i, term) in terms.into_iter().enumerate() {
|
||||
let skip_bytes = if i == 0 { offset_into_first_range } else { 0 };
|
||||
let take = remaining_total_len
|
||||
.min(remaining_segment_len)
|
||||
.min(term.unpacked_length as u64 - skip_bytes);
|
||||
let tasks = map_fetch_info_into_download_tasks(
|
||||
segment.clone(),
|
||||
terms,
|
||||
offset_into_first_range,
|
||||
base_write_negative_offset,
|
||||
self.chunk_cache.clone(),
|
||||
term_download_client.clone(),
|
||||
self.range_download_single_flight.clone(),
|
||||
writer,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let download_and_write_task = TermDownloadAndWrite {
|
||||
download: TermDownload {
|
||||
term,
|
||||
skip_bytes,
|
||||
take,
|
||||
fetch_info: segment.clone(),
|
||||
chunk_cache: self.chunk_cache.clone(),
|
||||
client: term_download_client.clone(),
|
||||
range_download_single_flight: self.range_download_single_flight.clone(),
|
||||
},
|
||||
write_offset: total_len - remaining_total_len,
|
||||
output: writer.clone(),
|
||||
};
|
||||
|
||||
remaining_total_len -= take;
|
||||
remaining_segment_len -= take;
|
||||
debug!("enqueueing {download_and_write_task:?}");
|
||||
task_tx.send(DownloadQueueItem::Term(download_and_write_task))?;
|
||||
debug!("enqueueing {} download tasks", tasks.len());
|
||||
for task_def in tasks {
|
||||
task_tx.send(DownloadQueueItem::DownloadTask(task_def))?;
|
||||
}
|
||||
|
||||
// enqueue the remainder of file info fetch task
|
||||
@@ -618,7 +609,7 @@ impl RemoteClient {
|
||||
}
|
||||
|
||||
while let Some(result) = running_downloads.join_next().await {
|
||||
let write_len = process_result(result, &mut total_written, &download_scheduler)?;
|
||||
let write_len = process_result(result??, &mut total_written, &download_scheduler)?;
|
||||
if let Some(updater) = progress_updater.as_ref() {
|
||||
updater.update(write_len).await;
|
||||
}
|
||||
@@ -628,9 +619,76 @@ impl RemoteClient {
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn map_fetch_info_into_download_tasks(
|
||||
segment: Arc<FetchInfo>,
|
||||
terms: Vec<CASReconstructionTerm>,
|
||||
offset_into_first_range: u64,
|
||||
base_write_negative_offset: u64,
|
||||
chunk_cache: Option<Arc<dyn ChunkCache>>,
|
||||
client: Arc<ClientWithMiddleware>,
|
||||
range_download_single_flight: Arc<Group<DownloadRangeResult, CasClientError>>,
|
||||
output_provider: &OutputProvider,
|
||||
) -> Result<Vec<FetchTermDownloadOnceAndWriteEverywhereUsed>> {
|
||||
// the actual segment length.
|
||||
// the file_range end may actually exceed the file total length for the last segment.
|
||||
// in that case, the maximum length of this segment will be the total of all terms given
|
||||
// minus the start offset
|
||||
let seg_len = segment
|
||||
.file_range
|
||||
.length()
|
||||
.min(terms.iter().fold(0, |acc, term| acc + term.unpacked_length as u64) - offset_into_first_range);
|
||||
|
||||
let initial_writer_offset = segment.file_range.start - base_write_negative_offset;
|
||||
let mut total_taken = 0;
|
||||
|
||||
let mut fetch_info_term_map: HashMap<(MerkleHash, ChunkRange), FetchTermDownloadOnceAndWriteEverywhereUsed> =
|
||||
HashMap::new();
|
||||
for (i, term) in terms.into_iter().enumerate() {
|
||||
let (individual_fetch_info, _) = segment.find((term.hash, term.range)).await?;
|
||||
|
||||
let skip_bytes = if i == 0 { offset_into_first_range } else { 0 };
|
||||
// amount to take is min of the whole term after skipped bytes or the remainder of the segment
|
||||
let take = (term.unpacked_length as u64 - skip_bytes).min(seg_len - total_taken);
|
||||
let write_term = ChunkRangeWrite {
|
||||
// term details
|
||||
chunk_range: term.range,
|
||||
unpacked_length: term.unpacked_length,
|
||||
|
||||
// write details
|
||||
skip_bytes,
|
||||
take,
|
||||
writer_offset: initial_writer_offset + total_taken,
|
||||
};
|
||||
|
||||
let task = fetch_info_term_map
|
||||
.entry((term.hash.into(), individual_fetch_info.range))
|
||||
.or_insert_with(|| FetchTermDownloadOnceAndWriteEverywhereUsed {
|
||||
download: FetchTermDownload {
|
||||
hash: term.hash.into(),
|
||||
range: individual_fetch_info.range,
|
||||
fetch_info: segment.clone(),
|
||||
chunk_cache: chunk_cache.clone(),
|
||||
client: client.clone(),
|
||||
range_download_single_flight: range_download_single_flight.clone(),
|
||||
},
|
||||
writes: vec![],
|
||||
output: output_provider.clone(),
|
||||
});
|
||||
task.writes.push(write_term);
|
||||
|
||||
total_taken += take;
|
||||
}
|
||||
|
||||
let tasks = fetch_info_term_map.into_values().collect();
|
||||
|
||||
Ok(tasks)
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl RegistrationClient for RemoteClient {
|
||||
#[instrument(skip_all, name="RemoteClient::upload_shard", fields(shard.hash = hash.hex(), shard.len = shard_data.len()))]
|
||||
#[instrument(skip_all, name = "RemoteClient::upload_shard", fields(shard.hash = hash.hex(), shard.len = shard_data.len()
|
||||
))]
|
||||
async fn upload_shard(
|
||||
&self,
|
||||
prefix: &str,
|
||||
@@ -676,7 +734,8 @@ impl RegistrationClient for RemoteClient {
|
||||
|
||||
#[async_trait]
|
||||
impl FileReconstructor<CasClientError> for RemoteClient {
|
||||
#[instrument(skip_all, name="RemoteClient::get_file_reconstruction", fields(file.hash = file_hash.hex()))]
|
||||
#[instrument(skip_all, name = "RemoteClient::get_file_reconstruction", fields(file.hash = file_hash.hex()
|
||||
))]
|
||||
async fn get_file_reconstruction_info(
|
||||
&self,
|
||||
file_hash: &MerkleHash,
|
||||
@@ -1204,7 +1263,11 @@ mod tests {
|
||||
assert_eq!(test.expect_error, resp.is_err());
|
||||
if !test.expect_error {
|
||||
assert_eq!(test.expected_data.len() as u64, resp.unwrap());
|
||||
assert_eq!(test.expected_data, buf.value());
|
||||
let value = buf.value();
|
||||
assert_eq!(&test.expected_data[..100], &value[..100]);
|
||||
let idx = test.expected_data.len() - 100;
|
||||
assert_eq!(&test.expected_data[idx..], &value[idx..]);
|
||||
assert_eq!(test.expected_data, value);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -74,23 +74,22 @@ impl CacheState {
|
||||
let mut ret = Vec::new();
|
||||
|
||||
while self.total_bytes > max_total_bytes {
|
||||
if let Some((key, idx)) = self.random_item() {
|
||||
let items = self.inner.get_mut(&key).ok_or(ChunkCacheError::Infallible)?;
|
||||
let cache_item = items.swap_remove(idx);
|
||||
let len = cache_item.len;
|
||||
|
||||
if items.is_empty() {
|
||||
self.inner.remove(&key);
|
||||
}
|
||||
|
||||
ret.push((key, cache_item));
|
||||
|
||||
self.total_bytes -= len;
|
||||
self.num_items -= 1;
|
||||
} else {
|
||||
let Some((key, idx)) = self.random_item() else {
|
||||
error!("attempted to evict item, but no item could be found to be evicted");
|
||||
break;
|
||||
};
|
||||
let items = self.inner.get_mut(&key).ok_or(ChunkCacheError::Infallible)?;
|
||||
let cache_item = items.swap_remove(idx);
|
||||
let len = cache_item.len;
|
||||
|
||||
if items.is_empty() {
|
||||
self.inner.remove(&key);
|
||||
}
|
||||
|
||||
ret.push((key, cache_item));
|
||||
|
||||
self.total_bytes -= len;
|
||||
self.num_items -= 1;
|
||||
}
|
||||
debug!(
|
||||
"cache evicting {} items totaling {}",
|
||||
@@ -515,7 +514,7 @@ impl DiskCache {
|
||||
}
|
||||
|
||||
let stored = get_range_from_cache_file(&header, &mut reader, range, cache_item.range.start)?;
|
||||
if data != stored.data.as_ref() {
|
||||
if data != stored.data {
|
||||
return Err(ChunkCacheError::InvalidArguments);
|
||||
}
|
||||
Ok(true)
|
||||
@@ -610,8 +609,8 @@ fn get_range_from_cache_file<R: Read + Seek>(
|
||||
debug_assert_eq!(range.end - range.start, offsets.len() as u32 - 1);
|
||||
|
||||
Ok(CacheRange {
|
||||
offsets: offsets.into(),
|
||||
data: data.into(),
|
||||
offsets,
|
||||
data,
|
||||
range: *range,
|
||||
})
|
||||
}
|
||||
@@ -868,9 +867,9 @@ mod tests {
|
||||
let cache_result = cache.get(&key, &range).await.unwrap();
|
||||
assert!(cache_result.is_some());
|
||||
let cache_range = cache_result.unwrap();
|
||||
assert_eq!(cache_range.data.as_ref(), data.as_slice());
|
||||
assert_eq!(cache_range.data, data);
|
||||
assert_eq!(cache_range.range, range);
|
||||
assert_eq!(cache_range.offsets.as_ref(), chunk_byte_indices.as_slice());
|
||||
assert_eq!(cache_range.offsets, chunk_byte_indices);
|
||||
|
||||
let miss_range = ChunkRange::new(100, 101);
|
||||
// miss
|
||||
@@ -918,7 +917,7 @@ mod tests {
|
||||
let start_byte = chunk_byte_indices[sub_range.start as usize] as usize;
|
||||
let end_byte = chunk_byte_indices[sub_range.end as usize] as usize;
|
||||
let data_portion = &data[start_byte..end_byte];
|
||||
assert_eq!(data_portion, cache_range.data.as_ref());
|
||||
assert_eq!(data_portion, &cache_range.data);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,7 +3,6 @@ mod disk;
|
||||
pub mod error;
|
||||
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
pub use cache_manager::get_cache;
|
||||
@@ -25,10 +24,10 @@ utils::configurable_constants! {
|
||||
/// [0, 2000, 4000, 6000] where chunk 2 is made of bytes [0, 2000)
|
||||
/// chunk 3 [2000, 4000) and chunk 4 is [4000, 6000).
|
||||
/// It is guaranteed that the first number in offsets is 0 and the last number is data.len()
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Debug)]
|
||||
pub struct CacheRange {
|
||||
pub offsets: Arc<[u32]>,
|
||||
pub data: Arc<[u8]>,
|
||||
pub offsets: Vec<u32>,
|
||||
pub data: Vec<u8>,
|
||||
pub range: ChunkRange,
|
||||
}
|
||||
|
||||
|
||||
@@ -18,12 +18,12 @@ path = "src/bin/xtool.rs"
|
||||
cas_client = { path = "../cas_client" }
|
||||
cas_object = { path = "../cas_object" }
|
||||
cas_types = { path = "../cas_types" }
|
||||
deduplication = { path = "../deduplication" }
|
||||
error_printer = { path = "../error_printer" }
|
||||
mdb_shard = { path = "../mdb_shard" }
|
||||
merkledb = { path = "../merkledb" }
|
||||
merklehash = { path = "../merklehash" }
|
||||
mdb_shard = { path = "../mdb_shard" }
|
||||
parutils = { path = "../parutils" }
|
||||
error_printer = { path = "../error_printer" }
|
||||
deduplication = { path = "../deduplication" }
|
||||
progress_tracking = { path = "../progress_tracking" }
|
||||
utils = { path = "../utils" }
|
||||
xet_threadpool = { path = "../xet_threadpool" }
|
||||
@@ -31,7 +31,6 @@ xet_threadpool = { path = "../xet_threadpool" }
|
||||
anyhow = { workspace = true }
|
||||
async-trait = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
ctor = { workspace = true }
|
||||
clap = { workspace = true }
|
||||
dirs = { workspace = true }
|
||||
jsonwebtoken = { workspace = true }
|
||||
@@ -74,6 +73,7 @@ sha2 = { workspace = true }
|
||||
[dev-dependencies]
|
||||
serial_test = { workspace = true }
|
||||
tracing-test = { workspace = true }
|
||||
ctor = { workspace = true }
|
||||
|
||||
[features]
|
||||
strict = []
|
||||
|
||||
2
hf_xet/.gitignore
vendored
Normal file
2
hf_xet/.gitignore
vendored
Normal file
@@ -0,0 +1,2 @@
|
||||
venv/
|
||||
.venv/
|
||||
102
hf_xet/Cargo.lock
generated
102
hf_xet/Cargo.lock
generated
@@ -151,7 +151,7 @@ checksum = "e539d3fca749fcee5236ab05e93a52867dd549cc157c8cb7f99595f3cedffdb5"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
"syn 2.0.101",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -274,6 +274,7 @@ dependencies = [
|
||||
"cas_types",
|
||||
"chunk_cache",
|
||||
"deduplication",
|
||||
"derivative",
|
||||
"error_printer",
|
||||
"file_utils",
|
||||
"futures",
|
||||
@@ -426,7 +427,7 @@ dependencies = [
|
||||
"heck",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
"syn 2.0.101",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -617,7 +618,6 @@ dependencies = [
|
||||
"cas_types",
|
||||
"chrono",
|
||||
"clap",
|
||||
"ctor",
|
||||
"deduplication",
|
||||
"dirs",
|
||||
"error_printer",
|
||||
@@ -681,6 +681,17 @@ dependencies = [
|
||||
"powerfmt",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "derivative"
|
||||
version = "2.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fcc3dd5e9e9c0b295d6e1e4d811fb6f157d5ffd784b8d202fc62eac8035a770b"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 1.0.109",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "digest"
|
||||
version = "0.10.7"
|
||||
@@ -720,7 +731,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
"syn 2.0.101",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -776,7 +787,7 @@ checksum = "44f23cf4b44bfce11a86ace86f8a73ffdec849c9fd00a386a53d278bd9e81fb3"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
"syn 2.0.101",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -925,7 +936,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
"syn 2.0.101",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1218,9 +1229,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "hyper-util"
|
||||
version = "0.1.11"
|
||||
version = "0.1.12"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "497bbc33a26fdd4af9ed9c70d63f61cf56a938375fbb32df34db9b1cd6d643f2"
|
||||
checksum = "cf9f1e950e0d9d1d3c47184416723cf29c0d1f93bd8cccf37e4beb6b44f31710"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"futures-channel",
|
||||
@@ -1309,9 +1320,9 @@ checksum = "00210d6893afc98edb752b664b8890f0ef174c8adbb8d0be9710fa66fbbf72d3"
|
||||
|
||||
[[package]]
|
||||
name = "icu_properties"
|
||||
version = "2.0.0"
|
||||
version = "2.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2549ca8c7241c82f59c80ba2a6f415d931c5b58d24fb8412caa1a1f02c49139a"
|
||||
checksum = "016c619c1eeb94efb86809b015c58f479963de65bdb6253345c1a1276f22e32b"
|
||||
dependencies = [
|
||||
"displaydoc",
|
||||
"icu_collections",
|
||||
@@ -1325,9 +1336,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "icu_properties_data"
|
||||
version = "2.0.0"
|
||||
version = "2.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8197e866e47b68f8f7d95249e172903bec06004b18b2937f1095d40a0c57de04"
|
||||
checksum = "298459143998310acd25ffe6810ed544932242d3f07083eee1084d83a71bd632"
|
||||
|
||||
[[package]]
|
||||
name = "icu_provider"
|
||||
@@ -1704,7 +1715,7 @@ dependencies = [
|
||||
"cfg-if 1.0.0",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
"syn 2.0.101",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1845,7 +1856,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
"syn 2.0.101",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1988,7 +1999,7 @@ checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
"syn 2.0.101",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -2145,7 +2156,7 @@ dependencies = [
|
||||
"itertools 0.12.1",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
"syn 2.0.101",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -2240,7 +2251,7 @@ dependencies = [
|
||||
"proc-macro2",
|
||||
"pyo3-macros-backend",
|
||||
"quote",
|
||||
"syn",
|
||||
"syn 2.0.101",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -2253,7 +2264,7 @@ dependencies = [
|
||||
"proc-macro2",
|
||||
"pyo3-build-config",
|
||||
"quote",
|
||||
"syn",
|
||||
"syn 2.0.101",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -2764,7 +2775,7 @@ checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
"syn 2.0.101",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -2787,7 +2798,7 @@ checksum = "175ee3e80ae9982737ca543e96133087cbd9a485eecc3bc4de9c1a37b47ea59c"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
"syn 2.0.101",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -2947,6 +2958,17 @@ dependencies = [
|
||||
"symbolic-common",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "syn"
|
||||
version = "1.0.109"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"unicode-ident",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "syn"
|
||||
version = "2.0.101"
|
||||
@@ -2984,7 +3006,7 @@ checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
"syn 2.0.101",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -3059,7 +3081,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
"syn 2.0.101",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -3070,7 +3092,7 @@ checksum = "7f7cf42b4507d8ea322120659672cf1b9dbb93f8f2d4ecfd6e51350ff5b17a1d"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
"syn 2.0.101",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -3165,7 +3187,7 @@ checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
"syn 2.0.101",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -3258,7 +3280,7 @@ checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
"syn 2.0.101",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -3494,7 +3516,7 @@ dependencies = [
|
||||
"log",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
"syn 2.0.101",
|
||||
"wasm-bindgen-shared",
|
||||
]
|
||||
|
||||
@@ -3529,7 +3551,7 @@ checksum = "8ae87ea40c9f689fc23f209965b6fb8a99ad69aeeb0231408be24920604395de"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
"syn 2.0.101",
|
||||
"wasm-bindgen-backend",
|
||||
"wasm-bindgen-shared",
|
||||
]
|
||||
@@ -3653,15 +3675,15 @@ checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
|
||||
|
||||
[[package]]
|
||||
name = "windows-core"
|
||||
version = "0.61.1"
|
||||
version = "0.61.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "46ec44dc15085cea82cf9c78f85a9114c463a369786585ad2882d1ff0b0acf40"
|
||||
checksum = "c0fdd3ddb90610c7638aa2b3a3ab2904fb9e5cdbecc643ddb3647212781c4ae3"
|
||||
dependencies = [
|
||||
"windows-implement",
|
||||
"windows-interface",
|
||||
"windows-link",
|
||||
"windows-result",
|
||||
"windows-strings 0.4.1",
|
||||
"windows-strings 0.4.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -3672,7 +3694,7 @@ checksum = "a47fddd13af08290e67f4acabf4b459f647552718f683a7b415d290ac744a836"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
"syn 2.0.101",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -3683,7 +3705,7 @@ checksum = "bd9211b69f8dcdfa817bfd14bf1c97c9188afa36f4750130fcdf3f400eca9fa8"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
"syn 2.0.101",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -3705,9 +3727,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "windows-result"
|
||||
version = "0.3.3"
|
||||
version = "0.3.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4b895b5356fc36103d0f64dd1e94dfa7ac5633f1c9dd6e80fe9ec4adef69e09d"
|
||||
checksum = "56f42bd332cc6c8eac5af113fc0c1fd6a8fd2aa08a0119358686e5160d0586c6"
|
||||
dependencies = [
|
||||
"windows-link",
|
||||
]
|
||||
@@ -3723,9 +3745,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "windows-strings"
|
||||
version = "0.4.1"
|
||||
version = "0.4.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2a7ab927b2637c19b3dbe0965e75d8f2d30bdd697a1516191cad2ec4df8fb28a"
|
||||
checksum = "56e6c93f3a0c3b36176cb1327a4958a0353d5d166c2a35cb268ace15e91d3b57"
|
||||
dependencies = [
|
||||
"windows-link",
|
||||
]
|
||||
@@ -3986,7 +4008,7 @@ checksum = "38da3c9736e16c5d3c8c597a9aaa5d1fa565d0532ae05e27c24aa62fb32c0ab6"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
"syn 2.0.101",
|
||||
"synstructure",
|
||||
]
|
||||
|
||||
@@ -4007,7 +4029,7 @@ checksum = "28a6e20d751156648aa063f3800b706ee209a32c0b4d9f24be3d980b01be55ef"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
"syn 2.0.101",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -4027,7 +4049,7 @@ checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
"syn 2.0.101",
|
||||
"synstructure",
|
||||
]
|
||||
|
||||
@@ -4067,5 +4089,5 @@ checksum = "5b96237efa0c878c64bd89c436f661be4e46b2f3eff1ebb976f7ef2321d2f58f"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
"syn 2.0.101",
|
||||
]
|
||||
|
||||
@@ -62,7 +62,7 @@ opt-level = "s"
|
||||
lto = true
|
||||
codegen-units = 1
|
||||
|
||||
# on manylinux and macos maturin + split-debuginfo doesn't output debug symbols for .so objects,
|
||||
# on manylinux and macos maturin + split-debuginfo doesn't output debug symbols for .so objects,
|
||||
# so we need a different profile to build. For mac, the below settings will output a .dSYM
|
||||
# file. For Linux, we are stripping them manually using binutils.
|
||||
[profile.release-dbgsymbols]
|
||||
@@ -73,4 +73,4 @@ split-debuginfo = "none"
|
||||
[profile.opt-test]
|
||||
inherits = "dev"
|
||||
debug = true
|
||||
opt-level = 3
|
||||
opt-level = 3
|
||||
Reference in New Issue
Block a user