This commit is contained in:
Hoyt Koepke
2026-03-30 07:50:15 -07:00
parent 9c0cb6e4c8
commit e33b30f076
2 changed files with 38 additions and 19 deletions

View File

@@ -122,11 +122,15 @@ impl DownloadStream {
match self.receiver.blocking_recv() {
Some(SequentialRetrievalItem::Data { receiver, permit }) => {
let data = receiver.blocking_recv().map_err(|_| {
FileReconstructionError::InternalWriterError(
"Data sender was dropped before sending data.".to_string(),
)
})?;
let data = match receiver.blocking_recv() {
Ok(data) => data,
Err(_) => {
self.run_state.check_error()?;
return Err(FileReconstructionError::InternalWriterError(
"Data sender was dropped before sending data.".to_string(),
));
},
};
self.run_state.report_bytes_written(data.len() as u64);
drop(permit);
Ok(Some(data))
@@ -160,11 +164,15 @@ impl DownloadStream {
match item {
Some(SequentialRetrievalItem::Data { receiver, permit }) => {
let data = receiver.await.map_err(|_| {
FileReconstructionError::InternalWriterError(
"Data sender was dropped before sending data.".to_string(),
)
})?;
let data = match receiver.await {
Ok(data) => data,
Err(_) => {
self.run_state.check_error()?;
return Err(FileReconstructionError::InternalWriterError(
"Data sender was dropped before sending data.".to_string(),
));
},
};
self.run_state.report_bytes_written(data.len() as u64);
drop(permit);
Ok(Some(data))

View File

@@ -47,6 +47,7 @@ struct SyncWriterThread {
rx: UnboundedReceiver<SequentialRetrievalItem>,
bytes_written: Arc<AtomicU64>,
progress_updater: Option<Arc<ItemProgressUpdater>>,
run_state: Arc<RunState>,
pending: Option<SequentialRetrievalItem>,
finished: bool,
}
@@ -56,11 +57,13 @@ impl SyncWriterThread {
rx: UnboundedReceiver<SequentialRetrievalItem>,
bytes_written: Arc<AtomicU64>,
progress_updater: Option<Arc<ItemProgressUpdater>>,
run_state: Arc<RunState>,
) -> Self {
Self {
rx,
bytes_written,
progress_updater,
run_state,
pending: None,
finished: false,
}
@@ -88,11 +91,15 @@ impl SyncWriterThread {
match self.pending.take() {
Some(SequentialRetrievalItem::Data { mut receiver, permit }) => {
if should_block {
let data = receiver.blocking_recv().map_err(|_| {
FileReconstructionError::InternalWriterError(
"Data sender was dropped before sending data.".to_string(),
)
})?;
let data = match receiver.blocking_recv() {
Ok(data) => data,
Err(_) => {
self.run_state.check_error()?;
return Err(FileReconstructionError::InternalWriterError(
"Data sender was dropped before sending data.".to_string(),
));
},
};
Ok(Some((data, permit)))
} else {
// Non-blocking: try to receive data.
@@ -103,9 +110,12 @@ impl SyncWriterThread {
self.pending = Some(SequentialRetrievalItem::Data { receiver, permit });
Ok(None)
},
Err(oneshot::error::TryRecvError::Closed) => Err(FileReconstructionError::InternalWriterError(
"Data sender was dropped before sending data.".to_string(),
)),
Err(oneshot::error::TryRecvError::Closed) => {
self.run_state.check_error()?;
Err(FileReconstructionError::InternalWriterError(
"Data sender was dropped before sending data.".to_string(),
))
},
}
}
},
@@ -391,11 +401,12 @@ impl SequentialWriter {
let bytes_written = Arc::new(AtomicU64::new(0));
let run_state_clone = run_state.clone();
let run_state_thread = run_state.clone();
let bytes_written_clone = bytes_written.clone();
let progress_updater = run_state.progress_updater().cloned();
let handle = XetRuntime::current().spawn_blocking(move || {
let writer_thread = SyncWriterThread::new(rx, bytes_written_clone, progress_updater);
let writer_thread = SyncWriterThread::new(rx, bytes_written_clone, progress_updater, run_state_thread);
let result = if use_vectorized {
writer_thread.run_vectorized(writer)
} else {