Source code for lmpy.statistics.pam_stats

"""Module containing base PAM statistic functionality."""
import numpy as np
from lmpy import Matrix


# .............................................................................
# Metric Decorators
# .............................................................................
[docs]class PamStatsMetric: """Base class for PAM statistics metrics. This wrapper is used to classify each statistic so that the arguments it needs can be inferred. """ # ......................... def __init__(self, func): """Constructor. Args: func: The function to call. """ self.func = func self.__doc__ = self.func.__doc__ # .........................
[docs] def __call__(self, *args, **kwargs): """Call the wrapped function. Args: *args: Positional arguments passed to the function. **kwargs: Keyword arguments passed to the function. Returns: object: The output of the wrapped function. """ # Do anything needed before calling the function ret = self.func(*args, **kwargs) # Do anything needed after the function return ret
# .............................................................................
[docs]class CovarianceMatrixMetric(PamStatsMetric): """A metric that produces a covariance matrix."""
# .............................................................................
[docs]class DiversityMetric(PamStatsMetric): """A single-value diversity metric."""
# ............................................................................. class _SiteStatMetric(PamStatsMetric): """A site-base statistic (do not use directly).""" # ............................................................................. class _SpeciesStatMetric(PamStatsMetric): """A species-based statistic (do not use directly).""" # .............................................................................
[docs]class SpeciesMatrixMetric(_SpeciesStatMetric): """A species-based statistic that is generated from a PAM."""
# .............................................................................
[docs]class SiteMatrixMetric(_SiteStatMetric): """A site-based metric that takes the PAM as input."""
# .............................................................................
[docs]class PamDistMatrixMetric(_SiteStatMetric): """A site-based metric computed from a PAM and Tree."""
# .............................................................................
[docs]class TreeMetric(_SiteStatMetric): """A site-based metric that takes a Dendropy tree as an argument."""
# .............................................................................
[docs]class TreeDistanceMatrixMetric(_SiteStatMetric): """A site-based metric that takes a distance matrix as an argument."""
# ............................................................................. # Site-based statistics # ............................................................................. @SiteMatrixMetric
[docs]def alpha(pam): """Calculate alpha diversity, the number of species in each site. Args: pam (Matrix): The presence-absence matrix to use for the computation. Returns: Matrix: A column of alpha diversity values for each site in the PAM. """ return pam.sum(axis=1)
# ............................................................................. @SiteMatrixMetric
[docs]def alpha_proportional(pam): """Calculate proportional alpha diversity. Args: pam (Matrix): The presence-absence matrix to use for the computation. Returns: Matrix: A column of proportional alpha diversity values for each site in the PAM. """ return pam.sum(axis=1).astype(float) / num_species(pam)
# ............................................................................. @SiteMatrixMetric
[docs]def phi(pam): """Calculate phi, the range size per site. Args: pam (Matrix): The presence-absence matrix to use for the computation. Returns: Matrix: A column of the sum of the range sizes for the species present at each site in the PAM. """ return pam.dot(pam.sum(axis=0))
# ............................................................................. @SiteMatrixMetric
[docs]def phi_average_proportional(pam): """Calculate proportional range size per site. Args: pam (Matrix): The presence-absence matrix to use for the computation. Returns: Matrix: A column of the proportional value of the sum of the range sizes for the species present at each site in the PAM. """ return pam.dot(omega(pam)).astype(float) / (num_sites(pam) * alpha(pam))
# ............................................................................. # Species metrics # ............................................................................. @SpeciesMatrixMetric
[docs]def omega(pam): """Calculate the range size per species. Args: pam (Matrix): The presence-absence matrix to use for the computation. Returns: Matrix: A row of range sizes for each species in the PAM. """ return pam.sum(axis=0)
# ............................................................................. @SpeciesMatrixMetric
[docs]def omega_proportional(pam): """Calculate the mean proportional range size of each species. Args: pam (Matrix): The presence-absence matrix to use for the computation. Returns: Matrix: A row of the proportional range sizes for each species in the PAM. """ return pam.sum(axis=0).astype(float) / num_sites(pam)
# ............................................................................. @SpeciesMatrixMetric
[docs]def psi(pam): """Calculate the range richness of each species. Args: pam (Matrix): The presence-absence matrix to use for the computation. Returns: Matrix: A row of range richness for the sites that each species is present in. """ return pam.sum(axis=1).dot(pam)
# ............................................................................. @SpeciesMatrixMetric
[docs]def psi_average_proportional(pam): """Calculate the mean proportional species diversity. Args: pam (Matrix): The presence-absence matrix to use for the computation. Returns: Matrix: A row of proportional range richness for the sites that each species in the PAM is present. """ return alpha(pam).dot(pam).astype(float) / (num_species(pam) * omega(pam))
# ............................................................................. # Diversity metrics # ............................................................................. @DiversityMetric
[docs]def schluter_species_variance_ratio(pam): """Calculate Schluter's species variance ratio. Args: pam (Matrix): The presence-absence matrix to use for the computation. Returns: float: The Schluter species variance ratio for the PAM. """ sigma_species_ = sigma_species(pam) return float(sigma_species_.sum() / sigma_species_.trace())
# ............................................................................. @DiversityMetric
[docs]def schluter_site_variance_ratio(pam): """Calculate Schluter's site variance ratio. Args: pam (Matrix): The presence-absence matrix to use for the computation. Returns: float: The Schluter site variance ratio for the PAM. """ sigma_sites_ = sigma_sites(pam) return float(sigma_sites_.sum() / sigma_sites_.trace())
# ............................................................................. @DiversityMetric
[docs]def num_sites(pam): """Get the number of sites with presences. Args: pam (Matrix): The presence-absence matrix to use for the computation. Returns: int: The number of sites that have present species. """ return int(np.sum(np.any(pam, axis=1)))
# ............................................................................. @DiversityMetric
[docs]def num_species(pam): """Get the number of species with presences. Args: pam (Matrix): The presence-absence matrix to use for the computation. Returns: int: The number of species that are present in at least one site. """ return int(np.sum(np.any(pam, axis=0)))
# ............................................................................. @DiversityMetric
[docs]def whittaker(pam): """Calculate Whittaker's beta diversity metric for a PAM. Args: pam (Matrix): The presence-absence matrix to use for the computation. Returns: float: Whittaker's beta diversity for the PAM. """ return float(num_species(pam) / omega_proportional(pam).sum())
# ............................................................................. @DiversityMetric
[docs]def lande(pam): """Calculate Lande's beta diversity metric for a PAM. Args: pam (Matrix): The presence-absence matrix to use for the computation. Returns: float: Lande's beta diversity for the PAM. """ return float( num_species(pam) - (pam.sum(axis=0).astype(float) / num_sites(pam)).sum() )
# ............................................................................. @DiversityMetric
[docs]def legendre(pam): """Calculate Legendre's beta diversity metric for a PAM. Args: pam (Matrix): The presence-absence matrix to use for the computation. Returns: float: Legendre's beta diversity for the PAM. """ return float(omega(pam).sum() - (float((omega(pam) ** 2).sum()) / num_sites(pam)))
# ............................................................................. @DiversityMetric
[docs]def c_score(pam): """Calculate the checker board score for the PAM. Args: pam (Matrix): The presence-absence matrix to use for the computation. Returns: float: The checkerboard score for the PAM. """ temp = 0.0 # Cache these so we don't recompute omega_ = omega(pam) # Cache so we don't waste computations num_species_ = num_species(pam) for i in range(num_species_): for j in range(i, num_species_): num_shared = len(np.where(np.sum(pam[:, [i, j]], axis=1) == 2)[0]) p_1 = omega_[i] - num_shared p_2 = omega_[j] - num_shared temp += p_1 * p_2 return 2 * temp / (num_species_ * (num_species_ - 1))
# ............................................................................. # Covariance metrics # ............................................................................. @CovarianceMatrixMetric
[docs]def sigma_sites(pam): """Compute the site sigma metric for a PAM. Args: pam (Matrix): The presence-absence matrix to use for the computation. Returns: Matrix: Matrix of covariance of composition of sites. """ site_by_site = pam.dot(pam.T).astype(float) alpha_prop = alpha_proportional(pam) return (site_by_site / num_species(pam)) - np.outer(alpha_prop, alpha_prop)
# ............................................................................. @CovarianceMatrixMetric
[docs]def sigma_species(pam): """Compute the species sigma metric for a PAM. Args: pam (Matrix): The presence-absence matrix to use for the computation. Returns: Matrix: Matrix of covariance of composition of species. """ species_by_site = pam.T.dot(pam).astype(float) omega_prop = omega_proportional(pam) return (species_by_site / num_sites(pam)) - np.outer(omega_prop, omega_prop)
# ............................................................................. # Tree distance matrix metrics # ............................................................................. @TreeDistanceMatrixMetric
[docs]def mean_nearest_taxon_distance(phylo_dist_mtx): """Calculates the nearest neighbor distance. Args: phylo_dist_mtx (Matrix): A matrix of distances between the species present at a site. Returns: float: The average distance from each taxa to taxa nearest to it. """ try: nearest_total = np.sum([np.min(row[row > 0.0]) for row in phylo_dist_mtx]) return float(nearest_total / phylo_dist_mtx.shape[0]) except Exception: # pragma: no cover return 0.0
# ............................................................................. @TreeDistanceMatrixMetric
[docs]def mean_pairwise_distance(phylo_dist_mtx): """Calculates mean pairwise distance between species present at a site. Args: phylo_dist_mtx (Matrix): A matrix of distances between the species present at a site. Returns: float: The average distance from each taxa all of the other co-located taxa. """ num_sp = phylo_dist_mtx.shape[0] return float( (phylo_dist_mtx.sum() - phylo_dist_mtx.trace()) / (num_sp * (num_sp - 1)) )
# ............................................................................. @TreeDistanceMatrixMetric
[docs]def sum_pairwise_distance(phylo_dist_mtx): """Calculates the sum pairwise distance for all species present at a site. Args: phylo_dist_mtx (Matrix): A matrix of distances between the species present at a site. Returns: float: The total distance from each taxa all of the other co-located taxa. """ return float((phylo_dist_mtx.sum() - phylo_dist_mtx.trace()) / 2.0)
# ............................................................................. @PamDistMatrixMetric
[docs]def pearson_correlation(pam, phylo_dist_mtx): """Calculates the Pearson correlation coefficient for each site. Args: pam (Matrix): A presence-absence matrix to use for the computation. phylo_dist_mtx (Matrix): A matrix of distance between species. Returns: Matrix: A column of Pearson correlation values for each site in a PAM. """ num_sites_ = pam.shape[0] pearson = np.zeros((num_sites_, 1), dtype=float) for site_idx in range(num_sites_): sp_idxs = np.where(pam[site_idx] == 1)[0] num_sp = len(sp_idxs) num_pairs = num_sp * (num_sp - 1) / 2 if num_pairs >= 2: # Need at least 2 pairs pair_dist = [] pair_sites_shared = [] for i in range(num_sp - 1): for j in range(i + 1, num_sp): pair_dist.append(phylo_dist_mtx[i, j]) pair_sites_shared.append(pam[:, i].dot(pam[:, j])) # X : Pair distance # Y : Pair sites shared x_val = np.array(pair_dist) y_val = np.array(pair_sites_shared) sum_xy = np.sum(x_val * y_val) sum_x = np.sum(x_val) sum_y = np.sum(y_val) sum_x_sq = np.sum(x_val**2) sum_y_sq = np.sum(y_val**2) # Pearson p_num = sum_xy - sum_x * sum_y / num_pairs p_denom = np.sqrt( (sum_x_sq - (sum_x**2 / num_pairs)) * (sum_y_sq - (sum_y**2 / num_pairs)) ) pearson[site_idx, 0] = p_num / p_denom return pearson
# ............................................................................. @TreeMetric
[docs]def phylogenetic_diversity(tree): """Calculate phylogenetic diversity of a tree. Args: tree (TreeWrapper): A phylogenetic tree to compute phylogenetic diversity for. Returns: float: The sum of the edge lengths of the nodes of the provided tree. """ try: return np.sum([node.edge_length for node in tree.nodes()]) except Exception: # pragma: no cover return 0.0
# .............................................................................
[docs]class PamStats: """Class for managing metric computation for PAM statistics."""
[docs] covariance_stats = [('sigma sites', sigma_sites), ('sigma species', sigma_species)]
[docs] diversity_stats = [ ('c-score', c_score), ('lande', lande), ('legendre', legendre), ('num sites', num_sites), ('num species', num_species), ('whittaker', whittaker), ]
[docs] site_matrix_stats = [ ('alpha', alpha), ('alpha proportional', alpha_proportional), ('phi', phi), ('phi average proportional', phi_average_proportional), ]
[docs] site_tree_stats = [('Phylogenetic Diversity', phylogenetic_diversity)]
[docs] site_tree_distance_matrix_stats = [ ('MNTD', mean_nearest_taxon_distance), ('Mean Pairwise Distance', mean_pairwise_distance), ('Sum Pairwise Distance', sum_pairwise_distance), ]
[docs] site_pam_dist_mtx_stats = [('pearson_correlation', pearson_correlation)]
[docs] species_matrix_stats = [ ('omega', omega), ('omega_proportional', omega_proportional), ('psi', psi), ('psi_average_proportional', psi_average_proportional), ]
# ........................... def __init__( self, pam, tree=None, tree_matrix=None, node_heights_matrix=None, tip_lengths_matrix=None, ): """Constructor for PAM stats computations. Args: pam (Matrix): A presence-absence matrix to use for computations. tree (TreeWrapper): A tree to use for phylogenetic distance computations. tree_matrix (Matrix): A matrix of tip / node presence absence values. node_heights_matrix (Matrix): A matrix of node height values. tip_lengths_matrix (Matrix): A matrix of tip length values. """ self.pam = pam self.tree = tree self.tree_matrix = tree_matrix self.node_heights_matrix = node_heights_matrix self.tip_lengths_matrix = tip_lengths_matrix # ...........................
[docs] def calculate_covariance_statistics(self): """Calculate covariance statistics matrices. Returns: list of tuple: A list of metric name, value tuples for covariance stats. """ return [(name, func(self.pam)) for name, func in self.covariance_stats]
# ...........................
[docs] def calculate_diversity_statistics(self): """Calculate diversity statistics. Returns: list of tuple: A list of metric name, value tuples for diversity metrics. """ print([func(self.pam) for _, func in self.diversity_stats]) diversity_matrix = Matrix( np.array([func(self.pam) for _, func in self.diversity_stats]), headers={'0': ['value'], '1': [name for name, _ in self.diversity_stats]}, ) return diversity_matrix
# ...........................
[docs] def calculate_site_statistics(self): """Calculate site-based statistics. Returns: Matrix: A matrix of site-based statistics for the selected metrics. """ # Matrix based print('Start') site_stats_matrix = Matrix( np.zeros((self.pam.shape[0], len(self.site_matrix_stats))), headers={ '0': self.pam.get_row_headers(), '1': [name for name, _ in self.site_matrix_stats], }, ) print('Site matrix stats') for i in range(len(self.site_matrix_stats)): site_stats_matrix[:, i] = self.site_matrix_stats[i][1](self.pam) if self.tree is not None: print('Tree stuff') squid_annotations = self.tree.get_annotations('squid') squid_dict = {squid: label for label, squid in squid_annotations} ordered_labels = [] for squid in self.pam.get_column_headers(): if squid in squid_dict.keys(): v = squid_dict[squid] else: # pragma: no cover v = '' ordered_labels.append(v) site_tree_stats_matrix = Matrix( np.zeros((self.pam.shape[0], len(self.site_tree_stats))), headers={ '0': self.pam.get_row_headers(), '1': [name for name, _ in self.site_tree_stats], }, ) site_tree_dist_mtx_matrix = Matrix( np.zeros( (self.pam.shape[0], len(self.site_tree_distance_matrix_stats)) ), headers={ '0': self.pam.get_row_headers(), '1': [name for name, _ in self.site_tree_distance_matrix_stats], }, ) # PAM / Tree stats print('Get distance matrix') phylo_dist_mtx = self.tree.get_distance_matrix() print('PAM dist mtx stats') site_pam_tree_matrix = None if self.site_pam_dist_mtx_stats: site_pam_tree_matrix = Matrix( Matrix.concatenate( [ func(self.pam, phylo_dist_mtx) for _, func in self.site_pam_dist_mtx_stats ] ), headers={ '0': self.pam.get_row_headers(), '1': [name for name, _ in self.site_pam_dist_mtx_stats], }, ) print('Site by site') # Loop through PAM for site_idx, site_row in enumerate(self.pam): # Get present species present_species = np.where(site_row == 1)[0] # Get sub tree present_labels = list( filter( lambda x: bool(x), [ordered_labels[i] for i in present_species] ) ) present_dist_mtx_idxs = [] for idx, label in enumerate(phylo_dist_mtx.get_column_headers()): if label in present_labels: present_dist_mtx_idxs.append(idx) try: if present_labels: site_tree = self.tree.extract_tree_with_taxa_labels( present_labels ) # Get distance matrix site_dist_mtx = phylo_dist_mtx.slice( present_dist_mtx_idxs, present_dist_mtx_idxs ) # site_dist_mtx = site_tree.get_distance_matrix() site_tree_dist_mtx_matrix[site_idx] = [ func(site_dist_mtx) for (_, func) in self.site_tree_distance_matrix_stats ] site_tree_stats_matrix[site_idx] = [ func(site_tree) for _, func in self.site_tree_stats ] except Exception as err: # pragma: no cover print(err) print(present_labels) print('Site index: {}'.format(site_idx)) all_stat_matrices = [] if site_stats_matrix is not None: all_stat_matrices.append(site_stats_matrix) if site_tree_stats_matrix is not None: all_stat_matrices.append(site_tree_stats_matrix) if site_tree_dist_mtx_matrix is not None: all_stat_matrices.append(site_tree_dist_mtx_matrix) if site_pam_tree_matrix is not None: all_stat_matrices.append(site_pam_tree_matrix) site_stats_matrix = Matrix.concatenate(all_stat_matrices, axis=1) return site_stats_matrix
# ...........................
[docs] def calculate_species_statistics(self): """Calculate species-based statistics. Returns: Matrix: A matrix of species-based statistics for the selected metrics. """ # Matrix based species_stats_matrix = Matrix( np.zeros((self.pam.shape[1], len(self.species_matrix_stats))), headers={ '0': self.pam.get_column_headers(), '1': [name for name, _ in self.species_matrix_stats], }, ) for i in range(len(self.species_matrix_stats)): species_stats_matrix[:, i] = self.species_matrix_stats[i][1](self.pam) return species_stats_matrix
# ...........................
[docs] def register_metric(self, name, metric_function): """Register a new metric. Args: name (str): A name for this metric that will be used for a header. metric_function (function): A decorated metric generating function. Raises: TypeError: Raised if the metric function type cannot be processed. """ if isinstance(metric_function, CovarianceMatrixMetric): self.covariance_stats.append((name, metric_function)) elif isinstance(metric_function, DiversityMetric): self.diversity_stats.append((name, metric_function)) elif isinstance(metric_function, SiteMatrixMetric): self.site_matrix_stats.append((name, metric_function)) elif isinstance(metric_function, TreeMetric): self.site_tree_stats.append((name, metric_function)) elif isinstance(metric_function, TreeDistanceMatrixMetric): self.site_tree_distance_matrix_stats.append((name, metric_function)) elif isinstance(metric_function, PamDistMatrixMetric): self.site_pam_dist_mtx_stats.append((name, metric_function)) elif isinstance(metric_function, SpeciesMatrixMetric): self.species_matrix_stats.append((name, metric_function)) else: raise TypeError('Unknown metric type: {}, {}'.format(name, metric_function))
# ............................................................................. __all__ = [ 'CovarianceMatrixMetric', 'DiversityMetric', 'PamDistMatrixMetric', 'PamStats', 'PamStatsMetric', 'SiteMatrixMetric', 'SpeciesMatrixMetric', 'TreeDistanceMatrixMetric', 'TreeMetric', 'alpha', 'alpha_proportional', 'c_score', 'lande', 'legendre', 'mean_nearest_taxon_distance', 'mean_pairwise_distance', 'num_sites', 'num_species', 'omega', 'omega_proportional', 'pearson_correlation', 'phi', 'phi_average_proportional', 'phylogenetic_diversity', 'psi', 'psi_average_proportional', 'schluter_site_variance_ratio', 'schluter_species_variance_ratio', 'sigma_sites', 'sigma_species', 'sum_pairwise_distance', 'whittaker', ]