Files
rdkit/Code/GraphMol/FileParsers/MultithreadedMolSupplier.cpp
shrey183 8ea1ac6112 [GSoC-2020] Generalized and Multithreaded File Reader (#3363)
* fixed issue #2965

* added test case for issue #2965

* fixed formatting and added comment.

* update

* General Reader files

* removed dependency on boost filesystems

* removed class

* clang-format

* added-comments

* further-cleanup

* added clang-formatting

* braces-for-if-else

* changed error messages, added option for windows file path

* fixed getFileName function

* cleanup

* option for filename without path

* further-cleanup

* added tests for determineFileFormat

* cleanup, const arguments for validate function

* init

* cleanup

* cleanup

* clang-format does not work for CMake

* added RDK_TEST_MULTITHREADED option

* add-flag

* cleanup

* Delete ConcurrentQueue.h

This PR deals with the Generalized File Reader.

* Delete testConcurrentQueue.cpp

This PR deals with the Generalized File Reader.

* no change

* concurrent queue

* print values

* Single Producer Multiple Consumer works

* cleanup

* Producer Consumer Example

* update queue methods and tests

* cleanup

* test

* fixed tests

* cleanup, updated tests

* Delete ProducerConsumer.h

* Delete testProducerConsumer.cpp

* cleanup

* futher cleanup

* changes based on feedback

* make queue non copyable

* psuedocode

* possible implementation

* untested implementation

* change class to typename

* basic-setup

* need to fix segfault

* need to fix blocking

* need to fix blocking

* need to fix blocking

* fix indentation

* one possibility

* without lambda function

* possible fix with some test cases

* performance tests

* added support for record id and item text

* cleanup

* cleanup

* fixed memory leak and added methods with tests for getting last id and item text

* cleanup

* added more test cases with different smi files

* cleanup

* SD mol supplier

* modified the parsing for SDMolSupplier

* cleanup

* cleanup

* new file for testing

* added support for reading molecule properties with tests

* thread-safe logging and exception handling

* cleanup

* without thread safe logging

* cleanup

* cleanup, modified MultithreadedSmilesMolSupplier

* cleanup, made reader and writer functions private

* move O2.sdf

* basic python wrapper with tests

* cleanup, added new methods for python wrappers

* made changes suggested by Andrew

* file and compression formats are case-insensitive

* cannot open files with gzstream

* cleanup

* possible fix for opening compressed streams (SMILES)

* removed seekg() and tellg() methods from multithreadeded suppliers

* cleanup

* test cases for python wrappers

* some wrapper cleanup

* cleanup, removed unused functions

* update the MT tests so that they actually do some work
also includes some cleanup here

* cleanup

* remove iterator_next header include

* added support for multithreaded readers

* use getNumThreadsToUse for multithreaded suppliers

* fixed documentation for multithreaded python wrappers

* commented performance test

* first draft of final evaluation report

* removed inline variables

* first draft getting started in python

* fixed typos in getting started in python

* fixed typos

* fix documentation tests

* fixed documentation tests

* added links to important files and PR

* added perfomance results

* first version of wrappers with compressed streams

* getting rid of streambuf stream method

* modified General File Reader

* make this work when building in non-threads mode

* rename a test

* rename a function in the python API

* rearrange the python test a bit

* disable the stream-based constructors in Python

* mark the multithreaded classes as experimental

Co-authored-by: greg landrum <greg.landrum@gmail.com>
2020-10-09 04:31:05 +02:00

109 lines
3.0 KiB
C++

#ifdef RDK_THREADSAFE_SSS
//
// Copyright (C) 2020 Shrey Aryan
//
// @@ All Rights Reserved @@
// This file is part of the RDKit.
// The contents are covered by the terms of the BSD license
// which is included in the file license.txt, found at the root
// of the RDKit source tree.
//
#include "MultithreadedMolSupplier.h"
namespace RDKit {
MultithreadedMolSupplier::~MultithreadedMolSupplier() {
endThreads();
// destroy all objects in the input queue
d_inputQueue->clear();
// delete the pointer to the input queue
delete d_inputQueue;
std::tuple<ROMol*, std::string, unsigned int> r;
while (d_outputQueue->pop(r)) {
ROMol* m = std::get<0>(r);
delete m;
}
// destroy all objects in the output queue
d_outputQueue->clear();
// delete the pointer to the output queue
delete d_outputQueue;
}
void MultithreadedMolSupplier::reader() {
std::string record;
unsigned int lineNum, index;
while (extractNextRecord(record, lineNum, index)) {
auto r = std::tuple<std::string, unsigned int, unsigned int>{
record, lineNum, index};
d_inputQueue->push(r);
}
d_inputQueue->setDone();
}
void MultithreadedMolSupplier::writer() {
std::tuple<std::string, unsigned int, unsigned int> r;
while (d_inputQueue->pop(r)) {
try {
ROMol* mol = processMoleculeRecord(std::get<0>(r), std::get<1>(r));
auto temp = std::tuple<ROMol*, std::string, unsigned int>{
mol, std::get<0>(r), std::get<2>(r)};
d_outputQueue->push(temp);
} catch (...) {
// fill the queue wih a null value
auto nullValue = std::tuple<ROMol*, std::string, unsigned int>{
nullptr, std::get<0>(r), std::get<2>(r)};
d_outputQueue->push(nullValue);
}
}
if (d_threadCounter != d_numWriterThreads) {
++d_threadCounter;
} else {
d_outputQueue->setDone();
}
}
ROMol* MultithreadedMolSupplier::next() {
std::tuple<ROMol*, std::string, unsigned int> r;
if (d_outputQueue->pop(r)) {
ROMol* mol = std::get<0>(r);
d_lastItemText = std::get<1>(r);
d_lastRecordId = std::get<2>(r);
return mol;
}
return nullptr;
}
void MultithreadedMolSupplier::endThreads() {
d_readerThread.join();
for (auto& thread : d_writerThreads) {
thread.join();
}
}
void MultithreadedMolSupplier::startThreads() {
// run the reader function in a seperate thread
d_readerThread = std::thread(&MultithreadedMolSupplier::reader, this);
// run the writer function in seperate threads
for (unsigned int i = 0; i < d_numWriterThreads; i++) {
d_writerThreads.emplace_back(
std::thread(&MultithreadedMolSupplier::writer, this));
}
}
bool MultithreadedMolSupplier::atEnd() {
return (d_outputQueue->isEmpty() && d_outputQueue->getDone());
}
unsigned int MultithreadedMolSupplier::getLastRecordId() const {
return d_lastRecordId;
}
std::string MultithreadedMolSupplier::getLastItemText() const {
return d_lastItemText;
}
void MultithreadedMolSupplier::reset() {
UNDER_CONSTRUCTION("reset() not supported for MultithreadedMolSupplier();");
}
} // namespace RDKit
#endif