Source code for timecorr.helpers

# coding: utf-8
from __future__ import division

import math
import warnings

import hypertools as hyp
import numpy as np
import pandas as pd
import scipy.spatial.distance as sd
from matplotlib import pyplot as plt
from scipy.linalg import toeplitz
from scipy.optimize import minimize
from scipy.special import gamma
from scipy.stats import ttest_1samp as ttest

gaussian_params = {"var": 100}
laplace_params = {"scale": 100}
eye_params = {}
t_params = {"df": 100}
mexican_hat_params = {"sigma": 10}
uniform_params = {}
boxcar_params = {"width": 10}


[docs] def gaussian_weights(T, params=gaussian_params): """ Generate Gaussian weighting function for dynamic correlations. This function creates a time-varying weighting matrix where each timepoint receives weights according to a Gaussian distribution centered at that timepoint. Useful for computing smooth dynamic correlations. Parameters ---------- T : int Number of timepoints in the timeseries params : dict, optional Dictionary containing Gaussian parameters. Default: {'var': 100} - 'var' : float, variance of the Gaussian kernel Returns ------- numpy.ndarray T x T matrix of Gaussian weights, where weights[i,j] represents the weight given to timepoint j when computing correlations at timepoint i Examples -------- >>> import timecorr as tc >>> weights = tc.gaussian_weights(50, {'var': 10}) >>> print(weights.shape) # (50, 50) """ if params is None: params = gaussian_params c1 = np.divide(1, np.sqrt(2 * math.pi * params["var"])) c2 = np.divide(-1, 2 * params["var"]) sqdiffs = toeplitz(np.arange(T) ** 2) return c1 * np.exp(c2 * sqdiffs)
[docs] def laplace_weights(T, params=laplace_params): """ Generate Laplace (exponential) weighting function for dynamic correlations. This function creates a time-varying weighting matrix where each timepoint receives weights according to a Laplace distribution centered at that timepoint. Provides sharper temporal localization compared to Gaussian weights. Parameters ---------- T : int Number of timepoints in the timeseries params : dict, optional Dictionary containing Laplace parameters. Default: {'scale': 100} - 'scale' : float, scale parameter of the Laplace distribution Returns ------- numpy.ndarray T x T matrix of Laplace weights, where weights[i,j] represents the weight given to timepoint j when computing correlations at timepoint i Examples -------- >>> import timecorr as tc >>> weights = tc.laplace_weights(50, {'scale': 5}) >>> print(weights.shape) # (50, 50) """ if params is None: params = laplace_params absdiffs = toeplitz(np.arange(T)) return np.multiply( np.divide(1, 2 * params["scale"]), np.exp(-np.divide(absdiffs, params["scale"])) ) # scale by a factor of 2.5 to prevent near-zero rounding issues
[docs] def eye_weights(T, params=eye_params): """ Generate identity (delta) weighting function for dynamic correlations. This function creates an identity matrix for weighting, where each timepoint only receives weight from itself. Useful for computing instantaneous correlations without temporal smoothing. Parameters ---------- T : int Number of timepoints in the timeseries params : dict, optional Empty dictionary (no parameters needed). Default: {} Returns ------- numpy.ndarray T x T identity matrix where weights[i,j] = 1 if i==j, else 0 Examples -------- >>> import timecorr as tc >>> weights = tc.eye_weights(50, {}) >>> print(weights.shape) # (50, 50) >>> print(weights[0, 0]) # 1.0 >>> print(weights[0, 1]) # 0.0 """ return np.eye(T)
[docs] def uniform_weights(T, params=uniform_params): return np.ones([T, T])
[docs] def t_weights(T, params=t_params): if params is None: params = t_params c1 = np.divide( gamma((params["df"] + 1) / 2), np.sqrt(params["df"] * math.pi) * gamma(params["df"] / 2), ) c2 = np.divide(-params["df"] + 1, 2) sqdiffs = toeplitz(np.arange(T) ** 2) return np.multiply(c1, np.power(1 + np.divide(sqdiffs, params["df"]), c2))
[docs] def mexican_hat_weights(T, params=mexican_hat_params): """ Generate Mexican Hat (Ricker) weighting function for dynamic correlations. This function creates a time-varying weighting matrix where each timepoint receives weights according to a Mexican Hat wavelet centered at that timepoint. Useful for capturing temporal dynamics and transitions in correlations. Parameters ---------- T : int Number of timepoints in the timeseries params : dict, optional Dictionary containing Mexican Hat parameters. Default: {'sigma': 10} - 'sigma' : float, scale parameter of the Mexican Hat wavelet Returns ------- numpy.ndarray T x T matrix of Mexican Hat weights, where weights[i,j] represents the weight given to timepoint j when computing correlations at timepoint i Examples -------- >>> import timecorr as tc >>> weights = tc.mexican_hat_weights(50, {'sigma': 5}) >>> print(weights.shape) # (50, 50) """ if params is None: params = mexican_hat_params absdiffs = toeplitz(np.arange(T)) sqdiffs = toeplitz(np.arange(T) ** 2) a = np.divide(2, np.sqrt(3 * params["sigma"]) * np.power(math.pi, 0.25)) b = 1 - np.power(np.divide(absdiffs, params["sigma"]), 2) c = np.exp(-np.divide(sqdiffs, 2 * np.power(params["sigma"], 2))) return np.multiply(a, np.multiply(b, c))
[docs] def boxcar_weights(T, params=boxcar_params): if params is None: params = boxcar_params return np.multiply(toeplitz(np.arange(T)) < params["width"] / 2.0, 1.0)
def format_data(data): def zero_nans(x): x[np.isnan(x)] = 0 return x x = hyp.tools.format_data( data, ppca=False, ) return list(map(zero_nans, x)) def _is_empty(dict): if not bool(dict): return True return False
[docs] def wcorr(a, b, weights): """ Compute moment-by-moment correlations between sets of observations :param a: a number-of-timepoints by number-of-features observations matrix :param b: a number-of-timepoints by number-of-features observations matrix :param weights: a number-of-timepoints by number-of-timepoints weights matrix specifying the per-timepoint weights to be considered (for each timepoint) :return: a a.shape[1] by b.shape[1] by weights.shape[0] array of per-timepoint correlation matrices. """ def weighted_var_diffs(x, w): w[np.isnan(w)] = 0 if np.sum(np.abs(w)) == 0: weights_tiled = np.ones(x.shape) else: weights_tiled = np.tile(w[:, np.newaxis], [1, x.shape[1]]) mx = np.sum(np.multiply(weights_tiled, x), axis=0)[:, np.newaxis].T diffs = x - np.tile(mx, [x.shape[0], 1]) varx = np.sum(diffs**2, axis=0)[:, np.newaxis].T return varx, diffs autocorrelation = np.isclose(a, b).all() corrs = np.zeros([a.shape[1], b.shape[1], weights.shape[1]]) for t in np.arange(weights.shape[1]): vara, diffs_a = weighted_var_diffs(a, weights[:, t]) if autocorrelation: varb = vara diffs_b = diffs_a else: varb, diffs_b = weighted_var_diffs(b, weights[:, t]) alpha = np.dot(diffs_a.T, diffs_b) beta = np.sqrt(np.dot(vara.T, varb)) corrs[:, :, t] = np.divide(alpha, beta) return corrs
[docs] def wisfc(data, timepoint_weights, subject_weights=None): """ Compute moment-by-moment correlations between sets of observations :data: a list of number-of-timepoints by V matrices :timepoint weights: a number-of-timepoints by number-of-timepoints weights matrix specifying the per-timepoint weights to be considered (for each timepoint) :subject weights: number-of-subjects by number-of-subjects weights matrix :return: a list of number-of-timepoints by (V^2 - V)/2 + V correlation matrices """ if type(data) != list: return wisfc([data], timepoint_weights, subject_weights=subject_weights)[0] if subject_weights is None: K = data[0].shape[1] connectomes = np.zeros([len(data), int((K**2 - K) / 2)]) for s in np.arange(len(data)): connectomes[s, :] = 1 - sd.pdist(data[s].T, metric="correlation") subject_weights = 1 - sd.squareform(sd.pdist(connectomes, metric="correlation")) np.fill_diagonal(subject_weights, 0) elif np.isscalar(subject_weights): subject_weights = subject_weights * np.ones([len(data), len(data)]) np.fill_diagonal(subject_weights, 0) corrs = [] for s, a in enumerate(data): b = weighted_mean(np.stack(data, axis=2), axis=2, weights=subject_weights[s, :]) wc = wcorr(a, b, timepoint_weights) wc[np.isnan(wc)] = 0 wc[np.isinf(wc)] = 1 try: corrs.append(mat2vec(wc)) except: print("mystery!") return corrs
[docs] def isfc(data, timepoint_weights): """ Compute Inter-Subject Functional Connectivity (ISFC). ISFC computes correlations between each subject's data and the average of all other subjects, providing a measure of shared neural patterns across participants while avoiding self-correlation artifacts. Parameters ---------- data : list of numpy.ndarray or numpy.ndarray List of timeseries data matrices, each of shape (timepoints, features). If a single array is provided, it will be converted to a list. timepoint_weights : numpy.ndarray T x T matrix of temporal weights for dynamic correlations Returns ------- list of numpy.ndarray List of correlation matrices (one per subject), each showing correlations between that subject and the average of all others Examples -------- >>> import timecorr as tc >>> import numpy as np >>> data = [np.random.randn(100, 10) for _ in range(5)] # 5 subjects >>> weights = tc.gaussian_weights(100, {'var': 10}) >>> isfc_results = tc.isfc(data, weights) >>> len(isfc_results) # 5 """ if type(data) != list: return isfc([data], timepoint_weights)[0] return wisfc(data, timepoint_weights, subject_weights=1 - np.eye(len(data)))
[docs] def autofc(data, timepoint_weights): """ Compute auto-correlations within each subject's data. This function computes correlations within each subject's own data, equivalent to standard within-subject dynamic correlations. Parameters ---------- data : list of numpy.ndarray or numpy.ndarray List of timeseries data matrices, each of shape (timepoints, features). If a single array is provided, it will be converted to a list. timepoint_weights : numpy.ndarray T x T matrix of temporal weights for dynamic correlations Returns ------- list of numpy.ndarray List of correlation matrices (one per subject), each showing within-subject correlations Examples -------- >>> import timecorr as tc >>> import numpy as np >>> data = [np.random.randn(100, 10) for _ in range(3)] # 3 subjects >>> weights = tc.gaussian_weights(100, {'var': 10}) >>> auto_results = tc.autofc(data, weights) >>> len(auto_results) # 3 """ if type(data) != list: return autofc([data], timepoint_weights)[0] return wisfc(data, timepoint_weights, subject_weights=np.eye(len(data)))
def apply_by_row(corrs, f): """ apply the function f to the correlation matrix specified in each row, and return a matrix of the concatenated results :param corrs: a matrix of vectorized correlation matrices (output of mat2vec), or a list of such matrices :param f: a function to apply to each vectorized correlation matrix :return: a matrix of function outputs (for each row of the given matrices), or a list of such matrices """ if type(corrs) is list: return list(map(lambda x: apply_by_row(x, f), corrs)) corrs = vec2mat(corrs) return np.stack( list(map(lambda x: f(np.squeeze(x)), np.split(corrs, corrs.shape[2], axis=2))), axis=0, )
[docs] def corrmean_combine(corrs): """ Compute the mean element-wise correlation across each matrix in a list. :param corrs: a matrix of vectorized correlation matrices (output of mat2vec), or a list of such matrices :return: a mean vectorized correlation matrix """ if not (type(corrs) == list): return corrs elif np.shape(corrs)[0] == 1: return corrs else: return z2r(np.mean(r2z(np.stack(corrs, axis=2)), axis=2))
[docs] def mean_combine(vals): """ Compute the element-wise mean across each matrix in a list. :param vals: a matrix, or a list of matrices :return: a mean matrix """ if not (type(vals) == list): return vals else: return np.mean(np.stack(vals, axis=2), axis=2)
def tstat_combine(corrs, return_pvals=False): """ Compute element-wise t-tests (comparing distribution means to 0) across each correlation matrix in a list. :param corrs: a matrix of vectorized correlation matrices (output of mat2vec), or a list of such matrices :param return_pvals: Boolean (default: False). If True, return a second matrix (or list) of the corresponding t-tests' p-values :return: a matrix of t-statistics of the same shape as a matrix of vectorized correlation matrices """ if not (type(corrs) == list): ts = corrs ps = np.nan * np.zeros_like(corrs) else: ts, ps = ttest(r2z(np.stack(corrs, axis=2)), popmean=0, axis=2) if return_pvals: return ts, ps else: return ts def null_combine(corrs): """ Placeholder function that returns the input :param corrs: a matrix of vectorized correlation matrices (output of mat2vec), or a list of such matrices :return: the input """ return corrs
[docs] def reduce(corrs, rfun=None): """ :param corrs: a matrix of vectorized correlation matrices (output of mat2vec), or a list of such matrices :param rfun: function to use for dimensionality reduction. All hypertools and scikit-learn functions are supported: PCA, IncrementalPCA, SparsePCA, MiniBatchSparsePCA, KernelPCA, FastICA, FactorAnalysis, TruncatedSVD, DictionaryLearning, MiniBatchDictionaryLearning, TSNE, Isomap, SpectralEmbedding, LocallyLinearEmbedding, MDS, and UMAP. Can be passed as a string, but for finer control of the model parameters, pass as a dictionary, e.g. reduction={‘model’ : ‘PCA’, ‘params’ : {‘whiten’ : True}}. See scikit-learn specific model docs for details on parameters supported for each model. Another option is to use graph theoretic measures computed for each node. The following measures are supported: eigenvector_centrality, pagerank_centrality, and strength. (Each of these must be specified as a string; dictionaries not supported.) Default: None (no dimensionality reduction) :return: dimensionality-reduced (or original) correlation matrices """ # Define graph measures directly instead of using brainconn def eigenvector_centrality_und(W): """ Compute eigenvector centrality for undirected graph. Parameters ---------- W : numpy.ndarray Undirected weighted/binary connection matrix Returns ------- numpy.ndarray Eigenvector centrality """ from scipy.linalg import eigh # Ensure matrix is symmetric W = (W + W.T) / 2 # Compute eigenvalues and eigenvectors eigenvalues, eigenvectors = eigh(W) # Get the eigenvector corresponding to the largest eigenvalue max_idx = np.argmax(eigenvalues) centrality = np.abs(eigenvectors[:, max_idx]) return centrality def pagerank_centrality(W, d=0.85, falff=None): """ Compute PageRank centrality using the same algorithm as brainconn. Parameters ---------- W : numpy.ndarray Weighted/binary connection matrix (adjacency matrix) d : float Damping factor (default: 0.85) falff : numpy.ndarray or None Initial page rank probability, non-negative values. Default value is None. If not specified, a naive bayesian prior is used. Returns ------- numpy.ndarray PageRank centrality values """ from scipy import linalg N = len(W) if falff is None: norm_falff = np.ones((N,)) / N else: norm_falff = falff / np.sum(falff) deg = np.sum(W, axis=0) deg[deg == 0] = 1 D1 = np.diag(1 / deg) B = np.eye(N) - d * np.dot(W, D1) b = (1 - d) * norm_falff r = linalg.solve(B, b) r /= np.sum(r) return r def strengths_und(W): """ Compute node strength for undirected graph. Parameters ---------- W : numpy.ndarray Undirected weighted connection matrix Returns ------- numpy.ndarray Node strengths """ # For undirected graphs, strength is the sum of weights return np.sum(W, axis=1) # Set up graph measures graph_measures = { "eigenvector_centrality": eigenvector_centrality_und, "pagerank_centrality": lambda x: pagerank_centrality(x, d=0.85), "strength": strengths_und, } if rfun is None: return corrs get_V = lambda x: int(np.divide(np.sqrt(8 * x + 1) - 1, 2)) if type(corrs) is list: V = get_V(corrs[0].shape[1]) else: V = get_V(corrs.shape[1]) if rfun in graph_measures.keys(): return apply_by_row(corrs, graph_measures[rfun]) else: # Limit V to the number of samples to avoid PCA error if type(corrs) is list: n_samples = corrs[0].shape[0] else: n_samples = corrs.shape[0] ndims = min(V, n_samples) red_corrs = hyp.reduce(corrs, reduce=rfun, ndims=ndims) D = np.shape(red_corrs)[-1] if D < V: # Pad with zeros to match expected output size if isinstance(red_corrs, list): # Handle list of arrays red_corrs = [ np.hstack((arr, np.zeros((arr.shape[0], V - D)))) for arr in red_corrs ] elif red_corrs.ndim == 2: red_corrs = np.hstack( (red_corrs, np.zeros((red_corrs.shape[0], V - D))) ) elif red_corrs.ndim == 3: red_corrs = np.hstack( ( red_corrs, np.zeros((red_corrs.shape[0], red_corrs.shape[1], V - D)), ) ) else: # Handle other cases or just return as is pass return red_corrs
[docs] def smooth(w, windowsize=10, kernel_fun=laplace_weights, kernel_params=laplace_params): if type(w) is list: return list( map( lambda x: smooth( x, windowsize=windowsize, kernel_fun=kernel_fun, kernel_params=kernel_params, ), w, ) ) assert type(windowsize) == int, "smoothing kernel must have integer width" k = kernel_fun(windowsize, params=kernel_params) if iseven(windowsize): kernel = np.divide( k[int(np.floor(windowsize / 2) - 1), :] + k[int(np.ceil(windowsize / 2) - 1), :], 2, ) else: kernel = k[int(np.floor(windowsize / 2)), :] kernel /= kernel.sum() x = np.zeros_like(w) for i in range(0, w.shape[1]): x[:, i] = np.convolve(kernel, w[:, i], mode="same") return x
[docs] def timepoint_decoder( data, mu=None, nfolds=2, level=0, cfun=isfc, weights_fun=laplace_weights, weights_params=laplace_params, combine=mean_combine, rfun=None, ): """ :param data: a list of number-of-observations by number-of-features matrices :param mu: list of floats sum to one for mixing proportions vector :param nfolds: number of cross-validation folds (train using out-of-fold data; test using in-fold data) :param level: integer or list of integers for levels to be evaluated (default:0) :param cfun: function for transforming the group data (default: isfc) :param weights_fun: used to compute per-timepoint weights for cfun; default: laplace_weights :param weights_params: parameters passed to weights_fun; default: laplace_params :params combine: function for combining data within each group, or a list of such functions (default: mean_combine) :param rfun: function for reducing output (default: None) :return: results dictionary with the following keys: 'rank': mean percentile rank (across all timepoints and folds) in the decoding distribution of the true timepoint 'accuracy': mean percent accuracy (across all timepoints and folds) 'error': mean estimation error (across all timepoints and folds) between the decoded and actual window numbers, expressed as a percentage of the total number of windows """ assert ( len(np.unique(list(map(lambda x: x.shape[0], data)))) == 1 ), "all data matrices must have the same number of timepoints" assert ( len(np.unique(list(map(lambda x: x.shape[1], data)))) == 1 ), "all data matrices must have the same number of features" group_assignments = get_xval_assignments(len(data), nfolds) orig_level = level orig_level = np.ravel(orig_level) if type(level) is int: level = np.arange(level + 1) level = np.ravel(level) assert type(level) is np.ndarray, "level needs be an integer, list, or np.ndarray" assert not np.any(level < 0), "level cannot contain negative numbers" if mu: orig_level = level.max() orig_level = np.ravel(orig_level) assert np.sum(mu) == 1, "weights must sum to one" assert ( np.shape(mu)[0] == level.max() + 1 ), "weights lengths need to be the same as number of levels" if not np.all(np.arange(level.max() + 1) == level): level = np.arange(level.max() + 1) if callable(combine): combine = [combine] * np.shape(level)[0] combine = np.ravel(combine) assert ( type(combine) is np.ndarray and type(combine[0]) is not np.str_ ), "combine needs to be a function, list of functions, or np.ndarray of functions" assert len(level) == len( combine ), "combine length need to be the same as level if input is type np.ndarray or list" if callable(cfun): cfun = [cfun] * np.shape(level)[0] cfun = np.ravel(cfun) assert ( type(cfun) is np.ndarray and type(cfun[0]) is not np.str_ ), "combine needs be a function, list of functions, or np.ndarray of functions" assert len(level) == len( cfun ), "cfun length need to be the same as level if input is type np.ndarray or list" if type(rfun) not in [list, np.ndarray]: rfun = [rfun] * np.shape(level)[0] p_rfun = [None] * np.shape(level)[0] assert len(level) == len(rfun), ( "parameter lengths need to be the same as level if input is " "type np.ndarray or list" ) results_pd = pd.DataFrame() corrs = 0 for i in range(0, nfolds): in_raw = [] out_raw = [] for v in level: if v == 0: in_data = [x for x in data[group_assignments == i]] out_data = [x for x in data[group_assignments != i]] in_smooth, out_smooth, in_raw, out_raw = reduce_wrapper( folding_levels( in_data, out_data, level=v, cfun=None, rfun=p_rfun, combine=combine, weights_fun=weights_fun, weights_params=weights_params, ), level=v, rfun=rfun, ) else: in_smooth, out_smooth, in_raw, out_raw = reduce_wrapper( folding_levels( in_raw, out_raw, level=v, cfun=cfun, rfun=p_rfun, combine=combine, weights_fun=weights_fun, weights_params=weights_params, ), level=v, rfun=rfun, ) if mu: next_corrs = 1 - sd.cdist(in_smooth, out_smooth, "correlation") corrs += mu[v] * z2r(next_corrs) else: corrs = 1 - sd.cdist(in_smooth, out_smooth, "correlation") if v in orig_level: if mu: corrs = r2z(corrs) next_results_pd = decoder(corrs) next_results_pd["level"] = v next_results_pd["folds"] = i results_pd = pd.concat([results_pd, next_results_pd]) return results_pd
[docs] def weighted_timepoint_decoder( data, nfolds=2, level=0, optimize_levels=None, cfun=isfc, weights_fun=laplace_weights, weights_params=laplace_params, combine=mean_combine, rfun=None, opt_init=None, ): """ :param data: a list of number-of-observations by number-of-features matrices :param nfolds: number of cross-validation folds (train using out-of-fold data; test using in-fold data) :param level: integer or list of integers for levels to be evaluated (default:0) :param cfun: function for transforming the group data (default: isfc) :param weights_fun: used to compute per-timepoint weights for cfun; default: laplace_weights :param weights_params: parameters passed to weights_fun; default: laplace_params :params combine: function for combining data within each group, or a list of such functions (default: mean_combine) :param rfun: function for reducing output (default: None) :return: results dictionary with the following keys: 'rank': mean percentile rank (across all timepoints and folds) in the decoding distribution of the true timepoint 'accuracy': mean percent accuracy (across all timepoints and folds) 'error': mean estimation error (across all timepoints and folds) between the decoded and actual window numbers, expressed as a percentage of the total number of windows """ assert ( len(np.unique(list(map(lambda x: x.shape[0], data)))) == 1 ), "all data matrices must have the same number of timepoints" assert ( len(np.unique(list(map(lambda x: x.shape[1], data)))) == 1 ), "all data matrices must have the same number of features" if nfolds == 1: sub_nfolds = 1 nfolds = 2 warnings.warn("When nfolds is set to one, the analysis will be circular.") else: sub_nfolds = 1 group_assignments = get_xval_assignments(len(data), nfolds) orig_level = level orig_level = np.ravel(orig_level) if type(level) is int: level = np.arange(level + 1) level = np.ravel(level) assert type(level) is np.ndarray, "level needs be an integer, list, or np.ndarray" assert not np.any(level < 0), "level cannot contain negative numbers" if not np.all(np.arange(level.max() + 1) == level): level = np.arange(level.max() + 1) if callable(combine): combine = [combine] * np.shape(level)[0] combine = np.ravel(combine) assert type(combine) is np.ndarray and type(combine[0]) is not np.str_, ( "combine needs to be a function, list of " "functions, or np.ndarray of functions" ) assert len(level) == len( combine ), "combine length need to be the same as level if input is type np.ndarray or list" if callable(cfun): cfun = [cfun] * np.shape(level)[0] cfun = np.ravel(cfun) assert type(cfun) is np.ndarray and type(cfun[0]) is not np.str_, ( "combine needs be a function, list of functions, " "or np.ndarray of functions" ) assert len(level) == len( cfun ), "cfun length need to be the same as level if input is type np.ndarray or list" if type(rfun) not in [list, np.ndarray]: rfun = [rfun] * np.shape(level)[0] p_rfun = [None] * np.shape(level)[0] assert len(level) == len(rfun), ( "parameter lengths need to be the same as level if input is " "type np.ndarray or list" ) results_pd = pd.DataFrame() for i in range(0, nfolds): in_raw = [] out_raw = [] sub_in_raw = [] sub_out_raw = [] sub_corrs = [] corrs = [] subgroup_assignments = get_xval_assignments( len(data[group_assignments == i]), nfolds ) in_data = [x for x in data[group_assignments == i]] out_data = [x for x in data[group_assignments != i]] for v in level: if v == 0: in_smooth, out_smooth, in_raw, out_raw = folding_levels( in_data, out_data, level=v, cfun=None, rfun=p_rfun, combine=combine, weights_fun=weights_fun, weights_params=weights_params, ) next_corrs = 1 - sd.cdist( mean_combine([x for x in in_raw]), mean_combine([x for x in out_raw]), "correlation", ) # next_corrs = (1 - sd.cdist(mean_combine(in_smooth), mean_combine(out_smooth), # 'correlation')) corrs.append(next_corrs) for s in range(0, 1): sub_in_data = [ x for x in data[group_assignments == i][subgroup_assignments == s] ] sub_out_data = [ x for x in data[group_assignments == i][subgroup_assignments != s] ] sub_in_smooth, sub_out_smooth, sub_in_raw, sub_out_raw = ( folding_levels( sub_in_data, sub_out_data, level=v, cfun=None, rfun=p_rfun, combine=combine, weights_fun=weights_fun, weights_params=weights_params, ) ) next_subcorrs = 1 - sd.cdist( mean_combine([x for x in sub_in_raw]), mean_combine([x for x in sub_out_raw]), "correlation", ) # next_subcorrs = (1 - sd.cdist(mean_combine(sub_in_smooth), # mean_combine(sub_out_smooth), 'correlation')) sub_corrs.append(next_subcorrs) else: in_smooth, out_smooth, in_raw, out_raw = folding_levels( in_raw, out_raw, level=v, cfun=cfun, rfun=rfun, combine=combine, weights_fun=weights_fun, weights_params=weights_params, ) next_corrs = 1 - sd.cdist(in_smooth, out_smooth, "correlation") corrs.append(next_corrs) print("corrs " + str(v)) for s in range(0, 1): sub_in_smooth, sub_out_smooth, sub_in_raw, sub_out_raw = ( folding_levels( sub_in_raw, sub_out_raw, level=v, cfun=cfun, rfun=rfun, combine=combine, weights_fun=weights_fun, weights_params=weights_params, ) ) print("sub corrs " + str(v) + str(s)) next_subcorrs = 1 - sd.cdist( sub_in_smooth, sub_out_smooth, "correlation" ) sub_corrs.append(next_subcorrs) sub_corrs = np.array(sub_corrs) corrs = np.array(corrs) if sub_nfolds == 1: sub_corrs = corrs if not optimize_levels: optimize_levels = range(v + 1) opt_over = [] for lev in optimize_levels: opt_over.append(lev) sub_out_corrs = sub_corrs[opt_over, :, :] out_corrs = corrs[opt_over, :, :] mu = optimize_weights(sub_out_corrs, opt_init) w_corrs = weight_corrs(out_corrs, mu) next_results_pd = decoder(w_corrs) print(next_results_pd) next_results_pd["level"] = lev next_results_pd["folds"] = i mu_pd = pd.DataFrame() for c in opt_over: mu_pd["level_" + str(c)] = [0] mu_pd += mu next_results_pd = pd.concat( [next_results_pd, mu_pd], axis=1, join_axes=[next_results_pd.index] ) results_pd = pd.concat([results_pd, next_results_pd]) return results_pd
def folding_levels( infold_data, outfold_data, level=0, cfun=None, weights_fun=None, weights_params=None, combine=None, rfun=None, ): from .timecorr import timecorr if rfun is None: rfun = [None] * np.shape(level)[0] p_cfun = eval("autofc") if level == 0: in_fold_smooth = np.asarray( timecorr( [x for x in infold_data], cfun=None, rfun=rfun[level], combine=combine[level], weights_function=weights_fun, weights_params=weights_params, ) ) out_fold_smooth = np.asarray( timecorr( [x for x in outfold_data], cfun=None, rfun=rfun[level], combine=combine[level], weights_function=weights_fun, weights_params=weights_params, ) ) in_fold_raw = infold_data out_fold_raw = outfold_data else: in_fold_smooth = np.asarray( timecorr( list(infold_data), cfun=cfun[level], rfun=rfun[level], combine=combine[level], weights_function=weights_fun, weights_params=weights_params, ) ) out_fold_smooth = np.asarray( timecorr( list(outfold_data), cfun=cfun[level], rfun=rfun[level], combine=combine[level], weights_function=weights_fun, weights_params=weights_params, ) ) in_fold_raw = np.asarray( timecorr( list(infold_data), cfun=p_cfun, rfun=rfun[level], combine=null_combine, weights_function=eye_weights, weights_params=eye_params, ) ) out_fold_raw = np.asarray( timecorr( list(outfold_data), cfun=p_cfun, rfun=rfun[level], combine=null_combine, weights_function=eye_weights, weights_params=eye_params, ) ) return in_fold_smooth, out_fold_smooth, in_fold_raw, out_fold_raw def weighted_timepoint_decoder_ec( data, nfolds=2, level=0, optimize_levels=None, cfun=isfc, weights_fun=laplace_weights, weights_params=laplace_params, combine=mean_combine, rfun=None, opt_init=None, ): """ :param data: a list of number-of-observations by number-of-features matrices :param nfolds: number of cross-validation folds (train using out-of-fold data; test using in-fold data) :param level: integer or list of integers for levels to be evaluated (default:0) :param cfun: function for transforming the group data (default: isfc) :param weights_fun: used to compute per-timepoint weights for cfun; default: laplace_weights :param weights_params: parameters passed to weights_fun; default: laplace_params :params combine: function for combining data within each group, or a list of such functions (default: mean_combine) :param rfun: function for reducing output (default: None) :return: results dictionary with the following keys: 'rank': mean percentile rank (across all timepoints and folds) in the decoding distribution of the true timepoint 'accuracy': mean percent accuracy (across all timepoints and folds) 'error': mean estimation error (across all timepoints and folds) between the decoded and actual window numbers, expressed as a percentage of the total number of windows """ if nfolds == 1: sub_nfolds = 1 nfolds = 2 warnings.warn("When nfolds is set to one, the analysis will be circular.") else: sub_nfolds = 1 group_assignments = get_xval_assignments(data.shape[1], nfolds) orig_level = level orig_level = np.ravel(orig_level) if type(level) is int: level = np.arange(level + 1) level = np.ravel(level) assert type(level) is np.ndarray, "level needs be an integer, list, or np.ndarray" assert not np.any(level < 0), "level cannot contain negative numbers" if not np.all(np.arange(level.max() + 1) == level): level = np.arange(level.max() + 1) if callable(combine): combine = [combine] * np.shape(level)[0] combine = np.ravel(combine) assert type(combine) is np.ndarray and type(combine[0]) is not np.str_, ( "combine needs to be a function, list of " "functions, or np.ndarray of functions" ) assert len(level) == len( combine ), "combine length need to be the same as level if input is type np.ndarray or list" if callable(cfun): cfun = [cfun] * np.shape(level)[0] cfun = np.ravel(cfun) assert type(cfun) is np.ndarray and type(cfun[0]) is not np.str_, ( "combine needs be a function, list of functions, " "or np.ndarray of functions" ) assert len(level) == len( cfun ), "cfun length need to be the same as level if input is type np.ndarray or list" if type(rfun) not in [list, np.ndarray]: rfun = [rfun] * np.shape(level)[0] p_rfun = [None] * np.shape(level)[0] assert len(level) == len(rfun), ( "parameter lengths need to be the same as level if input is " "type np.ndarray or list" ) results_pd = pd.DataFrame() for i in range(0, nfolds): sub_corrs = [] corrs = [] subgroup_assignments = get_xval_assignments( len(data[0][group_assignments == i]), nfolds ) in_data = [x for x in data[0][group_assignments == i]] out_data = [x for x in data[0][group_assignments != i]] for v in level: if v == 0: in_smooth, out_smooth, in_raw, out_raw = folding_levels_ec( in_data, out_data, level=v, cfun=None, rfun=p_rfun, combine=combine, weights_fun=weights_fun, weights_params=weights_params, ) # next_corrs = (1 - sd.cdist(mean_combine(in_smooth), mean_combine(out_smooth), # 'correlation')) next_corrs = 1 - sd.cdist( mean_combine(in_raw), mean_combine(out_raw), "correlation" ) corrs.append(next_corrs) for s in range(0, 1): sub_in_data = [ x for x in data[0][group_assignments == i][ subgroup_assignments == s ] ] sub_out_data = [ x for x in data[0][group_assignments == i][ subgroup_assignments != s ] ] sub_in_smooth, sub_out_smooth, sub_in_raw, sub_out_raw = ( folding_levels_ec( sub_in_data, sub_out_data, level=v, cfun=None, rfun=p_rfun, combine=combine, weights_fun=weights_fun, weights_params=weights_params, ) ) # next_subcorrs = (1 - sd.cdist(mean_combine(sub_in_smooth), # mean_combine(sub_out_smooth), 'correlation')) next_subcorrs = 1 - sd.cdist( mean_combine(sub_in_raw), mean_combine(sub_out_raw), "correlation", ) sub_corrs.append(next_subcorrs) elif v == 1: in_smooth, out_smooth, in_raw, out_raw = folding_levels_ec( in_raw, out_raw, level=v, cfun=cfun, rfun=rfun, combine=combine, weights_fun=weights_fun, weights_params=weights_params, ) next_corrs = 1 - sd.cdist( mean_combine(in_smooth), mean_combine(out_smooth), "correlation" ) corrs.append(next_corrs) for s in range(0, 1): sub_in_smooth, sub_out_smooth, sub_in_raw, sub_out_raw = ( folding_levels_ec( sub_in_raw, sub_out_raw, level=v, cfun=cfun, rfun=rfun, combine=combine, weights_fun=weights_fun, weights_params=weights_params, ) ) next_subcorrs = 1 - sd.cdist( mean_combine(sub_in_smooth), mean_combine(sub_out_smooth), "correlation", ) sub_corrs.append(next_subcorrs) else: in_raw = [x for x in data[v - 1][group_assignments == i]] out_raw = [x for x in data[v - 1][group_assignments != i]] in_smooth, out_smooth, in_raw, out_raw = folding_levels_ec( in_raw, out_raw, level=v, cfun=cfun, rfun=rfun, combine=combine, weights_fun=weights_fun, weights_params=weights_params, ) next_corrs = 1 - sd.cdist(in_smooth, out_smooth, "correlation") corrs.append(next_corrs) print("corrs " + str(v)) for s in range(0, 1): sub_in_raw = [ x for x in data[v - 1][group_assignments == i][ subgroup_assignments == s ] ] sub_out_raw = [ x for x in data[v - 1][group_assignments == i][ subgroup_assignments != s ] ] sub_in_smooth, sub_out_smooth, sub_in_raw, sub_out_raw = ( folding_levels_ec( sub_in_raw, sub_out_raw, level=v, cfun=cfun, rfun=rfun, combine=combine, weights_fun=weights_fun, weights_params=weights_params, ) ) print("sub corrs " + str(v) + str(s)) next_subcorrs = 1 - sd.cdist( sub_in_smooth, sub_out_smooth, "correlation" ) sub_corrs.append(next_subcorrs) sub_corrs = np.array(sub_corrs) corrs = np.array(corrs) if sub_nfolds == 1: sub_corrs = corrs if not optimize_levels: optimize_levels = range(v + 1) opt_over = [] for lev in optimize_levels: opt_over.append(lev) sub_out_corrs = sub_corrs[opt_over, :, :] out_corrs = corrs[opt_over, :, :] mu = optimize_weights(sub_out_corrs, opt_init) w_corrs = weight_corrs(out_corrs, mu) next_results_pd = decoder(w_corrs) print(next_results_pd) next_results_pd["level"] = lev next_results_pd["folds"] = i mu_pd = pd.DataFrame() for c in opt_over: mu_pd["level_" + str(c)] = [0] mu_pd += mu next_results_pd = pd.concat( [next_results_pd, mu_pd], axis=1, join_axes=[next_results_pd.index] ) results_pd = pd.concat([results_pd, next_results_pd]) return results_pd def folding_levels_ec( infold_data, outfold_data, level=0, cfun=None, weights_fun=None, weights_params=None, combine=None, rfun=None, ): from .timecorr import timecorr if rfun is None: rfun = [None] * np.shape(level)[0] p_cfun = eval("autofc") if level == 0: in_fold_smooth = np.asarray( timecorr( [x for x in infold_data], cfun=None, rfun=rfun[level], combine=combine[level], weights_function=weights_fun, weights_params=weights_params, ) ) out_fold_smooth = np.asarray( timecorr( [x for x in outfold_data], cfun=None, rfun=rfun[level], combine=combine[level], weights_function=weights_fun, weights_params=weights_params, ) ) in_fold_raw = infold_data out_fold_raw = outfold_data else: raw_rfun = [None] * (level + 1) in_fold_smooth = np.asarray( timecorr( list(infold_data), cfun=cfun[level], rfun=rfun[level], combine=combine[level], weights_function=weights_fun, weights_params=weights_params, ) ) out_fold_smooth = np.asarray( timecorr( list(outfold_data), cfun=cfun[level], rfun=rfun[level], combine=combine[level], weights_function=weights_fun, weights_params=weights_params, ) ) in_fold_raw = infold_data out_fold_raw = outfold_data return in_fold_smooth, out_fold_smooth, in_fold_raw, out_fold_raw def pca_decoder( data, nfolds=2, dims=10, cfun=isfc, weights_fun=laplace_weights, weights_params=laplace_params, combine=mean_combine, rfun=None, ): """ :param data: a list of number-of-observations by number-of-features matrices :param nfolds: number of cross-validation folds (train using out-of-fold data; test using in-fold data) :param cfun: function for transforming the group data (default: isfc) :param weights_fun: used to compute per-timepoint weights for cfun; default: laplace_weights :param weights_params: parameters passed to weights_fun; default: laplace_params :params combine: function for combining data within each group, or a list of such functions (default: mean_combine) :param rfun: function for reducing output (default: None) :return: results dictionary with the following keys: 'rank': mean percentile rank (across all timepoints and folds) in the decoding distribution of the true timepoint 'accuracy': mean percent accuracy (across all timepoints and folds) 'error': mean estimation error (across all timepoints and folds) between the decoded and actual window numbers, expressed as a percentage of the total number of windows """ assert ( len(np.unique(list(map(lambda x: x.shape[0], data)))) == 1 ), "all data matrices must have the same number of timepoints" assert ( len(np.unique(list(map(lambda x: x.shape[1], data)))) == 1 ), "all data matrices must have the same number of features" pca_data = np.asarray(hyp.reduce(list(data), ndims=dims)) group_assignments = get_xval_assignments(len(pca_data), nfolds) results_pd = pd.DataFrame() for i in range(0, nfolds): for d in range(1, dims + 1): in_data = np.asarray([x for x in pca_data[group_assignments == i]])[ :, :, :d ] out_data = np.asarray([x for x in pca_data[group_assignments != i]])[ :, :, :d ] in_smooth, out_smooth, in_raw, out_raw = folding_levels( in_data, out_data, level=0, cfun=isfc, rfun=[None], combine=[mean_combine], weights_fun=weights_fun, weights_params=weights_params, ) if d < 3: in_smooth = np.hstack( (in_smooth, np.zeros((in_smooth.shape[0], 3 - in_smooth.shape[1]))) ) out_smooth = np.hstack( ( out_smooth, np.zeros((out_smooth.shape[0], 3 - out_smooth.shape[1])), ) ) corrs = 1 - sd.cdist(in_smooth, out_smooth, "correlation") corrs = np.array(corrs) next_results_pd = decoder(corrs) next_results_pd["dims"] = d next_results_pd["folds"] = i results_pd = pd.concat([results_pd, next_results_pd]) return results_pd def reduce_wrapper(data, dims=10, level=0, rfun=None): if not level == 0: all_smooth = list(data[0][np.newaxis, :, :]) + list(data[1][np.newaxis, :, :]) all_raw = list(data[2]) + list(data[3]) all_smooth_reduced = reduce(all_smooth, rfun=rfun[level]) all_raw_reduced = reduce(all_raw, rfun=rfun[level]) return ( all_smooth_reduced[0], all_smooth_reduced[1], all_raw_reduced[0], all_raw_reduced[1], ) else: return data[0], data[1], data[2], data[3] def optimize_weights(corrs, opt_init=None): b = (0, 1) bns = (b,) * np.shape(corrs)[0] con1 = {"type": "eq", "fun": lambda x: 1 - np.sum(x)} if opt_init == "random": x0 = sum_to_x(np.shape(corrs)[0], 1) elif opt_init == "last": x0 = np.repeat(1 / np.shape(corrs)[0], np.shape(corrs)[0]) x0[-1] = 1 else: x0 = np.repeat(1 / np.shape(corrs)[0], np.shape(corrs)[0]) min_mu = minimize( calculate_error, x0, args=corrs, bounds=bns, constraints=con1, options={"disp": True, "eps": 1e-1}, ) return min_mu.x def sum_to_x(n, x): values = [0.0, x] + list(np.random.uniform(low=0.0, high=x, size=n - 1)) values.sort() return np.asarray([values[i + 1] - values[i] for i in range(n)]) def calculate_error(mu, corrs, metric="error", sign=1): results = decoder(weight_corrs(corrs, mu)) return sign * results[metric].values def weight_corrs(corrs, mu): assert np.shape(mu)[0] == len(corrs) weighted_corrs = 0 for i in np.arange(np.shape(corrs)[0]): weighted_corrs += mu[i] * r2z(corrs[i]) return z2r(weighted_corrs) def decoder(corrs): next_results_pd = pd.DataFrame({"rank": [0], "accuracy": [0], "error": [0]}) for t in np.arange(corrs.shape[0]): decoded_inds = np.argmax(corrs[t, :]) next_results_pd["error"] += ( np.mean(np.abs(decoded_inds - np.array(t))) / corrs.shape[0] ) next_results_pd["accuracy"] += np.mean(decoded_inds == np.array(t)) next_results_pd["rank"] += np.mean( list(map((lambda x: int(x)), (corrs[t, :] <= corrs[t, t]))) ) next_results_pd["error"] = next_results_pd["error"].values / corrs.shape[0] next_results_pd["accuracy"] = next_results_pd["accuracy"].values / corrs.shape[0] next_results_pd["rank"] = next_results_pd["rank"].values / corrs.shape[0] return next_results_pd def weighted_mean(x, axis=None, weights=None, tol=1e-5): if axis is None: axis = len(x.shape) - 1 if weights is None: weights = np.ones([1, x.shape[axis]]) # remove nans and force weights to sum to 1 weights[np.isnan(weights)] = 0 if np.sum(weights) == 0: return np.mean(x, axis=axis) # get rid of 0 weights to avoid unnecessary computations good_inds = np.abs(weights) > tol weights[good_inds] /= np.sum(weights[good_inds]) weighted_sum = np.zeros(np.take(x, 0, axis=axis).shape) for i in np.where(good_inds)[0]: weighted_sum += weights[i] * np.take(x, i, axis=axis) return weighted_sum def rmdiag(m): return m - np.diag(np.diag(m)) def r2z(r): """ Function that calculates the Fisher z-transformation Parameters ---------- r : int or ndarray Correlation value Returns ------- result : int or ndarray Fishers z transformed correlation value """ return 0.5 * (np.log(1 + r) - np.log(1 - r)) def z2r(z): """ Function that calculates the inverse Fisher z-transformation Parameters ---------- z : int or ndarray Fishers z transformed correlation value Returns ------- result : int or ndarray Correlation value """ r = np.divide((np.exp(2 * z) - 1), (np.exp(2 * z) + 1)) r[np.isnan(r)] = 0 r[np.isinf(r)] = np.sign(r)[np.isinf(r)] return r def isodd(x): return np.remainder(x, 2) == 1 def iseven(x): return np.remainder(x, 2) == 0
[docs] def mat2vec(m): """ Function that converts correlation matrix to a vector Parameters ---------- m : ndarray Correlation matix Returns ------- result : ndarray Vector """ K = m.shape[0] V = int((((K**2) - K) / 2) + K) if m.ndim == 2: y = np.zeros(V) y[0:K] = np.diag(m) # force m to by symmetric m = np.triu(rmdiag(m)) m[np.isnan(m)] = 0 m += m.T y[K:] = sd.squareform(rmdiag(m)) elif m.ndim == 3: T = m.shape[2] y = np.zeros([T, V]) for t in np.arange(T): y[t, :] = mat2vec(np.squeeze(m[:, :, t])) else: raise ValueError("Input must be a 2 or 3 dimensional Numpy array") return y
[docs] def vec2mat(v): """ Function that converts vector back to correlation matrix Parameters ---------- result : ndarray Vector Returns ------- m : ndarray Correlation matix """ if (v.ndim == 1) or (v.shape[0] == 1): x = int(0.5 * (np.sqrt(8 * len(v) + 1) - 1)) return sd.squareform(v[x:]) + np.diag(v[0:x]) elif v.ndim == 2: a = vec2mat(v[0, :]) y = np.zeros([a.shape[0], a.shape[1], v.shape[0]]) y[:, :, 0] = a for t in np.arange(1, v.shape[0]): y[:, :, t] = vec2mat(v[t, :]) else: raise ValueError("Input must be a 1 or 2 dimensional Numpy array") return y
def symmetric(m): return np.isclose(m, m.T).all() def get_xval_assignments(ndata, nfolds): group_assignments = np.zeros(ndata) groupsize = int(np.ceil(ndata / nfolds)) for i in range(1, nfolds): inds = np.arange(i * groupsize, np.min([(i + 1) * groupsize, ndata])) group_assignments[inds] = i np.random.shuffle(group_assignments) return group_assignments SMALL_SIZE = 18 MEDIUM_SIZE = 21 BIGGER_SIZE = 24 plt.rc("font", size=SMALL_SIZE) # controls default text sizes plt.rc("axes", titlesize=SMALL_SIZE) # fontsize of the axes title plt.rc("axes", labelsize=MEDIUM_SIZE) # fontsize of the x and y labels plt.rc("xtick", labelsize=SMALL_SIZE) # fontsize of the tick labels plt.rc("ytick", labelsize=SMALL_SIZE) # fontsize of the tick labels plt.rc("legend", fontsize=SMALL_SIZE) # legend fontsize plt.rc("figure", titlesize=BIGGER_SIZE) # fontsize of the figure title def plot_weights( weights, t=None, color="k", ax=None, xlab="Time (samples)", ylab="Weights", title=None, outfile=None, ): """ Plot weights Parameters ---------- weights : int or float or None Weights to plot t : int or float Time Returns ------- results : png Plot of weights """ T = weights.shape[0] if t is None: t = np.round(T / 2) ts = np.arange(1, T + 1) if ax is None: ax = plt.gca() ax.plot(ts, weights[int(t), :], color=color) plt.xlim([1, T]) if not (xlab is None): plt.xlabel(xlab) if not (ylab is None): plt.ylabel(ylab) if not (title is None): plt.title(title) plt.tight_layout() if outfile: plt.savefig(outfile) # Don't call plt.show() automatically to avoid hanging in tests/CI # Users can call plt.show() manually if needed # Only show if not in CI environment and no outfile specified import os if not outfile and not (os.environ.get("CI") or os.environ.get("GITHUB_ACTIONS")): plt.show()