Source code for lmpy.randomize.grady

"""Randomize PAMs using CJ's algorithm.

This module contains functions used to randomize a PAM using CJ's algorithm.
This algorithm can run in a parallel fashion and uses a fill-based approach so
as to prevent a bias caused by starting with an initial condition of a
populated matrix.
"""
import numpy as np

from lmpy.matrix import Matrix

# Number of times to look for a match when fixing problems
SEARCH_THRESHOLD = 100


# .............................................................................
[docs]def fill_shuffle_reshape_heuristic(orig_pam): """Create an approximation with the correct number of 1s randomly placed. Creates an array with the total number of ones from the original PAM and shuffles it then it reshapes it to match the shape of the original PAM. Args: orig_pam (Matrix): The observed PAM to randomize. Returns: Matrix: An approximate randomization of the PAM that is uncorrected. """ fill = int(np.sum(orig_pam)) approx = np.zeros((orig_pam.shape), dtype=int) approx[:fill] = 1 np.random.shuffle(approx) return approx.reshape(orig_pam.shape)
# .............................................................................
[docs]def total_fill_percentage_heuristic(orig_pam): """Create an approximation using the total fill percentage of the PAM. Creates an approximation using the total matrix fill of the original PAM as a weight threshold to compare with randomly generated numbers. Args: orig_pam (:obj:`Matrix`): The observed PAM to randomize. Returns: Matrix: An approximate randomization of the PAM that is uncorrected. """ fill = np.sum(orig_pam) fill_percentage = 1.0 * fill / orig_pam.size approx = ( np.random.uniform(low=0.0, high=1.0, size=orig_pam.shape) <= fill_percentage ).astype(int) return approx
# .............................................................................
[docs]def max_col_or_row_heuristic(orig_pam): """Weighting method using max weight between row and column. This method returns a matrix of weights where the weight of each cell is the maximum between the proportions of the row and col Args: orig_pam (:obj:`Matrix`): The observed PAM to randomize. Returns: Matrix: An approximate randomization of the PAM that is uncorrected. """ row_totals = np.sum(orig_pam, axis=1) col_totals = np.sum(orig_pam, axis=0) row_weights = row_totals.astype(float) / row_totals.shape[0] col_weights = col_totals.astype(float) / col_totals.shape[0] row_weights = np.expand_dims(row_totals.astype(float) / row_totals.shape[0], 1) col_weights = np.expand_dims(col_totals.astype(float) / col_totals.shape[0], 0) return ( np.random.uniform(low=0.0, high=1.0, size=orig_pam.shape) <= np.maximum(row_weights, col_weights) ).astype(int)
# .............................................................................
[docs]def min_col_or_row_heuristic(orig_pam): """Weighting method using max weight between row and column. This method returns a matrix of weights where the weight of each cell is the maximum between the proportions of the row and col Args: orig_pam (:obj:`Matrix`): The observed PAM to randomize. Returns: Matrix: An approximate randomization of the PAM that is uncorrected. """ row_totals = np.sum(orig_pam, axis=1, dtype=int) col_totals = np.sum(orig_pam, axis=0, dtype=int) row_weights = np.expand_dims(row_totals.astype(float) / row_totals.shape[0], 1) col_weights = np.expand_dims(col_totals.astype(float) / col_totals.shape[0], 0) return ( np.random.uniform(low=0.0, high=1.0, size=orig_pam.shape) <= np.minimum(row_weights, col_weights, dtype=np.single) ).astype(int)
# .............................................................................
[docs]def all_zeros_heuristic(orig_pam): """Creates a two-dimensional approximation composed of all zeros. Args: orig_pam (:obj:`Matrix`): The observed PAM to randomize. Returns: Matrix: An approximate randomization of the PAM that is uncorrected. """ return np.zeros(orig_pam.shape, dtype=int)
# .............................................................................
[docs]def all_ones_heuristic(orig_pam): """Creates a two-dimensional approximation composed of all ones. Args: orig_pam (:obj:`Matrix`): The observed PAM to randomize. Returns: Matrix: An approximate randomization of the PAM that is uncorrected. """ return np.ones(orig_pam.shape, dtype=int)
# .............................................................................
[docs]def grady_randomize(mtx, approximation_heuristic=total_fill_percentage_heuristic): """Main function for creating a random matrix. Args: mtx (:obj:`Matrix`): A Matrix object representation of a PAM approximation_heuristic (:obj:`Method`): A function that generates an approximation of a final randomized matrix. Returns: Matrix: A matrix of random presence absence values with the same marginal totals as the input matrix 'mtx'. """ # Step 0. Get marginal totals # ........................... num_rows, num_cols = mtx.shape row_totals = np.sum(mtx, axis=1).reshape((num_rows, 1)) col_totals = np.sum(mtx, axis=0).reshape((1, num_cols)) valid_rows = np.sum(mtx, axis=1) != 0 valid_cols = np.sum(mtx, axis=0) != 0 # Step 1. Get Initial random matrix # ........................... # Get approximation rand_mtx_data = approximation_heuristic(mtx) # Step 2: Purge over-filled marginals # ................................... # For each row / column with more ones than the marginal total, remove # extra ones until within limit row_sums = np.sum(rand_mtx_data, axis=1).reshape((num_rows, 1)) for i in np.where(row_sums > row_totals)[0]: row_choices = np.where(rand_mtx_data[i] == 1)[0] change_count = int(row_sums[i] - row_totals[i]) rand_mtx_data[i, np.random.permutation(row_choices)[:change_count]] = False col_sums = np.sum(rand_mtx_data, axis=0).reshape((1, num_cols)) for j in np.where(col_sums > col_totals)[1]: col_choices = np.where(rand_mtx_data[:, j] == 1)[0] change_count = int(col_sums[0, j] - col_totals[0, j]) rand_mtx_data[np.random.permutation(col_choices)[:change_count], j] = False # Step 3: Fill # ........................... problem_rows = [] problem_cols = [] row_sums = np.sum(rand_mtx_data, axis=1) col_sums = np.sum(rand_mtx_data, axis=0) unfilled_cols = np.where(col_sums < col_totals[0, :])[0].tolist() for row_idx in np.random.permutation(np.where(row_sums < row_totals[:, 0])[0]): possible_cols = np.random.permutation( np.intersect1d( np.where(rand_mtx_data[row_idx, :] == 0)[0], unfilled_cols, assume_unique=True, ) ) num_to_fill = int(row_totals[row_idx, 0] - row_sums[row_idx]) if num_to_fill > possible_cols.shape[0]: # Add this row to problem rows because we can't fill it enough problem_rows.append(row_idx) # Just fill what we can num_to_fill = possible_cols.shape[0] # Fill cells rand_mtx_data[row_idx, possible_cols[:num_to_fill]] = True # Add to col_sums col_sums[possible_cols[:num_to_fill]] += 1 # Check if we should remove these columns from the list for c in np.where( col_sums[possible_cols[:num_to_fill]] == col_totals[0, possible_cols[:num_to_fill]] )[0]: unfilled_cols.remove(possible_cols[c]) problem_cols = unfilled_cols # Step 4: Fix problems # ........................... j = 0 while problem_rows: r = np.random.choice(problem_rows) c = np.random.choice(problem_cols) i = 0 cs = np.where((rand_mtx_data[r] == 0) & (valid_cols))[0] rs = np.where((rand_mtx_data[:, c] == 0) & (valid_rows))[0] num_tries = 0 found = False while not found and num_tries < SEARCH_THRESHOLD: r2 = np.random.choice(rs) c2 = np.random.choice(cs) num_tries += 1 if rand_mtx_data[r2, c2] == 1: rand_mtx_data[r, c2] = 1 rand_mtx_data[r2, c2] = 0 rand_mtx_data[r2, c] = 1 found = True if not found: # pragma: no cover # Can't fix row easily so randomly move a one prs = np.where(rand_mtx_data[:, c2] == 1)[0] update_row = np.random.choice(prs) rand_mtx_data[r, c2] = 1 rand_mtx_data[update_row, c2] = 0 # Should we add update row to problem rows or is it already there? if update_row not in problem_rows: problem_rows.append(update_row) r_sum = np.sum(rand_mtx_data[r, :]) c_sum = np.sum(rand_mtx_data[:, c]) if r_sum == int(row_totals[r]): problem_rows.remove(r) if c_sum == int(col_totals[0, c]): problem_cols.remove(c) row_sums = np.sum(rand_mtx_data, axis=1) return Matrix(rand_mtx_data, headers=mtx.get_headers())
# ............................................................................. __all__ = [ 'all_ones_heuristic', 'all_zeros_heuristic', 'fill_shuffle_reshape_heuristic', 'grady_randomize', 'max_col_or_row_heuristic', 'min_col_or_row_heuristic', 'total_fill_percentage_heuristic', ]