Files
rdkit/Code/RDGeneral/ConcurrentQueue.h
shrey183 8ea1ac6112 [GSoC-2020] Generalized and Multithreaded File Reader (#3363)
* fixed issue #2965

* added test case for issue #2965

* fixed formatting and added comment.

* update

* General Reader files

* removed dependency on boost filesystems

* removed class

* clang-format

* added-comments

* further-cleanup

* added clang-formatting

* braces-for-if-else

* changed error messages, added option for windows file path

* fixed getFileName function

* cleanup

* option for filename without path

* further-cleanup

* added tests for determineFileFormat

* cleanup, const arguments for validate function

* init

* cleanup

* cleanup

* clang-format does not work for CMake

* added RDK_TEST_MULTITHREADED option

* add-flag

* cleanup

* Delete ConcurrentQueue.h

This PR deals with the Generalized File Reader.

* Delete testConcurrentQueue.cpp

This PR deals with the Generalized File Reader.

* no change

* concurrent queue

* print values

* Single Producer Multiple Consumer works

* cleanup

* Producer Consumer Example

* update queue methods and tests

* cleanup

* test

* fixed tests

* cleanup, updated tests

* Delete ProducerConsumer.h

* Delete testProducerConsumer.cpp

* cleanup

* futher cleanup

* changes based on feedback

* make queue non copyable

* psuedocode

* possible implementation

* untested implementation

* change class to typename

* basic-setup

* need to fix segfault

* need to fix blocking

* need to fix blocking

* need to fix blocking

* fix indentation

* one possibility

* without lambda function

* possible fix with some test cases

* performance tests

* added support for record id and item text

* cleanup

* cleanup

* fixed memory leak and added methods with tests for getting last id and item text

* cleanup

* added more test cases with different smi files

* cleanup

* SD mol supplier

* modified the parsing for SDMolSupplier

* cleanup

* cleanup

* new file for testing

* added support for reading molecule properties with tests

* thread-safe logging and exception handling

* cleanup

* without thread safe logging

* cleanup

* cleanup, modified MultithreadedSmilesMolSupplier

* cleanup, made reader and writer functions private

* move O2.sdf

* basic python wrapper with tests

* cleanup, added new methods for python wrappers

* made changes suggested by Andrew

* file and compression formats are case-insensitive

* cannot open files with gzstream

* cleanup

* possible fix for opening compressed streams (SMILES)

* removed seekg() and tellg() methods from multithreadeded suppliers

* cleanup

* test cases for python wrappers

* some wrapper cleanup

* cleanup, removed unused functions

* update the MT tests so that they actually do some work
also includes some cleanup here

* cleanup

* remove iterator_next header include

* added support for multithreaded readers

* use getNumThreadsToUse for multithreaded suppliers

* fixed documentation for multithreaded python wrappers

* commented performance test

* first draft of final evaluation report

* removed inline variables

* first draft getting started in python

* fixed typos in getting started in python

* fixed typos

* fix documentation tests

* fixed documentation tests

* added links to important files and PR

* added perfomance results

* first version of wrappers with compressed streams

* getting rid of streambuf stream method

* modified General File Reader

* make this work when building in non-threads mode

* rename a test

* rename a function in the python API

* rearrange the python test a bit

* disable the stream-based constructors in Python

* mark the multithreaded classes as experimental

Co-authored-by: greg landrum <greg.landrum@gmail.com>
2020-10-09 04:31:05 +02:00

133 lines
3.4 KiB
C++

//
// Copyright (C) 2020 Shrey Aryan
//
// @@ All Rights Reserved @@
// This file is part of the RDKit.
// The contents are covered by the terms of the BSD license
// which is included in the file license.txt, found at the root
// of the RDKit source tree.
//
#ifdef RDK_THREADSAFE_SSS
#ifndef CONCURRENT_QUEUE
#define CONCURRENT_QUEUE
#include <condition_variable>
#include <thread>
#include <vector>
namespace RDKit {
template <typename E>
class ConcurrentQueue {
private:
unsigned int d_capacity;
bool d_done;
std::vector<E> d_elements;
unsigned int d_head, d_tail;
mutable std::mutex d_lock;
std::condition_variable d_notEmpty, d_notFull;
private:
ConcurrentQueue<E>(const ConcurrentQueue<E>&);
ConcurrentQueue<E>& operator=(const ConcurrentQueue<E>&);
public:
ConcurrentQueue<E>(unsigned int capacity)
: d_capacity(capacity), d_done(false), d_head(0), d_tail(0) {
std::vector<E> elements(capacity);
d_elements = elements;
}
//! tries to push an element into the queue if it is not full without
//! modifying the variable element, if the queue is full then pushing an
//! element will result in blocking
void push(const E& element);
//! tries to pop an element from the queue if it is not empty and not done
//! the boolean value indicates the whether popping is successful, if the
//! queue is empty and not done then popping an element will result in
//! blocking
bool pop(E& element);
//! checks whether the ConcurrentQueue is empty
bool isEmpty() const;
//! returns the value of the variable done
bool getDone() const;
//! sets the variable d_done = true
void setDone();
//! clears the vector
void clear();
};
template <typename E>
void ConcurrentQueue<E>::push(const E& element) {
std::unique_lock<std::mutex> lk(d_lock);
//! concurrent queue is full so we wait until
//! it is not full
while (d_head + d_capacity == d_tail) {
d_notFull.wait(lk);
}
bool wasEmpty = (d_head == d_tail);
d_elements.at(d_tail % d_capacity) = element;
d_tail++;
//! if the concurrent queue was empty before
//! then it is not any more since we have "pushed" an element
//! thus we notify all the consumer threads
if (wasEmpty) {
d_notEmpty.notify_all();
}
}
template <typename E>
bool ConcurrentQueue<E>::pop(E& element) {
std::unique_lock<std::mutex> lk(d_lock);
//! concurrent queue is empty so we wait until
//! it is not empty
while (d_head == d_tail) {
if (d_done) {
return false;
}
d_notEmpty.wait(lk);
}
bool wasFull = (d_head + d_capacity == d_tail);
element = d_elements.at(d_head % d_capacity);
d_head++;
//! if the concurrent queue was full before
//! then it is not any more since we have "popped" an element
//! thus we notify all producer threads
if (wasFull) {
d_notFull.notify_all();
}
return true;
}
template <typename E>
bool ConcurrentQueue<E>::isEmpty() const {
std::unique_lock<std::mutex> lk(d_lock);
return (d_head == d_tail);
}
template <typename E>
bool ConcurrentQueue<E>::getDone() const {
std::unique_lock<std::mutex> lk(d_lock);
return d_done;
}
template <typename E>
void ConcurrentQueue<E>::setDone() {
std::unique_lock<std::mutex> lk(d_lock);
d_done = true;
d_notEmpty.notify_all();
}
template <typename E>
void ConcurrentQueue<E>::clear() {
std::unique_lock<std::mutex> lk(d_lock);
d_elements.clear();
}
} // namespace RDKit
#endif
#endif