mirror of
https://github.com/rdkit/rdkit.git
synced 2026-06-04 21:54:27 +08:00
174 lines
6.0 KiB
Python
174 lines
6.0 KiB
Python
import doctest
|
|
import gzip
|
|
import os
|
|
import sys
|
|
import unittest
|
|
|
|
from rdkit import Chem, RDConfig, __version__, rdBase
|
|
|
|
|
|
class TestCase(unittest.TestCase):
|
|
|
|
def testMultiSmiMolSupplier(self):
|
|
fileN = os.path.join(RDConfig.RDBaseDir, 'Code', 'GraphMol', 'FileParsers', 'test_data',
|
|
'first_200.tpsa.csv')
|
|
# fileN = "../FileParsers/test_data/first_200.tpsa.csv"
|
|
smiSup = Chem.MultithreadedSmilesMolSupplier(fileN, ",", 0, -1)
|
|
i = 0
|
|
while not smiSup.atEnd():
|
|
mol = next(smiSup)
|
|
if (mol):
|
|
i += 1
|
|
self.assertTrue(i == 200)
|
|
fileN = os.path.join(RDConfig.RDBaseDir, 'Code', 'GraphMol', 'FileParsers', 'test_data',
|
|
'fewSmi.csv')
|
|
# fileN = "../FileParsers/test_data/fewSmi.csv"
|
|
smiSup = Chem.MultithreadedSmilesMolSupplier(fileN, delimiter=",", smilesColumn=1, nameColumn=0,
|
|
titleLine=0)
|
|
names = ["1", "2", "3", "4", "5", "6", "7", "8", "9", "10"]
|
|
props = [
|
|
"34.14", "25.78", "106.51", "82.78", "60.16", "87.74", "37.38", "77.28", "65.18", "0.00"
|
|
]
|
|
confusedNames = []
|
|
confusedProps = []
|
|
i = 0
|
|
for mol in smiSup:
|
|
if mol is not None:
|
|
self.assertTrue(mol.HasProp("_Name"))
|
|
self.assertTrue(mol.HasProp("Column_2"))
|
|
prop = mol.GetProp("Column_2")
|
|
name = mol.GetProp("_Name")
|
|
confusedProps.append(prop)
|
|
confusedNames.append(name)
|
|
i += 1
|
|
self.assertTrue(i == 10)
|
|
self.assertTrue(sorted(confusedNames) == sorted(names))
|
|
self.assertTrue(sorted(confusedProps) == sorted(props))
|
|
|
|
# context manager
|
|
confusedNames = []
|
|
confusedProps = []
|
|
i = 0
|
|
with Chem.MultithreadedSmilesMolSupplier(fileN, delimiter=",", smilesColumn=1, nameColumn=0,
|
|
titleLine=0) as smiSup:
|
|
for mol in smiSup:
|
|
if mol is not None:
|
|
self.assertTrue(mol.HasProp("_Name"))
|
|
self.assertTrue(mol.HasProp("Column_2"))
|
|
prop = mol.GetProp("Column_2")
|
|
name = mol.GetProp("_Name")
|
|
confusedProps.append(prop)
|
|
confusedNames.append(name)
|
|
i += 1
|
|
self.assertTrue(i == 10)
|
|
self.assertTrue(sorted(confusedNames) == sorted(names))
|
|
self.assertTrue(sorted(confusedProps) == sorted(props))
|
|
|
|
def testMultiSDMolSupplier(self):
|
|
fileN = os.path.join(RDConfig.RDBaseDir, 'Code', 'GraphMol', 'FileParsers', 'test_data',
|
|
'NCI_aids_few.sdf')
|
|
# fileN = "../FileParsers/test_data/NCI_aids_few.sdf"
|
|
sdSup = Chem.MultithreadedSDMolSupplier(fileN)
|
|
molNames = [
|
|
"48", "78", "128", "163", "164", "170", "180", "186", "192", "203", "210", "211", "213",
|
|
"220", "229", "256"
|
|
]
|
|
confusedMolNames = []
|
|
i = 0
|
|
for mol in sdSup:
|
|
if mol is not None:
|
|
confusedMolNames.append(mol.GetProp("_Name"))
|
|
i += 1
|
|
self.assertTrue(len(molNames) == i)
|
|
self.assertTrue(sorted(confusedMolNames) == sorted(molNames))
|
|
|
|
# context manager
|
|
confusedMolNames = []
|
|
i = 0
|
|
with Chem.MultithreadedSDMolSupplier(fileN) as sdSup:
|
|
for mol in sdSup:
|
|
if mol is not None:
|
|
confusedMolNames.append(mol.GetProp("_Name"))
|
|
i += 1
|
|
self.assertTrue(len(molNames) == i)
|
|
self.assertTrue(sorted(confusedMolNames) == sorted(molNames))
|
|
|
|
# NOTE these are disabled until we rewrite the code to construct a
|
|
# MultithreadedSDMolSupplier from a python stream
|
|
@unittest.skip("Skipping construction from stream")
|
|
def testMultiSDMolSupplierFromStream(self):
|
|
fileN = os.path.join(RDConfig.RDBaseDir, 'Code', 'GraphMol', 'FileParsers', 'test_data',
|
|
'NCI_aids_few.sdf')
|
|
molNames = [
|
|
"48", "78", "128", "163", "164", "170", "180", "186", "192", "203", "210", "211", "213",
|
|
"220", "229", "256"
|
|
]
|
|
# try opening with streambuf
|
|
inf = open(fileN, 'rb')
|
|
if (inf):
|
|
gSup = Chem.SDMolSupplierFromStream(inf)
|
|
confusedMolNames = []
|
|
i = 0
|
|
for mol in gSup:
|
|
# print("!!",i,file=sys.stderr);sys.stderr.flush()
|
|
if (mol):
|
|
confusedMolNames.append(mol.GetProp("_Name"))
|
|
i += 1
|
|
self.assertTrue(len(molNames) == i)
|
|
self.assertTrue(sorted(confusedMolNames) == sorted(molNames))
|
|
# print("done!",file=sys.stderr);sys.stderr.flush()
|
|
# try opening with streambuf
|
|
fileN = os.path.join(RDConfig.RDBaseDir, 'Code', 'GraphMol', 'FileParsers', 'test_data',
|
|
'NCI_aids_few.sdf.gz')
|
|
# try opening with gzip
|
|
inf = gzip.open(fileN)
|
|
if (inf):
|
|
gSup = Chem.SDMolSupplierFromStream(inf)
|
|
confusedMolNames = []
|
|
i = 0
|
|
for mol in gSup:
|
|
# print("!",i,file=sys.stderr);sys.stderr.flush()
|
|
if (mol):
|
|
confusedMolNames.append(mol.GetProp("_Name"))
|
|
i += 1
|
|
self.assertTrue(len(molNames) == i)
|
|
self.assertTrue(sorted(confusedMolNames) == sorted(molNames))
|
|
|
|
def testMultiSmiMolSupplierThrow(self):
|
|
fileN = os.path.join(RDConfig.RDBaseDir, 'Code', 'GraphMol', 'FileParsers', 'test_data',
|
|
'first_200.tpsa.csv')
|
|
# fileN = "../FileParsers/test_data/first_200.tpsa.csv"
|
|
smiSup = Chem.MultithreadedSmilesMolSupplier(fileN, ",", 0, -1)
|
|
|
|
def helper(smiSup):
|
|
i = 0
|
|
while not smiSup.atEnd():
|
|
mol = next(smiSup)
|
|
if (mol):
|
|
i += 1
|
|
if i >= 10:
|
|
raise ValueError('hi')
|
|
|
|
self.assertRaises(ValueError, helper, smiSup)
|
|
|
|
def testMultiSDMolSupplierThrow(self):
|
|
fileN = os.path.join(RDConfig.RDBaseDir, 'Code', 'GraphMol', 'FileParsers', 'test_data',
|
|
'NCI_aids_few.sdf')
|
|
# fileN = "../FileParsers/test_data/NCI_aids_few.sdf"
|
|
sdSup = Chem.MultithreadedSDMolSupplier(fileN)
|
|
|
|
def helper(sdSup):
|
|
i = 0
|
|
while not sdSup.atEnd():
|
|
mol = next(sdSup)
|
|
if (mol):
|
|
i += 1
|
|
if i >= 10:
|
|
raise ValueError('hi')
|
|
|
|
self.assertRaises(ValueError, helper, sdSup)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|