mirror of
https://github.com/dmlc/dgl.git
synced 2026-06-03 19:34:33 +08:00
[GraphBolt][io_uring] Use RAII to ensure queues are returned. (#7680)
This commit is contained in:
committed by
GitHub
parent
90c26be268
commit
c86776d898
@@ -171,27 +171,19 @@ torch::Tensor OnDiskNpyArray::IndexSelectIOUringImpl(torch::Tensor index) {
|
||||
// Indicator for index error.
|
||||
std::atomic<int> error_flag{};
|
||||
std::atomic<int64_t> work_queue{};
|
||||
std::atomic_flag exiting_first = ATOMIC_FLAG_INIT;
|
||||
// Consume a slot so that parallel_for is called only if there are available
|
||||
// queues.
|
||||
semaphore_.acquire();
|
||||
std::atomic<int> num_semaphore_acquisitions = 1;
|
||||
graphbolt::parallel_for_each_interop(0, num_thread_, 1, [&](int thread_id) {
|
||||
// Construct a QueueAndBufferAcquirer object so that the worker threads can
|
||||
// share the available queues and buffers.
|
||||
QueueAndBufferAcquirer queue_source(this);
|
||||
graphbolt::parallel_for_each_interop(0, num_thread_, 1, [&](int) {
|
||||
// The completion queue might contain 4 * kGroupSize while we may submit
|
||||
// 4 * kGroupSize more. No harm in overallocation here.
|
||||
CircularQueue<ReadRequest> read_queue(8 * kGroupSize);
|
||||
int64_t num_submitted = 0;
|
||||
int64_t num_completed = 0;
|
||||
{
|
||||
// We consume a slot from the semaphore to use a queue.
|
||||
semaphore_.acquire();
|
||||
num_semaphore_acquisitions.fetch_add(1, std::memory_order_relaxed);
|
||||
std::lock_guard lock(available_queues_mtx_);
|
||||
TORCH_CHECK(!available_queues_.empty());
|
||||
thread_id = available_queues_.back();
|
||||
available_queues_.pop_back();
|
||||
}
|
||||
auto &io_uring_queue = io_uring_queue_[thread_id];
|
||||
auto [acquired_queue_handle, my_read_buffer2] = queue_source.get();
|
||||
auto &io_uring_queue = acquired_queue_handle.get();
|
||||
// Capturing structured binding is available only in C++20, so we rename.
|
||||
auto my_read_buffer = my_read_buffer2;
|
||||
auto submit_fn = [&](int64_t submission_minimum_batch_size) {
|
||||
if (read_queue.Size() < submission_minimum_batch_size) return;
|
||||
TORCH_CHECK( // Check for sqe overflow.
|
||||
@@ -207,7 +199,6 @@ torch::Tensor OnDiskNpyArray::IndexSelectIOUringImpl(torch::Tensor index) {
|
||||
read_queue.PopN(submitted);
|
||||
}
|
||||
};
|
||||
auto my_read_buffer = ReadBuffer(thread_id);
|
||||
for (int64_t read_buffer_slot = 0; true;) {
|
||||
auto request_read_buffer = [&]() {
|
||||
return my_read_buffer + (aligned_length_ + block_size_) *
|
||||
@@ -307,21 +298,7 @@ torch::Tensor OnDiskNpyArray::IndexSelectIOUringImpl(torch::Tensor index) {
|
||||
io_uring_cq_advance(&io_uring_queue, num_cqes_seen);
|
||||
num_completed += num_cqes_seen;
|
||||
}
|
||||
{
|
||||
// We give back the slot we used.
|
||||
std::lock_guard lock(available_queues_mtx_);
|
||||
available_queues_.push_back(thread_id);
|
||||
}
|
||||
// If this is the first thread exiting, release the master thread's ticket
|
||||
// as well by releasing 2 slots. Otherwise, release 1 slot.
|
||||
const auto releasing = exiting_first.test_and_set() ? 1 : 2;
|
||||
semaphore_.release(releasing);
|
||||
num_semaphore_acquisitions.fetch_add(-releasing, std::memory_order_relaxed);
|
||||
});
|
||||
// If any of the worker threads exit early without being able to release the
|
||||
// semaphore, we make sure to release it for them in the main thread.
|
||||
semaphore_.release(
|
||||
num_semaphore_acquisitions.load(std::memory_order_relaxed));
|
||||
const auto ret_val = error_flag.load(std::memory_order_relaxed);
|
||||
switch (ret_val) {
|
||||
case 0: // Successful.
|
||||
|
||||
@@ -18,11 +18,10 @@
|
||||
#include <cstdlib>
|
||||
#include <cstring>
|
||||
#include <cuda/std/semaphore>
|
||||
#include <fstream>
|
||||
#include <iostream>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
namespace graphbolt {
|
||||
@@ -147,6 +146,68 @@ class OnDiskNpyArray : public torch::CustomClassHolder {
|
||||
static inline std::mutex available_queues_mtx_; // available_queues_ mutex.
|
||||
static inline std::vector<int> available_queues_;
|
||||
|
||||
struct QueueAndBufferAcquirer {
|
||||
struct UniqueQueue {
|
||||
UniqueQueue(QueueAndBufferAcquirer* acquirer, int thread_id)
|
||||
: acquirer_(acquirer), thread_id_(thread_id) {}
|
||||
UniqueQueue(const UniqueQueue&) = delete;
|
||||
UniqueQueue& operator=(const UniqueQueue&) = delete;
|
||||
|
||||
~UniqueQueue() {
|
||||
{
|
||||
// We give back the slot we used.
|
||||
std::lock_guard lock(available_queues_mtx_);
|
||||
available_queues_.push_back(thread_id_);
|
||||
}
|
||||
// If this is the first thread exiting, release the master thread's
|
||||
// ticket as well by releasing 2 slots. Otherwise, release 1 slot.
|
||||
const auto releasing = acquirer_->exiting_first_.test_and_set() ? 1 : 2;
|
||||
semaphore_.release(releasing);
|
||||
acquirer_->num_acquisitions_.fetch_add(
|
||||
-releasing, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
::io_uring& get() const { return io_uring_queue_[thread_id_]; }
|
||||
|
||||
private:
|
||||
QueueAndBufferAcquirer* acquirer_;
|
||||
int thread_id_;
|
||||
};
|
||||
|
||||
QueueAndBufferAcquirer(OnDiskNpyArray* array) : array_(array) {
|
||||
semaphore_.acquire();
|
||||
}
|
||||
|
||||
~QueueAndBufferAcquirer() {
|
||||
// If any of the worker threads exit early without being able to release
|
||||
// the semaphore, we make sure to release it for them in the main thread.
|
||||
const auto releasing = num_acquisitions_.load(std::memory_order_relaxed);
|
||||
semaphore_.release(releasing);
|
||||
TORCH_CHECK(releasing == 0, "An io_uring worker thread didn't not exit.");
|
||||
}
|
||||
|
||||
std::pair<UniqueQueue, char*> get() {
|
||||
// We consume a slot from the semaphore to use a queue.
|
||||
semaphore_.acquire();
|
||||
num_acquisitions_.fetch_add(1, std::memory_order_relaxed);
|
||||
const auto thread_id = [&] {
|
||||
std::lock_guard lock(available_queues_mtx_);
|
||||
TORCH_CHECK(!available_queues_.empty());
|
||||
const auto thread_id = available_queues_.back();
|
||||
available_queues_.pop_back();
|
||||
return thread_id;
|
||||
}();
|
||||
return {
|
||||
std::piecewise_construct, std::make_tuple(this, thread_id),
|
||||
std::make_tuple(array_->ReadBuffer(thread_id))};
|
||||
}
|
||||
|
||||
private:
|
||||
const OnDiskNpyArray* array_;
|
||||
std::atomic_flag exiting_first_ = ATOMIC_FLAG_INIT;
|
||||
std::atomic<int> num_acquisitions_ = 1;
|
||||
};
|
||||
|
||||
#endif // HAVE_LIBRARY_LIBURING
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user