mirror of
https://github.com/KosinskiLab/AlphaPulldown.git
synced 2026-06-04 14:14:24 +08:00
Fix #575
This commit is contained in:
@@ -17,7 +17,7 @@ import zipfile
|
||||
import glob
|
||||
import ast
|
||||
|
||||
from Bio.PDB import PDBParser, PPBuilder
|
||||
from Bio.PDB import PDBParser, MMCIFParser, PPBuilder
|
||||
from Bio.PDB.Structure import Structure as BioStructure
|
||||
from absl import app, flags, logging
|
||||
import numpy as np
|
||||
@@ -792,6 +792,7 @@ def _get_feature_metadata(
|
||||
modelcif_json: dict,
|
||||
cmplx_name: str,
|
||||
out_dir: list,
|
||||
fallback_structure_path: str | None = None,
|
||||
) -> Tuple[List[str], List[str]]:
|
||||
"""Read metadata from a feature JSON file."""
|
||||
if "__meta__" not in modelcif_json:
|
||||
@@ -813,12 +814,49 @@ def _get_feature_metadata(
|
||||
modelcif_json["__meta__"][mnmr][
|
||||
"software"
|
||||
] = _get_software_with_parameters(jdata["software"], jdata["other"])
|
||||
fp = jdata["other"]["fasta_paths"]
|
||||
fp = ast.literal_eval(fp)
|
||||
for curr_seq, curr_desc in iter_seqs(fp):
|
||||
new_entry = {'description': curr_desc, 'sequence': curr_seq}
|
||||
if new_entry not in fasta_dicts:
|
||||
fasta_dicts.append(new_entry)
|
||||
fp = jdata["other"].get("fasta_paths")
|
||||
if fp is not None:
|
||||
fp = ast.literal_eval(fp)
|
||||
existing_fp = []
|
||||
missing_fp = []
|
||||
for p in fp:
|
||||
if os.path.isfile(p):
|
||||
existing_fp.append(p)
|
||||
else:
|
||||
# Also try relative-to-output-dir paths, as users
|
||||
# often run the converter from a different cwd.
|
||||
relp = os.path.join(out_dir, p)
|
||||
if os.path.isfile(relp):
|
||||
existing_fp.append(relp)
|
||||
else:
|
||||
missing_fp.append(p)
|
||||
if missing_fp:
|
||||
logging.warning(
|
||||
"FASTA file(s) referenced in feature metadata not found: "
|
||||
+ ", ".join(missing_fp)
|
||||
)
|
||||
for curr_seq, curr_desc in iter_seqs(existing_fp):
|
||||
new_entry = {"description": curr_desc, "sequence": curr_seq}
|
||||
if new_entry not in fasta_dicts:
|
||||
fasta_dicts.append(new_entry)
|
||||
|
||||
# Fallback: if FASTA input is missing, we still want a usable sequence
|
||||
# mapping (primarily for descriptions) by extracting polymer sequences from
|
||||
# the model structure file (PDB or mmCIF).
|
||||
if not fasta_dicts and fallback_structure_path:
|
||||
try:
|
||||
fasta_dicts = _get_fasta_dicts_from_structure_file(
|
||||
fallback_structure_path, cmplx_name
|
||||
)
|
||||
logging.warning(
|
||||
"No FASTA sequences available; extracted sequences from structure file "
|
||||
+ f"'{fallback_structure_path}'."
|
||||
)
|
||||
except Exception as exc:
|
||||
logging.warning(
|
||||
"No FASTA sequences available and failed to extract sequences from "
|
||||
+ f"'{fallback_structure_path}': {exc}"
|
||||
)
|
||||
|
||||
return cmplx_name, fasta_dicts
|
||||
|
||||
@@ -852,8 +890,8 @@ def _get_entities(
|
||||
# Using MD5 sums for comparing sequences
|
||||
sequences[hashlib.md5(seq.encode()).hexdigest()] = description
|
||||
|
||||
# gather molecular entities from PDB file
|
||||
structure = PDBParser().get_structure(cmplx_name, pdb_file)
|
||||
# gather molecular entities from structure file (PDB or mmCIF)
|
||||
structure = _load_structure(cmplx_name, pdb_file)
|
||||
cif_json["target_entities"] = []
|
||||
already_seen = []
|
||||
for seq in PPBuilder(radius=999999999).build_peptides(structure, aa_only=False):
|
||||
@@ -880,6 +918,41 @@ def _get_entities(
|
||||
return structure
|
||||
|
||||
|
||||
def _load_structure(structure_id: str, structure_path: str) -> BioStructure:
|
||||
"""Load a structure from PDB or mmCIF path."""
|
||||
lower = structure_path.lower()
|
||||
if lower.endswith(".cif") or lower.endswith(".mmcif"):
|
||||
# QUIET to suppress warnings on common mmCIF oddities
|
||||
return MMCIFParser(QUIET=True).get_structure(structure_id, structure_path)
|
||||
return PDBParser(QUIET=True).get_structure(structure_id, structure_path)
|
||||
|
||||
|
||||
def _get_fasta_dicts_from_structure_file(
|
||||
structure_path: str, structure_id: str
|
||||
) -> List[dict]:
|
||||
"""Extract polymer sequences from a PDB/mmCIF structure file.
|
||||
|
||||
Returns a list of dicts with keys: 'description', 'sequence'.
|
||||
"""
|
||||
structure = _load_structure(structure_id, structure_path)
|
||||
fasta_dicts: List[dict] = []
|
||||
seen = set()
|
||||
polypeptides = PPBuilder(radius=999999999).build_peptides(structure, aa_only=False)
|
||||
for pp in polypeptides:
|
||||
if not pp:
|
||||
continue
|
||||
chain_id = pp[0].parent.id
|
||||
seq = str(pp.get_sequence())
|
||||
if not seq:
|
||||
continue
|
||||
# De-duplicate identical sequences across chains
|
||||
if seq in seen:
|
||||
continue
|
||||
seen.add(seq)
|
||||
fasta_dicts.append({"description": f"chain_{chain_id}", "sequence": seq})
|
||||
return fasta_dicts
|
||||
|
||||
|
||||
def _get_scores(cif_json: dict, scr_file: str) -> None:
|
||||
"""Add scores to JSON data."""
|
||||
# Read from jsons instead
|
||||
@@ -1233,7 +1306,7 @@ def alphapulldown_model_to_modelcif(
|
||||
modelcif_json = {}
|
||||
# fetch metadata
|
||||
cmplx_name, fasta_dicts = _get_feature_metadata(
|
||||
modelcif_json, cmplx_name, out_dir
|
||||
modelcif_json, cmplx_name, out_dir, fallback_structure_path=mdl[0]
|
||||
)
|
||||
# fetch/ assemble more data about the modelling experiment
|
||||
_get_model_info(
|
||||
|
||||
@@ -6,6 +6,8 @@ import shutil
|
||||
import tempfile
|
||||
from os.path import join, dirname, abspath
|
||||
import zipfile
|
||||
import json
|
||||
import glob
|
||||
|
||||
"""
|
||||
Test conversion of PDB to CIF for monomers and multimers
|
||||
@@ -119,3 +121,34 @@ class TestConvertPDB2CIF(parameterized.TestCase):
|
||||
command.extend(["--model_selected", str(model_selected)])
|
||||
|
||||
return command
|
||||
|
||||
def test_missing_fasta_falls_back_to_structure_sequence(self):
|
||||
"""If FASTA path in feature metadata is missing, parse sequence from structure."""
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
test_output_dir = join(temp_dir, "output")
|
||||
shutil.copytree(join(self.input_dir, "TEST"), test_output_dir)
|
||||
|
||||
# Break the FASTA reference in feature metadata.
|
||||
md_files = glob.glob(join(test_output_dir, "*_feature_metadata_*.json"))
|
||||
self.assertTrue(md_files, "No feature metadata JSON found in test output dir")
|
||||
for md_file in md_files:
|
||||
with open(md_file, "r", encoding="ascii") as fh:
|
||||
data = json.load(fh)
|
||||
data["other"]["fasta_paths"] = "['/this/path/does/not/exist.fasta']"
|
||||
with open(md_file, "w", encoding="ascii") as fh:
|
||||
json.dump(data, fh, indent=2)
|
||||
|
||||
command = self.build_command(
|
||||
test_output_dir, add_associated=False, compress=False, model_selected=0
|
||||
)
|
||||
subprocess.run(command, check=True, capture_output=True, text=True)
|
||||
|
||||
out_cif = join(test_output_dir, "ranked_0.cif")
|
||||
self.assertTrue(os.path.exists(out_cif), "ModelCIF output was not created")
|
||||
|
||||
# Sequence should still be present in the output even without FASTA.
|
||||
# The TEST sequence starts with "MESAIA..." in test FASTA and in the
|
||||
# structure-derived sequence.
|
||||
with open(out_cif, "r", encoding="ascii") as fh:
|
||||
cif_txt = fh.read()
|
||||
self.assertIn("MESAIA", cif_txt)
|
||||
|
||||
Reference in New Issue
Block a user