mirror of
https://github.com/rdkit/rdkit.git
synced 2026-06-03 21:44:30 +08:00
robustify w.r.t. sqlalchemy
This commit is contained in:
@@ -452,19 +452,26 @@ if __name__=='__main__':
|
||||
if molsOut and ids:
|
||||
molDbName = os.path.join(options.dbDir,options.molDbName)
|
||||
conn = DbConnect(molDbName)
|
||||
cns = conn.GetColumnNames('molecules')
|
||||
cns = list(conn.GetColumnNames('molecules'))
|
||||
if cns[0]=='guid':
|
||||
# from sqlalchemy, ditch it:
|
||||
del cns[0]
|
||||
if cns[-1]!='molpkl':
|
||||
cns.remove('molpkl')
|
||||
cns.append('molpkl')
|
||||
curs = conn.GetCursor()
|
||||
ids = [(x,) for x in ids]
|
||||
curs.execute('create temporary table _tmpTbl (%(idName)s varchar)'%locals())
|
||||
curs.executemany('insert into _tmpTbl values (?)',ids)
|
||||
curs.execute('select * from molecules join _tmpTbl using (%(idName)s)'%locals())
|
||||
cnText=','.join(cns)
|
||||
curs.execute('select %(cnText)s from molecules join _tmpTbl using (%(idName)s)'%locals())
|
||||
row=curs.fetchone()
|
||||
while row:
|
||||
m = Chem.Mol(str(row[-1]))
|
||||
if sdfOut:
|
||||
m.SetProp('_Name',str(row[0]))
|
||||
print >>sdfOut,Chem.MolToMolBlock(m)
|
||||
for i in range(1,len(cns)-2):
|
||||
for i in range(1,len(cns)-1):
|
||||
pn = cns[i]
|
||||
pv = str(row[i])
|
||||
print >>sdfOut,'> <%s>\n%s\n'%(pn,pv)
|
||||
|
||||
@@ -279,92 +279,6 @@ class TestCase(unittest.TestCase):
|
||||
self.failUnless(len(lines)==25)
|
||||
os.unlink('testData/bzr/search.out')
|
||||
|
||||
def test3_1SDSearch(self):
|
||||
self.failUnless(os.path.exists('testData/bzr/Compounds.sqlt'))
|
||||
|
||||
p = subprocess.Popen(('python', 'SDSearch.py','--dbName=testData/bzr/Compounds.sqlt',
|
||||
'--nameOut=testData/bzr/search.out'))
|
||||
res=p.wait()
|
||||
self.failIf(res)
|
||||
p=None
|
||||
|
||||
self.failUnless(os.path.exists('testData/bzr/search.out'))
|
||||
inF = file('testData/bzr/search.out','r')
|
||||
lines=inF.readlines()
|
||||
inF=None
|
||||
self.failUnless(len(lines)==163)
|
||||
os.unlink('testData/bzr/search.out')
|
||||
|
||||
def test3_2SDSearch(self):
|
||||
self.failUnless(os.path.exists('testData/bzr/Compounds.sqlt'))
|
||||
|
||||
p = subprocess.Popen(('python', 'SDSearch.py','--dbName=testData/bzr/Compounds.sqlt',
|
||||
'--nameOut=testData/bzr/search.out', '-q activity<6.0'))
|
||||
res=p.wait()
|
||||
self.failIf(res)
|
||||
p=None
|
||||
|
||||
self.failUnless(os.path.exists('testData/bzr/search.out'))
|
||||
inF = file('testData/bzr/search.out','r')
|
||||
lines=inF.readlines()
|
||||
inF=None
|
||||
self.failUnless(len(lines)==17)
|
||||
os.unlink('testData/bzr/search.out')
|
||||
|
||||
def test3_3SDSearch(self):
|
||||
from rdkit import Chem
|
||||
self.failUnless(os.path.exists('testData/bzr/Compounds.sqlt'))
|
||||
|
||||
p = subprocess.Popen(('python', 'SDSearch.py','--dbName=testData/bzr/Compounds.sqlt',
|
||||
'--nameOut=testData/bzr/search.out', '--smarts=cncncc',
|
||||
'--sdOut=testData/bzr/search.out.sdf'))
|
||||
res=p.wait()
|
||||
self.failIf(res)
|
||||
p=None
|
||||
|
||||
self.failUnless(os.path.exists('testData/bzr/search.out'))
|
||||
inF = file('testData/bzr/search.out','r')
|
||||
lines=inF.readlines()
|
||||
inF=None
|
||||
self.failUnless(len(lines)==49)
|
||||
|
||||
suppl=Chem.SDMolSupplier('testData/bzr/search.out.sdf')
|
||||
ms = [x for x in suppl]
|
||||
suppl=None
|
||||
self.failUnless(len(ms)==49)
|
||||
for i,m in enumerate(ms):
|
||||
self.failUnless(m.GetProp('_Name')==lines[i].strip())
|
||||
os.unlink('testData/bzr/search.out')
|
||||
os.unlink('testData/bzr/search.out.sdf')
|
||||
|
||||
def test3_4SDSearch(self):
|
||||
from rdkit import Chem
|
||||
self.failUnless(os.path.exists('testData/bzr/Compounds.sqlt'))
|
||||
|
||||
p = subprocess.Popen(('python', 'SDSearch.py','--dbName=testData/bzr/Compounds.sqlt',
|
||||
'--nameOut=testData/bzr/search.out', '--smarts=cncncc',
|
||||
'-q activity<6.0',
|
||||
'--sdOut=testData/bzr/search.out.sdf'))
|
||||
res=p.wait()
|
||||
self.failIf(res)
|
||||
p=None
|
||||
|
||||
self.failUnless(os.path.exists('testData/bzr/search.out'))
|
||||
inF = file('testData/bzr/search.out','r')
|
||||
lines=inF.readlines()
|
||||
inF=None
|
||||
self.failUnless(len(lines)==5)
|
||||
|
||||
suppl=Chem.SDMolSupplier('testData/bzr/search.out.sdf')
|
||||
ms = [x for x in suppl]
|
||||
suppl=None
|
||||
self.failUnless(len(ms)==5)
|
||||
for i,m in enumerate(ms):
|
||||
self.failUnless(m.GetProp('_Name')==lines[i].strip())
|
||||
|
||||
os.unlink('testData/bzr/search.out')
|
||||
os.unlink('testData/bzr/search.out.sdf')
|
||||
|
||||
|
||||
def test4CreateOptions(self):
|
||||
if os.path.exists('testData/bzr/Compounds.sqlt'):
|
||||
@@ -390,10 +304,12 @@ class TestCase(unittest.TestCase):
|
||||
|
||||
conn = DbConnect('testData/bzr/Compounds.sqlt')
|
||||
d = conn.GetData('molecules',fields='count(*)')
|
||||
self.failUnless(d[0][0]==10)
|
||||
self.failUnlessEqual(d[0][0],10)
|
||||
d = conn.GetData('molecules',fields='*')
|
||||
self.failUnless(len(d)==10)
|
||||
self.failUnless(len(d[0])==2)
|
||||
self.failUnlessEqual(len(d),10)
|
||||
cns = [x.lower() for x in d.GetColumnNames()]
|
||||
self.failIf('smiles' in cns)
|
||||
|
||||
conn=None
|
||||
d=None
|
||||
|
||||
@@ -423,9 +339,9 @@ class TestCase(unittest.TestCase):
|
||||
self.failUnless(d[0][0]==10)
|
||||
d = conn.GetData('molecules',fields='*')
|
||||
self.failUnless(len(d)==10)
|
||||
self.failUnless(len(d[0])==2)
|
||||
cns = [x.lower() for x in d.GetColumnNames()]
|
||||
self.failIf('smiles' in cns)
|
||||
|
||||
|
||||
p = subprocess.Popen(('python', 'CreateDb.py','--dbDir=testData/bzr','--molFormat=smiles',
|
||||
'--noProps','--noFingerprints','--noPairs','--noDescriptors',
|
||||
'testData/bzr.smi'))
|
||||
@@ -440,10 +356,11 @@ class TestCase(unittest.TestCase):
|
||||
|
||||
conn = DbConnect('testData/bzr/Compounds.sqlt')
|
||||
d = conn.GetData('molecules',fields='count(*)')
|
||||
self.failUnless(d[0][0]==10)
|
||||
self.failUnlessEqual(d[0][0],10)
|
||||
d = conn.GetData('molecules',fields='*')
|
||||
self.failUnless(len(d)==10)
|
||||
self.failUnless(len(d[0])==3)
|
||||
self.failUnlessEqual(len(d),10)
|
||||
cns = [x.lower() for x in d.GetColumnNames()]
|
||||
self.failUnless('smiles' in cns)
|
||||
|
||||
p = subprocess.Popen(('python', 'CreateDb.py','--dbDir=testData/bzr','--molFormat=smiles',
|
||||
'--noFingerprints','--noPairs','--noDescriptors',
|
||||
@@ -460,10 +377,11 @@ class TestCase(unittest.TestCase):
|
||||
|
||||
conn = DbConnect('testData/bzr/Compounds.sqlt')
|
||||
d = conn.GetData('molecules',fields='count(*)')
|
||||
self.failUnless(d[0][0]==10)
|
||||
self.failUnlessEqual(d[0][0],10)
|
||||
d = conn.GetData('molecules',fields='*')
|
||||
self.failUnless(len(d)==10)
|
||||
self.failUnless(len(d[0])==3)
|
||||
self.failUnlessEqual(len(d),10)
|
||||
cns = [x.lower() for x in d.GetColumnNames()]
|
||||
self.failUnless('smiles' in cns)
|
||||
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user