Files
rdkit/Code/GraphMol/FileParsers/MultithreadedSmilesMolSupplier.cpp
shrey183 8ea1ac6112 [GSoC-2020] Generalized and Multithreaded File Reader (#3363)
* fixed issue #2965

* added test case for issue #2965

* fixed formatting and added comment.

* update

* General Reader files

* removed dependency on boost filesystems

* removed class

* clang-format

* added-comments

* further-cleanup

* added clang-formatting

* braces-for-if-else

* changed error messages, added option for windows file path

* fixed getFileName function

* cleanup

* option for filename without path

* further-cleanup

* added tests for determineFileFormat

* cleanup, const arguments for validate function

* init

* cleanup

* cleanup

* clang-format does not work for CMake

* added RDK_TEST_MULTITHREADED option

* add-flag

* cleanup

* Delete ConcurrentQueue.h

This PR deals with the Generalized File Reader.

* Delete testConcurrentQueue.cpp

This PR deals with the Generalized File Reader.

* no change

* concurrent queue

* print values

* Single Producer Multiple Consumer works

* cleanup

* Producer Consumer Example

* update queue methods and tests

* cleanup

* test

* fixed tests

* cleanup, updated tests

* Delete ProducerConsumer.h

* Delete testProducerConsumer.cpp

* cleanup

* futher cleanup

* changes based on feedback

* make queue non copyable

* psuedocode

* possible implementation

* untested implementation

* change class to typename

* basic-setup

* need to fix segfault

* need to fix blocking

* need to fix blocking

* need to fix blocking

* fix indentation

* one possibility

* without lambda function

* possible fix with some test cases

* performance tests

* added support for record id and item text

* cleanup

* cleanup

* fixed memory leak and added methods with tests for getting last id and item text

* cleanup

* added more test cases with different smi files

* cleanup

* SD mol supplier

* modified the parsing for SDMolSupplier

* cleanup

* cleanup

* new file for testing

* added support for reading molecule properties with tests

* thread-safe logging and exception handling

* cleanup

* without thread safe logging

* cleanup

* cleanup, modified MultithreadedSmilesMolSupplier

* cleanup, made reader and writer functions private

* move O2.sdf

* basic python wrapper with tests

* cleanup, added new methods for python wrappers

* made changes suggested by Andrew

* file and compression formats are case-insensitive

* cannot open files with gzstream

* cleanup

* possible fix for opening compressed streams (SMILES)

* removed seekg() and tellg() methods from multithreadeded suppliers

* cleanup

* test cases for python wrappers

* some wrapper cleanup

* cleanup, removed unused functions

* update the MT tests so that they actually do some work
also includes some cleanup here

* cleanup

* remove iterator_next header include

* added support for multithreaded readers

* use getNumThreadsToUse for multithreaded suppliers

* fixed documentation for multithreaded python wrappers

* commented performance test

* first draft of final evaluation report

* removed inline variables

* first draft getting started in python

* fixed typos in getting started in python

* fixed typos

* fix documentation tests

* fixed documentation tests

* added links to important files and PR

* added perfomance results

* first version of wrappers with compressed streams

* getting rid of streambuf stream method

* modified General File Reader

* make this work when building in non-threads mode

* rename a test

* rename a function in the python API

* rearrange the python test a bit

* disable the stream-based constructors in Python

* mark the multithreaded classes as experimental

Co-authored-by: greg landrum <greg.landrum@gmail.com>
2020-10-09 04:31:05 +02:00

221 lines
6.8 KiB
C++

#ifdef RDK_THREADSAFE_SSS
//
// Copyright (C) 2020 Shrey Aryan
//
// @@ All Rights Reserved @@
// This file is part of the RDKit.
// The contents are covered by the terms of the BSD license
// which is included in the file license.txt, found at the root
// of the RDKit source tree.
//
#include "MultithreadedSmilesMolSupplier.h"
namespace RDKit {
MultithreadedSmilesMolSupplier::MultithreadedSmilesMolSupplier(
const std::string &fileName, const std::string &delimiter, int smilesColumn,
int nameColumn, bool titleLine, bool sanitize,
unsigned int numWriterThreads, size_t sizeInputQueue,
size_t sizeOutputQueue) {
dp_inStream = openAndCheckStream(fileName);
CHECK_INVARIANT(dp_inStream, "bad instream");
CHECK_INVARIANT(!(dp_inStream->eof()), "early EOF");
// set df_takeOwnership = true
initFromSettings(true, delimiter, smilesColumn, nameColumn, titleLine,
sanitize, numWriterThreads, sizeInputQueue, sizeOutputQueue);
startThreads();
POSTCONDITION(dp_inStream, "bad instream");
}
MultithreadedSmilesMolSupplier::MultithreadedSmilesMolSupplier(
std::istream *inStream, bool takeOwnership, const std::string &delimiter,
int smilesColumn, int nameColumn, bool titleLine, bool sanitize,
unsigned int numWriterThreads, size_t sizeInputQueue,
size_t sizeOutputQueue) {
CHECK_INVARIANT(inStream, "bad instream");
CHECK_INVARIANT(!(inStream->eof()), "early EOF");
dp_inStream = inStream;
initFromSettings(takeOwnership, delimiter, smilesColumn, nameColumn,
titleLine, sanitize, numWriterThreads, sizeInputQueue,
sizeOutputQueue);
startThreads();
POSTCONDITION(dp_inStream, "bad instream");
}
MultithreadedSmilesMolSupplier::MultithreadedSmilesMolSupplier() {
dp_inStream = nullptr;
initFromSettings(true, "", 0, 1, true, true, 1, 5, 5);
startThreads();
}
MultithreadedSmilesMolSupplier::~MultithreadedSmilesMolSupplier() {
if (df_owner && dp_inStream) {
delete dp_inStream;
df_owner = false;
dp_inStream = nullptr;
}
}
void MultithreadedSmilesMolSupplier::initFromSettings(
bool takeOwnership, const std::string &delimiter, int smilesColumn,
int nameColumn, bool titleLine, bool sanitize,
unsigned int numWriterThreads, size_t sizeInputQueue,
size_t sizeOutputQueue) {
df_owner = takeOwnership;
d_delim = delimiter;
d_smi = smilesColumn;
d_name = nameColumn;
df_title = titleLine;
df_sanitize = sanitize;
d_numWriterThreads = getNumThreadsToUse(numWriterThreads);
d_sizeInputQueue = sizeInputQueue;
d_sizeOutputQueue = sizeOutputQueue;
d_inputQueue =
new ConcurrentQueue<std::tuple<std::string, unsigned int, unsigned int>>(
d_sizeInputQueue);
d_outputQueue =
new ConcurrentQueue<std::tuple<ROMol *, std::string, unsigned int>>(
d_sizeOutputQueue);
df_end = false;
d_line = -1;
}
bool MultithreadedSmilesMolSupplier::getEnd() const {
PRECONDITION(dp_inStream, "no stream");
return df_end;
}
// --------------------------------------------------
//
// Reads and processes the title line
//
void MultithreadedSmilesMolSupplier::processTitleLine() {
PRECONDITION(dp_inStream, "bad stream");
std::string tempStr = getLine(dp_inStream);
// loop until we get a valid line
while (!dp_inStream->eof() && !dp_inStream->fail() &&
((tempStr[0] == '#') || (strip(tempStr).size() == 0))) {
tempStr = getLine(dp_inStream);
}
boost::char_separator<char> sep(d_delim.c_str(), "",
boost::keep_empty_tokens);
tokenizer tokens(tempStr, sep);
for (tokenizer::iterator tokIter = tokens.begin(); tokIter != tokens.end();
++tokIter) {
std::string pname = strip(*tokIter);
d_props.push_back(pname);
}
}
bool MultithreadedSmilesMolSupplier::extractNextRecord(std::string &record,
unsigned int &lineNum,
unsigned int &index) {
PRECONDITION(dp_inStream, "bad stream");
if (dp_inStream->eof()) {
df_end = true;
return false;
}
// need to process title line
// if we have not called next yet and the current record id = 1
// then we are seeking the first record
if (d_lastRecordId == 0 && d_currentRecordId == 1) {
if (df_title) {
this->processTitleLine();
}
}
std::string tempStr = getLine(dp_inStream);
record = "";
while (!dp_inStream->eof() && !dp_inStream->fail() &&
((tempStr[0] == '#') || (strip(tempStr).size() == 0))) {
tempStr = getLine(dp_inStream);
}
record = tempStr;
lineNum = d_line;
index = d_currentRecordId;
++d_currentRecordId;
return true;
}
ROMol *MultithreadedSmilesMolSupplier::processMoleculeRecord(
const std::string &record, unsigned int lineNum) {
ROMol *res = nullptr;
// -----------
// tokenize the input line:
// -----------
boost::char_separator<char> sep(d_delim.c_str(), "",
boost::keep_empty_tokens);
tokenizer tokens(record, sep);
STR_VECT recs;
for (tokenizer::iterator tokIter = tokens.begin(); tokIter != tokens.end();
++tokIter) {
std::string rec = strip(*tokIter);
recs.push_back(rec);
}
if (recs.size() <= static_cast<unsigned int>(d_smi)) {
std::ostringstream errout;
errout << "ERROR: line #" << lineNum << "does not contain enough tokens\n";
throw FileParseException(errout.str());
}
// -----------
// get the smiles and create a molecule
// -----------
SmilesParserParams params;
params.sanitize = df_sanitize;
params.allowCXSMILES = false;
params.parseName = false;
res = SmilesToMol(recs[d_smi], params);
if (!res) {
std::stringstream errout;
errout << "Cannot create molecule from : '" << recs[d_smi] << "'";
throw SmilesParseException(errout.str());
}
// -----------
// get the name (if there's a name column)
// -----------
if (d_name == -1) {
// if no name defaults it to the line number we read it from string
std::ostringstream tstr;
tstr << lineNum;
std::string mname = tstr.str();
res->setProp(common_properties::_Name, mname);
} else {
if (d_name >= static_cast<int>(recs.size())) {
BOOST_LOG(rdWarningLog)
<< "WARNING: no name column found on line " << lineNum << std::endl;
} else {
res->setProp(common_properties::_Name, recs[d_name]);
}
}
// -----------
// read in the properties
// -----------
unsigned int iprop = 0;
for (unsigned int col = 0; col < recs.size(); col++) {
if (static_cast<int>(col) == d_smi || static_cast<int>(col) == d_name) {
continue;
}
std::string pname, pval;
if (d_props.size() > col) {
pname = d_props[col];
} else {
pname = "Column_";
std::stringstream ss;
ss << col;
pname += ss.str();
}
pval = recs[col];
res->setProp(pname, pval);
iprop++;
}
return res;
}
} // namespace RDKit
#endif