From 0b2d538c01f49b6ef88cb452c4a641ed33dc9156 Mon Sep 17 00:00:00 2001 From: Muhammed Fatih BALIN Date: Tue, 20 Aug 2024 02:46:07 -0400 Subject: [PATCH] [GraphBolt][io_uring] Document `QueueAndBufferAcquirer`. (#7713) --- graphbolt/src/cnumpy.cc | 8 ++++---- graphbolt/src/cnumpy.h | 24 ++++++++++++++++++++++-- 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/graphbolt/src/cnumpy.cc b/graphbolt/src/cnumpy.cc index 26db0ff1a8..37365e5fc7 100644 --- a/graphbolt/src/cnumpy.cc +++ b/graphbolt/src/cnumpy.cc @@ -179,10 +179,10 @@ torch::Tensor OnDiskNpyArray::IndexSelectIOUringImpl(torch::Tensor index) { CircularQueue read_queue(8 * kGroupSize); int64_t num_submitted = 0; int64_t num_completed = 0; - auto [acquired_queue_handle, my_read_buffer2] = queue_source.get(); + auto [acquired_queue_handle, read_buffer_source2] = queue_source.get(); auto &io_uring_queue = acquired_queue_handle.get(); // Capturing structured binding is available only in C++20, so we rename. - auto my_read_buffer = my_read_buffer2; + auto read_buffer_source = read_buffer_source2; auto submit_fn = [&](int64_t submission_minimum_batch_size) { if (read_queue.Size() < submission_minimum_batch_size) return; TORCH_CHECK( // Check for sqe overflow. @@ -200,8 +200,8 @@ torch::Tensor OnDiskNpyArray::IndexSelectIOUringImpl(torch::Tensor index) { }; for (int64_t read_buffer_slot = 0; true;) { auto request_read_buffer = [&]() { - return my_read_buffer + (aligned_length_ + block_size_) * - (read_buffer_slot++ % (8 * kGroupSize)); + return read_buffer_source + (aligned_length_ + block_size_) * + (read_buffer_slot++ % (8 * kGroupSize)); }; const auto num_requested_items = std::max( std::min( diff --git a/graphbolt/src/cnumpy.h b/graphbolt/src/cnumpy.h index f853ab70d9..7931165809 100644 --- a/graphbolt/src/cnumpy.h +++ b/graphbolt/src/cnumpy.h @@ -146,12 +146,21 @@ class OnDiskNpyArray : public torch::CustomClassHolder { static inline std::mutex available_queues_mtx_; // available_queues_ mutex. static inline std::vector available_queues_; - struct QueueAndBufferAcquirer { - struct UniqueQueue { + /** + * @brief This class is meant to distribute the available read buffers and the + * statically declared io_uring queues among the worker threads. + */ + class QueueAndBufferAcquirer { + public: + class UniqueQueue { + public: UniqueQueue(int thread_id) : thread_id_(thread_id) {} UniqueQueue(const UniqueQueue&) = delete; UniqueQueue& operator=(const UniqueQueue&) = delete; + /** + * @brief Returns the queue back to the pool. + */ ~UniqueQueue() { { // We give back the slot we used. @@ -161,6 +170,9 @@ class OnDiskNpyArray : public torch::CustomClassHolder { semaphore_.release(); } + /** + * @brief Returns the raw io_uring queue. + */ ::io_uring& get() const { return io_uring_queue_[thread_id_]; } private: @@ -179,6 +191,14 @@ class OnDiskNpyArray : public torch::CustomClassHolder { } } + /** + * @brief Returns the secured io_uring queue and the read buffer as a pair. + * The raw io_uring queue can be accessed by calling `.get()` on the + * returned UniqueQueue object. + * + * @note The returned UniqueQueue object manages the lifetime of the + * io_uring queue. Its destructor returns the queue back to the pool. + */ std::pair get() { // We consume a slot from the semaphore to use a queue. if (entering_first_.test_and_set(std::memory_order_relaxed)) {