[GraphBolt][io_uring] Document QueueAndBufferAcquirer. (#7713)

This commit is contained in:
Muhammed Fatih BALIN
2024-08-20 02:46:07 -04:00
committed by GitHub
parent 2ce0ea0d0d
commit 0b2d538c01
2 changed files with 26 additions and 6 deletions

View File

@@ -179,10 +179,10 @@ torch::Tensor OnDiskNpyArray::IndexSelectIOUringImpl(torch::Tensor index) {
CircularQueue<ReadRequest> read_queue(8 * kGroupSize);
int64_t num_submitted = 0;
int64_t num_completed = 0;
auto [acquired_queue_handle, my_read_buffer2] = queue_source.get();
auto [acquired_queue_handle, read_buffer_source2] = queue_source.get();
auto &io_uring_queue = acquired_queue_handle.get();
// Capturing structured binding is available only in C++20, so we rename.
auto my_read_buffer = my_read_buffer2;
auto read_buffer_source = read_buffer_source2;
auto submit_fn = [&](int64_t submission_minimum_batch_size) {
if (read_queue.Size() < submission_minimum_batch_size) return;
TORCH_CHECK( // Check for sqe overflow.
@@ -200,8 +200,8 @@ torch::Tensor OnDiskNpyArray::IndexSelectIOUringImpl(torch::Tensor index) {
};
for (int64_t read_buffer_slot = 0; true;) {
auto request_read_buffer = [&]() {
return my_read_buffer + (aligned_length_ + block_size_) *
(read_buffer_slot++ % (8 * kGroupSize));
return read_buffer_source + (aligned_length_ + block_size_) *
(read_buffer_slot++ % (8 * kGroupSize));
};
const auto num_requested_items = std::max(
std::min(

View File

@@ -146,12 +146,21 @@ class OnDiskNpyArray : public torch::CustomClassHolder {
static inline std::mutex available_queues_mtx_; // available_queues_ mutex.
static inline std::vector<int> available_queues_;
struct QueueAndBufferAcquirer {
struct UniqueQueue {
/**
* @brief This class is meant to distribute the available read buffers and the
* statically declared io_uring queues among the worker threads.
*/
class QueueAndBufferAcquirer {
public:
class UniqueQueue {
public:
UniqueQueue(int thread_id) : thread_id_(thread_id) {}
UniqueQueue(const UniqueQueue&) = delete;
UniqueQueue& operator=(const UniqueQueue&) = delete;
/**
* @brief Returns the queue back to the pool.
*/
~UniqueQueue() {
{
// We give back the slot we used.
@@ -161,6 +170,9 @@ class OnDiskNpyArray : public torch::CustomClassHolder {
semaphore_.release();
}
/**
* @brief Returns the raw io_uring queue.
*/
::io_uring& get() const { return io_uring_queue_[thread_id_]; }
private:
@@ -179,6 +191,14 @@ class OnDiskNpyArray : public torch::CustomClassHolder {
}
}
/**
* @brief Returns the secured io_uring queue and the read buffer as a pair.
* The raw io_uring queue can be accessed by calling `.get()` on the
* returned UniqueQueue object.
*
* @note The returned UniqueQueue object manages the lifetime of the
* io_uring queue. Its destructor returns the queue back to the pool.
*/
std::pair<UniqueQueue, char*> get() {
// We consume a slot from the semaphore to use a queue.
if (entering_first_.test_and_set(std::memory_order_relaxed)) {