# inference with persistence diagrams
from tdads.distance import *
from tdads.PH_utils import enclosing_radius
from multiprocessing import cpu_count, Pool
from numpy import ones, ndarray, percentile, array, float64, log, exp, Inf, where, concatenate, reshape, delete, inf, intersect1d
from numpy.testing import assert_almost_equal
from itertools import repeat, combinations
from random import sample, choices
from inspect import getfullargspec
from scipy.special import digamma
import warnings
# permutation test
[docs]
class perm_test:
def __init__(self, iterations:int = 20, dims:list = [0], p:float = 2.0, q:float = 2.0, paired:bool = False, n_cores:int = cpu_count() - 1):
'''Group difference testing for persistence diagrams. Based on the paper Robinson and Turner 2017, with
extra functionality detailed in Abdallah 2021.
Parameters
----------
`iterations` : int, default 20
The number of permutation iterations to carry out. The smallest p-value that can
be returned by this test is 1/(`iterations` + 1).
`dims` : list of int, default [0]
The list of homological dimensions in which to carry out the test.
`p` : float, default 2.0
The power parameter for the Wasserstein metric.
`q` : float, default 2.0
The loss function exponential parameter, must be at least 1. See Robinson and Turner 2017
for details.
`paired` : bool, default `False`
Whether or not there is a second-order pairing between each group, i.e. if only
permutations which shuffle the groups first, second, third, etc elements should
be used. See Abdallah 2021 for details.
Attributes
----------
`iterations` : int
The input `iterations` parameter.
`dims` : list of int
The input `dims` parameter.
`p` : float
The input `p` parameter.
`q` : float
The input `q` parameter.
`paired` : bool
The input `paired` parameter.
`distances` : list of `tdads.distance.distance` objects
The distance functions in each desired dimension.
`group_sizes` : `None` initially, then list of int once `test` is called
The integer sizes of the diagram groups provided for testing.
`dist_mats` : `None` initially, then a list of ndarray's
A list, with one element for each dimension in `dim`, of partially computed distance matrices between all diagrams in the groups.
Once two diagrams have there distances calculated in each dimension, the corresponding entries in all elements of `dist_mats` will be updated from -1 to the
actual distance value in that dimension.
Citations
---------
Robinson T, Turner K (2017). "Hypothesis testing for topological data analysis." https://link.springer.com/article/10.1007/s41468-017-0008-7.
Abdallah H et al. (2021). "Statistical Inference for Persistent Homology applied to fMRI." https://github.com/hassan-abdallah/Statistical_Inference_PH_fMRI/blob/main/Abdallah_et_al_Statistical_Inference_PH_fMRI.pdf.
'''
if isinstance(iterations, type(1)) == False:
raise Exception('iterations must be an integer.')
if iterations < 1:
raise Exception('iterations must be at least 1.')
self.iterations = iterations
if not isinstance(dims, type([0,1])):
raise Exception('dims must be a list.')
if set([type(d) for d in dims]) != set([type(1)]):
raise Exception('Each dimension in dims must be an integer.')
if min(dims) < 0:
raise Exception('Each dimension in dims must be non-negative.')
self.dims = dims
distances = [distance(dim = d, p = p) for d in dims]
self.distances = distances
if not isinstance(q,type(2)) and not isinstance(q,type(2.0)):
raise Exception('q must be a number.')
if q < 1:
raise Exception('q must be at least 1.')
self.q = q
if isinstance(n_cores, type(1)) == False:
raise Exception('n_cores must be an integer.')
if n_cores < 1:
raise Exception('n_cores must be at least 1.')
self.group_sizes = None
self.dist_mats = None
if isinstance(paired, type(True)) == False:
raise Exception('paired must be True or False.')
self.paired = paired
[docs]
def __str__(self):
'''Describe a permutation test procedure based on the number of permutation iterations
and whether the groups are paired or unpaired.'''
if self.paired == True:
start_str = 'Paired permutation test with '
else:
start_str = 'Non-paired permutation test with '
s = start_str + str(self.iterations) + ' iterations.'
return s
[docs]
def compute_loss(self, diagram_groups):
'''Internal method to compute the loss function from Robinson and Turner 2017.
This function should not be called directly.'''
combs = concatenate([concatenate([g*ones((int(len(diagram_groups[g])*(len(diagram_groups[g]) - 1)/2),1)), concatenate([array(x).reshape(1,2) for x in combinations(range(len(diagram_groups[g])), 2)])], axis = 1) for g in range(len(self.group_sizes))]).astype(int)
def get_distance(dim_ind, combination): # make sure this updates!!
v = self.dist_mats[dim_ind][diagram_groups[combination[0]][combination[1]]['ind'], diagram_groups[combination[0]][combination[2]]['ind']]
if v == -1:
v = self.distances[dim_ind].compute(diagram_groups[combination[0]][combination[1]]['diagram'], diagram_groups[combination[0]][combination[2]]['diagram'])
self.dist_mats[dim_ind][diagram_groups[combination[0]][combination[1]]['ind'], diagram_groups[combination[0]][combination[2]]['ind']] = v
return v
statistics = []
for d_i in range(len(self.dims)):
d_tots = [get_distance(d_i, comb) for comb in combs]
# with Pool(processes=self.n_cores) as pool:
# d_tots = pool.starmap(get_distance, zip(repeat(dim), combs)) # each row, need to do this per dim
for i in range(len(combs)):
self.dist_mats[d_i][diagram_groups[combs[i, 0]][combs[i, 1]]['ind'], diagram_groups[combs[i, 0]][combs[i, 2]]['ind']] = d_tots[i]
statistics.append(sum([sum([d_tots[x]**self.q for x in range(len(combs)) if combs[x,0] == g])/(len(diagram_groups[g])*(len(diagram_groups[g]) - 1)) for g in range(len(diagram_groups))]))
return statistics
[docs]
def test(self, diagram_groups):
'''Run the permutation test.
Parameters
----------
`diagram_groups` : list of lists
The groups of persistence diagrams to be analyzed.
Returns
-------
Dict
Keys are 'test_statistics' for the test statistic in each dimension,
'permvals' for the null distribution in each dimension and 'p_values' for the
p-values in each dimension. For example, `output['p_values']['1']` would give the
p-value for the second homological dimension in `self.dims`.
Examples
--------
>>> # create two groups of persistence diagrams
>>> from ripser import ripser
>>> import numpy as np
>>> data1 = np.random((100,2))
>>> data2 = np.random((100,2))
>>> D1 = ripser(data1)
>>> D2 = ripser(data2)
>>> group1 = [D1, D2]
>>> group2 = [D1, D2]
>>> # create perm test object in dimensions 0 and 1
>>> from tdads.inference import permutation_test
>>> pt = permutation_test(dims = [0, 1], n_cores = 2)
>>> # run test
>>> res = pt.test([g1, g2])
>>> # get p-values
>>> res['p_values']
Citations
---------
Robinson T, Turner K (2017). "Hypothesis testing for topological data analysis." https://link.springer.com/article/10.1007/s41468-017-0008-7.
Abdallah H et al. (2021). "Statistical Inference for Persistent Homology applied to fMRI." https://github.com/hassan-abdallah/Statistical_Inference_PH_fMRI/blob/main/Abdallah_et_al_Statistical_Inference_PH_fMRI.pdf.
'''
# test the diagram_groups and preprocess
diagram_groups, csum_group_sizes = preprocess_diagram_groups_for_inference(diagram_groups)
# update group_sizes
self.group_sizes = [len(g) for g in diagram_groups]
# more checks
if min(self.group_sizes) < 2:
raise Exception('Each group of diagrams must have at least 2 diagrams.')
if self.paired == True and len(set(self.group_sizes)) > 1:
raise Exception('When paired is True each group of diagrams must have the same number of elements.')
# set up to store distance calculations
n = sum(self.group_sizes)
self.dist_mats = [-1*ones((n, n)) for d in self.dims]
# compute test statistics
test_statistics = self.compute_loss(diagram_groups)
# generate permutations
perm_values = [[] for d in self.dims]
# iterate
for iteration in range(self.iterations):
if not self.paired:
permuted_groups = []
samples = [i for i in range(n)]
for i in range(len(diagram_groups)):
ss = sample(samples, len(diagram_groups[i]))
gs = [max([i for i in range(len(csum_group_sizes)) if csum_group_sizes[i] <= X]) if len([i for i in range(len(csum_group_sizes)) if csum_group_sizes[i] <= X]) > 0 else 0 for X in ss]
permuted_groups.append([diagram_groups[gs[j]][ss[j] - csum_group_sizes[gs[j]]] for j in range(len(ss))])
samples = [x for x in samples if x not in ss]
else:
perm = [sample(range(len(diagram_groups)), len(diagram_groups)) for x in range(self.group_sizes[0])]
permuted_groups = [[diagram_groups[perm[p][i]][p] for p in range(len(perm))] for i in range(len(diagram_groups))]
permuted_statistics = self.compute_loss(permuted_groups)
for i in range(len(self.dims)):
perm_values[i].append(permuted_statistics[i])
# gather all data in results
perm_values_ret = {}
p_values_ret = {}
test_statistics_ret = {}
for i in range(len(self.dims)):
perm_values_ret[str(self.dims[i])] = perm_values[i]
test_statistics_ret[str(self.dims[i])] = test_statistics[i]
p_values_ret[str(self.dims[i])] = (sum(perm_values[i] <= test_statistics[i]) + 1)/(self.iterations + 1)
return {'permvals':perm_values_ret, 'test_statistics':test_statistics_ret, 'p_values':p_values_ret}
[docs]
class diagram_bootstrap:
def __init__(self, diag_fun, dims:list = [0], num_samples:int = 20, distance_mat:bool = False, alpha:float = 0.05):
'''Compute confidence sets for (the topological features within) persistence diagrams.
Parameters
----------
`diag_fun` : function of two variables `X` and `thresh`
The persistent homology algorithm (Vietoris Rips) for the dataset `X` up to
radius `thresh`. The maximum homological dimension should be the same as the
maximum value of `dims`.
`dims` : list of int, default [0]
The list of homological dimensions in which to compute confidence sets.
`num_samples` : int, default 20
The number of bootstrap resamplings to carry out.
`distance_mat` : bool, default False
Whether the input dataset will be a distance matrix or not.
`alpha` : float, default 0.05
The type 1 error for determining significant topological features.
Attributes
----------
`diag_fun` : function
The input `diag_fun` parameter.
`dims` : list
The input `dims` parameter.
`num_samples` : int
The input `num_samples` parameter.
`distance_mat` : bool, default False
The input `distance_mat` parameter.
`alpha` : float
The input `alpha` parameter.
Examples
--------
>>> from tdads.inference import diagram_bootstrap
>>> from ripser import ripser
>>> from numpy.random import uniform
>>> from numpy import array, cos, sin
>>> # build circle dataset
>>> theta = uniform(low = 0, high = 2*pi, size = 100)
>>> data = array([[cos(theta[i]), sin(theta[i])] for i in range(100)])
>>> # define persistent homology function
>>> def diag_fun(X, thresh):
>>> return ripser(X = X, thresh = thresh)
>>> # create bootstrap object
>>> boot = diagram_bootstrap(diag_fun = diag_fun)
Citations
---------
Chazal F et al (2017). "Robust Topological Inference: Distance to a Measure and Kernel Distance." https://www.jmlr.org/papers/volume18/15-484/15-484.pdf.
'''
def check_fun(a, b):
return 1
if not isinstance(diag_fun, type(check_fun)):
raise Exception('diag_fun must be a function.')
if getfullargspec(diag_fun)[0] != ['X', 'thresh']:
raise Exception('diag_fun must be a function of two parameters, X and thresh.')
if not isinstance(dims, type([0,1])):
raise Exception('dims must be a list.')
if set([type(d) for d in dims]) != set([type(1)]):
raise Exception('Each dimension in dims must be an integer.')
if min(dims) < 0:
raise Exception('Each dimension in dims must be non-negative.')
self.dims = dims
if isinstance(distance_mat, type(True)) == False:
raise Exception('distance_mat must be True or False.')
self.distance_mat = distance_mat
diamond = array([[0,0],[1,1],[-1,1],[0,2]])
dist_diamond = array([[0, sqrt(2), sqrt(2), 2], [sqrt(2), 0, 2, sqrt(2)], [sqrt(2), 2, 0, sqrt(2)], [2, sqrt(2), sqrt(2), 0]])
diamond_diag = [array([[0, sqrt(2)],[0, sqrt(2)],[0, sqrt(2)],[0, float('inf')]]),array([[sqrt(2), 2]])]
diamond_diag = [diamond_diag[x] if x < len(diamond_diag) else array([]).reshape(0,2).astype(float64) for x in self.dims]
if not distance_mat:
X = diamond
else:
X = dist_diamond
try:
with warnings.catch_warnings():
warnings.simplefilter("ignore")
sample_res = diag_fun(X, float('inf'))
sample_res = preprocess_diagram(sample_res, ret = True)
except Exception as ex:
raise Exception('diag_fun doesn\'t seem to be computing persistence diagrams correctly.')
for i in self.dims:
try:
assert_almost_equal(sample_res[i], diamond_diag[self.dims.index(i)], 7)
except:
raise Exception('diag_fun doesn\'t seem to be computing persistence diagrams correctly.')
self.diag_fun = diag_fun
if isinstance(num_samples, type(1)) == False:
raise Exception('num_samples must be an integer.')
if num_samples < 1:
raise Exception('num_samples must be at least 1.')
self.num_samples = num_samples
if not isinstance(alpha,type(0.05)):
raise Exception('alpha must be a float.')
if alpha <= 0 or alpha >= 1:
raise Exception('alpha must be between 0 and 1 (non-inclusive).')
self.alpha = alpha
[docs]
def __str__(self):
'''Describe a bootstrap procedure based on the number of bootstrap samples,
whether or not the input will be a distance matrix and the Type 1 error rate (alpha).'''
dms = ''
if not self.distance_mat:
dms = 'non-'
s = 'Bootstrap confidence intervals with ' + str(self.num_samples) + ' many samples, ' + '' + f'{dms}distance-matrix input, and a Type 1 error of ' + str(self.alpha) + '.'
return s
[docs]
def compute(self, X:ndarray, thresh:float):
'''Carry out the bootstrap procedure.
Parameters
----------
`X` : numpy.ndarray (2D)
The input dataset - either raw tabular data or a distance matrix of samples.
`thresh` : float
The maximum filtration radius for Vietoris-Rips persistent homology.
Returns
-------
Dict
Entries are 'diagram' (the computed persistence diagram), 'thresholds' (a Dict of the computed
persistence thresholds for each desired dimension) and 'subsetted_diagram' (the persistence diagram
thresholded by the threshold values in each dimension).
Examples
--------
>>> from tdads.inference import diagram_bootstrap
>>> from ripser import ripser
>>> from numpy.random import uniform
>>> # build circle dataset
>>> theta = uniform(low = 0, high = 2*pi, size = 100)
>>> data = array([[cos(theta[i]), sin(theta[i])] for i in range(100)])
>>> # define persistent homology function
>>> def diag_fun(X, thresh):
>>> return ripser(X = X, thresh = thresh)
>>> # create bootstrap object and compute significant features
>>> boot = diagram_bootstrap(diag_fun = diag_fun)
>>> res = boot.compute(data, 2)
>>> # print subsetted diagram
>>> res['subsetted_diagram']
Citations
---------
Chazal F et al (2017). "Robust Topological Inference: Distance to a Measure and Kernel Distance." https://www.jmlr.org/papers/volume18/15-484/15-484.pdf.
'''
if not isinstance(X, type(array([0,1]))):
raise Exception('X must be a numpy array.')
if len(X.shape) != 2 or X.shape[0] < 2 or X.shape[1] < 1:
raise Exception('X must be a 2-dimensional array with at least two rows and one column.')
if self.distance_mat and X.shape[0] != X.shape[1]:
raise Exception('When distance_mat is True X must have the same number of rows and columns (as a distance matrix).')
if not (isinstance(thresh, type(1)) or isinstance(thresh, type(0.1))):
raise Exception('thresh must be a number.')
if thresh <= 0:
raise Exception('thresh must be positive.')
# try to calculate the full persistence diagram
try:
diagram = self.diag_fun(X, thresh)
except Exception as ex:
raise Exception('An error occured when diag_fun tried to compute the persistence diagram of X. The error was: ' + str(ex))
# error check the persistence diagram
try:
diagram = preprocess_diagram(D = diagram, ret = True)
except Exception as ex:
raise Exception('The output of diagam_fun(X, thresh) was not in the correct format for a persistence diagram.')
# create bottleneck distance objects in each dimension
distances = [distance(dim = d, p = float('inf'), n_cores = 2) for d in self.dims]
# function to bootstrap
def get_distances():
# generate sample (unique row indices)
s = choices(population = range(len(X)), k = len(X))
s = list(set(s))
# subset X
if self.distance_mat:
X_sample = X[s,:][:,s]
else:
X_sample = X[s, :]
# compute new persistence diagram
try:
diagram_sample = self.diag_fun(X_sample, thresh)
except Exception as ex:
raise Exception('An output of diag_fun was not in the correct format for a persistence diagram.')
# compute distances in each desired dimension
res = [dist.compute(diagram, diagram_sample) for dist in distances]
return res
# perform bootstrap
bootstrap_values = [get_distances() for i in range(self.num_samples)]
# filter by dimension
bootstrap_values = [array([bv[i] for bv in bootstrap_values]) for i in range(len(self.dims))]
# compute thresholds
thresholds = [2*percentile(bv, 1 - self.alpha) for bv in bootstrap_values]
# subset diagram by thresholds
subsetted_diagram = [diagram[i][diagram[i][:,1] - diagram[i][:,0] > thresholds[self.dims.index(i)]] if i in self.dims else empty((0, 2)) for i in range(max(self.dims) + 1)]
# set up return dict
ret = {'diagram':diagram, 'thresholds':thresholds, 'subsetted_diagram':subsetted_diagram}
return ret
[docs]
class universal_null:
def __init__(self, diag_fun, dims:list = [1], distance_mat:bool = False, alpha:float = 0.05, infinite_cycle_inference:bool = False):
'''A universal null distribution for (the topological features in) persistence diagrams.
Parameters
----------
`diag_fun` : function of two variables, `X` and `thresh`
The persistent homology algorithm (Vietoris Rips) for the dataset `X` up to
radius `thresh`. The maximum homological dimension should be the same as the
maximum value of `dims`.
`dims` : list of int, default [1]
The list of homological dimensions in which to compute confidence sets.
`distance_mat` : bool, default False
Whether the input dataset will be a distance matrix or not.
`alpha` : float, default 0.05
The type 1 error for determining significant topological features with the
Bonferroni correction.
`infinite_cycle_inference` : bool, default `FALSE`
Whether or not to perform inference for features with infinite death values.
Attributes
----------
`diag_fun` : function
The input `diag_fun` parameter.
`dims` : list
The input `dims` parameter.
`distance_mat` : bool, default False
The input `distance_mat` parameter.
`alpha` : float
The input `alpha` parameter.
`infinite_cycle_inference` : bool
The input `infinite_cycle_inference` parameter.
Examples
--------
>>> from tdads.inference import universal_null
>>> from tdads.PH_utils import enclosing_radius
>>> from ripser import ripser
>>> # define the persistent homology function
>>> def diag_fun(X):
>>> return ripser(X = X, thresh = enclosing_radius(X))
>>> # create universal null object
>>> univ_null = universal_null(diag_fun = diag_fun)
Citations
---------
Bobrowski O, Skraba P (2023). "A universal null-distribution for topological data analysis." https://www.nature.com/articles/s41598-023-37842-2.
'''
def check_fun(a):
return 1
if not isinstance(diag_fun, type(check_fun)):
raise Exception('diag_fun must be a function.')
if getfullargspec(diag_fun)[0] != ['X', 'thresh']:
raise Exception('diag_fun must be a function of two parameters, X and thresh.')
if not isinstance(dims, type([0,1])):
raise Exception('dims must be a list.')
if set([type(d) for d in dims]) != set([type(1)]):
raise Exception('Each dimension in dims must be an integer.')
if min(dims) < 1:
raise Exception('Each dimension in dims must be at least 1.')
self.dims = dims
if isinstance(distance_mat, type(True)) == False:
raise Exception('distance_mat must be True or False.')
self.distance_mat = distance_mat
if not isinstance(alpha,type(0.05)):
raise Exception('alpha must be a float.')
if alpha <= 0 or alpha >= 1:
raise Exception('alpha must be between 0 and 1 (non-inclusive).')
self.alpha = alpha
diamond = array([[0,0],[1,1],[-1,1],[0,2]])
dist_diamond = array([[0, sqrt(2), sqrt(2), 2], [sqrt(2), 0, 2, sqrt(2)], [sqrt(2), 2, 0, sqrt(2)], [2, sqrt(2), sqrt(2), 0]])
diamond_diag = [array([[0, sqrt(2)],[0, sqrt(2)],[0, sqrt(2)],[0, float('inf')]]),array([[sqrt(2), 2]])]
diamond_diag = [diamond_diag[x] if x < len(diamond_diag) else array([]).reshape(0,2).astype(float64) for x in self.dims]
if not distance_mat:
X = diamond
else:
X = dist_diamond
try:
with warnings.catch_warnings():
warnings.simplefilter("ignore")
sample_res = diag_fun(X, float('inf'))
sample_res = preprocess_diagram(sample_res, ret = True)
except Exception as ex:
raise Exception('diag_fun doesn\'t seem to be computing persistence diagrams correctly.')
for i in self.dims:
try:
assert_almost_equal(sample_res[i], diamond_diag[self.dims.index(i)], 7)
except:
raise Exception('diag_fun doesn\'t seem to be computing persistence diagrams correctly.')
self.diag_fun = diag_fun
if isinstance(infinite_cycle_inference, type(True)) == False:
raise Exception('infinite_cycle_inference must be True or False.')
self.infinite_cycle_inference = infinite_cycle_inference
[docs]
def __str__(self):
'''Describe a universal null procedure based on the dimensions being analyzed,
whether or not the input will be a distance matrix, the Type 1 error rate (alpha) and
whether or not infinite cycle inference will be carried out.'''
dms = ''
if not self.distance_mat:
dms = 'non-'
if len(self.dims) == 1:
dim_str = 'dimension ' + str(self.dims[0])
else:
dim_str = ', '.join([str(d) for d in self.dims[::-1]]) + ' and ' + str(self.dims[-1])
if self.distance_mat:
distmat_str = 'distance-matrix'
else:
distmat_str = 'point-cloud'
if self.infinite_cycle_inference:
ici_str = ''
else:
ici_str = 'no '
s = f'Universal null procedure for {dim_str}, {distmat_str} input, a Type 1 error rate of {str(self.alpha)} and {ici_str}infinite cycle inference.'
return s
[docs]
def compute(self, X:ndarray, thresh):
'''Carry out the universal null inference procedure.
Parameters
----------
`X` : numpy.ndarray
The input dataset - either raw tabular data or a distance matrix of samples.
`thresh` : float or 'enclosing'
The maximum filtration radius for persistent homology. If 'enclosing' then the
enclosing radius of `X` will be computed and used, otherwise `thresh` must be a
set number.
Returns
-------
Dict
The entries are 'subsetted_diagram' - the list of subsetted persistence diagrams in each dimension (numpy ndarrays),
and 'p_values' - a list of lists for the p-values of each remaining topological feature.
Examples
--------
>>> from tdads.inference import universal_null
>>> from ripser import ripser
>>> from numpy.random import uniform, normal
>>> # build circle dataset and add noise
>>> theta = uniform(low = 0, high = 2*pi, size = 100)
>>> data = array([[cos(theta[i]), sin(theta[i])] for i in range(100)])
>>> data = data + normal(scale = 0.2, size = (100, 2))
>>> # define the persistent homology function
>>> def diag_fun(X, thresh):
>>> return ripser(X = X, thresh = thresh)
>>> # create universal null object
>>> univ_null = universal_null(diag_fun = diag_fun)
>>> # carry out the inference procedure
>>> res = univ_null.compute(data)
Citations
---------
Bobrowski O, Skraba P (2023). "A universal null-distribution for topological data analysis." https://www.nature.com/articles/s41598-023-37842-2.
'''
# check X and thresh parameters
if not isinstance(X, type(array([0,1]))):
raise Exception('X must be a numpy array.')
if len(X.shape) != 2 or X.shape[0] < 2 or X.shape[1] < 1:
raise Exception('X must be a 2-dimensional array with at least two rows and one column.')
if self.distance_mat and X.shape[0] != X.shape[1]:
raise Exception('When distance_mat is True X must have the same number of rows and columns (as a distance matrix).')
if not (isinstance(thresh, type(1)) or isinstance(thresh, type(0.1)) or thresh == 'enclosing'):
raise Exception('thresh must be a number or the string \"enclosing\".')
if thresh != 'enclosing':
if thresh <= 0:
raise Exception('thresh must be positive.')
else:
# compute the enclosing radius
thresh = enclosing_radius(X)
# try to calculate the full persistence diagram
try:
diagram = self.diag_fun(X, thresh)
except Exception as ex:
raise Exception('An error occured when diag_fun tried to compute the persistence diagram of X. The error was: ' + str(ex))
# error check the persistence diagram
try:
diagram = preprocess_diagram(D = diagram, ret = True)
except Exception as ex:
raise Exception('The output of diagam_fun(X, thresh) was not in the correct format for a persistence diagram.')
# set up return dict
ret = {}
# check if there is any work to be done
if len(diagram) > 1:
# subset for diagrams above dimension 0
diag_highdim = diagram[1::]
# replace inf values with thresh
diag_highdim = [where(d == inf,concatenate([reshape(d[:,0], (len(d), 1)), reshape(array([thresh for _ in range(len(d))]), (len(d), 1))], axis = 1),d) for d in diag_highdim]
# compute test statistics and p-values
A = 1 # for VR persistent homology
lambd = -1*digamma(1)
pi = [diag_sub[:,1]/diag_sub[:,0] for diag_sub in diag_highdim] # ratio of death to birth
loglog_pi = [log(log(pi_sub)) for pi_sub in pi]
Lbar = [loglog_pi_sub.mean() for loglog_pi_sub in loglog_pi]
B = -1*lambd - A*Lbar
test_statistics = [A*loglog_pi[i] + B[i] for i in range(len(loglog_pi))]
p_values = [exp(-1*exp(test_statistics_sub)) for test_statistics_sub in test_statistics]
# determine Bonferroni thresholds in each dimension
alpha_thresh = [self.alpha/len(diag_sub) if len(diag_sub) > 0 else Inf for diag_sub in diag_highdim]
# subset the diagram
subsetted_diagram = [concatenate([reshape(diag_highdim[i][j,:], (1,2)) if p_values[i][j] < alpha_thresh[i] else empty(shape=(0, 2)) for j in range(len(diag_highdim[i]))], axis=0) for i in range(len(diag_highdim))]
# switch thresh values back to inf
subsetted_diagram = [where(d == thresh,concatenate([reshape(d[:,0], (len(d), 1)), reshape(array([inf for _ in range(len(d))]), (len(d), 1))], axis = 1),d) for d in subsetted_diagram]
# perform infinite cycle inference if desired
if self.infinite_cycle_inference:
infinite_inds = [where((diag_highdim[i][:,1] == thresh)*(p_values[i] >= alpha_thresh[i]))[0] for i in range(len(diag_highdim))]
# subset p-values
p_values = [array([p_values[i][x] for x in where(p_values[i] < alpha_thresh[i])]) for i in range(len(diag_highdim))]
if sum([len(x) for x in infinite_inds]) > 0:
infinite_cycle_inference = [empty(shape = (0,2)) for _ in range(len(infinite_inds))]
diag_infinite = [diag_highdim[i][infinite_inds[i],:] for i in range(len(infinite_inds))]
alpha_cutoffs = [exp(exp((log(log(1/alpha_thresh[i])) - B[i])/A)) for i in range(len(alpha_thresh))]
while sum([len(x) for x in diag_infinite]) > 0:
# get minimum birth value of any point
min_birth_vals = [min(d[:,0]) for d in diag_infinite]
min_birth_val = min(min_birth_vals)
min_birth_dim = min_birth_vals.index(min_birth_val)
min_birth_ind = where(diag_infinite[min_birth_dim][:,0] == min_birth_val)[0].item()
pt = diag_infinite[min_birth_dim][min_birth_ind]
diag_infinite[min_birth_dim] = delete(diag_infinite[min_birth_dim],min_birth_ind, axis = 0)
# compute new death value
new_death = alpha_cutoffs[min_birth_dim]*pt[0]
# calculate new diagram
new_diag = self.diag_fun(X = X, thresh = new_death)
new_diag = preprocess_diagram(new_diag, ret = True)
new_diag = new_diag[1::]
# check if the point is still there
if len(intersect1d(where(new_diag[min_birth_dim][:,0] == pt[0]), where(new_diag[min_birth_dim][:,1] == inf))) > 0:
pt[1] = inf
pt = reshape(pt, (1,2))
infinite_cycle_inference[min_birth_dim] = concatenate([infinite_cycle_inference[min_birth_dim], pt])
# update results
subsetted_diagram = [concatenate([subsetted_diagram[i], infinite_cycle_inference[i]]) for i in range(len(infinite_cycle_inference))]
p_values = [p_values[i] + [None for _ in range(len(infinite_cycle_inference[i]))] for i in range(len(infinite_cycle_inference))]
else:
# subset p-values
p_values = [array([p_values[i][x] for x in where(p_values[i] < alpha_thresh[i])[0]]) for i in range(len(diag_highdim))]
else:
p_values = [array([p_values[i][x] for x in where(p_values[i] < alpha_thresh[i])[0]]) for i in range(len(diag_highdim))]
else:
subsetted_diagram = [empty(shape = (0,2)) for _ in range(len(diagram))]
p_values = [[] for _ in range(len(diagram))]
ret['subsetted_diagram'] = subsetted_diagram
ret['p_values'] = p_values
return ret