mirror of
https://github.com/rdkit/rdkit.git
synced 2026-06-03 21:44:30 +08:00
* fixed issue #2965 * added test case for issue #2965 * fixed formatting and added comment. * update * General Reader files * removed dependency on boost filesystems * removed class * clang-format * added-comments * further-cleanup * added clang-formatting * braces-for-if-else * changed error messages, added option for windows file path * fixed getFileName function * cleanup * option for filename without path * further-cleanup * added tests for determineFileFormat * cleanup, const arguments for validate function * init * cleanup * cleanup * clang-format does not work for CMake * added RDK_TEST_MULTITHREADED option * add-flag * cleanup * Delete ConcurrentQueue.h This PR deals with the Generalized File Reader. * Delete testConcurrentQueue.cpp This PR deals with the Generalized File Reader. * no change * concurrent queue * print values * Single Producer Multiple Consumer works * cleanup * Producer Consumer Example * update queue methods and tests * cleanup * test * fixed tests * cleanup, updated tests * Delete ProducerConsumer.h * Delete testProducerConsumer.cpp * cleanup * futher cleanup * changes based on feedback * make queue non copyable * psuedocode * possible implementation * untested implementation * change class to typename * basic-setup * need to fix segfault * need to fix blocking * need to fix blocking * need to fix blocking * fix indentation * one possibility * without lambda function * possible fix with some test cases * performance tests * added support for record id and item text * cleanup * cleanup * fixed memory leak and added methods with tests for getting last id and item text * cleanup * added more test cases with different smi files * cleanup * SD mol supplier * modified the parsing for SDMolSupplier * cleanup * cleanup * new file for testing * added support for reading molecule properties with tests * thread-safe logging and exception handling * cleanup * without thread safe logging * cleanup * cleanup, modified MultithreadedSmilesMolSupplier * cleanup, made reader and writer functions private * move O2.sdf * basic python wrapper with tests * cleanup, added new methods for python wrappers * made changes suggested by Andrew * file and compression formats are case-insensitive * cannot open files with gzstream * cleanup * possible fix for opening compressed streams (SMILES) * removed seekg() and tellg() methods from multithreadeded suppliers * cleanup * test cases for python wrappers * some wrapper cleanup * cleanup, removed unused functions * update the MT tests so that they actually do some work also includes some cleanup here * cleanup * remove iterator_next header include * added support for multithreaded readers * use getNumThreadsToUse for multithreaded suppliers * fixed documentation for multithreaded python wrappers * commented performance test * first draft of final evaluation report * removed inline variables * first draft getting started in python * fixed typos in getting started in python * fixed typos * fix documentation tests * fixed documentation tests * added links to important files and PR * added perfomance results * first version of wrappers with compressed streams * getting rid of streambuf stream method * modified General File Reader * make this work when building in non-threads mode * rename a test * rename a function in the python API * rearrange the python test a bit * disable the stream-based constructors in Python * mark the multithreaded classes as experimental Co-authored-by: greg landrum <greg.landrum@gmail.com>
133 lines
3.4 KiB
C++
133 lines
3.4 KiB
C++
//
|
|
// Copyright (C) 2020 Shrey Aryan
|
|
//
|
|
// @@ All Rights Reserved @@
|
|
// This file is part of the RDKit.
|
|
// The contents are covered by the terms of the BSD license
|
|
// which is included in the file license.txt, found at the root
|
|
// of the RDKit source tree.
|
|
//
|
|
#ifdef RDK_THREADSAFE_SSS
|
|
#ifndef CONCURRENT_QUEUE
|
|
#define CONCURRENT_QUEUE
|
|
#include <condition_variable>
|
|
#include <thread>
|
|
#include <vector>
|
|
|
|
namespace RDKit {
|
|
template <typename E>
|
|
class ConcurrentQueue {
|
|
private:
|
|
unsigned int d_capacity;
|
|
bool d_done;
|
|
std::vector<E> d_elements;
|
|
unsigned int d_head, d_tail;
|
|
mutable std::mutex d_lock;
|
|
std::condition_variable d_notEmpty, d_notFull;
|
|
|
|
private:
|
|
ConcurrentQueue<E>(const ConcurrentQueue<E>&);
|
|
ConcurrentQueue<E>& operator=(const ConcurrentQueue<E>&);
|
|
|
|
public:
|
|
ConcurrentQueue<E>(unsigned int capacity)
|
|
: d_capacity(capacity), d_done(false), d_head(0), d_tail(0) {
|
|
std::vector<E> elements(capacity);
|
|
d_elements = elements;
|
|
}
|
|
|
|
//! tries to push an element into the queue if it is not full without
|
|
//! modifying the variable element, if the queue is full then pushing an
|
|
//! element will result in blocking
|
|
void push(const E& element);
|
|
|
|
//! tries to pop an element from the queue if it is not empty and not done
|
|
//! the boolean value indicates the whether popping is successful, if the
|
|
//! queue is empty and not done then popping an element will result in
|
|
//! blocking
|
|
bool pop(E& element);
|
|
|
|
//! checks whether the ConcurrentQueue is empty
|
|
bool isEmpty() const;
|
|
|
|
//! returns the value of the variable done
|
|
bool getDone() const;
|
|
|
|
//! sets the variable d_done = true
|
|
void setDone();
|
|
|
|
//! clears the vector
|
|
void clear();
|
|
};
|
|
|
|
template <typename E>
|
|
void ConcurrentQueue<E>::push(const E& element) {
|
|
std::unique_lock<std::mutex> lk(d_lock);
|
|
//! concurrent queue is full so we wait until
|
|
//! it is not full
|
|
while (d_head + d_capacity == d_tail) {
|
|
d_notFull.wait(lk);
|
|
}
|
|
bool wasEmpty = (d_head == d_tail);
|
|
d_elements.at(d_tail % d_capacity) = element;
|
|
d_tail++;
|
|
//! if the concurrent queue was empty before
|
|
//! then it is not any more since we have "pushed" an element
|
|
//! thus we notify all the consumer threads
|
|
if (wasEmpty) {
|
|
d_notEmpty.notify_all();
|
|
}
|
|
}
|
|
|
|
template <typename E>
|
|
bool ConcurrentQueue<E>::pop(E& element) {
|
|
std::unique_lock<std::mutex> lk(d_lock);
|
|
//! concurrent queue is empty so we wait until
|
|
//! it is not empty
|
|
while (d_head == d_tail) {
|
|
if (d_done) {
|
|
return false;
|
|
}
|
|
d_notEmpty.wait(lk);
|
|
}
|
|
bool wasFull = (d_head + d_capacity == d_tail);
|
|
element = d_elements.at(d_head % d_capacity);
|
|
d_head++;
|
|
//! if the concurrent queue was full before
|
|
//! then it is not any more since we have "popped" an element
|
|
//! thus we notify all producer threads
|
|
if (wasFull) {
|
|
d_notFull.notify_all();
|
|
}
|
|
return true;
|
|
}
|
|
|
|
template <typename E>
|
|
bool ConcurrentQueue<E>::isEmpty() const {
|
|
std::unique_lock<std::mutex> lk(d_lock);
|
|
return (d_head == d_tail);
|
|
}
|
|
|
|
template <typename E>
|
|
bool ConcurrentQueue<E>::getDone() const {
|
|
std::unique_lock<std::mutex> lk(d_lock);
|
|
return d_done;
|
|
}
|
|
|
|
template <typename E>
|
|
void ConcurrentQueue<E>::setDone() {
|
|
std::unique_lock<std::mutex> lk(d_lock);
|
|
d_done = true;
|
|
d_notEmpty.notify_all();
|
|
}
|
|
|
|
template <typename E>
|
|
void ConcurrentQueue<E>::clear() {
|
|
std::unique_lock<std::mutex> lk(d_lock);
|
|
d_elements.clear();
|
|
}
|
|
|
|
} // namespace RDKit
|
|
#endif
|
|
#endif
|