From e33b30f07650bae84870ff371187072025b69290 Mon Sep 17 00:00:00 2001 From: Hoyt Koepke Date: Mon, 30 Mar 2026 07:50:15 -0700 Subject: [PATCH] Update. --- .../data_writer/download_stream.rs | 28 +++++++++++------- .../data_writer/sequential_writer.rs | 29 +++++++++++++------ 2 files changed, 38 insertions(+), 19 deletions(-) diff --git a/xet_data/src/file_reconstruction/data_writer/download_stream.rs b/xet_data/src/file_reconstruction/data_writer/download_stream.rs index dfe0dcac..c1e4ca48 100644 --- a/xet_data/src/file_reconstruction/data_writer/download_stream.rs +++ b/xet_data/src/file_reconstruction/data_writer/download_stream.rs @@ -122,11 +122,15 @@ impl DownloadStream { match self.receiver.blocking_recv() { Some(SequentialRetrievalItem::Data { receiver, permit }) => { - let data = receiver.blocking_recv().map_err(|_| { - FileReconstructionError::InternalWriterError( - "Data sender was dropped before sending data.".to_string(), - ) - })?; + let data = match receiver.blocking_recv() { + Ok(data) => data, + Err(_) => { + self.run_state.check_error()?; + return Err(FileReconstructionError::InternalWriterError( + "Data sender was dropped before sending data.".to_string(), + )); + }, + }; self.run_state.report_bytes_written(data.len() as u64); drop(permit); Ok(Some(data)) @@ -160,11 +164,15 @@ impl DownloadStream { match item { Some(SequentialRetrievalItem::Data { receiver, permit }) => { - let data = receiver.await.map_err(|_| { - FileReconstructionError::InternalWriterError( - "Data sender was dropped before sending data.".to_string(), - ) - })?; + let data = match receiver.await { + Ok(data) => data, + Err(_) => { + self.run_state.check_error()?; + return Err(FileReconstructionError::InternalWriterError( + "Data sender was dropped before sending data.".to_string(), + )); + }, + }; self.run_state.report_bytes_written(data.len() as u64); drop(permit); Ok(Some(data)) diff --git a/xet_data/src/file_reconstruction/data_writer/sequential_writer.rs b/xet_data/src/file_reconstruction/data_writer/sequential_writer.rs index a627e823..a493981e 100644 --- a/xet_data/src/file_reconstruction/data_writer/sequential_writer.rs +++ b/xet_data/src/file_reconstruction/data_writer/sequential_writer.rs @@ -47,6 +47,7 @@ struct SyncWriterThread { rx: UnboundedReceiver, bytes_written: Arc, progress_updater: Option>, + run_state: Arc, pending: Option, finished: bool, } @@ -56,11 +57,13 @@ impl SyncWriterThread { rx: UnboundedReceiver, bytes_written: Arc, progress_updater: Option>, + run_state: Arc, ) -> Self { Self { rx, bytes_written, progress_updater, + run_state, pending: None, finished: false, } @@ -88,11 +91,15 @@ impl SyncWriterThread { match self.pending.take() { Some(SequentialRetrievalItem::Data { mut receiver, permit }) => { if should_block { - let data = receiver.blocking_recv().map_err(|_| { - FileReconstructionError::InternalWriterError( - "Data sender was dropped before sending data.".to_string(), - ) - })?; + let data = match receiver.blocking_recv() { + Ok(data) => data, + Err(_) => { + self.run_state.check_error()?; + return Err(FileReconstructionError::InternalWriterError( + "Data sender was dropped before sending data.".to_string(), + )); + }, + }; Ok(Some((data, permit))) } else { // Non-blocking: try to receive data. @@ -103,9 +110,12 @@ impl SyncWriterThread { self.pending = Some(SequentialRetrievalItem::Data { receiver, permit }); Ok(None) }, - Err(oneshot::error::TryRecvError::Closed) => Err(FileReconstructionError::InternalWriterError( - "Data sender was dropped before sending data.".to_string(), - )), + Err(oneshot::error::TryRecvError::Closed) => { + self.run_state.check_error()?; + Err(FileReconstructionError::InternalWriterError( + "Data sender was dropped before sending data.".to_string(), + )) + }, } } }, @@ -391,11 +401,12 @@ impl SequentialWriter { let bytes_written = Arc::new(AtomicU64::new(0)); let run_state_clone = run_state.clone(); + let run_state_thread = run_state.clone(); let bytes_written_clone = bytes_written.clone(); let progress_updater = run_state.progress_updater().cloned(); let handle = XetRuntime::current().spawn_blocking(move || { - let writer_thread = SyncWriterThread::new(rx, bytes_written_clone, progress_updater); + let writer_thread = SyncWriterThread::new(rx, bytes_written_clone, progress_updater, run_state_thread); let result = if use_vectorized { writer_thread.run_vectorized(writer) } else {