mirror of
https://github.com/rdkit/rdkit.git
synced 2026-06-03 21:44:30 +08:00
Performance improvement: Implement buffered reading for SDMolSupplier (#9010)
This commit is contained in:
7
.gitignore
vendored
7
.gitignore
vendored
@@ -131,6 +131,8 @@ __pycache__/
|
||||
/Code/GraphMol/test_data/bilastine_trajectory.sdf
|
||||
/Code/GraphMol/test_data/prop.pkl
|
||||
|
||||
/Data/eht_parms.dat
|
||||
|
||||
/Projects/DbCLI/testData/bzr/
|
||||
|
||||
/rdkit/sping/tests/testallps.ps
|
||||
@@ -144,7 +146,10 @@ __pycache__/
|
||||
/rdkit/hetero_atom_mutations.csv
|
||||
/rdkit/not_changed.csv
|
||||
/rdkit/similarityMap1_out.svg
|
||||
/rdkit/similarityMap1_out2.svg
|
||||
/rdkit/similarityMap1_out*.svg
|
||||
/rdkit/github4763.svg
|
||||
|
||||
/Testing/
|
||||
|
||||
# JS distribution or temp files
|
||||
Code/MinimalLib/dist
|
||||
|
||||
@@ -220,6 +220,8 @@ class RDKIT_FILEPARSERS_EXPORT SDMolSupplier : public ForwardSDMolSupplier {
|
||||
|
||||
private:
|
||||
void checkForEnd() override;
|
||||
void peekCheckForEnd(char* bufPtr, char* bufEnd, std::streampos molStartPos);
|
||||
void buildIndexTo(unsigned int targetIdx);
|
||||
int d_len = 0; // total number of mol blocks in the file (initialized to -1)
|
||||
int d_last = 0; // the molecule we are ready to read
|
||||
std::vector<std::streampos> d_molpos;
|
||||
|
||||
@@ -119,6 +119,42 @@ void SDMolSupplier::checkForEnd() {
|
||||
}
|
||||
}
|
||||
|
||||
void SDMolSupplier::peekCheckForEnd(char *bufPtr, char *bufEnd,
|
||||
std::streampos molStartPos) {
|
||||
PRECONDITION(dp_inStream, "no stream");
|
||||
int emptyLines = 0;
|
||||
char *p = bufPtr;
|
||||
|
||||
while (p < bufEnd) {
|
||||
if (!std::isspace(*p)) {
|
||||
return;
|
||||
}
|
||||
if (*p == '\n') {
|
||||
++emptyLines;
|
||||
if (emptyLines >= 4) { // the 4th empty line found
|
||||
this->df_end = true;
|
||||
this->d_len = rdcast<int>(this->d_molpos.size());
|
||||
return;
|
||||
}
|
||||
}
|
||||
++p;
|
||||
}
|
||||
|
||||
// buffer was exhausted without finding 4 empty lines or data. Need to check
|
||||
// the stream.
|
||||
std::streampos saveBlockPos = dp_inStream->tellg();
|
||||
|
||||
dp_inStream->clear();
|
||||
dp_inStream->seekg(molStartPos);
|
||||
|
||||
// run the standard (slow) logic
|
||||
this->checkForEnd();
|
||||
|
||||
// restore the stream position
|
||||
dp_inStream->clear();
|
||||
dp_inStream->seekg(saveBlockPos);
|
||||
}
|
||||
|
||||
void SDMolSupplier::reset() {
|
||||
PRECONDITION(dp_inStream, "no stream");
|
||||
dp_inStream->clear();
|
||||
@@ -192,30 +228,28 @@ void SDMolSupplier::moveTo(unsigned int idx) {
|
||||
if (idx < d_molpos.size()) {
|
||||
dp_inStream->seekg(d_molpos[idx]);
|
||||
d_last = idx;
|
||||
} else {
|
||||
std::string tempStr;
|
||||
dp_inStream->seekg(d_molpos.back());
|
||||
d_last = rdcast<int>(d_molpos.size()) - 1;
|
||||
while (d_last < static_cast<int>(idx) && !dp_inStream->eof() &&
|
||||
!dp_inStream->fail()) {
|
||||
d_line++;
|
||||
tempStr = getLine(dp_inStream);
|
||||
}
|
||||
// actually scan with buffering
|
||||
else {
|
||||
buildIndexTo(idx);
|
||||
|
||||
if (tempStr[0] == '$' && tempStr.substr(0, 4) == "$$$$") {
|
||||
std::streampos posHold = dp_inStream->tellg();
|
||||
this->checkForEnd();
|
||||
if (!this->df_end) {
|
||||
d_molpos.push_back(posHold);
|
||||
d_last++;
|
||||
}
|
||||
}
|
||||
}
|
||||
// if we reached end of file without reaching "idx" we have an index error
|
||||
if (dp_inStream->eof()) {
|
||||
if (idx < d_molpos.size()) {
|
||||
dp_inStream->clear();
|
||||
dp_inStream->seekg(d_molpos[idx]);
|
||||
d_last = idx;
|
||||
} else {
|
||||
/*Unfortunately, the FileParseException is not being catched and thrown on
|
||||
python directly. Instead, we use this df_end flag workaround to indicate
|
||||
that we reached the end of file (and signal the error). There's a comment
|
||||
on MolSupplier.h about problems with Boost exception handling and the full
|
||||
explanation That's the only reason for the following line*/
|
||||
df_end = true;
|
||||
|
||||
// if we reached end of file without reaching "idx" we have an index error
|
||||
d_len = rdcast<int>(d_molpos.size());
|
||||
std::ostringstream errout;
|
||||
errout << "ERROR: Index error (idx = " << idx << ") : "
|
||||
<< " we do no have enough mol blocks";
|
||||
<< " we do not have enough mol blocks";
|
||||
throw FileParseException(errout.str());
|
||||
}
|
||||
}
|
||||
@@ -234,23 +268,12 @@ unsigned int SDMolSupplier::length() {
|
||||
if (d_len > 0 || (df_end && d_len == 0)) {
|
||||
return d_len;
|
||||
} else {
|
||||
std::string tempStr;
|
||||
int old_last = d_last;
|
||||
buildIndexTo(UINT32_MAX);
|
||||
d_len = rdcast<int>(d_molpos.size());
|
||||
dp_inStream->seekg(d_molpos.back());
|
||||
while (!dp_inStream->eof() && !dp_inStream->fail()) {
|
||||
std::getline(*dp_inStream, tempStr);
|
||||
if (tempStr.length() >= 4 && tempStr[0] == '$' && tempStr[1] == '$' &&
|
||||
tempStr[2] == '$' && tempStr[3] == '$') {
|
||||
std::streampos posHold = dp_inStream->tellg();
|
||||
// don't worry about the last molecule:
|
||||
this->checkForEnd();
|
||||
if (!this->df_end) {
|
||||
d_molpos.push_back(posHold);
|
||||
++d_len;
|
||||
}
|
||||
}
|
||||
}
|
||||
// now remember to set the stream to the last position we want to read
|
||||
|
||||
// safeguard to restore the pointer to the last read molecule
|
||||
d_last = old_last;
|
||||
dp_inStream->clear();
|
||||
dp_inStream->seekg(d_molpos[d_last]);
|
||||
df_end = false;
|
||||
@@ -258,6 +281,99 @@ unsigned int SDMolSupplier::length() {
|
||||
}
|
||||
}
|
||||
|
||||
void SDMolSupplier::buildIndexTo(unsigned int targetIdx) {
|
||||
dp_inStream->seekg(d_molpos.back());
|
||||
d_last = rdcast<int>(d_molpos.size()) - 1;
|
||||
const size_t CHUNK_SIZE = 65536;
|
||||
const size_t OVERLAP =
|
||||
4; // to catch "$$$$" at chunk boundaries ("...\n$$ <new chunk> $$...")
|
||||
std::vector<char> buffer(CHUNK_SIZE + OVERLAP);
|
||||
std::fill(buffer.begin(), buffer.begin() + OVERLAP, '\n'); // safe init
|
||||
std::streampos currentStreamPos = dp_inStream->tellg();
|
||||
bool foundTarget = false;
|
||||
|
||||
while (dp_inStream->good() && !foundTarget) {
|
||||
std::streampos chunkStartPos = currentStreamPos;
|
||||
dp_inStream->read(&buffer[OVERLAP], CHUNK_SIZE);
|
||||
std::streamsize bytesRead = dp_inStream->gcount();
|
||||
if (bytesRead == 0) {
|
||||
break; // EOF
|
||||
}
|
||||
|
||||
std::streampos chunkEndPos = dp_inStream->tellg();
|
||||
// check if the stream is "honest" (binary or text mode with 1 byte newlines
|
||||
// (like UNIX), meaning read bytes map 1:1 to disk bytes)
|
||||
bool isBinaryLike = (bytesRead == (chunkEndPos - chunkStartPos));
|
||||
char *bufStart = &buffer[0];
|
||||
char *bufEnd = bufStart + OVERLAP + bytesRead;
|
||||
char *ptr = bufStart + 1;
|
||||
|
||||
while (true) {
|
||||
constexpr char dollarSigns[]{"$$$$"};
|
||||
auto match = std::search(ptr, bufEnd, dollarSigns, dollarSigns + 4);
|
||||
if (match == bufEnd) break;
|
||||
if (*(match - 1) == '\n') { // ensure $$$$ is at start of line
|
||||
char *nlPos = match + 4;
|
||||
while (nlPos < bufEnd && *nlPos != '\n') {
|
||||
++nlPos;
|
||||
}
|
||||
if (nlPos < bufEnd) {
|
||||
++nlPos;
|
||||
}
|
||||
|
||||
std::streampos posHold;
|
||||
if (isBinaryLike) { // fast path, math checks out, no need to seek
|
||||
posHold = chunkStartPos + std::streamoff(nlPos - bufStart - OVERLAP);
|
||||
} else { // slow path, there is byte translation going on, need to seek
|
||||
// and use the std translation magic to find the actual byte
|
||||
// position
|
||||
dp_inStream->clear();
|
||||
dp_inStream->seekg(
|
||||
chunkStartPos); // rollback to the start of the chunk
|
||||
dp_inStream->ignore(
|
||||
nlPos - bufStart -
|
||||
OVERLAP); // advance but with the magic translation in effect now
|
||||
posHold =
|
||||
dp_inStream
|
||||
->tellg(); // this is the physical position on disk we want
|
||||
}
|
||||
|
||||
bool atTrueEOF =
|
||||
(bytesRead < static_cast<std::streamsize>(CHUNK_SIZE)) &&
|
||||
(nlPos >= bufEnd);
|
||||
if (!atTrueEOF) {
|
||||
this->peekCheckForEnd(nlPos, bufEnd,
|
||||
posHold); // the optimized peek version
|
||||
if (!this->df_end) {
|
||||
d_molpos.push_back(posHold);
|
||||
++d_last;
|
||||
if (static_cast<unsigned int>(d_last) ==
|
||||
targetIdx) { // not really needed but this way we only index as
|
||||
// much as needed
|
||||
foundTarget = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
ptr = match + 4;
|
||||
}
|
||||
if (foundTarget) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (!isBinaryLike) { // need to seek to the end of the chunk again to make
|
||||
// sure next read is from the right position
|
||||
dp_inStream->clear();
|
||||
dp_inStream->seekg(chunkEndPos);
|
||||
}
|
||||
|
||||
if (bytesRead >= static_cast<std::streamsize>(OVERLAP))
|
||||
std::memcpy(&buffer[0], bufEnd - OVERLAP, OVERLAP);
|
||||
currentStreamPos = chunkEndPos;
|
||||
}
|
||||
}
|
||||
|
||||
bool SDMolSupplier::atEnd() {
|
||||
PRECONDITION(dp_inStream, "no stream");
|
||||
return df_end;
|
||||
|
||||
Reference in New Issue
Block a user