Source code for lmpy.statistics.mcpa

"""Module for performing a MetaCommunity Phylogenetics Analysis.

See:
    Leibold, M.A., E.P. Economo and P.R. Peres-Neto. 2010. Metacommunity
         phylogenetics: separating the roles of environmental filters and
         historical biogeography. Ecology letters 13: 1290-1299.
"""
from concurrent.futures import ThreadPoolExecutor as ExecutorClass
from functools import partial
import threading

import numpy as np

from lmpy import Matrix

# Note: This lock is used when calculating beta to keep memory usage down
[docs]lock = threading.Lock()
# Note: This will depend on the executor class used and is subject to tuning. # Threads should have a higher level of concurrency than processes.
[docs]CONCURRENCY_FACTOR = 5
# .............................................................................
[docs]def _beta_helper(mtx1, mtx2, weights): """This helper function avoids creating large temporary matrices. Args: mtx1 (Matrix): A (n [sites] by i [predictors]) standardized matrix. mtx2 (Matrix): A (n [sites] by i [predictors]) standardized matrix. weights (Matrix): A (n [sites]) array of site weights. Returns: Matrix: The beta matrix created by the helper. """ _, num_predictors = mtx1.shape _, num_k = mtx2.shape out_mtx = np.empty((num_predictors, num_k)) for i in range(num_predictors): for j in range(i, num_k): out_mtx[i, j] = np.sum(mtx1[:, i] * weights * mtx2[:, j]) return out_mtx
# .............................................................................
[docs]def _calculate_beta(pred_std, weights, phylo_std, use_lock=True): """Calculates the regression model (beta) for the provided inputs. Args: pred_std (Matrix): A standardized predictor matrix (n [sites] by i [predictors]). weights (Matrix): A matrix of site weights (n by n). phylo_std (Matrix): A standardized phylo matrix (n by k [nodes]). use_lock (boolean): If true, use a lock when performing the computation to save on overall memory usage. This is useful for large, or full, predictor matrices but can be skipped for single columns to improve performance. Note: * The computation is:: beta = (M_T.W.M)^-1.M_T.W.P * M is the predictor matrix * M_T is the transverse of the predictor matrix * W is the weights column * P is the phylo matrix * "^-1" is the inverse of the matrix * Locking is available to prevent many threads / subprocesses from performing memory intensive computations concurrently and overwhelming the system. Todo: * Update documentation so that note shows symbols in equation Returns: Matrix: An (i by k) numpy ndarray, where i is the number of predictors in pred_std and k is the number of nodes in phylo_std. """ if use_lock: # pragma: no cover lock.acquire() temp1 = _beta_helper(pred_std, pred_std, weights) tmp1_inv = np.linalg.inv(temp1) temp2 = _beta_helper(pred_std, phylo_std, weights) beta = tmp1_inv.dot(temp2) if len(beta.shape) == 1: beta = beta.reshape((beta.shape[0], 1)) # pragma: no cover if use_lock: # pragma: no cover lock.release() return beta
# .............................................................................
[docs]def _calculate_r_squared(y_hat, phylo_std): """Calculates the R-squared value for the inputs. Args: y_hat (Matrix): A predicted value for a regression (n [sites] by k [nodes]). phylo_std (Matrix): A standardized phylo matrix (n [sites] by k [nodes]). Note: * R^2 = trace(y_hat . y_hat) / trace(P . P_T) * y_hat_T is the transverse of y_hat * P_T is the transverse of phylo_std Returns: Matrix: R squared value for the inputs. """ # t1 = np.trace(y_hat.dot(y_hat.T)) t_1 = _trace_mtx_by_transverse(y_hat) # t2 = np.trace(phylo_std.dot(phylo_std.T)) t_2 = _trace_mtx_by_transverse(phylo_std) r_2 = t_1 / t_2 # r2 = np.trace( # y_hat.T.dot(y_hat)) / np.trace(phylo_std.T.dot(phylo_std.T)) return r_2
# .............................................................................
[docs]def _calculate_y_hat(pred_std, beta): """Calculates the predicted value for a regression (Y hat). Args: pred_std (Matrix): A standardized predictor matrix (n [sites] by i [predictors]). beta (Matrix): The regression model associated with this prediction (i by k [nodes]). Returns: Matrix: An (n [sites] by k [nodes]) numpy ndarray """ return pred_std.dot(beta)
# .............................................................................
[docs]def _mcpa_for_node(incidence_mtx, env_mtx, bg_mtx, phylo_col, use_locks=False): """Runs MCPA computations for a single tree node. Args: incidence_mtx (Matrix): An incidence matrix (PAM). env_mtx (Matrix): An environmental matrix (GRIM). bg_mtx (Matrix): A matrix of encoded Biogeographic hypotheses. phylo_col (Matrix): A column from the phylo matrix for a single node. use_locks (boolean): Indicator if locks are needed for larger computations. This is probably only true for parallel runs. Returns: tuple: Tuple of observed Matrix, f-values Matrix """ species_present_at_node = np.where(phylo_col != 0)[0] phylo_col = phylo_col[species_present_at_node, :] # Purge incidence matrix to only those species present for this node incidence_temp = incidence_mtx[:, species_present_at_node] # Get the sites present for this node and purge the other matrices sites_present = np.where(incidence_temp.sum(axis=1) > 0) incidence = incidence_temp[sites_present] env_predictors = env_mtx[sites_present] bg_predictors = bg_mtx[sites_present] # Get the number of sites and check that all matrices have been purged num_sites = sites_present[0].size num_env_predictors = env_predictors.shape[1] num_bg_predictors = bg_predictors.shape[1] obs_values = np.zeros((1, num_env_predictors + num_bg_predictors + 2)) f_values = np.zeros((1, num_env_predictors + num_bg_predictors + 2)) try: if len(sites_present) > 0: # Standardize matrices site_weights = np.sum(incidence, axis=1) species_weights = np.sum(incidence, axis=0) e_std = _standardize_matrix(env_predictors, site_weights) b_std = _standardize_matrix(bg_predictors, site_weights) all_std = np.concatenate([b_std, e_std], axis=1) p_std = _standardize_matrix(phylo_col, species_weights) p_sigma_std = np.dot(incidence, p_std) # Get Beta, Y(hat), Rho, R-squared, F-pseudo beta_env_all = _calculate_beta( e_std, site_weights, p_sigma_std, use_lock=use_locks ) y_hat_env_all = _calculate_y_hat(e_std, beta_env_all) beta_bg_all = _calculate_beta( all_std, site_weights, p_sigma_std, use_lock=use_locks ) y_hat_bg_all = _calculate_y_hat(all_std, beta_bg_all) env_r2 = _calculate_r_squared(y_hat_env_all, p_sigma_std) bg_r2 = _calculate_r_squared(y_hat_bg_all, p_sigma_std) try: env_adj_r2 = 1.0 - ( (num_sites - 1.0) / (num_sites - num_env_predictors - 1.0) ) * (1.0 - env_r2) except ZeroDivisionError: # pragma: no cover env_adj_r2 = 0.0 try: bg_adj_r2 = 1.0 - ( (num_sites - 1.0) / (num_sites - num_bg_predictors - 1.0) ) * (1.0 - bg_r2) except ZeroDivisionError: # pragma: no cover bg_adj_r2 = 0.0 # env_f_pseudo_numerator = np.trace( # y_hat_env_all.T.dot(y_hat_env_all)) env_f_pseudo_numerator = _trace_mtx_by_transverse(y_hat_env_all.T) # bg_f_pseudo_numerator = np.trace( # y_hat_bg_all.T.dot(y_hat_bg_all)) bg_f_pseudo_numerator = _trace_mtx_by_transverse(y_hat_bg_all.T) # Pre-calculate the denominator for the F-pseudo stats env_denom_temp = p_sigma_std.T - y_hat_env_all.T # env_f_pseudo_denominator = np.trace( # env_denom_temp.T.dot(env_denom_temp)) env_f_pseudo_denominator = _trace_mtx_by_transverse(env_denom_temp.T) bg_denom_temp = p_sigma_std.T - y_hat_bg_all.T # bg_f_pseudo_denominator = np.trace( # bg_denom_temp.T.dot(bg_denom_temp)) bg_f_pseudo_denominator = _trace_mtx_by_transverse(bg_denom_temp.T) idx = 0 # Environment # For each environmental predictor, compute the semi-partial # correlation and the F-pseudo value for i in range(num_env_predictors): wo_predictor = np.delete(e_std, i, axis=1) predictor = e_std[:, [i]] # Semi-partial correlation beta_wo_pred = _calculate_beta( wo_predictor, site_weights, p_sigma_std, use_lock=use_locks ) y_hat_wo_pred = _calculate_y_hat(wo_predictor, beta_wo_pred) beta_j_i = _calculate_beta( predictor, site_weights, p_sigma_std, use_lock=False ) r2_j_i = _calculate_r_squared(y_hat_wo_pred, p_sigma_std) semi_partial = beta_j_i * np.sqrt(env_r2 - r2_j_i) / np.abs(beta_j_i) f_pseudo_env_i = (env_r2 - r2_j_i) / env_f_pseudo_denominator obs_values[0, idx] = semi_partial f_values[0, idx] = f_pseudo_env_i idx += 1 # Add Environment adjusted R squared obs_values[0, idx] = env_adj_r2 f_values[0, idx] = env_f_pseudo_numerator / env_f_pseudo_denominator idx += 1 # Biogeography for i in range(num_bg_predictors): wo_predictor = np.delete(all_std, i, axis=1) predictor = all_std[:, [i]] # Semi-partial correlation beta_wo_pred = _calculate_beta( wo_predictor, site_weights, p_sigma_std, use_lock=use_locks ) y_hat_wo_pred = _calculate_y_hat(wo_predictor, beta_wo_pred) beta_j_i = _calculate_beta( predictor, site_weights, p_sigma_std, use_lock=False ) r2_j_i = _calculate_r_squared(y_hat_wo_pred, p_sigma_std) semi_partial = beta_j_i * np.sqrt(bg_r2 - r2_j_i) / np.abs(beta_j_i) f_pseudo_bg_i = (bg_r2 - r2_j_i) / bg_f_pseudo_denominator obs_values[0, idx] = semi_partial f_values[0, idx] = f_pseudo_bg_i idx += 1 # Add Biogeography adjusted R squared obs_values[0, idx] = bg_adj_r2 f_values[0, idx] = bg_f_pseudo_numerator / bg_f_pseudo_denominator except np.linalg.linalg.LinAlgError: # Singular matrix that does not have inverse pass return (obs_values, f_values)
# .............................................................................
[docs]def _standardize_matrix(mtx, weights): """Standardizes a phylogenetic or predictor matrix. Args: mtx (Matrix): The matrix to standardize weights (Matrix): A one-dimensional array of sums to use for standardization. Note: * Formula for standardization :: Mstd = M - 1c.1r.W.M(1/trace(W)) ./ 1c(1r.W(M*M) - ((1r.W.M)*(1r.W.M)(1/trace(W))(1/trace(W)-1))^0.5 * M - Matrix to be standardized * W - A k by k diagonal matrix of weights, where each non-zero value is the column or row sum (depending on the M) for an incidence matrix. * 1r - A row of k ones * 1c - A column of k ones * trace - Returns the sum of the input matrix * "./" indicates Hadamard division * "*" indicates Hadamard multiplication * Code adopted from supplemental material MATLAB code See: Literature supplemental materials Returns: Matrix: Standardized matrix. """ # Create a row of ones, we'll transpose for a column ones = np.ones((1, weights.shape[0]), dtype=float) # This maps to trace(W) total_sum = np.sum(weights) # s1 = 1r.W.M s_1 = (ones * weights).dot(mtx) # s2 = 1r.W.(M*M) s_2 = (ones * weights).dot(mtx * mtx) mean_weighted = s_1 / total_sum std_dev_weighted = ((s_2 - (s_1**2.0 / total_sum)) / (total_sum)) ** 0.5 # Fixes any invalid values created previously tmp = np.nan_to_num(ones.T.dot(std_dev_weighted) ** -1.0) std_mtx = tmp * (mtx - ones.T.dot(mean_weighted)) return std_mtx
# .............................................................................
[docs]def _trace_mtx_by_transverse(mtx): """Prevent a memory bomb by performing trace(mtx . mtx.T) in a smarter way. This method takes advantage of the fact that we are really only interested in the diagonal matrix created by performing a dot product of a matrix and its transverse. Because of that, we can perform the dot product of each row by its transverse and then sum the results. Args: mtx (Matrix): A matrix to use to calculate the trace(M . M_T). Note: * There was an error in the formula that called for a dot product of each matrix transverse with itself. This was a typo as that would only work with square matrices. Returns: Matrix: Trace matrix dot matrix transverse. """ return np.sum([row.dot(row.T) for row in mtx])
# .............................................................................
[docs]def get_p_values(observed_value, test_values, num_permutations=None): """Gets an array of P-Values. Gets an (1 or 2 dimension) array of P values where the P value for an array location is determined by finding the number of test values at corresponding locations are greater than or equal to that value and then dividing that number by the number of permutations Args: observed_value (Matrix): An array of observed values to use as a reference. test_values (Matrix): A list of arrays generated from randomizations that will be compared to the observed num_permutations: (optional) The total number of randomizations performed. Divide the P-values by this if provided. Todo: Deprecate this in favor of method in running stats Returns: Matrix: Calculated p-values. """ if num_permutations is None: num_permutations = 1.0 # Create the P-Values matrix p_vals = np.zeros(observed_value.shape, dtype=float) # For each matrix in test values for test_mtx in test_values: # Add 1 where every value in the test matrix is greater than or equal # to the value in the observed value. Numpy comparisons will create # a matrix of boolean values for each cell, which when added to the # p_vals matrix will be treated as 1 for True and 0 for False # If this is a stack if test_mtx.ndim == 3: for i in range(test_mtx.shape[2]): p_vals += np.abs(np.round(test_mtx[:, :, i], 5)) >= np.abs( np.round(observed_value, 5) ) else: p_vals += np.abs(np.round(test_mtx, 5)) >= np.abs( np.round(observed_value, 5) ) # Reshape and adding depth header if len(p_vals.shape) == 2: p_vals = np.expand_dims(p_vals, axis=2) headers = observed_value.headers headers['2'] = ['P-values'] # Scale and return the p-values matrix ret_data_tmp = np.nan_to_num(p_vals / num_permutations) return Matrix(np.clip(ret_data_tmp, -1.0, 1.0), headers=headers)
# .............................................................................
[docs]def mcpa(incidence_matrix, phylo_mtx, env_mtx, bg_mtx): """Runs MCPA for a set of matrices. Args: incidence_matrix (Matrix): A binary Lifemapper Matrix object representing the incidence of each species for each site by coding them as ones. This is the same thing as a Lifemapper Presence Absence Matrix, or PAM (n [sites] by k+1 [species]). phylo_mtx (Matrix): A matrix encoding of a phylogenetic tree where each cell represents the relative contribution of each tip to each inner tree node (k+1 [species] by k [nodes]). env_mtx (Matrix): A matrix encoding of the environment for each site (n [sites] by ei [environmental predictors]). bg_mtx (Matirx): A matrix of Helmert contrasts (-1, 0, 1) for Biogeographic hypotheses (n [sites] by bi [biogeographic predictors]). Returns: tuple: Tuple of Matrix of observed values and Matrix of F-pseudo values. """ site_present = np.any(incidence_matrix, axis=1) empty_sites = np.where(site_present == 0)[0] # Initial purge of empty sites init_incidence = np.delete(incidence_matrix, empty_sites, axis=0) env_predictors = np.delete(env_mtx, empty_sites, axis=0) bg_predictors = np.delete(bg_mtx, empty_sites, axis=0) num_nodes = phylo_mtx.shape[1] num_predictors = env_predictors.shape[1] + bg_predictors.shape[1] obs_results = np.empty((num_nodes, num_predictors + 2)) f_results = np.empty((num_nodes, num_predictors + 2)) for i in range(num_nodes): # print('Node {} of {}'.format(i+1, num_nodes)) obs, f_vals = _mcpa_for_node( init_incidence, env_predictors, bg_predictors, phylo_mtx[:, [i]] ) obs_results[i] = obs f_results[i] = f_vals print('Processed mcpa for {} of {} nodes'.format(i + 1, num_nodes)) # Correct any nans and add depth obs_results = np.clip(np.expand_dims(np.nan_to_num(obs_results), axis=2), -1.0, 1.0) f_results = np.clip(np.expand_dims(np.nan_to_num(f_results), axis=2), -1.0, 1.0) column_headers = env_mtx.get_column_headers() column_headers.append('Env - Adjusted R-squared') column_headers.extend(bg_mtx.get_column_headers()) column_headers.append('BG - Adjusted R-squared') obs_headers = { '0': phylo_mtx.get_column_headers(), '1': column_headers, '2': ['Observed'], } f_headers = { '0': phylo_mtx.get_column_headers(), '1': column_headers, '2': ['F-values'], } obs_mtx = Matrix(obs_results, headers=obs_headers) f_mtx = Matrix(f_results, headers=f_headers) return (obs_mtx, f_mtx)
# .............................................................................
[docs]def mcpa_parallel(incidence_matrix, phylo_mtx, env_mtx, bg_mtx): # pragma: no cover """Run MCPA for a set of matrices using parallelism. Performs MCPA across each of the tree nodes in parallel. Args: incidence_matrix (Matrix): A binary Lifemapper Matrix object representing the incidence of each species for each site by coding them as ones. This is the same thing as a Lifemapper Presence Absence Matrix, or PAM (n [sites] by k+1 [species]). phylo_mtx (Matrix): A matrix encoding of a phylogenetic tree where each cell represents the relative contribution of each tip to each inner tree node (k+1 [species] by k [nodes]). env_mtx (Matrix): A matrix encoding of the environment for each site (n [sites] by ei [environmental predictors]). bg_mtx (Matirx): A matrix of Helmert contrasts (-1, 0, 1) for Biogeographic hypotheses (n [sites] by bi [biogeographic predictors]). Returns: tuple: Tuple of Matrix of observed values and Matrix of F-pseudo values. """ site_present = np.any(incidence_matrix, axis=1) empty_sites = np.where(site_present == 0)[0] # Initial purge of empty sites init_incidence = np.delete(incidence_matrix, empty_sites, axis=0) env_predictors = np.delete(env_mtx, empty_sites, axis=0) bg_predictors = np.delete(bg_mtx, empty_sites, axis=0) num_nodes = phylo_mtx.shape[1] num_predictors = env_predictors.shape[1] + bg_predictors.shape[1] obs_results = np.empty((num_nodes, num_predictors + 2)) f_results = np.empty((num_nodes, num_predictors + 2)) func = partial( _mcpa_for_node, init_incidence, env_predictors, bg_predictors, use_locks=True ) # Use an Executor to parallelize the computations over each tree node # Note: The executor class is determined at the module level, so see top of # module for more information about executor class and concurrency with ExecutorClass(CONCURRENCY_FACTOR) as executor: comp_run = executor.map(func, [phylo_mtx[:, [i]] for i in range(num_nodes)]) i = 0 for obs, f_val in comp_run: obs_results[i] = obs f_results[i] = f_val # Correct any nans obs_results = np.clip(np.nan_to_num(obs_results), -1.0, 1.0) f_results = np.clip(np.nan_to_num(f_results), -1.0, 1.0) column_headers = env_mtx.get_column_headers() column_headers.append('Env - Adjusted R-squared') column_headers.extend(bg_mtx.get_column_headers()) column_headers.append('BG - Adjusted R-squared') obs_headers = { '0': phylo_mtx.get_column_headers(), '1': column_headers, '2': ['Observed'], } f_headers = { '0': phylo_mtx.get_column_headers(), '1': column_headers, '2': ['F-values'], } obs_mtx = Matrix(obs_results, headers=obs_headers) f_mtx = Matrix(f_results, headers=f_headers) return (obs_mtx, f_mtx)