diff --git a/Code/GraphMol/FileParsers/MultithreadedMolSupplier.cpp b/Code/GraphMol/FileParsers/MultithreadedMolSupplier.cpp index b33842ca4..63b56858c 100644 --- a/Code/GraphMol/FileParsers/MultithreadedMolSupplier.cpp +++ b/Code/GraphMol/FileParsers/MultithreadedMolSupplier.cpp @@ -81,6 +81,7 @@ void MultithreadedMolSupplier::reader() { auto r = std::make_tuple(record, lineNum, index); if (!df_forceStop) { d_inputQueue->push(r); + ++d_mols_read; } } d_inputQueue->setDone(); @@ -106,23 +107,6 @@ void MultithreadedMolSupplier::writer() { d_outputQueue->push(nullValue); } } - - // we need a lock here otherwise two threads - // can increment d_threadCounter even though it's - // atomic. - d_threadCounterMutex.lock(); - if (d_threadCounter < d_params.numWriterThreads) { - ++d_threadCounter; - d_threadCounterMutex.unlock(); - } else { - // Here we need to unlock the threadCounterMutex before we setDone on the - // outputQueue. This causes a notification to the queue which may actually - // have elements in it. This notification may unblock the queue which - // allows waiting threads to get their last attempt at adding to it - // which will end up here and deadlock. - d_threadCounterMutex.unlock(); - d_outputQueue->setDone(); - } } std::unique_ptr MultithreadedMolSupplier::next() { @@ -131,7 +115,11 @@ std::unique_ptr MultithreadedMolSupplier::next() { startThreads(); } std::tuple r; - if (!df_forceStop && d_outputQueue->pop(r)) { + if (!df_forceStop) { + if (!d_outputQueue->pop(r)) { + throw FileParseException("something went wrong"); + } + d_lastItemText = std::get<1>(r); d_lastRecordId = std::get<2>(r); std::unique_ptr res{std::get<0>(r)}; @@ -142,9 +130,10 @@ std::unique_ptr MultithreadedMolSupplier::next() { // Ignore exception and proceed with mol as is. } } + ++d_mols_returned; return res; } - return nullptr; + throw FileParseException("stop forced"); } // this calls joins on the reader and writer threads @@ -164,9 +153,9 @@ void MultithreadedMolSupplier::endThreads() { } void MultithreadedMolSupplier::startThreads() { - // run the reader function in a seperate thread + // run the reader function in a separate thread d_readerThread = std::thread(&MultithreadedMolSupplier::reader, this); - // run the writer function in seperate threads + // run the writer function in separate threads for (unsigned int i = 0; i < d_params.numWriterThreads; i++) { d_writerThreads.emplace_back( std::thread(&MultithreadedMolSupplier::writer, this)); @@ -174,7 +163,7 @@ void MultithreadedMolSupplier::startThreads() { } bool MultithreadedMolSupplier::atEnd() { - return (d_outputQueue->isEmpty() && d_outputQueue->getDone()); + return (d_inputQueue->getDone() && d_mols_read == d_mols_returned); } unsigned int MultithreadedMolSupplier::getLastRecordId() const { diff --git a/Code/GraphMol/FileParsers/MultithreadedMolSupplier.h b/Code/GraphMol/FileParsers/MultithreadedMolSupplier.h index 4e91e0cf4..8bf09c51e 100644 --- a/Code/GraphMol/FileParsers/MultithreadedMolSupplier.h +++ b/Code/GraphMol/FileParsers/MultithreadedMolSupplier.h @@ -43,10 +43,9 @@ class RDKIT_FILEPARSERS_EXPORT MultithreadedMolSupplier : public MolSupplier { MultithreadedMolSupplier() {} - // Derived classes MUST have a destructor that calls close // to properly end threads while the instance is alive - virtual ~MultithreadedMolSupplier() {close();} + virtual ~MultithreadedMolSupplier() { close(); } //! shut down the supplier virtual void close() override; @@ -133,10 +132,13 @@ class RDKIT_FILEPARSERS_EXPORT MultithreadedMolSupplier : public MolSupplier { virtual RWMol *processMoleculeRecord(const std::string &record, unsigned int lineNum) = 0; - std::mutex d_threadCounterMutex; - std::atomic d_threadCounter{1}; //!< thread counter - std::vector d_writerThreads; //!< vector writer threads - std::thread d_readerThread; //!< single reader thread + std::vector d_writerThreads; //!< vector writer threads + std::thread d_readerThread; //!< single reader thread + + // both of these will only be incremented in unique threads + // (the reader and the master thread), so no need for atomics/mutexes + unsigned int d_mols_read{0}; + unsigned int d_mols_returned{0}; protected: std::atomic df_started = false; @@ -160,7 +162,6 @@ class RDKIT_FILEPARSERS_EXPORT MultithreadedMolSupplier : public MolSupplier { writeCallback = nullptr; std::function readCallback = nullptr; - }; } // namespace FileParsers } // namespace v2 diff --git a/Code/GraphMol/Wrap/testMultithreadedMolSupplier.py b/Code/GraphMol/Wrap/testMultithreadedMolSupplier.py index cb72dd37c..46689b930 100644 --- a/Code/GraphMol/Wrap/testMultithreadedMolSupplier.py +++ b/Code/GraphMol/Wrap/testMultithreadedMolSupplier.py @@ -168,6 +168,23 @@ class TestCase(unittest.TestCase): self.assertRaises(ValueError, helper, sdSup) + def testGitHubIssue8644(self): + fileN = os.path.join(RDConfig.RDBaseDir, 'Code', 'GraphMol', 'FileParsers', 'test_data', + 'CH.mol') + noneMols = 0 + notNoneMols = 0 + # this is usually enough to hit the async issue described in the PR + # without the test taking forever + numIters = 100 + for i in range(numIters): + for m in Chem.MultithreadedSDMolSupplier(fileN, numWriterThreads=100): + if m is None: + noneMols += 1 + else: + notNoneMols += 1 + self.assertEqual(notNoneMols, numIters) + self.assertEqual(noneMols, 0) + if __name__ == '__main__': unittest.main()