mirror of
https://github.com/rdkit/rdkit.git
synced 2026-06-03 21:44:30 +08:00
* potential satisfactory solution * fix typo * update testMultithreadedMolSupplier.py * revert first patch * better fix * add test
This commit is contained in:
committed by
GitHub
parent
10e7d50a35
commit
a47a5d91aa
@@ -81,6 +81,7 @@ void MultithreadedMolSupplier::reader() {
|
||||
auto r = std::make_tuple(record, lineNum, index);
|
||||
if (!df_forceStop) {
|
||||
d_inputQueue->push(r);
|
||||
++d_mols_read;
|
||||
}
|
||||
}
|
||||
d_inputQueue->setDone();
|
||||
@@ -106,23 +107,6 @@ void MultithreadedMolSupplier::writer() {
|
||||
d_outputQueue->push(nullValue);
|
||||
}
|
||||
}
|
||||
|
||||
// we need a lock here otherwise two threads
|
||||
// can increment d_threadCounter even though it's
|
||||
// atomic.
|
||||
d_threadCounterMutex.lock();
|
||||
if (d_threadCounter < d_params.numWriterThreads) {
|
||||
++d_threadCounter;
|
||||
d_threadCounterMutex.unlock();
|
||||
} else {
|
||||
// Here we need to unlock the threadCounterMutex before we setDone on the
|
||||
// outputQueue. This causes a notification to the queue which may actually
|
||||
// have elements in it. This notification may unblock the queue which
|
||||
// allows waiting threads to get their last attempt at adding to it
|
||||
// which will end up here and deadlock.
|
||||
d_threadCounterMutex.unlock();
|
||||
d_outputQueue->setDone();
|
||||
}
|
||||
}
|
||||
|
||||
std::unique_ptr<RWMol> MultithreadedMolSupplier::next() {
|
||||
@@ -131,7 +115,11 @@ std::unique_ptr<RWMol> MultithreadedMolSupplier::next() {
|
||||
startThreads();
|
||||
}
|
||||
std::tuple<RWMol *, std::string, unsigned int> r;
|
||||
if (!df_forceStop && d_outputQueue->pop(r)) {
|
||||
if (!df_forceStop) {
|
||||
if (!d_outputQueue->pop(r)) {
|
||||
throw FileParseException("something went wrong");
|
||||
}
|
||||
|
||||
d_lastItemText = std::get<1>(r);
|
||||
d_lastRecordId = std::get<2>(r);
|
||||
std::unique_ptr<RWMol> res{std::get<0>(r)};
|
||||
@@ -142,9 +130,10 @@ std::unique_ptr<RWMol> MultithreadedMolSupplier::next() {
|
||||
// Ignore exception and proceed with mol as is.
|
||||
}
|
||||
}
|
||||
++d_mols_returned;
|
||||
return res;
|
||||
}
|
||||
return nullptr;
|
||||
throw FileParseException("stop forced");
|
||||
}
|
||||
|
||||
// this calls joins on the reader and writer threads
|
||||
@@ -164,9 +153,9 @@ void MultithreadedMolSupplier::endThreads() {
|
||||
}
|
||||
|
||||
void MultithreadedMolSupplier::startThreads() {
|
||||
// run the reader function in a seperate thread
|
||||
// run the reader function in a separate thread
|
||||
d_readerThread = std::thread(&MultithreadedMolSupplier::reader, this);
|
||||
// run the writer function in seperate threads
|
||||
// run the writer function in separate threads
|
||||
for (unsigned int i = 0; i < d_params.numWriterThreads; i++) {
|
||||
d_writerThreads.emplace_back(
|
||||
std::thread(&MultithreadedMolSupplier::writer, this));
|
||||
@@ -174,7 +163,7 @@ void MultithreadedMolSupplier::startThreads() {
|
||||
}
|
||||
|
||||
bool MultithreadedMolSupplier::atEnd() {
|
||||
return (d_outputQueue->isEmpty() && d_outputQueue->getDone());
|
||||
return (d_inputQueue->getDone() && d_mols_read == d_mols_returned);
|
||||
}
|
||||
|
||||
unsigned int MultithreadedMolSupplier::getLastRecordId() const {
|
||||
|
||||
@@ -43,10 +43,9 @@ class RDKIT_FILEPARSERS_EXPORT MultithreadedMolSupplier : public MolSupplier {
|
||||
|
||||
MultithreadedMolSupplier() {}
|
||||
|
||||
|
||||
// Derived classes MUST have a destructor that calls close
|
||||
// to properly end threads while the instance is alive
|
||||
virtual ~MultithreadedMolSupplier() {close();}
|
||||
virtual ~MultithreadedMolSupplier() { close(); }
|
||||
|
||||
//! shut down the supplier
|
||||
virtual void close() override;
|
||||
@@ -133,10 +132,13 @@ class RDKIT_FILEPARSERS_EXPORT MultithreadedMolSupplier : public MolSupplier {
|
||||
virtual RWMol *processMoleculeRecord(const std::string &record,
|
||||
unsigned int lineNum) = 0;
|
||||
|
||||
std::mutex d_threadCounterMutex;
|
||||
std::atomic<unsigned int> d_threadCounter{1}; //!< thread counter
|
||||
std::vector<std::thread> d_writerThreads; //!< vector writer threads
|
||||
std::thread d_readerThread; //!< single reader thread
|
||||
std::vector<std::thread> d_writerThreads; //!< vector writer threads
|
||||
std::thread d_readerThread; //!< single reader thread
|
||||
|
||||
// both of these will only be incremented in unique threads
|
||||
// (the reader and the master thread), so no need for atomics/mutexes
|
||||
unsigned int d_mols_read{0};
|
||||
unsigned int d_mols_returned{0};
|
||||
|
||||
protected:
|
||||
std::atomic<bool> df_started = false;
|
||||
@@ -160,7 +162,6 @@ class RDKIT_FILEPARSERS_EXPORT MultithreadedMolSupplier : public MolSupplier {
|
||||
writeCallback = nullptr;
|
||||
std::function<std::string(const std::string &, unsigned int)> readCallback =
|
||||
nullptr;
|
||||
|
||||
};
|
||||
} // namespace FileParsers
|
||||
} // namespace v2
|
||||
|
||||
@@ -168,6 +168,23 @@ class TestCase(unittest.TestCase):
|
||||
|
||||
self.assertRaises(ValueError, helper, sdSup)
|
||||
|
||||
def testGitHubIssue8644(self):
|
||||
fileN = os.path.join(RDConfig.RDBaseDir, 'Code', 'GraphMol', 'FileParsers', 'test_data',
|
||||
'CH.mol')
|
||||
noneMols = 0
|
||||
notNoneMols = 0
|
||||
# this is usually enough to hit the async issue described in the PR
|
||||
# without the test taking forever
|
||||
numIters = 100
|
||||
for i in range(numIters):
|
||||
for m in Chem.MultithreadedSDMolSupplier(fileN, numWriterThreads=100):
|
||||
if m is None:
|
||||
noneMols += 1
|
||||
else:
|
||||
notNoneMols += 1
|
||||
self.assertEqual(notNoneMols, numIters)
|
||||
self.assertEqual(noneMols, 0)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
Reference in New Issue
Block a user