#ifdef RDK_BUILD_THREADSAFE_SSS // // Copyright (C) 2020 Shrey Aryan // // @@ All Rights Reserved @@ // This file is part of the RDKit. // The contents are covered by the terms of the BSD license // which is included in the file license.txt, found at the root // of the RDKit source tree. // #include "MultithreadedMolSupplier.h" #include namespace RDKit { namespace v2 { namespace FileParsers { void MultithreadedMolSupplier::close() { df_forceStop = true; d_outputQueue->setDone(); if (df_started) { // Clear the queues until they are empty // d_inputQueue->clear is not thread-safe std::tuple r; while (d_inputQueue->pop(r)) { } // clear the output queues, they might be full // and blocking the writer threads, note // that while ending threads the writers may // put a few more items back in the queue std::tuple mol_r; while (d_outputQueue->pop(mol_r)) { delete std::get<0>(mol_r); } } endThreads(); // notify the queue again that it is done in case // anyone is waiting on it d_outputQueue->setDone(); // destroy all objects in the input and output queues // and anything missed put in the queues while // the threads were endings if (df_started) { d_inputQueue->clear(); } if (d_outputQueue) { // destroy all objects in the output queue std::tuple r; while (d_outputQueue->pop(r)) { delete std::get<0>(r); } } // close external streams if any // destructors are called child to parent, however the threads // need to be ended before shutting down streams, so override this // in the child class. closeStreams(); df_started = false; } void MultithreadedMolSupplier::reader() { std::string record; unsigned int lineNum, index; while (!df_forceStop && extractNextRecord(record, lineNum, index)) { if (readCallback) { try { record = readCallback(record, index); } catch (std::exception &e) { BOOST_LOG(rdErrorLog) << "Read callback exception: " << e.what() << std::endl; } } auto r = std::make_tuple(record, lineNum, index); if (!df_forceStop) { d_inputQueue->push(r); } } d_inputQueue->setDone(); } void MultithreadedMolSupplier::writer() { std::tuple r; while (!df_forceStop && d_inputQueue->pop(r)) { try { std::unique_ptr mol( processMoleculeRecord(std::get<0>(r), std::get<1>(r))); if (!df_forceStop && mol && writeCallback) { writeCallback(*mol, std::get<0>(r), std::get<2>(r)); } auto temp = std::tuple{ mol.release(), std::get<0>(r), std::get<2>(r)}; d_outputQueue->push(temp); } catch (...) { // fill the queue wih a null value auto nullValue = std::tuple{ nullptr, std::get<0>(r), std::get<2>(r)}; d_outputQueue->push(nullValue); } } // we need a lock here otherwise two threads // can increment d_threadCounter even though it's // atomic. d_threadCounterMutex.lock(); if (d_threadCounter < d_params.numWriterThreads) { ++d_threadCounter; d_threadCounterMutex.unlock(); } else { // Here we need to unlock the threadCounterMutex before we setDone on the // outputQueue. This causes a notification to the queue which may actually // have elements in it. This notification may unblock the queue which // allows waiting threads to get their last attempt at adding to it // which will end up here and deadlock. d_threadCounterMutex.unlock(); d_outputQueue->setDone(); } } std::unique_ptr MultithreadedMolSupplier::next() { if (!df_started) { df_started = true; startThreads(); } std::tuple r; if (!df_forceStop && d_outputQueue->pop(r)) { d_lastItemText = std::get<1>(r); d_lastRecordId = std::get<2>(r); std::unique_ptr res{std::get<0>(r)}; if (res && nextCallback) { try { nextCallback(*res, *this); } catch (...) { // Ignore exception and proceed with mol as is. } } return res; } return nullptr; } // this calls joins on the reader and writer threads // and waits until completion. To actually force a stop // call close which handles the input and output queues void MultithreadedMolSupplier::endThreads() { if (!df_started) { return; } // stop the writers before stopping the readers // otherwise there might be a deadlock for (auto &thread : d_writerThreads) { thread.join(); } d_readerThread.join(); } void MultithreadedMolSupplier::startThreads() { // run the reader function in a seperate thread d_readerThread = std::thread(&MultithreadedMolSupplier::reader, this); // run the writer function in seperate threads for (unsigned int i = 0; i < d_params.numWriterThreads; i++) { d_writerThreads.emplace_back( std::thread(&MultithreadedMolSupplier::writer, this)); } } bool MultithreadedMolSupplier::atEnd() { return (d_outputQueue->isEmpty() && d_outputQueue->getDone()); } unsigned int MultithreadedMolSupplier::getLastRecordId() const { return d_lastRecordId; } std::string MultithreadedMolSupplier::getLastItemText() const { return d_lastItemText; } void MultithreadedMolSupplier::reset() { UNDER_CONSTRUCTION("reset() not supported for MultithreadedMolSupplier();"); } } // namespace FileParsers } // namespace v2 } // namespace RDKit #endif