Source code for evcouplings.fold.tools

"""
Wrappers for tools for 3D structure prediction
from evolutionary couplings

Authors:
  Thomas A. Hopf
"""
from copy import deepcopy
import os
from os import path
from collections import defaultdict
import re

import pandas as pd

from evcouplings.utils.config import InvalidParameterError
from evcouplings.utils.system import (
    run, makedirs, temp, verify_resources
)


[docs]def run_cns(inp_script=None, inp_file=None, log_file=None, binary="cns"): """ Run CNSsolve 1.21 (without worrying about environment setup) Note that the user is responsible for verifying the output products of CNS, since their paths are determined by .inp scripts and hard to check automatically and in a general way. Either input_script or input_file has to be specified. Parameters ---------- inp_script : str, optional (default: None) CNS ".inp" input script (actual commands, not file) inp_file : str, optional (default: None) Path to .inp input script file. Will override inp_script if also specified. log_file : str, optional (default: None) Save CNS stdout output to this file binary : str, optional (default: "cns") Absolute path of CNS binary Raises ------ ExternalToolError If call to CNS fails InvalidParameterError If no input script (file or string) given """ # make sure we have absolute path binary = path.abspath(binary) # extract main installation directory cns_main_dir = binary for i in range(3): cns_main_dir = path.dirname(cns_main_dir) # create environment env = deepcopy(os.environ) library_dir = path.join(cns_main_dir, "libraries") module_dir = path.join(cns_main_dir, "modules") env["CNS_SOLVE"] = cns_main_dir env["CNS_LIB"] = library_dir env["CNS_MODULE"] = module_dir env["CNS_HELPLIB"] = path.join(cns_main_dir, "helplip") for var, subdir in [ ("CNS_TOPPAR", "toppar"), ("CNS_CONFDB", "confdb"), ("CNS_XTALLIB", "xtal"), ("CNS_NMRLIB", "nmr"), ("CNS_XRAYLIB", "xray"), ]: env[var] = path.join(library_dir, subdir) for var, subdir in [ ("CNS_XTALMODULE", "xtal"), ("CNS_NMRMODULE", "nmr"), ]: env[var] = path.join(module_dir, subdir) if inp_script is None and inp_file is None: raise InvalidParameterError( "Must specify either input_script or input_file" ) # read input script, this is fed into CNS using stdin if inp_file is not None: with open(inp_file) as f: inp_script = "".join(f.readlines()) # run and store output return_code, stdout, stderr = run( binary, stdin=inp_script ) # write stdout output to log file if log_file is not None: with open(log_file, "w") as f: f.write(stdout)
[docs]def run_cns_13(inp_script=None, inp_file=None, log_file=None, source_script=None, binary="cns"): """ Run CNSsolve 1.3 Note that the user is responsible for verifying the output products of CNS, since their paths are determined by .inp scripts and hard to check automatically and in a general way. Either input_script or input_file has to be specified. Parameters ---------- inp_script : str, optional (default: None) CNS ".inp" input script (actual commands, not file) inp_file : str, optional (default: None) Path to .inp input script file. Will override inp_script if also specified. log_file : str, optional (default: None) Save CNS stdout output to this file source_script : str, optional (default: None) Script to set CNS environment variables. This should typically point to .cns_solve_env_sh in the CNS installation main directory (the shell script itself needs to be edited to contain the path of the installation) binary : str, optional (default: "cns") Name of CNS binary Raises ------ ExternalToolError If call to CNS fails InvalidParameterError If no input script (file or string) given """ # usually need to source script to set up environment for CNS if source_script is not None: cmd = "source {};".format(source_script) else: cmd = "" cmd += binary if inp_script is None and inp_file is None: raise InvalidParameterError( "Must specify either input_script or input_file" ) # read input script, this is fed into CNS using stdin if inp_file is not None: with open(inp_file) as f: inp_script = "".join(f.readlines()) # run and store output return_code, stdout, stderr = run( cmd, stdin=inp_script, shell=True ) # write stdout output to log file if log_file is not None: with open(log_file, "w") as f: f.write(stdout)
[docs]def run_psipred(fasta_file, output_dir, binary="runpsipred"): """ Run psipred secondary structure prediction psipred output file convention: run_psipred creates output files <rootname>.ss2 and <rootname2>.horiz in the current working directory, where <rootname> is extracted from the basename of the input file (e.g. /home/test/<rootname>.fa) Parameters ---------- fasta_file : str Input sequence file in FASTA format output_dir : str Directory in which output will be saved binary : str, optional (default: "cns") Path of psipred executable (runpsipred) Returns ------- ss2_file : str Absolute path to prediction output in "VFORMAT" horiz_file : str Absolute path to prediction output in "HFORMAT" Raises ------ ExternalToolError If call to psipred fails """ # make sure we have absolute path binary = path.abspath(binary) fasta_file = path.abspath(fasta_file) output_dir = path.abspath(output_dir) # make sure input file is valid verify_resources("Input FASTA file is invalid", fasta_file) # make sure output directory exists makedirs(output_dir) # execute psipred; # we have to start it from output directory so # result files end up there (this is hardcoded # in runpsipred) return_code, stdout, stderr = run( [binary, fasta_file], working_dir=output_dir, ) # determine where psipred will store output based # on logic from runpsipred script rootname, _ = path.splitext(path.basename(fasta_file)) output_prefix = path.join( output_dir, rootname ) # construct paths to output files in vertical and horizontal formats ss2_file = output_prefix + ".ss2" horiz_file = output_prefix + ".horiz" # make sure we actually predicted something verify_resources( "psipred output is invalid", ss2_file, horiz_file ) return ss2_file, horiz_file
[docs]def read_psipred_prediction(filename, first_index=1): """ Read a psipred secondary structure prediction file in horizontal or vertical format (auto-detected). Parameters ---------- filename : str Path to prediction output file first_index : int, optional (default: 1) Index of first position in predicted sequence Returns ------- pred : pandas.DataFrame Table containing secondary structure prediction, with the following columns: * i: position * A_i: amino acid * sec_struct_3state: prediction (H, E, C) If reading vformat, also contains columns for the individual (score_coil/helix/strand) If reading hformat, also contains confidence score between 1 and 9 (sec_struct_conf) """ # detect file format file_format = None with open(filename) as f: for line in f: if line.startswith("# PSIPRED HFORMAT"): file_format = "hformat" elif line.startswith("# PSIPRED VFORMAT"): file_format = "vformat" if file_format == "vformat": # read in output file pred = pd.read_csv( filename, skip_blank_lines=True, comment="#", delim_whitespace=True, names=[ "i", "A_i", "sec_struct_3state", "score_coil", "score_helix", "score_strand" ], ) elif file_format == "hformat": content = defaultdict(str) with open(filename) as f: # go through file and assemble Conf, Pred, and AA lines # into single strings for line in f: line = line.rstrip().replace(" ", "") if ":" in line: key, _, value = line.partition(":") content[key] += value pred = pd.DataFrame({ "A_i": list(content["AA"]), "sec_struct_3state": list(content["Pred"]), "sec_struct_conf": list(map(int, content["Conf"])), }) pred.loc[:, "i"] = list(range(1, len(pred) + 1)) else: raise InvalidParameterError( "Input file is not a valid psipred prediciton file" ) # shift indices if first_index != 1 pred.loc[:, "i"] += (first_index - 1) return pred
[docs]def parse_maxcluster_comparison(comparison_output): """ Parse maxcluster output into a DataFrame Parameters ---------- comparison_output : str stdout output from maxcluster after comparison Returns ------- pandas.DataFrame Parsed result table (columns: filename, num_pairs, rmsd, maxsub, tm, msi), refer to maxcluster documentation for explanation of the score fields. """ # compile regular expression to extract output fields m = re.compile( "vs. (.+?)\s+Pairs=\s*(\d+), RMSD=\s*(\d+\.\d+), " "MaxSub=\s*(\d+\.\d+), TM=\s*(\d+\.\d+), MSI=\s*(\d+\.\d+)" ) # extract scores for each structure (one per line) res = [] for line in comparison_output.splitlines(): match = m.search(line) if match: res.append(match.groups()) # create dataframe of results df = pd.DataFrame( res, columns=[ "filename", "num_pairs", "rmsd", "maxsub", "tm", "msi" ] ) # convert score columns to numerical values for c in df.columns: if c != "filename": df.loc[:, c] = pd.to_numeric(df.loc[:, c]) df.loc[:, "num_pairs"] = df.loc[:, "num_pairs"].astype(int) return df
[docs]def run_maxcluster_compare(predictions, experiment, normalization_length=None, distance_cutoff=None, binary="maxcluster"): """ Compare a set of predicted structures to an experimental structure using maxcluster. For clustering functionality, use run_maxcluster_clustering() function. For a high-level wrapper around this function that removes problematic atoms and compares multiple models, please look at evcouplings.fold.protocol.compare_models_maxcluster(). Parameters ---------- predictions : list(str) List of PDB files that should be compared against experiment experiment : str Path of experimental structure PDB file. Note that the numbering and residues in this file must agree with the predicted structure, and that the structure may not contain duplicate atoms (multiple models, or alternative locations for the same atom). normalization_length : int, optional (default: None) Use this length to normalize the Template Modeling (TM) score (-N option of maxcluster). If None, will normalize by length of experiment. distance_cutoff : float, optional (default: None) Distance cutoff for MaxSub search (-d option of maxcluster). If None, will use maxcluster auto-calibration. binary : str, optional (default: "maxcluster") Path to maxcluster binary Returns ------- pandas.DataFrame Comparison result table (see parse_maxcluster_comparison for more detailed explanation) """ # create a list of files for input to maxcluster list_file = temp() with open(list_file, "w") as f: for pred_file in predictions: f.write(pred_file + "\n") cmd = [binary, "-l", list_file, "-e", experiment] # normalization length for TM score calculation if normalization_length is not None: cmd += ["-N", str(normalization_length)] # distance cutoff for MaxSub search if distance_cutoff is not None: cmd += ["-d", str(distance_cutoff)] return_code, stdout, stderr = run(cmd) return parse_maxcluster_comparison(stdout)
[docs]def parse_maxcluster_clustering(clustering_output): """ Parse maxcluster clustering output into a DataFrame Parameters ---------- clustering_output : str stdout output from maxcluster after clustering Returns ------- pandas.DataFrame Parsed result table (columns: filename, cluster, cluster_size) """ # compile regular expression to extract output fields m = re.compile("INFO\s*:\s*(\d+)\s*:\s*(\d+)\s+(.+)") # extract scores for each structure (one per line) res = [] read = False for line in clustering_output.splitlines(): # only parse section where cluster for each structure is output if "Clusters @ Threshold" in line: read = True if "Centroids" in line: read = False match = m.search(line) if read and match: res.append(match.groups()) # turn results into table df = pd.DataFrame(res, columns=["item", "cluster", "filename"]) # add column containing the size of each cluster cluster_sizes = df.groupby( "cluster" ).size().to_frame("cluster_size").reset_index() df = df.merge(cluster_sizes, on="cluster") return df.loc[:, ["filename", "cluster", "cluster_size"]]
[docs]def run_maxcluster_cluster(predictions, method="average", rmsd=True, clustering_threshold=None, binary="maxcluster"): """ Compare a set of predicted structures to an experimental structure using maxcluster. For clustering functionality, use run_maxcluster_clustering() function. Parameters ---------- predictions : list(str) List of PDB files that should be compared against experiment method : {"single", "average", "maximum", "pairs_min", "pairs_abs"}, optional (default: "average") Clustering method (single / average / maximum linkage, or min / absolute size neighbour pairs clustering_threshold : float (optional, default: None) Initial clustering threshold (maxcluster -T option) rmsd : bool, optional (default: True) Use RMSD-based clustering (faster) binary : str, optional (default: "maxcluster") Path to maxcluster binary Returns ------- pandas.DataFrame Clustering result table (see parse_maxcluster_clustering for more detailed explanation) """ # create a list of files for input to maxcluster list_file = temp() with open(list_file, "w") as f: for pred_file in predictions: f.write(pred_file + "\n") method_map = { "single": 1, "average": 2, "maximum": 3, "pairs_min": 4, "pairs_abs": 5, } if method not in method_map: raise InvalidParameterError( "Method must be one of the following: " + ", ".join(method_map.keys()) ) cmd = [binary, "-l", list_file, "-C", str(method_map[method])] if rmsd: cmd += ["-rmsd"] if clustering_threshold is not None: cmd += ["-T", str(clustering_threshold)] return_code, stdout, stderr = run(cmd) return parse_maxcluster_clustering(stdout)