mirror of
https://github.com/microsoft/foldingdiff.git
synced 2026-06-04 13:30:33 +08:00
117 lines
3.4 KiB
Python
117 lines
3.4 KiB
Python
"""
|
|
Wrapper to run FASPR side chain packing on a pdb file.
|
|
|
|
https://github.com/tommyhuangthu/FASPR
|
|
"""
|
|
|
|
import os
|
|
import argparse
|
|
from glob import glob
|
|
from collections import defaultdict
|
|
import itertools
|
|
import tempfile
|
|
import subprocess
|
|
import shlex
|
|
import logging
|
|
import multiprocessing as mp
|
|
from typing import *
|
|
|
|
from foldingdiff.tmalign import match_files
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
FASPR_BIN = os.path.expanduser("~/software/FASPR/FASPR")
|
|
assert os.path.isfile(FASPR_BIN)
|
|
|
|
|
|
def read_fasta(fasta_fname: str) -> Dict[str, str]:
|
|
"""Read the fasta file as a dict mapping identifiers to sequences."""
|
|
retval = defaultdict(str)
|
|
with open(fasta_fname, "r") as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if line.startswith(">"):
|
|
identifier = line[1:]
|
|
else:
|
|
retval[identifier] += line
|
|
return retval
|
|
|
|
|
|
def run_faspr(input_pdb: str, input_fasta: str, output_pdb: str) -> str:
|
|
"""Run FASPR and write structure to output_pdb."""
|
|
# sequence should be written a single line of one-letter alphabet; we read
|
|
# in the fasta file and rewrite the sequences to adhere to this format
|
|
fasta_seqs = read_fasta(input_fasta)
|
|
assert len(fasta_seqs) == 1
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
# Write the fasta file
|
|
seq_fname = os.path.join(tmpdir, "seq.txt")
|
|
with open(seq_fname, "w") as sink:
|
|
sink.write(fasta_seqs.popitem()[1] + "\n")
|
|
|
|
# Run FASPR
|
|
cmd = f"{FASPR_BIN} -i {input_pdb} -s {seq_fname} -o {output_pdb}"
|
|
subprocess.check_call(
|
|
shlex.split(cmd), stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL
|
|
)
|
|
return output_pdb
|
|
|
|
|
|
def build_parser() -> argparse.ArgumentParser:
|
|
"""Build a basic CLI parser."""
|
|
parser = argparse.ArgumentParser(
|
|
usage="Wrapper for running FASPR",
|
|
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
|
|
)
|
|
parser.add_argument(
|
|
"input_pdb_dir", type=str, help="Input dir containing .pdb files"
|
|
)
|
|
parser.add_argument(
|
|
"input_fasta_dir", type=str, help="Input dir containing .fasta files"
|
|
)
|
|
parser.add_argument("output_pdb_dir", type=str, help="Output dir for .pdb files")
|
|
parser.add_argument(
|
|
"--threads", "-t", type=int, default=mp.cpu_count(), help="Num threads to use"
|
|
)
|
|
return parser
|
|
|
|
|
|
def main():
|
|
"""Run script."""
|
|
args = build_parser().parse_args()
|
|
|
|
query2fasta = match_files(
|
|
glob(os.path.join(args.input_pdb_dir, "*.pdb")),
|
|
glob(os.path.join(args.input_fasta_dir, "*.fasta")),
|
|
strategy="prefix",
|
|
)
|
|
logging.info(
|
|
f"Matched {len(query2fasta)} pdb files to {len(list(itertools.chain.from_iterable(query2fasta.values())))} fasta files"
|
|
)
|
|
|
|
# Make the output directory
|
|
os.makedirs(args.output_pdb_dir, exist_ok=True)
|
|
|
|
# Create arg tuples (pdb, fasta, out_pdb)
|
|
arg_tuples = []
|
|
for query_pdb, fasta_files in query2fasta.items():
|
|
for fasta_file in fasta_files:
|
|
arg_tuples.append(
|
|
(
|
|
query_pdb,
|
|
fasta_file,
|
|
os.path.join(
|
|
args.output_pdb_dir,
|
|
os.path.basename(fasta_file).replace(".fasta", ".pdb"),
|
|
),
|
|
)
|
|
)
|
|
|
|
with mp.Pool(args.threads) as pool:
|
|
pool.starmap(run_faspr, arg_tuples, chunksize=10)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|