[GraphBolt][io_uring] Use the interop pool for io_uring workers. (#7575)

Co-authored-by: Rhett Ying <85214957+Rhett-Ying@users.noreply.github.com>
This commit is contained in:
Muhammed Fatih BALIN
2024-07-23 23:23:45 -04:00
committed by GitHub
parent dd02406c99
commit 849f1ce4b2
6 changed files with 90 additions and 39 deletions

View File

@@ -7,12 +7,15 @@
#include "./cnumpy.h"
#include "./io_uring.h"
#ifdef HAVE_LIBRARY_LIBURING
#include <fcntl.h>
#include <sys/stat.h>
#include <unistd.h>
#endif
#include <graphbolt/async.h>
#include <torch/torch.h>
#include <atomic>
@@ -21,6 +24,7 @@
#include <memory>
#include <numeric>
#include <stdexcept>
#include <vector>
#include "./circular_queue.h"
#include "./utils.h"
@@ -56,21 +60,28 @@ OnDiskNpyArray::OnDiskNpyArray(
// The minimum page size to contain one feature.
aligned_length_ = (feature_size_ + block_size_ - 1) & ~(block_size_ - 1);
// Get system max thread number.
num_thread_ = torch::get_num_threads();
if (num_threads.has_value() && num_thread_ > *num_threads) {
num_thread_ = *num_threads;
}
TORCH_CHECK(num_thread_ > 0, "A positive # threads is required.");
io_uring_queue_ = std::make_unique<io_uring[]>(num_thread_);
std::call_once(call_once_flag_, [&] {
// Get system max interop thread count.
num_queues_ = torch::get_num_interop_threads();
TORCH_CHECK(num_queues_ > 0, "A positive # queues is required.");
io_uring_queue_ = std::unique_ptr<::io_uring[], io_uring_queue_destroyer>(
new ::io_uring[num_queues_], io_uring_queue_destroyer{num_queues_});
mtx_ = std::make_unique<std::mutex[]>(num_queues_);
// Init io_uring queue.
for (int64_t t = 0; t < num_thread_; t++) {
TORCH_CHECK(
::io_uring_queue_init(2 * kGroupSize, &io_uring_queue_[t], 0) == 0);
// We have allocated 2 * kGroupSize submission queue entries and
// 4 * kGroupSize completion queue entries after this call.
}
// Init io_uring queue.
for (int64_t t = 0; t < num_queues_; t++) {
TORCH_CHECK(
::io_uring_queue_init(2 * kGroupSize, &io_uring_queue_[t], 0) == 0);
// We have allocated 2 * kGroupSize submission queue entries and
// 4 * kGroupSize completion queue entries after this call.
}
});
num_thread_ = std::min(
static_cast<int64_t>(num_queues_),
num_threads.value_or(
io_uring::num_threads.value_or((torch::get_num_threads() + 1) / 2)));
TORCH_CHECK(num_thread_ > 0, "A positive # threads is required.");
read_tensor_ = torch::empty(
ReadBufferSizePerThread() * num_thread_ + block_size_ - 1,
@@ -88,10 +99,6 @@ c10::intrusive_ptr<OnDiskNpyArray> OnDiskNpyArray::Create(
OnDiskNpyArray::~OnDiskNpyArray() {
#ifdef HAVE_LIBRARY_LIBURING
// IO queue exit.
for (int64_t t = 0; t < num_thread_; t++) {
::io_uring_queue_exit(&io_uring_queue_[t]);
}
TORCH_CHECK(::close(file_description_) == 0);
#endif // HAVE_LIBRARY_LIBURING
}
@@ -160,10 +167,7 @@ torch::Tensor OnDiskNpyArray::IndexSelectIOUringImpl(torch::Tensor index) {
// Indicator for index error.
std::atomic<int> error_flag{};
std::atomic<int64_t> work_queue{};
std::lock_guard lock(mtx_);
torch::parallel_for(0, num_thread_, 1, [&](int64_t begin, int64_t end) {
if (begin >= end) return;
const auto thread_id = begin;
auto worker_fn = [&](const int thread_id) {
auto &io_uring_queue = io_uring_queue_[thread_id];
auto my_read_buffer = ReadBuffer(thread_id);
// The completion queue might contain 4 * kGroupSize while we may submit
@@ -186,6 +190,8 @@ torch::Tensor OnDiskNpyArray::IndexSelectIOUringImpl(torch::Tensor index) {
read_queue.PopN(submitted);
}
};
// Make sure we have sole control of this thread's queue.
std::lock_guard io_uring_queue_lock(mtx_[thread_id]);
for (int64_t read_buffer_slot = 0; true;) {
auto request_read_buffer = [&]() {
return my_read_buffer + (aligned_length_ + block_size_) *
@@ -199,7 +205,7 @@ torch::Tensor OnDiskNpyArray::IndexSelectIOUringImpl(torch::Tensor index) {
// The condition not to overflow the submission queue.
kGroupSize - read_queue.Size()),
int64_t{});
begin =
const auto begin =
work_queue.fetch_add(num_requested_items, std::memory_order_relaxed);
if ((begin >= index.numel() && read_queue.IsEmpty() &&
num_completed >= num_submitted) ||
@@ -208,7 +214,7 @@ torch::Tensor OnDiskNpyArray::IndexSelectIOUringImpl(torch::Tensor index) {
// complete to avoid the instability due to incompleted reads.
error_flag.load(std::memory_order_relaxed) > 1)
break;
end = std::min(begin + num_requested_items, index.numel());
const auto end = std::min(begin + num_requested_items, index.numel());
AT_DISPATCH_INDEX_TYPES(
index.scalar_type(), "IndexSelectIOUring", ([&] {
auto index_data = index.data_ptr<index_t>();
@@ -285,8 +291,13 @@ torch::Tensor OnDiskNpyArray::IndexSelectIOUringImpl(torch::Tensor index) {
io_uring_cq_advance(&io_uring_queue, num_cqes_seen);
num_completed += num_cqes_seen;
}
});
};
std::vector<c10::intrusive_ptr<Future<void>>> futures;
for (int t = 0; t < num_thread_; t++) {
futures.emplace_back(async([&worker_fn, t] { worker_fn(t); }));
}
// Wait for the launched work to finish.
for (auto &future : futures) future->Wait();
switch (error_flag.load(std::memory_order_relaxed)) {
case 0: // Successful.
return result;

View File

@@ -27,6 +27,22 @@
namespace graphbolt {
namespace storage {
namespace {
#ifdef HAVE_LIBRARY_LIBURING
struct io_uring_queue_destroyer {
int num_thread_;
void operator()(::io_uring* queues) {
if (!queues) return;
for (int t = 0; t < num_thread_; t++) {
// IO queue exit.
::io_uring_queue_exit(&queues[t]);
}
delete[] queues;
}
};
#endif // HAVE_LIBRARY_LIBURING
} // namespace
/**
* @brief Disk Numpy Fetecher class.
*/
@@ -114,11 +130,17 @@ class OnDiskNpyArray : public torch::CustomClassHolder {
int64_t aligned_length_; // Aligned feature_size.
int num_thread_; // Default thread number.
torch::Tensor read_tensor_; // Provides temporary read buffer.
std::mutex mtx_;
#ifdef HAVE_LIBRARY_LIBURING
std::unique_ptr<io_uring[]> io_uring_queue_; // io_uring queue.
#endif // HAVE_LIBRARY_LIBURING
static inline std::once_flag
call_once_flag_; // Protect initialization of below.
static inline int num_queues_; // Number of queues.
static inline std::unique_ptr<::io_uring[], io_uring_queue_destroyer>
io_uring_queue_; // io_uring queue.
static inline std::unique_ptr<std::mutex[]> mtx_; // io_uring_queue mutexes.
#endif // HAVE_LIBRARY_LIBURING
};
} // namespace storage

View File

@@ -14,15 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*
* @file detect_io_uring.cc
* @brief Check whether io_uring is available on the system.
* @file io_uring.cc
* @brief io_uring related functions.
*/
#include "./io_uring.h"
#ifdef HAVE_LIBRARY_LIBURING
#include "./detect_io_uring.h"
#include <errno.h>
#include <liburing.h>
#include <linux/io_uring.h>
#include <liburing/io_uring.h>
#include <stddef.h>
#include <sys/syscall.h>
#include <unistd.h>
@@ -75,5 +76,7 @@ bool IsAvailable() {
#endif
}
void SetNumThreads(int64_t count) { num_threads = count; }
} // namespace io_uring
} // namespace graphbolt

View File

@@ -14,18 +14,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*
* @file detect_io_uring.h
* @brief Check whether io_uring is available on the system.
* @file io_uring.h
* @brief io_uring related functions.
*/
#ifndef GRAPHBOLT_DETECT_IO_URING_H_
#define GRAPHBOLT_DETECT_IO_URING_H_
#ifndef GRAPHBOLT_IO_URING_H_
#define GRAPHBOLT_IO_URING_H_
#include <cstdint>
#include <optional>
namespace graphbolt {
namespace io_uring {
bool IsAvailable();
/** @brief Set a limit on # background io_uring threads. */
inline std::optional<int64_t> num_threads;
/**
* @brief Set the number of background io_uring threads.
*/
void SetNumThreads(int64_t count);
} // namespace io_uring
} // namespace graphbolt
#endif // GRAPHBOLT_DETECT_IO_URING_H_
#endif // GRAPHBOLT_IO_URING_H_

View File

@@ -13,10 +13,10 @@
#include "./cuda/max_uva_threads.h"
#endif
#include "./cnumpy.h"
#include "./detect_io_uring.h"
#include "./expand_indptr.h"
#include "./feature_cache.h"
#include "./index_select.h"
#include "./io_uring.h"
#include "./partitioned_cache_policy.h"
#include "./random.h"
#include "./utils.h"
@@ -146,6 +146,7 @@ TORCH_LIBRARY(graphbolt, m) {
m.def("index_select_csc_batched", &ops::IndexSelectCSCBatched);
m.def("ondisk_npy_array", &storage::OnDiskNpyArray::Create);
m.def("detect_io_uring", &io_uring::IsAvailable);
m.def("set_num_io_uring_threads", &io_uring::SetNumThreads);
m.def("set_worker_id", &utils::SetWorkerId);
m.def("set_seed", &RandomEngine::SetManualSeed);
#ifdef GRAPHBOLT_USE_CUDA

View File

@@ -65,3 +65,6 @@ if torch.cuda.is_available() and not built_with_cuda():
"is installed. Consider reinstalling GraphBolt with CUDA support, see "
"installation instructions at https://www.dgl.ai/pages/start.html"
)
torch.ops.graphbolt.set_num_io_uring_threads(
min((torch.get_num_threads() + 1) // 2, 8)
)