"""
Class to store parameters of undirected graphical model of
sequences and perform calculations using the model
(statistical energies, coupling scores).
Authors:
Thomas A. Hopf
"""
from collections import Iterable
from copy import deepcopy
from numba import jit
import numpy as np
import pandas as pd
# Constants
_SLICE = np.s_[:]
HAMILTONIAN_COMPONENTS = [FULL, COUPLINGS, FIELDS] = [0, 1, 2]
NUM_COMPONENTS = len(HAMILTONIAN_COMPONENTS)
# Methods for fast calculations (moved outside of class for numba jit)
@jit(nopython=True)
def _hamiltonians(sequences, J_ij, h_i):
"""
Calculates the Hamiltonian of the global probability distribution P(A_1, ..., A_L)
for a given sequence A_1,...,A_L from J_ij and h_i parameters
Parameters
----------
sequences : np.array
Sequence matrix for which Hamiltonians will be computed
J_ij: np.array
L x L x num_symbols x num_symbols J_ij pair coupling parameter matrix
h_i: np.array
L x num_symbols h_i fields parameter matrix
Returns
-------
np.array
Float matrix of size len(sequences) x 3, where each row corresponds to the
1) total Hamiltonian of sequence and the 2) J_ij and 3) h_i sub-sums
"""
# iterate over sequences
N, L = sequences.shape
H = np.zeros((N, NUM_COMPONENTS))
for s in range(N):
A = sequences[s]
hi_sum = 0.0
Jij_sum = 0.0
for i in range(L):
hi_sum += h_i[i, A[i]]
for j in range(i + 1, L):
Jij_sum += J_ij[i, j, A[i], A[j]]
H[s] = [Jij_sum + hi_sum, Jij_sum, hi_sum]
return H
@jit(nopython=True)
def _single_mutant_hamiltonians(target_seq, J_ij, h_i):
"""
Calculate matrix of all possible single-site substitutions
Parameters
----------
L : int
Length of model
num_symbols : int
Number of states of model
target_seq : np.array(int)
Target sequence for which mutant energy differences will be calculated
J_ij: np.array
L x L x num_symbols x num_symbols J_ij pair coupling parameter matrix
h_i: np.array
L x num_symbols h_i fields parameter matrix
Returns
-------
np.array
Float matrix of size L x num_symbols x 3, where the first two dimensions correspond to
Hamiltonian differences compared to target sequence for all possible substitutions in
all positions, and the third dimension corresponds to the deltas of
1) total Hamiltonian and the 2) J_ij and 3) h_i sub-sums
"""
L, num_symbols = h_i.shape
H = np.empty((L, num_symbols, NUM_COMPONENTS))
# iterate over all positions
for i in range(L):
# iterate over all substitutions
for A_i in range(num_symbols):
# iterate over couplings to all other sites
delta_hi = h_i[i, A_i] - h_i[i, target_seq[i]]
delta_Jij = 0.0
for j in range(L):
if i != j:
delta_Jij += (
J_ij[i, j, A_i, target_seq[j]] -
J_ij[i, j, target_seq[i], target_seq[j]]
)
H[i, A_i] = [delta_Jij + delta_hi, delta_Jij, delta_hi]
return H
@jit(nopython=True)
def _delta_hamiltonian(pos, subs, target_seq, J_ij, h_i):
"""
Parameters
----------
pos : np.array(int)
Vector of substituted positions
subs : np.array(int)
Vector of symbols above positions are substituted to
target_seq : np.array(int)
Target sequence for which mutant energy differences will be calculated
relative to
J_ij: np.array
L x L x num_symbols x num_symbols J_ij pair coupling parameter matrix
h_i: np.array
L x num_symbols h_i fields parameter matrix
Returns
-------
np.array
Vector of length 3, where elements correspond to delta of
1) total Hamiltonian and the 2) J_ij and 3) h_i sub-sums
"""
L, num_symbols = h_i.shape
M = pos.shape[0]
delta_hi = 0.0
delta_Jij = 0.0
# iterate over all changed positions
for m in range(M):
i = pos[m]
A_i = subs[m]
# change in fields
delta_hi += h_i[i, A_i] - h_i[i, target_seq[i]]
# couplings to all other positions in sequence
for j in range(L):
if i != j:
delta_Jij += (
J_ij[i, j, A_i, target_seq[j]] -
J_ij[i, j, target_seq[i], target_seq[j]]
)
# correct couplings between substituted positions:
# 1) do not count coupling twice (remove forward
# and backward coupling)
# 2) adjust background to new sequence
for n in range(m + 1, M):
j = pos[n]
A_j = subs[n]
# remove forward and backward coupling delta
delta_Jij -= J_ij[i, j, A_i, target_seq[j]]
delta_Jij -= J_ij[i, j, target_seq[i], A_j]
delta_Jij += J_ij[i, j, target_seq[i], target_seq[j]]
# the following line cancels out with line further down:
# delta_Jij += J_ij[i, j, target_seq[i], target_seq[j]]
# now add coupling delta once in correct background
delta_Jij += J_ij[i, j, A_i, A_j]
# following line cancels out with line above:
# delta_Jij -= J_ij[i, j, target_seq[i], target_seq[j]]
return np.array([delta_Jij + delta_hi, delta_Jij, delta_hi])
@jit(nopython=True)
def _zero_sum_gauge(J_ij, inplace=False):
"""
Transform coupling matrix into zero-sum gauge
(i.e., row and column sums of each ij submatrix are 0)
Parameters
----------
J_ij : np.array
Coupling matrix of size L x L x num_symbols x num_symbols
that should be transformed into zero-sum gauge
inplace : bool, optional (default: False)
Modify original matrix (True), or return transformed
matrix in a new matrix
Returns
-------
J_ij_0 : np.array
J_ij transformed into zero-sum gauge
"""
L, L2, num_symbols, num_symbols2 = J_ij.shape
assert L == L2 and num_symbols == num_symbols2
if inplace:
J_ij_0 = J_ij
else:
J_ij_0 = np.zeros((L, L, num_symbols, num_symbols))
# go through all pairs of positions
for i in range(L - 1):
for j in range(i + 1, L):
ij_mat = J_ij[i, j]
# calculate matrix, row and column averages
avg_ab = np.mean(ij_mat)
# can't use axis argument of np.mean in numba,
# so have to calculate rows/cols manually
avg_a = np.zeros(num_symbols)
avg_b = np.zeros(num_symbols)
ij_mat_T = ij_mat.T
for k in range(num_symbols):
avg_a[k] = np.mean(ij_mat[k])
avg_b[k] = np.mean(ij_mat_T[k])
# subtract correction terms from each entry
for a in range(num_symbols):
for b in range(num_symbols):
J_ij_0[i, j, a, b] = (
ij_mat[a, b] - avg_a[a] - avg_b[b] + avg_ab
)
J_ij_0[j, i, b, a] = J_ij_0[i, j, a, b]
return J_ij_0
[docs]class CouplingsModel:
"""
Class to store parameters of pairwise undirected graphical model of sequences
and compute evolutionary couplings, sequence statistical energies, etc.
"""
def __init__(self, model_file, precision="float32", file_format="plmc_v2", **kwargs):
"""
Initializes the object with raw values read from binary .Jij file
Parameters
----------
model_file : str or file-like object
Binary Jij file containing model parameters from plmc software,
or open binary file handle. Note: recommended use is to read from file
object, passing a file path will eventually be deprecated.
precision : {"float32", "float64"}, default: "float32"
Sets if input file has single (float32) or double precision (float64)
}
file_format : {"plmc_v2", "plmc_v1"}, default: "plmc_v2"
File format of parameter file.
Note: The use of "plmc_v1" is discouraged and only for backwards
compatibility as this format lacks crucial information about
parameters used by this class. Users are responsible for supplying
the missing values (e.g. regularization strength, alphabet or M_eff)
manually via the respective member variables/properties.
"""
is_file_obj = hasattr(model_file, "read")
if file_format == "plmc_v2":
if is_file_obj:
self.__read_plmc_v2(model_file, precision)
else:
with open(model_file, "rb") as f:
self.__read_plmc_v2(f, precision)
elif file_format == "plmc_v1":
if is_file_obj:
self.__read_plmc_v1(
model_file, precision, kwargs.get("alphabet", None)
)
else:
with open(model_file, "rb") as f:
self.__read_plmc_v1(
f, precision, kwargs.get("alphabet", None)
)
else:
raise ValueError(
"Illegal file format {}, valid options are:"
"plmc_v2, plmc_v1".format(
file_format
)
)
self.alphabet_map = {s: i for i, s in enumerate(self.alphabet)}
# in non-gap mode, focus sequence is still coded with a gap character,
# but gap is not part of model alphabet anymore; so if mapping crashes
# that means there is a non-alphabet character in sequence array
# and therefore there is no focus sequence.
try:
self.target_seq_mapped = np.array([self.alphabet_map[x] for x in self.target_seq])
self.has_target_seq = (np.sum(self.target_seq_mapped) > 0)
except KeyError:
self.target_seq_mapped = np.zeros((self.L), dtype=np.int32)
self.has_target_seq = False
self._reset_precomputed()
def _reset_precomputed(self):
"""
Delete precomputed values (e.g. mutation matrices)
"""
self._single_mut_mat_full = None
self._double_mut_mat = None
self._cn_scores = None
self._fn_scores = None
self._mi_scores_raw = None
self._mi_scores_apc = None
self._ecs = None
def __read_plmc_v2(self, f, precision):
"""
Read updated Jij file format from plmc.
Parameters
----------
f : file-like object
Binary Jij file containing model parameters
precision : {"float32", "float64"}
Sets if input file has single or double precision
"""
# model length, number of symbols, valid/invalid sequences
# and iterations
self.L, self.num_symbols, self.N_valid, self.N_invalid, self.num_iter = (
np.fromfile(f, "int32", 5)
)
# theta, regularization weights, and effective number of samples
self.theta, self.lambda_h, self.lambda_J, self.lambda_group, self.N_eff = (
np.fromfile(f, precision, 5)
)
# Read alphabet (make sure we get proper unicode rather than byte string)
self.alphabet = np.fromfile(
f, "S1", self.num_symbols
).astype("U1")
# weights of individual sequences (after clustering)
self.weights = np.fromfile(
f, precision, self.N_valid + self.N_invalid
)
# target sequence and index mapping, again ensure unicode
self._target_seq = np.fromfile(f, "S1", self.L).astype("U1")
self.index_list = np.fromfile(f, "int32", self.L)
# single site frequencies f_i and fields h_i
self.f_i, = np.fromfile(
f, dtype=(precision, (self.L, self.num_symbols)), count=1
)
self.h_i, = np.fromfile(
f, dtype=(precision, (self.L, self.num_symbols)), count=1
)
# pair frequencies f_ij and pair couplings J_ij / J_ij
self.f_ij = np.zeros(
(self.L, self.L, self.num_symbols, self.num_symbols)
)
self.J_ij = np.zeros(
(self.L, self.L, self.num_symbols, self.num_symbols)
)
# TODO: could read triangle matrix from file in one block
# like in read_params.m, which would result in faster reading
# but also 50% higher memory usage... for now save memory
for i in range(self.L - 1):
for j in range(i + 1, self.L):
self.f_ij[i, j], = np.fromfile(
f, dtype=(precision, (self.num_symbols, self.num_symbols)),
count=1
)
self.f_ij[j, i] = self.f_ij[i, j].T
for i in range(self.L - 1):
for j in range(i + 1, self.L):
self.J_ij[i, j], = np.fromfile(
f, dtype=(precision, (self.num_symbols, self.num_symbols)),
count=1
)
self.J_ij[j, i] = self.J_ij[i, j].T
# if lambda_h is negative, the model was
# inferred using mean-field
if self.lambda_h < 0:
# cast model to mean field model
from evcouplings.couplings.mean_field import MeanFieldCouplingsModel
self.__class__ = MeanFieldCouplingsModel
# handle requirements specific to
# the mean-field couplings model
self.transform_from_plmc_model()
def __read_plmc_v1(self, f, precision, alphabet=None):
"""
Read original eij/Jij file format from plmc. Use of this old format
is discouraged (see constructor documentation for details)
Parameters
----------
f : file-like object
Binary Jij file containing model parameters
precision : {"float32", "float64"}
Sets if input file has single or double precision
alphabet : str, default: None
"""
# local definitions of alphabets for backwards compatibility
# better: use v2 file format which includes this information
GAP = "-"
ALPHABET_PROTEIN_NOGAP = "ACDEFGHIKLMNPQRSTVWY"
ALPHABET_PROTEIN = GAP + ALPHABET_PROTEIN_NOGAP
# model length, number of symbols
self.L, = np.fromfile(f, "int32", 1)
self.num_symbols, = np.fromfile(f, "int32", 1)
# Old format does not have alphabet in file, so need
# to guess it or use user-supplied alphabet.
# if no alphabet is given, try to guess
if alphabet is None:
if self.num_symbols == 21:
alphabet = ALPHABET_PROTEIN
elif self.num_symbols == 20:
alphabet = ALPHABET_PROTEIN_NOGAP
else:
raise ValueError(
"Could not guess default alphabet for "
"{} states, specify alphabet parameter.".format(
self.num_symbols
)
)
else:
# verify if size of given alphabet matches model
if len(alphabet) != self.num_symbols:
raise ValueError(
"Size of alphabet ({}) does not agree with "
"number of states in model ({})".format(
len(alphabet), self.num_symbols
)
)
self.alphabet = np.array(list(alphabet))
# target sequence and index mapping, again ensure unicode
self._target_seq = np.fromfile(f, "S1", self.L).astype("U1")
self.index_list = np.fromfile(f, "int32", self.L)
# set all the information missing in v1 files to None
# valid/invalid sequences, number of iterations
self.N_valid = None
self.N_invalid = None
self.num_iter = None
# theta, regularization weights, and effective number of samples
self.theta = None
self.lambda_h = None
self.lambda_J = None
self.lambda_group = None
self.N_eff = None
# weights of individual sequences (after clustering)
self.weights = None
# single site frequencies f_i and fields h_i
self.f_i, = np.fromfile(
f, dtype=(precision, (self.L, self.num_symbols)), count=1
)
self.h_i, = np.fromfile(
f, dtype=(precision, (self.L, self.num_symbols)), count=1
)
# pair frequencies f_ij and pair couplings J_ij / J_ij
self.f_ij = np.zeros(
(self.L, self.L, self.num_symbols, self.num_symbols)
)
self.J_ij = np.zeros(
(self.L, self.L, self.num_symbols, self.num_symbols)
)
for i in range(self.L - 1):
for j in range(i + 1, self.L):
file_i, file_j = np.fromfile(f, "int32", 2)
if i + 1 != file_i or j + 1 != file_j:
raise ValueError(
"Error: column pair indices inconsistent. "
"Expected: {} {}; File: {} {}".format(i + 1, j + 1, file_i, file_j)
)
self.f_ij[i, j], = np.fromfile(
f, dtype=(precision, (self.num_symbols, self.num_symbols)),
count=1
)
self.f_ij[j, i] = self.f_ij[i, j].T
self.J_ij[i, j], = np.fromfile(
f, dtype=(precision, (self.num_symbols, self.num_symbols)),
count=1
)
self.J_ij[j, i] = self.J_ij[i, j].T
@property
def target_seq(self):
"""
Target/Focus sequence of model used for delta_hamiltonian
calculations (including single and double mutation matrices)
"""
return self._target_seq
@target_seq.setter
def target_seq(self, sequence):
"""
Define a new target sequence
Parameters
----------
sequence : str, or list of chars
Define a new default sequence for relative Hamiltonian
calculations (e.g. energy difference relative to wild-type
sequence). Length of sequence must correspond to model length (self.L)
"""
self._reset_precomputed()
if len(sequence) != self.L:
raise ValueError(
"Sequence length inconsistent with model length: {} {}".format(
len(sequence), self.L
)
)
if isinstance(sequence, str):
sequence = list(sequence)
self._target_seq = np.array(sequence)
self.target_seq_mapped = np.array([self.alphabet_map[x] for x in self.target_seq])
self.has_target_seq = True
@property
def index_list(self):
"""
Target/Focus sequence of model used for delta_hamiltonian
calculations (including single and double mutation matrices)
"""
return self._index_list
@index_list.setter
def index_list(self, mapping):
"""
Define a new number mapping for sequences
Parameters
----------
mapping: list of int
Sequence indices of the positions in the model.
Length of list must correspond to model length (self.L)
"""
if len(mapping) != self.L:
raise ValueError(
"Mapping length inconsistent with model length: {} {}".format(
len(mapping), self.L
)
)
self._index_list = deepcopy(mapping)
self.index_map = {b: a for a, b in enumerate(self.index_list)}
# update ECs, if they were already calculated
if hasattr(self, "_ecs"):
self._calculate_ecs()
[docs] def convert_sequences(self, sequences):
"""
Converts sequences in string format into internal symbol representation
according to alphabet of model
Parameters
----------
sequences : list of str
List of sequences (must have same length and correspond to
model states)
Returns
-------
np.array
Matrix of size len(sequences) x L of sequences converted to
integer symbols
"""
seq_lens = list(set(map(len, sequences)))
if len(seq_lens) != 1:
raise ValueError("Input sequences have different lengths: " + str(seq_lens))
L_seq = seq_lens[0]
if L_seq != self.L:
raise ValueError(
"Sequence lengths do not correspond to model length: {} {}".format(
L_seq, self.L
)
)
S = np.empty((len(sequences), L_seq), dtype=np.int)
try:
for i, s in enumerate(sequences):
S[i] = [self.alphabet_map[x] for x in s]
except KeyError:
raise ValueError(
"Invalid symbol in sequence {}: {}".format(i, s)
)
return S
[docs] def hamiltonians(self, sequences):
"""
Calculates the Hamiltonians of the global probability distribution P(A_1, ..., A_L)
for the given sequences A_1,...,A_L from J_ij and h_i parameters
Parameters
----------
sequences : list of str
List of sequences for which Hamiltonian will be computed,
or converted np.array obtained using convert_sequences method
Returns
-------
np.array
Float matrix of size len(sequences) x 3, where each row corresponds to the
1) total Hamiltonian of sequence and the 2) J_ij and 3) h_i sub-sums
"""
if isinstance(sequences, list):
sequences = self.convert_sequences(sequences)
return _hamiltonians(sequences, self.J_ij, self.h_i)
@property
def single_mut_mat_full(self):
"""
Hamiltonian difference for all possible single-site variants
L x num_symbol x 3 matrix (np.array) containing delta Hamiltonians
for all possible single mutants of target sequence.
Third dimension: 1) full Hamiltonian, 2) J_ij, 3) h_i
"""
if self._single_mut_mat_full is None:
self._single_mut_mat_full = _single_mutant_hamiltonians(
self.target_seq_mapped, self.J_ij, self.h_i
)
return self._single_mut_mat_full
@property
def single_mut_mat(self):
"""
Hamiltonian difference for all possible single-site variants
L x num_symbol matrix (np.array) containing delta Hamiltonians
for all possible single mutants of target sequence.
"""
return self.single_mut_mat_full[:, :, FULL]
[docs] def delta_hamiltonian(self, substitutions, verify_mutants=True):
"""
Calculate difference in statistical energy relative to
self.target_seq by changing sequence according to list of
substitutions
Parameters
----------
substitutions : list of tuple(pos, subs_from, subs_to)
Substitutions to be applied to target sequence
verify_mutants : bool, optional
Test if subs_from is consistent with self.target_seq
Returns
-------
np.array
Vector of length 3 with 1) total delta Hamiltonian,
2) delta J_ij, 3) delta h_i
"""
pos = np.empty(len(substitutions), dtype=np.int)
subs = np.empty(len(substitutions), dtype=np.int)
try:
for i, (subs_pos, subs_from, subs_to) in enumerate(substitutions):
pos[i] = self.index_map[subs_pos]
subs[i] = self.alphabet_map[subs_to]
if verify_mutants and subs_from != self.target_seq[pos[i]]:
raise ValueError(
"Inconsistency with target sequence: pos={} target={} subs={}".format(
subs_pos, self.target_seq[i], subs_from
)
)
except KeyError:
raise ValueError(
"Illegal substitution: {}{}{}\nAlphabet: {}\nPositions: {}".format(
subs_from, subs_pos, subs_to, self.alphabet_map, self.index_list
)
)
return _delta_hamiltonian(pos, subs, self.target_seq_mapped, self.J_ij, self.h_i)
@property
def double_mut_mat(self):
"""
Hamiltonian difference for all possible double mutant variants
L x L x num_symbol x num_symbol matrix containing delta Hamiltonians
for all possible double mutants of target sequence
"""
if self._double_mut_mat is None:
self._double_mut_mat = np.zeros(
(self.L, self.L, self.num_symbols, self.num_symbols)
)
seq = self.target_seq_mapped
for i in range(self.L - 1):
for j in range(i + 1, self.L):
self._double_mut_mat[i, j] = (
np.tile(self.single_mut_mat[i], (self.num_symbols, 1)).T +
np.tile(self.single_mut_mat[j], (self.num_symbols, 1)) +
self.J_ij[i, j] -
np.tile(self.J_ij[i, j, :, seq[j]], (self.num_symbols, 1)).T -
np.tile(self.J_ij[i, j, seq[i], :], (self.num_symbols, 1)) +
# we are only interested in difference to WT, so normalize
# for second couplings subtraction with last term
self.J_ij[i, j, seq[i], seq[j]])
self._double_mut_mat[j, i] = self._double_mut_mat[i, j].T
return self._double_mut_mat
[docs] @classmethod
def apc(cls, matrix):
"""
Apply average product correction (Dunn et al., Bioinformatics, 2008)
to matrix
Parameters
----------
matrix : np.array
Symmetric L x L matrix which should be corrected by APC
Returns
-------
np.array
Symmetric L x L matrix with APC correction applied
"""
L = matrix.shape[0]
if L != matrix.shape[1]:
raise ValueError("Input matrix is not symmetric: {}".format(matrix.shape))
col_means = np.mean(matrix, axis=0) * L / (L - 1)
matrix_mean = np.mean(matrix) * L / (L - 1)
apc = np.dot(
col_means.reshape(L, 1), col_means.reshape(1, L)
) / matrix_mean
# subtract APC and blank diagonal entries
corrected_matrix = matrix - apc
corrected_matrix[np.diag_indices(L)] = 0
return corrected_matrix
def _calculate_ecs(self):
"""
Calculates FN and CN scores as defined in Ekeberg et al., Phys Rev E, 2013,
as well as MI scores.
"""
# calculate Frobenius norm for each pair of sites (i, j)
# also calculate mutual information
self._fn_scores = np.zeros((self.L, self.L))
self._mi_scores_raw = np.zeros((self.L, self.L))
# transform couplings into zero-sum gauge
J_ij_0 = _zero_sum_gauge(self.J_ij)
for i in range(self.L - 1):
for j in range(i + 1, self.L):
self._fn_scores[i, j] = np.linalg.norm(J_ij_0[i, j], "fro")
self._fn_scores[j, i] = self._fn_scores[i, j]
# mutual information
p = self.f_ij[i, j]
m = np.dot(self.f_i[i, np.newaxis].T, self.f_i[j, np.newaxis])
self._mi_scores_raw[i, j] = np.sum(p[p > 0] * np.log(p[p > 0] / m[p > 0]))
self._mi_scores_raw[j, i] = self._mi_scores_raw[i, j]
# apply Average Product Correction (Dunn et al., Bioinformatics, 2008)
# subtract APC and blank diagonal entries
self._cn_scores = self.apc(self._fn_scores)
self._mi_scores_apc = self.apc(self._mi_scores_raw)
# create internal dataframe representation
ecs = []
for i in range(self.L - 1):
for j in range(i + 1, self.L):
# if we have custom indeces, cannot compute sequence distance
# easily, unless we use segment information
try:
seqdist = abs(self.index_list[i] - self.index_list[j])
except TypeError:
seqdist = np.nan
ecs.append((
self.index_list[i], self.target_seq[i],
self.index_list[j], self.target_seq[j],
seqdist,
self._mi_scores_raw[i, j], self._mi_scores_apc[i, j],
self._fn_scores[i, j], self._cn_scores[i, j]
))
self._ecs = pd.DataFrame(
ecs, columns=["i", "A_i", "j", "A_j", "seqdist", "mi_raw", "mi_apc", "fn", "cn"]
).sort_values(by="cn", ascending=False)
@property
def cn_scores(self):
"""
L x L numpy matrix with CN (corrected norm) scores
"""
if self._cn_scores is None:
self._calculate_ecs()
return self._cn_scores
@property
def fn_scores(self):
"""
L x L numpy matrix with FN (Frobenius norm) scores
"""
if self._fn_scores is None:
self._calculate_ecs()
return self._fn_scores
@property
def mi_scores_raw(self):
"""
L x L numpy matrix with MI (mutual information) scores
without APC correction
"""
if self._mi_scores_raw is None:
self._calculate_ecs()
return self._mi_scores_raw
@property
def mi_scores_apc(self):
"""
L x L numpy matrix with MI (mutual information) scores
with APC correction
"""
if self._mi_scores_apc is None:
self._calculate_ecs()
return self._mi_scores_apc
@property
def ecs(self):
"""
DataFrame with evolutionary couplings, sorted by CN score
(all scores: CN, FN, MI)
"""
if self._ecs is None:
self._calculate_ecs()
return self._ecs
[docs] def to_independent_model(self):
"""
Estimate parameters of a single-site model using
Gaussian prior/L2 regularization.
Returns
-------
CouplingsModel
Copy of object turned into independent model
"""
from scipy.optimize import fmin_bfgs
def _log_post(x, *args):
"""
Log posterior of single-site model
"""
(fi, lambda_h, N) = args
logZ = np.log(np.exp(x).sum())
return N * (logZ - (fi * x).sum()) + lambda_h * ((x ** 2).sum())
def _gradient(x, *args):
"""
Gradient of single-site model
"""
(fi, lambda_h, N) = args
Z = np.exp(x).sum()
P = np.exp(x) / Z
return N * (P - fi) + lambda_h * 2 * x
h_i = np.zeros((self.L, self.num_symbols))
for i in range(self.L):
x0 = np.zeros(self.num_symbols)
h_i[i] = fmin_bfgs(
_log_post, x0, _gradient,
args=(self.f_i[i], self.lambda_h, self.N_eff),
disp=False
)
c0 = deepcopy(self)
c0.h_i = h_i
c0.J_ij.fill(0)
c0._reset_precomputed()
return c0
# syntactic sugar to access most important member variables in target numbering space
def __map(self, indices, mapping):
"""
Applies a mapping either to a single index, or to a list of indices
Parameters
----------
indices : Iterable, or single item of any type
Items that should be mapped into new space.
Tuples and strings will not be used as an
Iterable, but as single items.
mapping : dict
Mapping into new space
Returns
-------
Iterable, or single item
Items mapped into new space
"""
is_sequence = (
isinstance(indices, Iterable) and
not isinstance(indices, str) and
not isinstance(indices, tuple)
)
if is_sequence:
return np.array(
[mapping[i] for i in indices]
)
else:
return mapping[indices]
def __4d_access(self, matrix, i=None, j=None, A_i=None, A_j=None):
"""
Provides shortcut access to column pair properties
(e.g. J_ij or f_ij matrices)
Parameters
-----------
i : Iterable(int) or int
Position(s) on first matrix axis
j : Iterable(int) or int
Position(s) on second matrix axis
A_i : Iterable(str) or str
Symbols corresponding to first matrix axis
A_j : Iterable(str) or str
Symbols corresponding to second matrix axis
Returns
-------
np.array
4D matrix "matrix" sliced according to values i, j, A_i and A_j
"""
i = self.__map(i, self.index_map) if i is not None else _SLICE
j = self.__map(j, self.index_map) if j is not None else _SLICE
A_i = self.__map(A_i, self.alphabet_map) if A_i is not None else _SLICE
A_j = self.__map(A_j, self.alphabet_map) if A_j is not None else _SLICE
return matrix[i, j, A_i, A_j]
def __2d_access(self, matrix, i=None, A_i=None):
"""
Provides shortcut access to single-column properties
(e.g. f_i or h_i matrices)
Parameters
-----------
i : Iterable(int) or int
Position(s) on first matrix axis
A_i : Iterable(str) or str
Symbols corresponding to first matrix axis
Returns
-------
np.array
2D matrix "matrix" sliced according to values i and A_i
"""
i = self.__map(i, self.index_map) if i is not None else _SLICE
A_i = self.__map(A_i, self.alphabet_map) if A_i is not None else _SLICE
return matrix[i, A_i]
def __2d_access_score_matrix(self, matrix, i=None, j=None):
"""
Provides shortcut access to quadratic 2D matrices
Parameters
-----------
i : Iterable(int) or int
Position(s) on first matrix axis
j : Iterable(int) or int
Position(s) on first matrix axis
Returns
-------
np.array
2D matrix "matrix" sliced according to values i and j
"""
i = self.__map(i, self.index_map) if i is not None else _SLICE
j = self.__map(j, self.index_map) if j is not None else _SLICE
return matrix[i, j]
[docs] def Jij(self, i=None, j=None, A_i=None, A_j=None):
"""
Quick access to J_ij matrix with automatic index mapping.
See __4d_access for explanation of parameters.
"""
return self.__4d_access(self.J_ij, i, j, A_i, A_j)
[docs] def fij(self, i=None, j=None, A_i=None, A_j=None):
"""
Quick access to f_ij matrix with automatic index mapping.
See __4d_access for explanation of parameters.
"""
return self.__4d_access(self.f_ij, i, j, A_i, A_j)
[docs] def hi(self, i=None, A_i=None):
"""
Quick access to h_i matrix with automatic index mapping.
See __2d_access for explanation of parameters.
"""
return self.__2d_access(self.h_i, i, A_i)
[docs] def fi(self, i=None, A_i=None):
"""
Quick access to f_i matrix with automatic index mapping.
See __2d_access for explanation of parameters.
"""
return self.__2d_access(self.f_i, i, A_i)
[docs] def cn(self, i=None, j=None):
"""
Quick access to cn_scores matrix with automatic index mapping.
See __2d_access_score_matrix for explanation of parameters.
"""
return self.__2d_access_score_matrix(self.cn_scores, i, j)
[docs] def fn(self, i=None, j=None):
"""
Quick access to fn_scores matrix with automatic index mapping.
See __2d_access_score_matrix for explanation of parameters.
"""
return self.__2d_access_score_matrix(self.fn_scores, i, j)
[docs] def mi_apc(self, i=None, j=None):
"""
Quick access to mi_scores_apc matrix with automatic index mapping.
See __2d_access_score_matrix for explanation of parameters.
"""
return self.__2d_access_score_matrix(self.mi_scores_apc, i, j)
[docs] def mi_raw(self, i=None, j=None):
"""
Quick access to mi_scores_raw matrix with automatic index mapping.
See __2d_access_score_matrix for explanation of parameters.
"""
return self.__2d_access_score_matrix(self.mi_scores_raw, i, j)
[docs] def mn(self, i=None):
"""
Map model numbering to internal numbering
Parameters
----------
i : Iterable(int) or int
Position(s) to be mapped from model numbering space
into internal numbering space
Returns
-------
Iterable(int) or int
Remapped position(s)
"""
if i is None:
return np.array(sorted(self.index_map.values()))
else:
return self.__map(i, self.index_map)
[docs] def mui(self, i=None):
"""
Legacy method for backwards compatibility. See self.mn for explanation.
"""
return self.mn(i)
[docs] def sn(self, i=None):
"""
Map internal numbering to sequence numbering
Parameters
----------
i : Iterable(int) or int
Position(s) to be mapped from internal numbering space
into sequence numbering space.
Returns
-------
Iterable(int) or int
Remapped position(s)
"""
if i is None:
return np.array(self.index_list)
else:
return self.__map(i, self.index_list)
[docs] def itu(self, i=None):
"""
Legacy method for backwards compatibility. See self.sn for explanation.
"""
return self.sn(i)
[docs] def seq(self, i=None):
"""
Access target sequence of model
Parameters
----------
i : Iterable(int) or int
Position(s) for which symbol should be retrieved
Returns
-------
Iterable(char) or char
Sequence symbols
"""
if i is None:
return self.target_seq
else:
i = self.__map(i, self.index_map)
return self.__map(i, self.target_seq)
[docs] def smm(self, i=None, A_i=None):
"""
Access delta_Hamiltonian matrix of single mutants of target sequence
Parameters
----------
i : Iterable(int) or int
Position(s) for which energy difference should be retrieved
A_i : Iterable(char) or char
Substitutions for which energy difference should be retrieved
Returns
-------
np.array(float)
2D matrix containing energy differences for slices along both
axes of single mutation matrix (first axis: position, second
axis: substitution).
"""
return self.__2d_access(self.single_mut_mat, i, A_i)
[docs] def dmm(self, i=None, j=None, A_i=None, A_j=None):
"""
Access delta_Hamiltonian matrix of double mutants of target sequence
Parameters
----------
i : Iterable(int) or int
Position(s) of first substitution(s)
j : Iterable(int) or int
Position(s) of second substitution(s)
A_i : Iterable(char) or char
Substitution(s) to first position
A_j : Iterable(char) or char
Substitution(s) to second position
Returns
-------
np.array(float)
4D matrix containing energy differences for slices along both
axes of double mutation matrix (axes 1/2: position, axis 3/4:
substitutions).
"""
return self.__4d_access(self.double_mut_mat, i, j, A_i, A_j)
[docs] def to_file(self, out_file, precision="float32", file_format="plmc_v2"):
"""
Writes the potentially modified model again to binary file
Parameters
----------
out_file: str
A string specifying the path to a file
precision: {"float16", "float32", "float64"}, optional (default: "float32")
Numerical NumPy data type specifying the precision
used to write numerical values to file
file_format : {"plmc_v1", "plmc_v2"}, optional (default: "plmc_v2")
Available file formats
"""
new = file_format.lower() == "plmc_v2"
with open(out_file, "wb") as f:
np.array([self.L, self.num_symbols], dtype="int32").tofile(f)
if new:
np.array([self.N_valid,
self.N_invalid,
self.num_iter],
dtype="int32").tofile(f)
np.array([self.theta,
self.lambda_h,
self.lambda_J,
self.lambda_group,
self.N_eff],
dtype=precision).tofile(f)
self.alphabet.dtype = "S1"
self.alphabet[np.where(self.alphabet != b"")].tofile(f)
self.alphabet.dtype = "U1"
self.weights.astype(precision).tofile(f)
self.target_seq.dtype = "S1"
self.target_seq[np.where(self.target_seq != b"")].tofile(f)
self.target_seq.dtype = "U1"
self.index_list.astype("int32").tofile(f)
self.f_i.astype(precision).tofile(f)
self.h_i.astype(precision).tofile(f)
if not new:
for i in range(self.L - 1):
for j in range(i + 1, self.L):
np.array([i + 1, j + 1], dtype="int32").tofile(f)
self.f_ij[i, j].astype(precision).tofile(f)
self.J_ij[i, j].astype(precision).tofile(f)
else:
for i in range(self.L - 1):
for j in range(i + 1, self.L):
self.f_ij[i, j].astype(precision).tofile(f)
for i in range(self.L - 1):
for j in range(i + 1, self.L):
self.J_ij[i, j].astype(precision).tofile(f)