Files
rdkit/Code/RDGeneral/testConcurrentQueue.cpp
shrey183 8ea1ac6112 [GSoC-2020] Generalized and Multithreaded File Reader (#3363)
* fixed issue #2965

* added test case for issue #2965

* fixed formatting and added comment.

* update

* General Reader files

* removed dependency on boost filesystems

* removed class

* clang-format

* added-comments

* further-cleanup

* added clang-formatting

* braces-for-if-else

* changed error messages, added option for windows file path

* fixed getFileName function

* cleanup

* option for filename without path

* further-cleanup

* added tests for determineFileFormat

* cleanup, const arguments for validate function

* init

* cleanup

* cleanup

* clang-format does not work for CMake

* added RDK_TEST_MULTITHREADED option

* add-flag

* cleanup

* Delete ConcurrentQueue.h

This PR deals with the Generalized File Reader.

* Delete testConcurrentQueue.cpp

This PR deals with the Generalized File Reader.

* no change

* concurrent queue

* print values

* Single Producer Multiple Consumer works

* cleanup

* Producer Consumer Example

* update queue methods and tests

* cleanup

* test

* fixed tests

* cleanup, updated tests

* Delete ProducerConsumer.h

* Delete testProducerConsumer.cpp

* cleanup

* futher cleanup

* changes based on feedback

* make queue non copyable

* psuedocode

* possible implementation

* untested implementation

* change class to typename

* basic-setup

* need to fix segfault

* need to fix blocking

* need to fix blocking

* need to fix blocking

* fix indentation

* one possibility

* without lambda function

* possible fix with some test cases

* performance tests

* added support for record id and item text

* cleanup

* cleanup

* fixed memory leak and added methods with tests for getting last id and item text

* cleanup

* added more test cases with different smi files

* cleanup

* SD mol supplier

* modified the parsing for SDMolSupplier

* cleanup

* cleanup

* new file for testing

* added support for reading molecule properties with tests

* thread-safe logging and exception handling

* cleanup

* without thread safe logging

* cleanup

* cleanup, modified MultithreadedSmilesMolSupplier

* cleanup, made reader and writer functions private

* move O2.sdf

* basic python wrapper with tests

* cleanup, added new methods for python wrappers

* made changes suggested by Andrew

* file and compression formats are case-insensitive

* cannot open files with gzstream

* cleanup

* possible fix for opening compressed streams (SMILES)

* removed seekg() and tellg() methods from multithreadeded suppliers

* cleanup

* test cases for python wrappers

* some wrapper cleanup

* cleanup, removed unused functions

* update the MT tests so that they actually do some work
also includes some cleanup here

* cleanup

* remove iterator_next header include

* added support for multithreaded readers

* use getNumThreadsToUse for multithreaded suppliers

* fixed documentation for multithreaded python wrappers

* commented performance test

* first draft of final evaluation report

* removed inline variables

* first draft getting started in python

* fixed typos in getting started in python

* fixed typos

* fix documentation tests

* fixed documentation tests

* added links to important files and PR

* added perfomance results

* first version of wrappers with compressed streams

* getting rid of streambuf stream method

* modified General File Reader

* make this work when building in non-threads mode

* rename a test

* rename a function in the python API

* rearrange the python test a bit

* disable the stream-based constructors in Python

* mark the multithreaded classes as experimental

Co-authored-by: greg landrum <greg.landrum@gmail.com>
2020-10-09 04:31:05 +02:00

140 lines
3.4 KiB
C++

#ifdef RDK_THREADSAFE_SSS
#include <RDGeneral/Invariant.h>
#include <RDGeneral/RDLog.h>
#include <functional>
#include <iomanip>
#include <sstream>
#include "ConcurrentQueue.h"
using namespace RDKit;
//! method for testing basic ConcurrentQueue operations
void testPushAndPop() {
ConcurrentQueue<int>* q = new ConcurrentQueue<int>(4);
int e1, e2, e3;
TEST_ASSERT(q->isEmpty());
q->push(1);
q->push(2);
q->push(3);
TEST_ASSERT(!q->isEmpty());
TEST_ASSERT(q->pop(e1));
TEST_ASSERT(q->pop(e2));
TEST_ASSERT(q->pop(e3));
TEST_ASSERT(e1 == 1);
TEST_ASSERT(e2 == 2);
TEST_ASSERT(e3 == 3);
TEST_ASSERT(q->isEmpty());
delete (q);
}
void produce(ConcurrentQueue<int>& q, const int numToProduce) {
for (int i = 0; i < numToProduce; ++i) {
q.push(i);
}
}
void consume(ConcurrentQueue<int>& q, std::vector<int>& result) {
int element;
while (q.pop(element)) {
result.push_back(element);
}
}
//! multithreaded testing for ConcurrentQueue
bool testProducerConsumer(const int numProducerThreads,
const int numConsumerThreads) {
ConcurrentQueue<int> q(5);
TEST_ASSERT(q.isEmpty());
const int numToProduce = 10;
std::vector<std::thread> producers(numProducerThreads);
std::vector<std::thread> consumers(numConsumerThreads);
std::vector<std::vector<int>> results(numConsumerThreads);
//! start producer threads
for (int i = 0; i < numProducerThreads; i++) {
producers[i] = std::thread(produce, std::ref(q), numToProduce);
}
//! start consumer threads
for (int i = 0; i < numConsumerThreads; i++) {
consumers[i] = std::thread(consume, std::ref(q), std::ref(results[i]));
}
std::for_each(producers.begin(), producers.end(),
std::mem_fn(&std::thread::join));
//! the producer is done producing
q.setDone();
std::for_each(consumers.begin(), consumers.end(),
std::mem_fn(&std::thread::join));
TEST_ASSERT(q.isEmpty());
std::vector<int> frequency(numToProduce, 0);
for (auto& result : results) {
for (auto& element : result) {
frequency[element] += 1;
}
}
for (auto& freq : frequency) {
if (freq != numProducerThreads) {
return false;
}
}
return true;
}
void testMultipleTimes() {
const int trials = 10000;
//! Single Producer, Single Consumer
for (int i = 0; i < trials; i++) {
bool result = testProducerConsumer(1, 1);
TEST_ASSERT(result);
}
//! Single Producer, Multiple Consumer
for (int i = 0; i < trials; i++) {
bool result = testProducerConsumer(1, 5);
TEST_ASSERT(result);
}
//! Multiple Producer, Single Consumer
for (int i = 0; i < trials; i++) {
bool result = testProducerConsumer(5, 1);
TEST_ASSERT(result);
}
//! Multiple Producer, Multiple Consumer
for (int i = 0; i < trials; i++) {
bool result = testProducerConsumer(2, 4);
TEST_ASSERT(result);
}
}
int main() {
RDLog::InitLogs();
BOOST_LOG(rdErrorLog) << "\n-----------------------------------------\n";
testPushAndPop();
BOOST_LOG(rdErrorLog) << "Finished: testPushAndPop() \n";
BOOST_LOG(rdErrorLog) << "\n-----------------------------------------\n";
#ifdef RDK_TEST_MULTITHREADED
BOOST_LOG(rdErrorLog) << "\n-----------------------------------------\n";
testMultipleTimes();
BOOST_LOG(rdErrorLog) << "Finished: testMultipleTimes() \n";
BOOST_LOG(rdErrorLog) << "\n-----------------------------------------\n";
#endif
return 0;
}
#endif