Switch chunk cache to use async RWlock instead of std::sync mutex. (#306)

Currently, the chunk cache holds a std::sync Mutex around the internal
state. This can block the tokio worker threads from processing other
tasks, and in most cases a read lock is sufficient for the operations.

The PR does some mild rearranging of the functions to accommodate the
different read/write locks and makes the primary trait async, but should
currently involve no other functionality change.
This commit is contained in:
Hoyt Koepke
2025-05-13 09:45:07 -07:00
committed by GitHub
parent 6766488858
commit 0fcc132459
12 changed files with 277 additions and 313 deletions

5
Cargo.lock generated
View File

@@ -313,9 +313,9 @@ checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de"
[[package]]
name = "async-trait"
version = "0.1.87"
version = "0.1.88"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d556ec1359574147ec0c4fc5eb525f3f23263a592b1a9c07e0a75b427de55c97"
checksum = "e539d3fca749fcee5236ab05e93a52867dd549cc157c8cb7f99595f3cedffdb5"
dependencies = [
"proc-macro2",
"quote",
@@ -598,6 +598,7 @@ dependencies = [
name = "chunk_cache"
version = "0.1.0"
dependencies = [
"async-trait",
"base64 0.22.1",
"cas_types",
"clap 4.5.28",

View File

@@ -308,7 +308,7 @@ pub(crate) async fn get_one_term(
prefix: PREFIX_DEFAULT.to_string(),
hash: term.hash.into(),
};
if let Ok(Some(cached)) = cache.get(&key, &term.range).log_error("cache error") {
if let Ok(Some(cached)) = cache.get(&key, &term.range).await.log_error("cache error") {
return Ok(cached.data.to_vec());
}
}
@@ -329,7 +329,7 @@ pub(crate) async fn get_one_term(
prefix: PREFIX_DEFAULT.to_string(),
hash: term.hash.into(),
};
if let Err(e) = cache.put(&key, &fetch_term.range, &chunk_byte_indices, &data) {
if let Err(e) = cache.put(&key, &fetch_term.range, &chunk_byte_indices, &data).await {
info!("Writing to local cache failed, continuing. Error: {}", e);
}
}

View File

@@ -27,7 +27,6 @@ use tracing::{debug, info, instrument};
use utils::auth::AuthConfig;
use utils::progress::SimpleProgressUpdater;
use utils::singleflight::Group;
use xet_threadpool::ThreadPool;
use crate::download_utils::*;
use crate::error::{CasClientError, Result};
@@ -349,14 +348,13 @@ impl RemoteClient {
// download tasks are enqueued and spawned with the degree of concurrency equal to `num_concurrent_range_gets`.
// After the above, a task that defines fetching the remainder of the file reconstruction info is enqueued,
// which will execute after the first of the above term download tasks finishes.
let threadpool = ThreadPool::current();
let chunk_cache = self.chunk_cache.clone();
let term_download_client = self.http_client.clone();
let range_download_single_flight = self.range_download_single_flight.clone();
let download_scheduler = DownloadScheduler::new(*NUM_CONCURRENT_RANGE_GETS);
let download_scheduler_clone = download_scheduler.clone();
let queue_dispatcher: JoinHandle<Result<()>> = threadpool.clone().spawn(async move {
let queue_dispatcher: JoinHandle<Result<()>> = tokio::spawn(async move {
let mut remaining_total_len = total_len;
while let Some(item) = task_rx.recv().await {
match item {
@@ -372,7 +370,7 @@ impl RemoteClient {
let permit = download_scheduler_clone.download_permit().await?;
debug!("spawning 1 download task");
let future: JoinHandle<Result<(TermDownloadResult<Vec<u8>>, OwnedSemaphorePermit)>> =
threadpool.spawn(async move {
tokio::spawn(async move {
let data = term_download.run().await?;
Ok((data, permit))
});
@@ -759,6 +757,7 @@ mod tests {
use httpmock::MockServer;
use merkledb::constants::TARGET_CDC_CHUNK_SIZE;
use tracing_test::traced_test;
use xet_threadpool::ThreadPool;
use super::*;
use crate::interface::buffer::BufferProvider;

View File

@@ -18,6 +18,8 @@ clap = { version = "4.5.20", optional = true, features = ["derive"] }
once_cell = "1.20.2"
crc32fast = "1.4.2"
log = "0.4.22"
async-trait = "0.1.88"
tokio = { version = "1.44", features = ["sync"] }
[dev-dependencies]
tokio = { version = "1.44", features = ["full"] }

View File

@@ -3,8 +3,9 @@ use std::fs::{DirEntry, File};
use std::io::{self, Cursor, ErrorKind, Read, Seek, SeekFrom, Write};
use std::mem::size_of;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex, MutexGuard};
use std::sync::Arc;
use async_trait::async_trait;
use base64::engine::general_purpose::URL_SAFE;
use base64::engine::GeneralPurpose;
use base64::Engine;
@@ -12,6 +13,7 @@ use cas_types::{ChunkRange, Key};
use error_printer::ErrorPrinter;
use file_utils::SafeFileCreator;
use merklehash::MerkleHash;
use tokio::sync::RwLock;
use tracing::{debug, error};
use utils::output_bytes;
@@ -46,6 +48,83 @@ impl CacheState {
total_bytes,
}
}
fn find_match(&self, key: &Key, range: &ChunkRange) -> Option<VerificationCell<CacheItem>> {
let items = self.inner.get(key)?;
// attempt to find a matching range in the given key's items using
for item in items.iter() {
if item.range.start <= range.start && range.end <= item.range.end {
return Some(item.clone());
}
}
None
}
/// removed items from the cache (including deleting from file system)
/// until at least to_remove number of bytes have been removed
///
/// removes data from in memory state and returns a list of file paths to delete
/// (so that deletion can occur after the locked state is dropped)
fn evict_to_capacity(
&mut self,
max_total_bytes: u64,
) -> Result<Vec<(Key, VerificationCell<CacheItem>)>, ChunkCacheError> {
let original_total_bytes = self.total_bytes;
let mut ret = Vec::new();
while self.total_bytes > max_total_bytes {
if let Some((key, idx)) = self.random_item() {
let items = self.inner.get_mut(&key).ok_or(ChunkCacheError::Infallible)?;
let cache_item = items.swap_remove(idx);
let len = cache_item.len;
if items.is_empty() {
self.inner.remove(&key);
}
ret.push((key, cache_item));
self.total_bytes -= len;
self.num_items -= 1;
} else {
error!("attempted to evict item, but no item could be found to be evicted");
break;
}
}
debug!(
"cache evicting {} items totaling {}",
ret.len(),
output_bytes(original_total_bytes - self.total_bytes)
);
Ok(ret)
}
/// returns the key and index within that key for a random item
fn random_item(&self) -> Option<(Key, usize)> {
debug_assert_eq!(
self.inner.values().map(|v| v.len()).sum::<usize>(),
self.num_items,
"real num items != stored num items"
);
if self.num_items == 0 {
error!("cache random_item for eviction: no items in cache");
return None;
}
let random_item = rand::random::<usize>() % self.num_items;
let mut count = 0;
for (key, items) in self.inner.iter() {
if random_item < count + items.len() {
return Some((key.clone(), random_item - count));
}
count += items.len();
}
// should never occur
error!("cache random_item for eviction: tried to return random item error not enough items");
None
}
}
/// DiskCache is a ChunkCache implementor that saves data on the file system
@@ -53,14 +132,14 @@ impl CacheState {
pub struct DiskCache {
cache_root: PathBuf,
capacity: u64,
state: Arc<Mutex<CacheState>>,
state: Arc<RwLock<CacheState>>,
}
// helper for analysis binary to print inner state
#[cfg(feature = "analysis")]
impl DiskCache {
pub fn print(&self) {
let state = self.state.lock().unwrap();
pub async fn print(&self) {
let state = self.state.read().await;
let total_num_items = state.num_items;
let total_total_bytes = state.total_bytes;
@@ -91,14 +170,12 @@ impl DiskCache {
}
impl DiskCache {
pub fn num_items(&self) -> Result<usize, ChunkCacheError> {
let state = self.state.lock()?;
Ok(state.num_items)
pub async fn num_items(&self) -> usize {
self.state.read().await.num_items
}
pub fn total_bytes(&self) -> Result<u64, ChunkCacheError> {
let state = self.state.lock()?;
Ok(state.total_bytes)
pub async fn total_bytes(&self) -> u64 {
self.state.read().await.total_bytes
}
/// initialize will create a new DiskCache with the capacity and cache root based on the config
@@ -136,11 +213,12 @@ impl DiskCache {
let capacity = config.cache_size;
let cache_root = config.cache_directory.clone();
// May take a while; don't block the runtime for this.
let state = Self::initialize_state(&cache_root, capacity)?;
Ok(Self {
state: Arc::new(Mutex::new(state)),
cache_root,
state: Arc::new(RwLock::new(state)),
cache_root: config.cache_directory.clone(),
capacity,
})
}
@@ -233,13 +311,13 @@ impl DiskCache {
Ok(CacheState::new(state, num_items, total_bytes))
}
fn get_impl(&self, key: &Key, range: &ChunkRange) -> OptionResult<CacheRange, ChunkCacheError> {
async fn get_impl(&self, key: &Key, range: &ChunkRange) -> OptionResult<CacheRange, ChunkCacheError> {
if range.start >= range.end {
return Err(ChunkCacheError::InvalidArguments);
}
loop {
let Some(cache_item) = self.find_match(key, range)? else {
let Some(cache_item) = self.state.read().await.find_match(key, range) else {
return Ok(None);
};
@@ -249,7 +327,7 @@ impl DiskCache {
Ok(file) => file,
Err(e) => match e.kind() {
ErrorKind::NotFound => {
self.remove_item(key, &cache_item)?;
self.remove_item(key, &cache_item).await?;
continue;
},
_ => return Err(e.into()),
@@ -263,7 +341,7 @@ impl DiskCache {
file.rewind()?;
} else {
debug!("computed checksum {checksum} mismatch on cache item {key}/{cache_item}");
self.remove_item(key, &cache_item)?;
self.remove_item(key, &cache_item).await?;
continue;
}
}
@@ -273,7 +351,7 @@ impl DiskCache {
let Ok(header) = CacheFileHeader::deserialize(&mut file_reader)
.debug_error(format!("failed to deserialize cache file header on path: {path:?}"))
else {
self.remove_item(key, &cache_item)?;
self.remove_item(key, &cache_item).await?;
continue;
};
@@ -283,31 +361,7 @@ impl DiskCache {
}
}
fn find_match(&self, key: &Key, range: &ChunkRange) -> OptionResult<VerificationCell<CacheItem>, ChunkCacheError> {
let state = self.state.lock()?;
self.find_match_with_state(&state, key, range)
}
fn find_match_with_state(
&self,
state: &MutexGuard<'_, CacheState>,
key: &Key,
range: &ChunkRange,
) -> OptionResult<VerificationCell<CacheItem>, ChunkCacheError> {
let Some(items) = state.inner.get(key) else {
return Ok(None);
};
// attempt to find a matching range in the given key's items using
for item in items.iter() {
if item.range.start <= range.start && range.end <= item.range.end {
return Ok(Some(item.clone()));
}
}
Ok(None)
}
fn put_impl(
async fn put_impl(
&self,
key: &Key,
range: &ChunkRange,
@@ -325,8 +379,8 @@ impl DiskCache {
}
// check if we already contain the range
while let Some(cache_item) = self.find_match(key, range)? {
if self.validate_match(key, range, chunk_byte_indices, data, &cache_item)? {
while let Some(cache_item) = self.state.read().await.find_match(key, range) {
if self.validate_match(key, range, chunk_byte_indices, data, &cache_item).await? {
return Ok(());
}
}
@@ -361,13 +415,13 @@ impl DiskCache {
// evict items after ensuring the file write but before committing to cache state
// to avoid removing new item.
let mut state = self.state.lock()?;
let mut state_write = self.state.write().await;
// acquiring lock to state before closing the file
// this will ensure that this thread is the only one writing to the final
// cache file but allowing other threads to modify the state while we write the file
// before committing it.
if self.find_match_with_state(&state, key, range)?.is_some() {
if state_write.find_match(key, range).is_some() {
// another thread already added this item or overlapping item while this thread
// was writing the file
fw.abort()?;
@@ -375,20 +429,21 @@ impl DiskCache {
}
fw.close()?;
// add evicted paths to paths to remove from file system
let evicted_paths = self.maybe_evict(&mut state, cache_item.len)?;
// Evict entries to make sure we have enough room.
let evicted_paths = state_write.evict_to_capacity(self.capacity - cache_item.len)?;
// add the item info in-memory state after evictions are done
state.num_items += 1;
state.total_bytes += cache_item.len;
let item_set = state.inner.entry(key.clone()).or_default();
state_write.num_items += 1;
state_write.total_bytes += cache_item.len;
let item_set = state_write.inner.entry(key.clone()).or_default();
item_set.push(VerificationCell::new_verified(cache_item));
// release lock
drop(state);
drop(state_write);
// remove files after done with modifying in memory state and releasing lock
for path in evicted_paths {
for (key, cache_item) in evicted_paths {
let path = self.item_path(&key, &cache_item)?;
remove_file(&path)?;
// check and try to remove key path if all items evicted for key
let dir_path = path.parent().ok_or(ChunkCacheError::Infallible)?;
@@ -400,7 +455,7 @@ impl DiskCache {
// on a non-error case, returns true if the item is a good match and a new item should not be inserted
// returns false if not a good match and should be removed.
fn validate_match(
async fn validate_match(
&self,
key: &Key,
range: &ChunkRange,
@@ -417,24 +472,24 @@ impl DiskCache {
let path = self.item_path(key, cache_item)?;
let Ok(mut file) = File::open(path) else {
self.remove_item(key, cache_item)?;
self.remove_item(key, cache_item).await?;
return Ok(false);
};
let md = file.metadata()?;
if md.len() != cache_item.len {
self.remove_item(key, cache_item)?;
self.remove_item(key, cache_item).await?;
return Ok(false);
}
let mut buf = Vec::with_capacity(md.len() as usize);
file.read_to_end(&mut buf)?;
let checksum = crc32fast::hash(&buf);
if checksum != cache_item.checksum {
self.remove_item(key, cache_item)?;
self.remove_item(key, cache_item).await?;
return Ok(false);
}
let mut reader = Cursor::new(buf);
let Ok(header) = CacheFileHeader::deserialize(&mut reader) else {
self.remove_item(key, cache_item)?;
self.remove_item(key, cache_item).await?;
return Ok(false);
};
@@ -466,80 +521,17 @@ impl DiskCache {
Ok(true)
}
/// removed items from the cache (including deleting from file system)
/// until at least to_remove number of bytes have been removed
///
/// removes data from in memory state and returns a list of file paths to delete
/// (so that deletion can occur after the locked state is dropped)
fn maybe_evict(
&self,
state: &mut MutexGuard<'_, CacheState>,
expected_add: u64,
) -> Result<Vec<PathBuf>, ChunkCacheError> {
let original_total_bytes = state.total_bytes;
let mut paths = Vec::new();
while state.total_bytes + expected_add > self.capacity {
if let Some((key, idx)) = self.random_item(state) {
let items = state.inner.get_mut(&key).ok_or(ChunkCacheError::Infallible)?;
let cache_item = &items[idx];
let len = cache_item.len;
let path = self.item_path(&key, cache_item)?;
paths.push(path);
items.swap_remove(idx);
if items.is_empty() {
state.inner.remove(&key);
}
state.total_bytes -= len;
state.num_items -= 1;
} else {
error!("attempted to evict item, but no item could be found to be evicted");
break;
}
}
debug!(
"cache evicting {} items totaling {}",
paths.len(),
output_bytes(original_total_bytes - state.total_bytes)
);
Ok(paths)
}
/// returns the key and index within that key for a random item
fn random_item(&self, state: &MutexGuard<'_, CacheState>) -> Option<(Key, usize)> {
debug_assert_eq!(
state.inner.values().map(|v| v.len()).sum::<usize>(),
state.num_items,
"real num items != stored num items"
);
if state.num_items == 0 {
error!("cache random_item for eviction: no items in cache");
return None;
}
let random_item = rand::random::<usize>() % state.num_items;
let mut count = 0;
for (key, items) in state.inner.iter() {
if random_item < count + items.len() {
return Some((key.clone(), random_item - count));
}
count += items.len();
}
// should never occur
error!("cache random_item for eviction: tried to return random item error not enough items");
None
}
/// removes an item from both the in-memory state of the cache and the file system
fn remove_item(&self, key: &Key, cache_item: &VerificationCell<CacheItem>) -> Result<(), ChunkCacheError> {
async fn remove_item(&self, key: &Key, cache_item: &VerificationCell<CacheItem>) -> Result<(), ChunkCacheError> {
{
let mut state = self.state.lock()?;
let mut state = self.state.write().await;
if let Some(items) = state.inner.get_mut(key) {
let idx = match index_of(items, cache_item) {
Some(idx) => idx,
// item is no longer in the state
None => return Ok(()),
};
items.swap_remove(idx);
if items.is_empty() {
state.inner.remove(key);
@@ -802,19 +794,20 @@ fn key_dir(key: &Key) -> PathBuf {
PathBuf::from(dir_str)
}
#[async_trait]
impl ChunkCache for DiskCache {
fn get(&self, key: &Key, range: &ChunkRange) -> Result<Option<CacheRange>, ChunkCacheError> {
self.get_impl(key, range)
async fn get(&self, key: &Key, range: &ChunkRange) -> Result<Option<CacheRange>, ChunkCacheError> {
self.get_impl(key, range).await
}
fn put(
async fn put(
&self,
key: &Key,
range: &ChunkRange,
chunk_byte_indices: &[u32],
data: &[u8],
) -> Result<(), ChunkCacheError> {
self.put_impl(key, range, chunk_byte_indices, data)
self.put_impl(key, range, chunk_byte_indices, data).await
}
}
@@ -835,8 +828,8 @@ mod tests {
const RANDOM_SEED: u64 = 9089 << 20 | 120043;
#[test]
fn test_get_cache_empty() {
#[tokio::test]
async fn test_get_cache_empty() {
let mut rng = StdRng::seed_from_u64(RANDOM_SEED);
let cache_root = TempDir::new("empty").unwrap();
let config = CacheConfig {
@@ -845,11 +838,15 @@ mod tests {
..Default::default()
};
let cache = DiskCache::initialize(&config).unwrap();
assert!(cache.get(&random_key(&mut rng), &random_range(&mut rng)).unwrap().is_none());
assert!(cache
.get(&random_key(&mut rng), &random_range(&mut rng))
.await
.unwrap()
.is_none());
}
#[test]
fn test_put_get_simple() {
#[tokio::test]
async fn test_put_get_simple() {
let mut rng = StdRng::seed_from_u64(RANDOM_SEED);
let cache_root = TempDir::new("put_get_simple").unwrap();
let config = CacheConfig {
@@ -862,13 +859,13 @@ mod tests {
let key = random_key(&mut rng);
let range = ChunkRange::new(0, 4);
let (chunk_byte_indices, data) = random_bytes(&mut rng, &range, RANGE_LEN);
let put_result = cache.put(&key, &range, &chunk_byte_indices, data.as_slice());
let put_result = cache.put(&key, &range, &chunk_byte_indices, data.as_slice()).await;
assert!(put_result.is_ok(), "{put_result:?}");
print_directory_contents(cache_root.as_ref());
// hit
let cache_result = cache.get(&key, &range).unwrap();
let cache_result = cache.get(&key, &range).await.unwrap();
assert!(cache_result.is_some());
let cache_range = cache_result.unwrap();
assert_eq!(cache_range.data.as_ref(), data.as_slice());
@@ -877,11 +874,11 @@ mod tests {
let miss_range = ChunkRange::new(100, 101);
// miss
assert!(cache.get(&key, &miss_range).unwrap().is_none());
assert!(cache.get(&key, &miss_range).await.unwrap().is_none());
}
#[test]
fn test_put_get_subrange() {
#[tokio::test]
async fn test_put_get_subrange() {
let mut rng = StdRng::seed_from_u64(RANDOM_SEED);
let cache_root = TempDir::new("put_get_subrange").unwrap();
let config = CacheConfig {
@@ -895,7 +892,7 @@ mod tests {
// following parts of test assume overall inserted range includes chunk 0
let range = ChunkRange::new(0, 4);
let (chunk_byte_indices, data) = random_bytes(&mut rng, &range, RANGE_LEN);
let put_result = cache.put(&key, &range, &chunk_byte_indices, data.as_slice());
let put_result = cache.put(&key, &range, &chunk_byte_indices, data.as_slice()).await;
assert!(put_result.is_ok(), "{put_result:?}");
print_directory_contents(cache_root.as_ref());
@@ -903,7 +900,7 @@ mod tests {
for start in range.start..range.end {
for end in (start + 1)..=range.end {
let sub_range = ChunkRange::new(start, end);
let get_result = cache.get(&key, &sub_range).unwrap();
let get_result = cache.get(&key, &sub_range).await.unwrap();
assert!(get_result.is_some(), "range: [{start} {end})");
let cache_range = get_result.unwrap();
assert_eq!(cache_range.range, sub_range);
@@ -926,8 +923,8 @@ mod tests {
}
}
#[test]
fn test_puts_eviction() {
#[tokio::test]
async fn test_puts_eviction() {
const MIN_NUM_KEYS: u32 = 12;
const CAP: u64 = (RANGE_LEN * (MIN_NUM_KEYS - 1)) as u64;
let cache_root = TempDir::new("puts_eviction").unwrap();
@@ -942,18 +939,18 @@ mod tests {
// fill the cache to almost capacity
for _ in 0..MIN_NUM_KEYS {
let (key, range, offsets, data) = it.next().unwrap();
assert!(cache.put(&key, &range, &offsets, &data).is_ok());
assert!(cache.put(&key, &range, &offsets, &data).await.is_ok());
}
let total_bytes = cache.total_bytes().unwrap();
let total_bytes = cache.total_bytes().await;
assert!(total_bytes <= CAP, "cache size: {} <= {}", output_bytes(total_bytes), output_bytes(CAP));
let (key, range, offsets, data) = it.next().unwrap();
let result = cache.put(&key, &range, &offsets, &data);
let result = cache.put(&key, &range, &offsets, &data).await;
assert!(result.is_ok());
}
#[test]
fn test_same_puts_noop() {
#[tokio::test]
async fn test_same_puts_noop() {
let cache_root = TempDir::new("same_puts_noop").unwrap();
let config = CacheConfig {
cache_directory: cache_root.path().to_path_buf(),
@@ -963,13 +960,13 @@ mod tests {
let cache = DiskCache::initialize(&config).unwrap();
let mut it = RandomEntryIterator::std_from_seed(RANDOM_SEED).with_range_len(1000);
let (key, range, offsets, data) = it.next().unwrap();
assert!(cache.put(&key, &range, &offsets, &data).is_ok());
assert!(cache.put(&key, &range, &offsets, &data).is_ok());
assert!(cache.put(&key, &range, &offsets, &data).await.is_ok());
assert!(cache.put(&key, &range, &offsets, &data).await.is_ok());
}
#[test]
fn test_overlap_range_data_mismatch_fail() {
let setup = || {
#[tokio::test]
async fn test_overlap_range_data_mismatch_fail() {
let setup = || async move {
let mut it = RandomEntryIterator::std_from_seed(RANDOM_SEED);
let cache_root = TempDir::new("overlap_range_data_mismatch_fail").unwrap();
let config = CacheConfig {
@@ -979,48 +976,49 @@ mod tests {
};
let cache = DiskCache::initialize(&config).unwrap();
let (key, range, offsets, data) = it.next().unwrap();
assert!(cache.put(&key, &range, &offsets, &data).is_ok());
assert!(cache.put(&key, &range, &offsets, &data).await.is_ok());
(cache_root, cache, key, range, offsets, data)
};
// bad offsets
// totally random, mismatch len from range
let (_cache_root, cache, key, range, mut offsets, data) = setup();
let (_cache_root, cache, key, range, mut offsets, data) = setup().await;
offsets.remove(1);
assert!(cache.put(&key, &range, &offsets, &data).is_err());
assert!(cache.put(&key, &range, &offsets, &data).await.is_err());
// start isn't 0
let (_cache_root, cache, key, range, mut offsets, data) = setup();
let (_cache_root, cache, key, range, mut offsets, data) = setup().await;
offsets[0] = 100;
assert!(cache.put(&key, &range, &offsets, &data).is_err());
assert!(cache.put(&key, &range, &offsets, &data).await.is_err());
// end isn't data.len()
let (_cache_root, cache, key, range, mut offsets, data) = setup();
let (_cache_root, cache, key, range, mut offsets, data) = setup().await;
*offsets.last_mut().unwrap() = data.len() as u32 + 1;
assert!(cache.put(&key, &range, &offsets, &data).is_err());
assert!(cache.put(&key, &range, &offsets, &data).await.is_err());
// not strictly increasing
let (_cache_root, cache, key, range, mut offsets, data) = setup();
let (_cache_root, cache, key, range, mut offsets, data) = setup().await;
offsets[2] = offsets[1];
assert!(cache.put(&key, &range, &offsets, &data).is_err());
assert!(cache.put(&key, &range, &offsets, &data).await.is_err());
// not matching
let (_cache_root, cache, key, range, mut offsets, data) = setup();
let (_cache_root, cache, key, range, mut offsets, data) = setup().await;
offsets[1] = offsets[1] + 1;
assert!(cache.put(&key, &range, &offsets, &data).is_err());
assert!(cache.put(&key, &range, &offsets, &data).await.is_err());
// bad data
// size mismatch given offsets
let (_cache_root, cache, key, range, offsets, data) = setup();
assert!(cache.put(&key, &range, &offsets, &data[1..]).is_err());
let (_cache_root, cache, key, range, offsets, data) = setup().await;
assert!(cache.put(&key, &range, &offsets, &data[1..]).await.is_err());
// data changed
let (_cache_root, cache, key, range, offsets, mut data) = setup();
let (_cache_root, cache, key, range, offsets, mut data) = setup().await;
data[0] = data[0] + 1;
assert!(cache.put(&key, &range, &offsets, &data).is_err());
assert!(cache.put(&key, &range, &offsets, &data).await.is_err());
}
#[test]
fn test_initialize_non_empty() {
#[tokio::test]
async fn test_initialize_non_empty() {
let cache_root = TempDir::new("initialize_non_empty").unwrap();
let config = CacheConfig {
cache_directory: cache_root.path().to_path_buf(),
@@ -1035,24 +1033,24 @@ mod tests {
for _ in 0..20 {
let (key, range, offsets, data) = it.next().unwrap();
assert!(cache.put(&key, &range, &offsets, &data).is_ok());
assert!(cache.put(&key, &range, &offsets, &data).await.is_ok());
keys_and_ranges.push((key, range));
}
let cache2 = DiskCache::initialize(&config).unwrap();
for (i, (key, range)) in keys_and_ranges.iter().enumerate() {
let get_result = cache2.get(&key, &range);
let get_result = cache2.get(&key, &range).await;
assert!(get_result.is_ok(), "{i} {get_result:?}");
assert!(get_result.unwrap().is_some(), "{i}");
}
let cache_keys = cache.state.lock().unwrap().inner.keys().cloned().collect::<BTreeSet<_>>();
let cache2_keys = cache2.state.lock().unwrap().inner.keys().cloned().collect::<BTreeSet<_>>();
let cache_keys = cache.state.read().await.inner.keys().cloned().collect::<BTreeSet<_>>();
let cache2_keys = cache2.state.read().await.inner.keys().cloned().collect::<BTreeSet<_>>();
assert_eq!(cache_keys, cache2_keys);
}
#[test]
fn test_initialize_too_large_file() {
#[tokio::test]
async fn test_initialize_too_large_file() {
const LARGE_FILE: u64 = 1000;
let cache_root = TempDir::new("initialize_too_large_file").unwrap();
let config = CacheConfig {
@@ -1064,7 +1062,7 @@ mod tests {
let mut it = RandomEntryIterator::std_from_seed(RANDOM_SEED).with_range_len(LARGE_FILE as u32);
let (key, range, offsets, data) = it.next().unwrap();
cache.put(&key, &range, &offsets, &data).unwrap();
cache.put(&key, &range, &offsets, &data).await.unwrap();
let config = CacheConfig {
cache_directory: cache_root.path().to_path_buf(),
cache_size: LARGE_FILE - 1,
@@ -1072,11 +1070,11 @@ mod tests {
};
let cache2 = DiskCache::initialize(&config).unwrap();
assert_eq!(cache2.total_bytes().unwrap(), 0);
assert_eq!(cache2.total_bytes().await, 0);
}
#[test]
fn test_initialize_stops_loading_early_with_too_many_files() {
#[tokio::test]
async fn test_initialize_stops_loading_early_with_too_many_files() {
const LARGE_FILE: u64 = 1000;
let cache_root = TempDir::new("initialize_stops_loading_early_with_too_many_files").unwrap();
let config = CacheConfig {
@@ -1088,7 +1086,7 @@ mod tests {
let mut it = RandomEntryIterator::std_from_seed(RANDOM_SEED).with_range_len(LARGE_FILE as u32);
for _ in 0..10 {
let (key, range, offsets, data) = it.next().unwrap();
cache.put(&key, &range, &offsets, &data).unwrap();
cache.put(&key, &range, &offsets, &data).await.unwrap();
}
let cap2 = LARGE_FILE * 2;
@@ -1099,7 +1097,7 @@ mod tests {
};
let cache2 = DiskCache::initialize(&config).unwrap();
assert!(cache2.total_bytes().unwrap() < cap2 * 3, "{} < {}", cache2.total_bytes().unwrap(), cap2 * 3);
assert!(cache2.total_bytes().await < cap2 * 3, "{} < {}", cache2.total_bytes().await, cap2 * 3);
}
#[test]
@@ -1109,8 +1107,8 @@ mod tests {
assert!(key.is_ok(), "{key:?}")
}
#[test]
fn test_unknown_eviction() {
#[tokio::test]
async fn test_unknown_eviction() {
let cache_root = TempDir::new("initialize_non_empty").unwrap();
let capacity = 12 * RANGE_LEN as u64;
let config = CacheConfig {
@@ -1121,23 +1119,23 @@ mod tests {
let cache = DiskCache::initialize(&config).unwrap();
let mut it = RandomEntryIterator::std_from_seed(RANDOM_SEED);
let (key, range, chunk_byte_indices, data) = it.next().unwrap();
cache.put(&key, &range, &chunk_byte_indices, &data).unwrap();
cache.put(&key, &range, &chunk_byte_indices, &data).await.unwrap();
let cache2 = DiskCache::initialize(&config).unwrap();
let get_result = cache2.get(&key, &range);
let get_result = cache2.get(&key, &range).await;
assert!(get_result.is_ok());
assert!(get_result.unwrap().is_some());
let (key2, range2, chunk_byte_indices2, data2) = it.next().unwrap();
assert!(cache2.put(&key2, &range2, &chunk_byte_indices2, &data2).is_ok());
assert!(cache2.put(&key2, &range2, &chunk_byte_indices2, &data2).await.is_ok());
let mut get_result_1 = cache2.get(&key, &range).unwrap();
let mut get_result_1 = cache2.get(&key, &range).await.unwrap();
let mut i = 0;
while get_result_1.is_some() && i < 50 {
i += 1;
let (key2, range2, chunk_byte_indices2, data2) = it.next().unwrap();
cache2.put(&key2, &range2, &chunk_byte_indices2, &data2).unwrap();
get_result_1 = cache2.get(&key, &range).unwrap();
cache2.put(&key2, &range2, &chunk_byte_indices2, &data2).await.unwrap();
get_result_1 = cache2.get(&key, &range).await.unwrap();
}
if get_result_1.is_some() {
// randomness didn't evict the record after 50 tries, don't test this case now
@@ -1145,13 +1143,13 @@ mod tests {
}
// we've evicted the original record from the cache
// note using the original cache handle without updates!
let get_result_post_eviction = cache.get(&key, &range);
let get_result_post_eviction = cache.get(&key, &range).await;
assert!(get_result_post_eviction.is_ok());
assert!(get_result_post_eviction.unwrap().is_none());
}
#[test]
fn put_subrange() {
#[tokio::test]
async fn put_subrange() {
let cache_root = TempDir::new("put_subrange").unwrap();
let config = CacheConfig {
cache_directory: cache_root.path().to_path_buf(),
@@ -1161,23 +1159,26 @@ mod tests {
let cache = DiskCache::initialize(&config).unwrap();
let (key, range, chunk_byte_indices, data) = RandomEntryIterator::std_from_seed(RANDOM_SEED).next().unwrap();
cache.put(&key, &range, &chunk_byte_indices, &data).unwrap();
let total_bytes = cache.total_bytes().unwrap();
cache.put(&key, &range, &chunk_byte_indices, &data).await.unwrap();
let total_bytes = cache.total_bytes().await;
// left range
let left_range = ChunkRange::new(range.start, range.end - 1);
let left_chunk_byte_indices = &chunk_byte_indices[..chunk_byte_indices.len() - 1];
let left_data = &data[..*left_chunk_byte_indices.last().unwrap() as usize];
assert!(cache.put(&key, &left_range, left_chunk_byte_indices, left_data).is_ok());
assert_eq!(total_bytes, cache.total_bytes().unwrap());
assert!(cache.put(&key, &left_range, left_chunk_byte_indices, left_data).await.is_ok());
assert_eq!(total_bytes, cache.total_bytes().await);
// right range
let right_range = ChunkRange::new(range.start + 1, range.end);
let right_chunk_byte_indices: Vec<u32> =
(&chunk_byte_indices[1..]).iter().map(|v| v - chunk_byte_indices[1]).collect();
let right_data = &data[chunk_byte_indices[1] as usize..];
assert!(cache.put(&key, &right_range, &right_chunk_byte_indices, right_data).is_ok());
assert_eq!(total_bytes, cache.total_bytes().unwrap());
assert!(cache
.put(&key, &right_range, &right_chunk_byte_indices, right_data)
.await
.is_ok());
assert_eq!(total_bytes, cache.total_bytes().await);
// middle range
let middle_range = ChunkRange::new(range.start + 1, range.end - 1);
@@ -1188,12 +1189,15 @@ mod tests {
let middle_data =
&data[chunk_byte_indices[1] as usize..chunk_byte_indices[chunk_byte_indices.len() - 2] as usize];
assert!(cache.put(&key, &middle_range, &middle_chunk_byte_indices, middle_data).is_ok());
assert_eq!(total_bytes, cache.total_bytes().unwrap());
assert!(cache
.put(&key, &middle_range, &middle_chunk_byte_indices, middle_data)
.await
.is_ok());
assert_eq!(total_bytes, cache.total_bytes().await);
}
#[test]
fn test_evictions_with_multiple_range_per_key() {
#[tokio::test]
async fn test_evictions_with_multiple_range_per_key() {
const NUM: u32 = 12;
let cache_root = TempDir::new("multiple_range_per_key").unwrap();
let capacity = (NUM * RANGE_LEN) as u64;
@@ -1212,15 +1216,15 @@ mod tests {
while previously_put.iter().any(|(_, r)| r.start == range.start) {
range.start += 1 % 1000;
}
cache.put(&key, &range, &chunk_byte_indices, &data).unwrap();
cache.put(&key, &range, &chunk_byte_indices, &data).await.unwrap();
previously_put.push((key.clone(), range.clone()));
cache.put(&key2, &range, &chunk_byte_indices, &data).unwrap();
cache.put(&key2, &range, &chunk_byte_indices, &data).await.unwrap();
previously_put.push((key2, range));
}
let mut num_hits = 0;
for (key, range) in &previously_put {
let result = cache.get(key, range);
let result = cache.get(key, range).await;
assert!(result.is_ok());
let result = result.unwrap();
if result.is_some() {
@@ -1231,10 +1235,7 @@ mod tests {
assert_ne!(num_hits, 0);
// assert that we haven't evicted all keys for key with multiple items
assert!(
cache.state.lock().unwrap().inner.contains_key(&key),
"evicted key that should have remained in cache"
);
assert!(cache.state.read().await.inner.contains_key(&key), "evicted key that should have remained in cache");
}
#[test]
@@ -1279,11 +1280,11 @@ mod concurrency_tests {
let mut kr = Vec::with_capacity(NUM_ITEMS_PER_TASK);
for _ in 0..NUM_ITEMS_PER_TASK {
let (key, range, chunk_byte_indices, data) = it.next().unwrap();
assert!(cache_clone.put(&key, &range, &chunk_byte_indices, &data).is_ok());
assert!(cache_clone.put(&key, &range, &chunk_byte_indices, &data).await.is_ok());
kr.push((key, range));
}
for (key, range) in kr {
assert!(cache_clone.get(&key, &range).is_ok());
assert!(cache_clone.get(&key, &range).await.is_ok());
}
}))
}
@@ -1313,11 +1314,11 @@ mod concurrency_tests {
let mut kr = Vec::with_capacity(NUM_ITEMS_PER_TASK);
for _ in 0..NUM_ITEMS_PER_TASK {
let (key, range, chunk_byte_indices, data) = it.next().unwrap();
assert!(cache_clone.put(&key, &range, &chunk_byte_indices, &data).is_ok());
assert!(cache_clone.put(&key, &range, &chunk_byte_indices, &data).await.is_ok());
kr.push((key, range));
}
for (key, range) in kr {
assert!(cache_clone.get(&key, &range).is_ok());
assert!(cache_clone.get(&key, &range).await.is_ok());
}
}))
}
@@ -1350,7 +1351,7 @@ mod concurrency_tests {
let chunk_byte_indices = chunk_byte_indices.clone();
let data_clone = data.clone();
handles.push(tokio::spawn(async move {
let res = cache_clone.put(&key, &range, &chunk_byte_indices, &data_clone);
let res = cache_clone.put(&key, &range, &chunk_byte_indices, &data_clone).await;
assert!(res.is_ok(), "err: {res:?}");
}))
}
@@ -1360,7 +1361,7 @@ mod concurrency_tests {
}
// check that there is only 1 term in the cache for this data
let state = cache.state.lock().unwrap();
let state = cache.state.read().await;
let items = state.inner.get(&key).unwrap();
let num = items.iter().filter(|item| item.range == range).count();

View File

@@ -4,6 +4,7 @@ use std::str::Utf8Error;
use base64::DecodeError;
use merklehash::DataHashBytesParseError;
use thiserror::Error;
use tokio::task::JoinError;
#[derive(Debug, Error)]
pub enum ChunkCacheError {
@@ -23,6 +24,9 @@ pub enum ChunkCacheError {
LockPoison,
#[error("invalid arguments")]
InvalidArguments,
#[error("RuntimeError")]
RuntimeError(#[from] JoinError),
}
impl ChunkCacheError {

View File

@@ -5,6 +5,7 @@ pub mod error;
use std::path::PathBuf;
use std::sync::Arc;
use async_trait::async_trait;
pub use cache_manager::get_cache;
use cas_types::{ChunkRange, Key};
pub use disk::test_utils::*;
@@ -39,6 +40,7 @@ pub struct CacheRange {
/// implementors are allowed to evict data, a get after a put is not required to
/// be a cache hit.
#[automock]
#[async_trait]
pub trait ChunkCache: Sync + Send {
/// get should return an Ok() variant if significant error occurred, check the error
/// variant for issues with IO or parsing contents etc.
@@ -53,7 +55,7 @@ pub trait ChunkCache: Sync + Send {
/// key is required to be a valid CAS Key
/// range is intended to be an index range within the xorb with constraint
/// 0 <= range.start < range.end <= num_chunks_in_xorb(key)
fn get(&self, key: &Key, range: &ChunkRange) -> Result<Option<CacheRange>, ChunkCacheError>;
async fn get(&self, key: &Key, range: &ChunkRange) -> Result<Option<CacheRange>, ChunkCacheError>;
/// put should return Ok(()) if the put succeeded with no error, check the error
/// variant for issues with validating the input, cache state, IO, etc.
@@ -66,7 +68,7 @@ pub trait ChunkCache: Sync + Send {
/// key is required to be a valid CAS Key
/// range is intended to be an index range within the xorb with constraint
/// 0 <= range.start < range.end <= num_chunks_in_xorb(key)
fn put(
async fn put(
&self,
key: &Key,
range: &ChunkRange,

View File

@@ -142,9 +142,9 @@ checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50"
[[package]]
name = "async-trait"
version = "0.1.87"
version = "0.1.88"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d556ec1359574147ec0c4fc5eb525f3f23263a592b1a9c07e0a75b427de55c97"
checksum = "e539d3fca749fcee5236ab05e93a52867dd549cc157c8cb7f99595f3cedffdb5"
dependencies = [
"proc-macro2",
"quote",
@@ -373,6 +373,7 @@ dependencies = [
name = "chunk_cache"
version = "0.1.0"
dependencies = [
"async-trait",
"base64 0.22.1",
"cas_types",
"crc32fast",
@@ -384,6 +385,7 @@ dependencies = [
"once_cell",
"rand 0.8.5",
"thiserror 2.0.11",
"tokio",
"tracing",
"utils",
]
@@ -392,6 +394,7 @@ dependencies = [
name = "chunk_cache_bench"
version = "0.1.0"
dependencies = [
"async-trait",
"base64 0.22.1",
"cas_types",
"chunk_cache",
@@ -1785,7 +1788,7 @@ dependencies = [
"blake3",
"getrandom 0.3.1",
"heed",
"rand 0.9.0",
"rand 0.8.5",
"safe-transmute",
"serde",
]
@@ -2368,7 +2371,7 @@ version = "0.2.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77957b295656769bb8ad2b6a6b09d897d94f05c41b069aede1fcdaa675eaea04"
dependencies = [
"zerocopy 0.7.35",
"zerocopy",
]
[[package]]
@@ -2531,21 +2534,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"libc",
"rand_chacha 0.3.1",
"rand_chacha",
"rand_core 0.6.4",
]
[[package]]
name = "rand"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94"
dependencies = [
"rand_chacha 0.9.0",
"rand_core 0.9.3",
"zerocopy 0.8.23",
]
[[package]]
name = "rand_chacha"
version = "0.3.1"
@@ -2556,16 +2548,6 @@ dependencies = [
"rand_core 0.6.4",
]
[[package]]
name = "rand_chacha"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb"
dependencies = [
"ppv-lite86",
"rand_core 0.9.3",
]
[[package]]
name = "rand_core"
version = "0.3.1"
@@ -2590,15 +2572,6 @@ dependencies = [
"getrandom 0.2.15",
]
[[package]]
name = "rand_core"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38"
dependencies = [
"getrandom 0.3.1",
]
[[package]]
name = "rayon"
version = "1.10.0"
@@ -3953,7 +3926,6 @@ dependencies = [
"thiserror 2.0.11",
"tokio",
"tracing",
"xet_threadpool",
]
[[package]]
@@ -4428,15 +4400,6 @@ dependencies = [
"rustix",
]
[[package]]
name = "xet_threadpool"
version = "0.1.0"
dependencies = [
"thiserror 2.0.11",
"tokio",
"tracing",
]
[[package]]
name = "yoke"
version = "0.7.4"
@@ -4468,16 +4431,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0"
dependencies = [
"byteorder",
"zerocopy-derive 0.7.35",
]
[[package]]
name = "zerocopy"
version = "0.8.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd97444d05a4328b90e75e503a34bad781f14e28a823ad3557f0750df1ebcbc6"
dependencies = [
"zerocopy-derive 0.8.23",
"zerocopy-derive",
]
[[package]]
@@ -4491,17 +4445,6 @@ dependencies = [
"syn 2.0.87",
]
[[package]]
name = "zerocopy-derive"
version = "0.8.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6352c01d0edd5db859a63e2605f4ea3183ddbd15e2c4a9e7d32184df75e4f154"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.87",
]
[[package]]
name = "zerofrom"
version = "0.1.4"

View File

@@ -12,6 +12,8 @@ r2d2_postgres = "0.18.1"
r2d2 = "0.8.10"
clap = { version = "4.5.19", features = ["derive"] }
tempdir = "0.3.7"
async-trait = "*"
tokio = { version = "1.36", features = ["full"] }
[[bench]]
@@ -26,4 +28,3 @@ name = "cache_resilience_test"
[dev-dependencies]
rand = "0.8"
criterion = { version = "0.4", features = ["async_tokio"] }
tokio = { version = "1.36", features = ["full"] }

View File

@@ -44,15 +44,16 @@ struct ChildArgs {
capacity: u64,
}
fn main() {
#[tokio::main]
async fn main() {
let args = ResilienceTestArgs::parse();
match args.command {
Commands::Parent(parent) => parent_main(parent),
Commands::Child(child) => child_main(child),
Commands::Parent(parent) => parent_main(parent).await,
Commands::Child(child) => child_main(child).await,
}
}
fn parent_main(args: ParentArgs) {
async fn parent_main(args: ParentArgs) {
let binary = std::env::current_exe().unwrap();
let binary_str = binary.to_str().unwrap();
let cache_root = TempDir::new("resilience").unwrap();
@@ -89,7 +90,7 @@ fn parent_main(args: ParentArgs) {
exit(0);
}
fn child_main(args: ChildArgs) {
async fn child_main(args: ChildArgs) {
let id = std::process::id();
let end_time = SystemTime::now().checked_add(Duration::from_secs(args.seconds)).unwrap();
@@ -99,7 +100,7 @@ fn child_main(args: ChildArgs) {
};
let cache = DiskCache::initialize(&config).unwrap();
eprintln!("initialized id: {id} with {} entries", cache.num_items().unwrap());
eprintln!("initialized id: {id} with {} entries", cache.num_items().await);
let mut saved = (0, Key::default(), ChunkRange::default());
@@ -109,15 +110,15 @@ fn child_main(args: ChildArgs) {
let mut it = RandomEntryIterator::default();
while SystemTime::now() < end_time {
let (key, range, chunk_byte_indices, data) = it.next().unwrap();
cache.put(&key, &range, &chunk_byte_indices, &data).unwrap();
cache.get(&key, &range).unwrap();
cache.put(&key, &range, &chunk_byte_indices, &data).await.unwrap();
cache.get(&key, &range).await.unwrap();
if i % 1000 == 1 {
saved = (i, key, range);
}
if i != 0 && i % 1000 == 0 {
let (_old_i, key, range) = &saved;
attempts += 1f64;
match cache.get(key, range).unwrap() {
match cache.get(key, range).await.unwrap() {
Some(_) => {
// eprintln!("id: {id} old test got a hit {old_i} @ {i}");
hits += 1f64;

View File

@@ -31,8 +31,13 @@ impl ChunkCacheExt for SCCache {
}
}
#[async_trait::async_trait]
impl ChunkCache for SCCache {
fn get(&self, key: &cas_types::Key, range: &cas_types::ChunkRange) -> Result<Option<CacheRange>, ChunkCacheError> {
async fn get(
&self,
key: &cas_types::Key,
range: &cas_types::ChunkRange,
) -> Result<Option<CacheRange>, ChunkCacheError> {
let cache_key = CacheKey::new(key, range)?;
let mut file = if let Ok(file) = self.cache.lock()?.get(&cache_key) {
file
@@ -49,7 +54,7 @@ impl ChunkCache for SCCache {
}))
}
fn put(
async fn put(
&self,
key: &cas_types::Key,
range: &cas_types::ChunkRange,

View File

@@ -35,8 +35,13 @@ impl ChunkCacheExt for SolidCache {
}
}
#[async_trait::async_trait]
impl ChunkCache for SolidCache {
fn get(&self, key: &cas_types::Key, range: &cas_types::ChunkRange) -> Result<Option<CacheRange>, ChunkCacheError> {
async fn get(
&self,
key: &cas_types::Key,
range: &cas_types::ChunkRange,
) -> Result<Option<CacheRange>, ChunkCacheError> {
let start = range.start as i32;
let end = range.end as i32;
@@ -67,7 +72,7 @@ impl ChunkCache for SolidCache {
}))
}
fn put(
async fn put(
&self,
key: &cas_types::Key,
range: &cas_types::ChunkRange,
@@ -105,39 +110,39 @@ mod tests {
use super::SolidCache;
#[test]
#[tokio::test]
#[ignore = "need a running postgres"]
fn test_postgres() {
async fn test_postgres() {
let cache = SolidCache::new();
let mut it = RandomEntryIterator::new(thread_rng());
let mut kr = Vec::new();
for _ in 0..5 {
let (key, range, chunk_byte_indices, data) = it.next().unwrap();
let result = cache.put(&key, &range, &chunk_byte_indices, &data);
let result = cache.put(&key, &range, &chunk_byte_indices, &data).await;
assert!(result.is_ok(), "{result:?}");
kr.push((key, range));
}
for (key, range) in kr {
let result = cache.get(&key, &range);
let result = cache.get(&key, &range).await;
assert!(result.is_ok(), "{result:?}");
let result = result.unwrap();
assert!(result.is_some(), "{result:?}");
}
let (key, range) = it.next_key_range();
let result = cache.get(&key, &range);
let result = cache.get(&key, &range).await;
assert!(result.is_ok(), "{result:?}");
let result = result.unwrap();
assert!(result.is_none(), "{result:?}");
}
#[test]
#[tokio::test]
#[ignore = "need a running postgres"]
fn test_postgres_get_miss() {
async fn test_postgres_get_miss() {
let cache = SolidCache::new();
let mut it = RandomEntryIterator::new(thread_rng());
let (key, range) = it.next_key_range();
let result = cache.get(&key, &range);
let result = cache.get(&key, &range).await;
assert!(result.is_ok(), "{result:?}");
let result = result.unwrap();
assert!(result.is_none(), "{result:?}");