Source code for timecorr.helpers

# coding: utf-8
from __future__ import division
import numpy as np
import scipy.spatial.distance as sd
from scipy.special import gamma
from scipy.linalg import toeplitz
from scipy.optimize import minimize
from scipy.stats import ttest_1samp as ttest
import hypertools as hyp
import brainconn as bc
import pandas as pd
import warnings
from matplotlib import pyplot as plt

graph_measures = {'eigenvector_centrality': bc.centrality.eigenvector_centrality_und,
                  'pagerank_centrality': lambda x: bc.centrality.pagerank_centrality(x, d=0.85),
                  'strength': bc.degree.strengths_und}

gaussian_params = {'var': 100}
laplace_params = {'scale': 100}
eye_params = {}
t_params = {'df': 100}
mexican_hat_params = {'sigma': 10}
uniform_params = {}
boxcar_params = {'width': 10}


def gaussian_weights(T, params=gaussian_params):
    if params is None:
        params = gaussian_params

    c1 = np.divide(1, np.sqrt(2 * np.math.pi * params['var']))
    c2 = np.divide(-1, 2 * params['var'])
    sqdiffs = toeplitz(np.arange(T) ** 2)
    return c1 * np.exp(c2 * sqdiffs)

def laplace_weights(T, params=laplace_params):
    if params is None:
        params = laplace_params

    absdiffs = toeplitz(np.arange(T))
    return np.multiply(np.divide(1, 2 * params['scale']), np.exp(-np.divide(absdiffs, params['scale']))) #scale by a factor of 2.5 to prevent near-zero rounding issues


def eye_weights(T, params=eye_params):

    return np.eye(T)

def uniform_weights(T, params=uniform_params):
    return np.ones([T, T])

def t_weights(T, params=t_params):
    if params is None:
        params = t_params

    c1 = np.divide(gamma((params['df'] + 1) / 2), np.sqrt(params['df'] * np.math.pi) * gamma(params['df'] / 2))
    c2 = np.divide(-params['df'] + 1, 2)

    sqdiffs = toeplitz(np.arange(T) ** 2)
    return np.multiply(c1, np.power(1 + np.divide(sqdiffs, params['df']), c2))

def mexican_hat_weights(T, params=mexican_hat_params):
    if params is None:
        params = mexican_hat_params

    absdiffs = toeplitz(np.arange(T))
    sqdiffs = toeplitz(np.arange(T) ** 2)

    a = np.divide(2, np.sqrt(3 * params['sigma']) * np.power(np.math.pi, 0.25))
    b = 1 - np.power(np.divide(absdiffs, params['sigma']), 2)
    c = np.exp(-np.divide(sqdiffs, 2 * np.power(params['sigma'], 2)))

    return np.multiply(a, np.multiply(b, c))

def boxcar_weights(T, params=boxcar_params):
    if params is None:
        params = boxcar_params

    return np.multiply(toeplitz(np.arange(T)) < params['width']/2., 1.)



def format_data(data):
    def zero_nans(x):
        x[np.isnan(x)] = 0
        return x
    
    x = hyp.tools.format_data(data, ppca=False, )
    return list(map(zero_nans, x))
    

def _is_empty(dict):
    if not bool(dict):
        return True
    return False



[docs]def wcorr(a, b, weights): ''' Compute moment-by-moment correlations between sets of observations :param a: a number-of-timepoints by number-of-features observations matrix :param b: a number-of-timepoints by number-of-features observations matrix :param weights: a number-of-timepoints by number-of-timepoints weights matrix specifying the per-timepoint weights to be considered (for each timepoint) :return: a a.shape[1] by b.shape[1] by weights.shape[0] array of per-timepoint correlation matrices. ''' def weighted_var_diffs(x, w): w[np.isnan(w)] = 0 if np.sum(np.abs(w)) == 0: weights_tiled = np.ones(x.shape) else: weights_tiled = np.tile(w[:, np.newaxis], [1, x.shape[1]]) mx = np.sum(np.multiply(weights_tiled, x), axis=0)[:, np.newaxis].T diffs = x - np.tile(mx, [x.shape[0], 1]) varx = np.sum(diffs ** 2, axis=0)[:, np.newaxis].T return varx, diffs autocorrelation = np.isclose(a, b).all() corrs = np.zeros([a.shape[1], b.shape[1], weights.shape[1]]) for t in np.arange(weights.shape[1]): vara, diffs_a = weighted_var_diffs(a, weights[:, t]) if autocorrelation: varb = vara diffs_b = diffs_a else: varb, diffs_b = weighted_var_diffs(b, weights[:, t]) alpha = np.dot(diffs_a.T, diffs_b) beta = np.sqrt(np.dot(vara.T, varb)) corrs[:, :, t] = np.divide(alpha, beta) return corrs
def wisfc(data, timepoint_weights, subject_weights=None): ''' Compute moment-by-moment correlations between sets of observations :data: a list of number-of-timepoints by V matrices :timepoint weights: a number-of-timepoints by number-of-timepoints weights matrix specifying the per-timepoint weights to be considered (for each timepoint) :subject weights: number-of-subjects by number-of-subjects weights matrix :return: a list of number-of-timepoints by (V^2 - V)/2 + V correlation matrices ''' if type(data) != list: return wisfc([data], timepoint_weights, subject_weights=subject_weights)[0] if subject_weights is None: K = data[0].shape[1] connectomes = np.zeros([len(data), int((K ** 2 - K) / 2)]) for s in np.arange(len(data)): connectomes[s, :] = 1 - sd.pdist(data[s].T, metric='correlation') subject_weights = 1 - sd.squareform(sd.pdist(connectomes, metric='correlation')) np.fill_diagonal(subject_weights, 0) elif np.isscalar(subject_weights): subject_weights = subject_weights * np.ones([len(data), len(data)]) np.fill_diagonal(subject_weights, 0) corrs = [] for s, a in enumerate(data): b = weighted_mean(np.stack(data, axis=2), axis=2, weights=subject_weights[s, :]) wc = wcorr(a, b, timepoint_weights) wc[np.isnan(wc)] = 0 wc[np.isinf(wc)] = 1 try: corrs.append(mat2vec(wc)) except: print('mystery!') return corrs def isfc(data, timepoint_weights): if type(data) != list: return isfc([data], timepoint_weights)[0] return wisfc(data, timepoint_weights, subject_weights=1 - np.eye(len(data))) def autofc(data, timepoint_weights): if type(data) != list: return autofc([data], timepoint_weights)[0] return wisfc(data, timepoint_weights, subject_weights=np.eye(len(data))) def apply_by_row(corrs, f): ''' apply the function f to the correlation matrix specified in each row, and return a matrix of the concatenated results :param corrs: a matrix of vectorized correlation matrices (output of mat2vec), or a list of such matrices :param f: a function to apply to each vectorized correlation matrix :return: a matrix of function outputs (for each row of the given matrices), or a list of such matrices ''' if type(corrs) is list: return list(map(lambda x: apply_by_row(x, f), corrs)) corrs = vec2mat(corrs) return np.stack(list(map(lambda x: f(np.squeeze(x)), np.split(corrs, corrs.shape[2], axis=2))), axis=0) def corrmean_combine(corrs): ''' Compute the mean element-wise correlation across each matrix in a list. :param corrs: a matrix of vectorized correlation matrices (output of mat2vec), or a list of such matrices :return: a mean vectorized correlation matrix ''' if not (type(corrs) == list): return corrs elif np.shape(corrs)[0] == 1: return corrs else: return z2r(np.mean(r2z(np.stack(corrs, axis=2)), axis=2)) def mean_combine(vals): ''' Compute the element-wise mean across each matrix in a list. :param vals: a matrix, or a list of matrices :return: a mean matrix ''' if not (type(vals) == list): return vals else: return np.mean(np.stack(vals, axis=2), axis=2) def tstat_combine(corrs, return_pvals=False): ''' Compute element-wise t-tests (comparing distribution means to 0) across each correlation matrix in a list. :param corrs: a matrix of vectorized correlation matrices (output of mat2vec), or a list of such matrices :param return_pvals: Boolean (default: False). If True, return a second matrix (or list) of the corresponding t-tests' p-values :return: a matrix of t-statistics of the same shape as a matrix of vectorized correlation matrices ''' if not (type(corrs) == list): ts = corrs ps = np.nan * np.zeros_like(corrs) else: ts, ps = ttest(r2z(np.stack(corrs, axis=2)), popmean=0, axis=2) if return_pvals: return ts, ps else: return ts def null_combine(corrs): ''' Placeholder function that returns the input :param corrs: a matrix of vectorized correlation matrices (output of mat2vec), or a list of such matrices :return: the input ''' return corrs def reduce(corrs, rfun=None): ''' :param corrs: a matrix of vectorized correlation matrices (output of mat2vec), or a list of such matrices :param rfun: function to use for dimensionality reduction. All hypertools and scikit-learn functions are supported: PCA, IncrementalPCA, SparsePCA, MiniBatchSparsePCA, KernelPCA, FastICA, FactorAnalysis, TruncatedSVD, DictionaryLearning, MiniBatchDictionaryLearning, TSNE, Isomap, SpectralEmbedding, LocallyLinearEmbedding, MDS, and UMAP. Can be passed as a string, but for finer control of the model parameters, pass as a dictionary, e.g. reduction={‘model’ : ‘PCA’, ‘params’ : {‘whiten’ : True}}. See scikit-learn specific model docs for details on parameters supported for each model. Another option is to use graph theoretic measures computed for each node. The following measures are supported (via the brainconn toolbox): eigenvector_centrality, pagerank_centrality, and strength. (Each of these must be specified as a string; dictionaries not supported.) Default: None (no dimensionality reduction) :return: dimensionality-reduced (or original) correlation matrices ''' if rfun is None: return corrs get_V = lambda x: int(np.divide(np.sqrt(8 * x + 1) - 1, 2)) if type(corrs) is list: V = get_V(corrs[0].shape[1]) else: V = get_V(corrs.shape[1]) if rfun in graph_measures.keys(): return apply_by_row(corrs, graph_measures[rfun]) else: red_corrs = hyp.reduce(corrs, reduce=rfun, ndims=V) D = np.shape(red_corrs)[-1] if D < V : red_corrs = np.hstack((red_corrs, np.zeros((D, V - D)))) return red_corrs def smooth(w, windowsize=10, kernel_fun=laplace_weights, kernel_params=laplace_params): if type(w) is list: return list(map(lambda x: smooth(x, windowsize=windowsize, kernel_fun=kernel_fun, kernel_params=kernel_params), w)) assert type(windowsize) == int, 'smoothing kernel must have integer width' k = kernel_fun(windowsize, params=kernel_params) if iseven(windowsize): kernel = np.divide(k[int(np.floor(windowsize/2) - 1), :] + k[int(np.ceil(windowsize/2) - 1), :], 2) else: kernel = k[int(np.floor(windowsize/2)), :] kernel /= kernel.sum() x = np.zeros_like(w) for i in range(0, w.shape[1]): x[:, i] = np.convolve(kernel, w[:, i], mode='same') return x def timepoint_decoder(data, mu=None, nfolds=2, level=0, cfun=isfc, weights_fun=laplace_weights, weights_params=laplace_params, combine=mean_combine, rfun=None): """ :param data: a list of number-of-observations by number-of-features matrices :param mu: list of floats sum to one for mixing proportions vector :param nfolds: number of cross-validation folds (train using out-of-fold data; test using in-fold data) :param level: integer or list of integers for levels to be evaluated (default:0) :param cfun: function for transforming the group data (default: isfc) :param weights_fun: used to compute per-timepoint weights for cfun; default: laplace_weights :param weights_params: parameters passed to weights_fun; default: laplace_params :params combine: function for combining data within each group, or a list of such functions (default: mean_combine) :param rfun: function for reducing output (default: None) :return: results dictionary with the following keys: 'rank': mean percentile rank (across all timepoints and folds) in the decoding distribution of the true timepoint 'accuracy': mean percent accuracy (across all timepoints and folds) 'error': mean estimation error (across all timepoints and folds) between the decoded and actual window numbers, expressed as a percentage of the total number of windows """ assert len(np.unique( list(map(lambda x: x.shape[0], data)))) == 1, 'all data matrices must have the same number of timepoints' assert len(np.unique( list(map(lambda x: x.shape[1], data)))) == 1, 'all data matrices must have the same number of features' group_assignments = get_xval_assignments(len(data), nfolds) orig_level = level orig_level = np.ravel(orig_level) if type(level) is int: level = np.arange(level + 1) level = np.ravel(level) assert type(level) is np.ndarray, 'level needs be an integer, list, or np.ndarray' assert not np.any(level < 0), 'level cannot contain negative numbers' if mu: orig_level = level.max() orig_level = np.ravel(orig_level) assert np.sum(mu)==1, 'weights must sum to one' assert np.shape(mu)[0]== level.max()+1, 'weights lengths need to be the same as number of levels' if not np.all(np.arange(level.max()+1)==level): level = np.arange(level.max()+1) if callable(combine): combine = [combine] * np.shape(level)[0] combine = np.ravel(combine) assert type(combine) is np.ndarray and type(combine[0]) is not np.str_, 'combine needs to be a function, list of functions, or np.ndarray of functions' assert len(level)==len(combine), 'combine length need to be the same as level if input is type np.ndarray or list' if callable(cfun): cfun = [cfun] * np.shape(level)[0] cfun = np.ravel(cfun) assert type(cfun) is np.ndarray and type(cfun[0]) is not np.str_, 'combine needs be a function, list of functions, or np.ndarray of functions' assert len(level)==len(cfun), 'cfun length need to be the same as level if input is type np.ndarray or list' if type(rfun) not in [list, np.ndarray]: rfun = [rfun] * np.shape(level)[0] p_rfun = [None] * np.shape(level)[0] assert len(level)==len(rfun), 'parameter lengths need to be the same as level if input is ' \ 'type np.ndarray or list' results_pd = pd.DataFrame() corrs = 0 for i in range(0, nfolds): in_raw = [] out_raw = [] for v in level: if v==0: in_data = [x for x in data[group_assignments == i]] out_data = [x for x in data[group_assignments != i]] in_smooth, out_smooth, in_raw, out_raw = reduce_wrapper(folding_levels(in_data, out_data, level=v, cfun=None,rfun=p_rfun, combine=combine, weights_fun=weights_fun, weights_params=weights_params), level=v, rfun=rfun) else: in_smooth, out_smooth, in_raw, out_raw = reduce_wrapper(folding_levels(in_raw, out_raw, level=v, cfun=cfun, rfun=p_rfun, combine=combine, weights_fun=weights_fun, weights_params=weights_params), level=v, rfun=rfun) if mu: next_corrs = (1 - sd.cdist(in_smooth, out_smooth, 'correlation')) corrs += mu[v] * z2r(next_corrs) else: corrs = (1 - sd.cdist(in_smooth, out_smooth, 'correlation')) if v in orig_level: if mu: corrs = r2z(corrs) next_results_pd = decoder(corrs) next_results_pd['level'] = v next_results_pd['folds'] = i results_pd = pd.concat([results_pd, next_results_pd]) return results_pd def weighted_timepoint_decoder(data, nfolds=2, level=0, optimize_levels=None, cfun=isfc, weights_fun=laplace_weights, weights_params=laplace_params, combine=mean_combine, rfun=None, opt_init=None): """ :param data: a list of number-of-observations by number-of-features matrices :param nfolds: number of cross-validation folds (train using out-of-fold data; test using in-fold data) :param level: integer or list of integers for levels to be evaluated (default:0) :param cfun: function for transforming the group data (default: isfc) :param weights_fun: used to compute per-timepoint weights for cfun; default: laplace_weights :param weights_params: parameters passed to weights_fun; default: laplace_params :params combine: function for combining data within each group, or a list of such functions (default: mean_combine) :param rfun: function for reducing output (default: None) :return: results dictionary with the following keys: 'rank': mean percentile rank (across all timepoints and folds) in the decoding distribution of the true timepoint 'accuracy': mean percent accuracy (across all timepoints and folds) 'error': mean estimation error (across all timepoints and folds) between the decoded and actual window numbers, expressed as a percentage of the total number of windows """ assert len(np.unique( list(map(lambda x: x.shape[0], data)))) == 1, 'all data matrices must have the same number of timepoints' assert len(np.unique( list(map(lambda x: x.shape[1], data)))) == 1, 'all data matrices must have the same number of features' if nfolds == 1: sub_nfolds = 1 nfolds = 2 warnings.warn('When nfolds is set to one, the analysis will be circular.') else: sub_nfolds = nfolds group_assignments = get_xval_assignments(len(data), nfolds) orig_level = level orig_level = np.ravel(orig_level) if type(level) is int: level = np.arange(level + 1) level = np.ravel(level) assert type(level) is np.ndarray, 'level needs be an integer, list, or np.ndarray' assert not np.any(level < 0), 'level cannot contain negative numbers' if not np.all(np.arange(level.max()+1)==level): level = np.arange(level.max()+1) if callable(combine): combine = [combine] * np.shape(level)[0] combine = np.ravel(combine) assert type(combine) is np.ndarray and type(combine[0]) is not np.str_, 'combine needs to be a function, list of ' \ 'functions, or np.ndarray of functions' assert len(level)==len(combine), 'combine length need to be the same as level if input is type np.ndarray or list' if callable(cfun): cfun = [cfun] * np.shape(level)[0] cfun = np.ravel(cfun) assert type(cfun) is np.ndarray and type(cfun[0]) is not np.str_, 'combine needs be a function, list of functions, ' \ 'or np.ndarray of functions' assert len(level)==len(cfun), 'cfun length need to be the same as level if input is type np.ndarray or list' if type(rfun) not in [list, np.ndarray]: rfun = [rfun] * np.shape(level)[0] p_rfun = [None] * np.shape(level)[0] assert len(level)==len(rfun), 'parameter lengths need to be the same as level if input is ' \ 'type np.ndarray or list' results_pd = pd.DataFrame() for i in range(0, nfolds): in_raw = [] out_raw = [] sub_in_raw = [] sub_out_raw = [] sub_corrs = [] corrs = [] subgroup_assignments = get_xval_assignments(len(data[group_assignments == i]), nfolds) for v in level: if v==0: in_data = [x for x in data[group_assignments == i]] out_data = [x for x in data[group_assignments != i]] in_smooth, out_smooth, in_raw, out_raw = reduce_wrapper(folding_levels(in_data, out_data, level=v, cfun=None, rfun=p_rfun, combine=combine, weights_fun=weights_fun, weights_params=weights_params), level=v, rfun=rfun) for s in range(0, nfolds): sub_in_data = [x for x in data[group_assignments == i][subgroup_assignments==s]] sub_out_data = [x for x in data[group_assignments == i][subgroup_assignments!=s]] sub_in_smooth, sub_out_smooth, sub_in_raw, sub_out_raw = reduce_wrapper(folding_levels(sub_in_data, sub_out_data, level=v, cfun=None, rfun=p_rfun, combine=combine, weights_fun=weights_fun, weights_params=weights_params), level=v, rfun=rfun) next_corrs = (1 - sd.cdist(in_raw, out_raw, 'correlation')) next_subcorrs = (1 - sd.cdist(sub_in_raw, sub_out_raw, 'correlation')) corrs.append(next_corrs) sub_corrs.append(next_subcorrs) else: in_smooth, out_smooth, in_raw, out_raw = reduce_wrapper(folding_levels(in_raw, out_raw, level=v, cfun=cfun, rfun=p_rfun, combine=combine, weights_fun=weights_fun, weights_params=weights_params), level=v, rfun=rfun) for s in range(0, nfolds): sub_in_smooth, sub_out_smooth, sub_in_raw, sub_out_raw = reduce_wrapper(folding_levels(sub_in_raw, sub_out_raw, level=v, cfun=cfun, rfun=p_rfun, combine=combine, weights_fun=weights_fun, weights_params=weights_params), level=v, rfun=rfun) next_corrs = (1 - sd.cdist(in_smooth, out_smooth, 'correlation')) next_subcorrs = (1 - sd.cdist(sub_in_smooth, sub_out_smooth, 'correlation')) corrs.append(next_corrs) sub_corrs.append(next_subcorrs) sub_corrs = np.array(sub_corrs) corrs = np.array(corrs) if sub_nfolds == 1: sub_corrs = corrs if not optimize_levels: optimize_levels = range(v+1) opt_over = [] for lev in optimize_levels: opt_over.append(lev) sub_out_corrs = sub_corrs[opt_over,:,:] out_corrs = corrs[opt_over, :, :] mu = optimize_weights(sub_out_corrs, opt_init) w_corrs = weight_corrs(out_corrs, mu) next_results_pd = decoder(w_corrs) print(next_results_pd) next_results_pd['level'] = lev next_results_pd['folds'] = i mu_pd = pd.DataFrame() for c in opt_over: mu_pd['level_' + str(c)] = [0] mu_pd += mu next_results_pd = pd.concat([next_results_pd, mu_pd], axis=1, join_axes=[next_results_pd.index]) results_pd = pd.concat([results_pd, next_results_pd]) return results_pd def pca_decoder(data, nfolds=2, dims=10, cfun=isfc, weights_fun=laplace_weights, weights_params=laplace_params, combine=mean_combine, rfun=None): """ :param data: a list of number-of-observations by number-of-features matrices :param nfolds: number of cross-validation folds (train using out-of-fold data; test using in-fold data) :param cfun: function for transforming the group data (default: isfc) :param weights_fun: used to compute per-timepoint weights for cfun; default: laplace_weights :param weights_params: parameters passed to weights_fun; default: laplace_params :params combine: function for combining data within each group, or a list of such functions (default: mean_combine) :param rfun: function for reducing output (default: None) :return: results dictionary with the following keys: 'rank': mean percentile rank (across all timepoints and folds) in the decoding distribution of the true timepoint 'accuracy': mean percent accuracy (across all timepoints and folds) 'error': mean estimation error (across all timepoints and folds) between the decoded and actual window numbers, expressed as a percentage of the total number of windows """ assert len(np.unique( list(map(lambda x: x.shape[0], data)))) == 1, 'all data matrices must have the same number of timepoints' assert len(np.unique( list(map(lambda x: x.shape[1], data)))) == 1, 'all data matrices must have the same number of features' pca_data = np.asarray(hyp.reduce(list(data), ndims=dims)) group_assignments = get_xval_assignments(len(pca_data), nfolds) results_pd = pd.DataFrame() for i in range(0, nfolds): for d in range(1, dims + 1): in_data = np.asarray([x for x in pca_data[group_assignments == i]])[:, :, :d] out_data = np.asarray([x for x in pca_data[group_assignments != i]])[:, :, :d] in_smooth, out_smooth, in_raw, out_raw = folding_levels(in_data, out_data, level=0, cfun=isfc, rfun=[None], combine=[mean_combine], weights_fun=weights_fun, weights_params=weights_params) if d < 3: in_smooth = np.hstack((in_smooth, np.zeros((in_smooth.shape[0], 3 - in_smooth.shape[1])))) out_smooth = np.hstack((out_smooth, np.zeros((out_smooth.shape[0], 3 - out_smooth.shape[1])))) corrs = (1 - sd.cdist(in_smooth, out_smooth, 'correlation')) corrs = np.array(corrs) next_results_pd = decoder(corrs) next_results_pd['dims'] = d next_results_pd['folds'] = i results_pd = pd.concat([results_pd, next_results_pd]) return results_pd def folding_levels(infold_data, outfold_data, level=0, cfun=None, weights_fun=None, weights_params=None, combine=None, rfun=None): from .timecorr import timecorr if rfun is None: rfun = [None] * np.shape(level)[0] if level == 0: in_fold_smooth = np.asarray(timecorr([x for x in infold_data], cfun=None, rfun=rfun[level], combine=combine[level], weights_function=weights_fun, weights_params=weights_params)) out_fold_smooth = np.asarray(timecorr([x for x in outfold_data], cfun=None, rfun=rfun[level], combine=combine[level], weights_function=weights_fun, weights_params=weights_params)) in_fold_raw = mean_combine([x for x in infold_data]) out_fold_raw = mean_combine([x for x in outfold_data]) else: in_fold_smooth = np.asarray(timecorr(infold_data, cfun=cfun[level], rfun=rfun[level], combine=combine[level], weights_function=weights_fun, weights_params=weights_params)) out_fold_smooth = np.asarray(timecorr(outfold_data, cfun=cfun[level], rfun=rfun[level], combine=combine[level], weights_function=weights_fun, weights_params=weights_params)) in_fold_raw = np.asarray(timecorr(infold_data, cfun=cfun[level], rfun=rfun[level], combine=null_combine, weights_function=eye_weights, weights_params=eye_params)) out_fold_raw = np.asarray(timecorr(outfold_data, cfun=cfun[level], rfun=rfun[level], combine=null_combine, weights_function=eye_weights, weights_params=eye_params)) return in_fold_smooth, out_fold_smooth, in_fold_raw, out_fold_raw def reduce_wrapper(data, dims=10, level=0, rfun=None): if not level == 0: all_smooth = list(data[0][np.newaxis, :, :]) + list(data[1][np.newaxis, :, :]) all_raw = list(data[2][np.newaxis, :, :]) + list(data[3][np.newaxis, :, :]) all_smooth_reduced = reduce(all_smooth, rfun=rfun[level]) all_raw_reduced = reduce(all_raw, rfun=rfun[level]) return all_smooth_reduced[0], all_smooth_reduced[1], all_raw_reduced[0], all_raw_reduced[1] else: return data[0], data[1], data[2], data[3] def optimize_weights(corrs, opt_init=None): b = (0, 1) bns = (b,) * np.shape(corrs)[0] con1 = {'type': 'eq', 'fun': lambda x: 1 - np.sum(x)} if opt_init=='random': x0 = sum_to_x(np.shape(corrs)[0], 1) elif opt_init=='last': x0 = np.repeat(1 / np.shape(corrs)[0], np.shape(corrs)[0]) x0[-1] = 1 else: x0 = np.repeat(1/np.shape(corrs)[0], np.shape(corrs)[0]) min_mu = minimize(calculate_error, x0, args=corrs, bounds=bns, constraints=con1, options={'disp': True, 'eps': 1e-1}) return min_mu.x def sum_to_x(n, x): values = [0.0, x] + list(np.random.uniform(low=0.0,high=x,size=n-1)) values.sort() return np.asarray([values[i+1] - values[i] for i in range(n)]) def calculate_error(mu, corrs, metric='error', sign=1): results = decoder(weight_corrs(corrs, mu)) return sign * results[metric].values def weight_corrs(corrs, mu): assert np.shape(mu)[0] == len(corrs) weighted_corrs = 0 for i in np.arange(np.shape(corrs)[0]): weighted_corrs += mu[i] * r2z(corrs[i]) return z2r(weighted_corrs) def decoder(corrs): next_results_pd = pd.DataFrame({'rank': [0], 'accuracy': [0], 'error': [0]}) for t in np.arange(corrs.shape[0]): decoded_inds = np.argmax(corrs[t, :]) next_results_pd['error'] += np.mean(np.abs(decoded_inds - np.array(t))) / corrs.shape[0] next_results_pd['accuracy'] += np.mean(decoded_inds == np.array(t)) next_results_pd['rank'] += np.mean(list(map((lambda x: int(x)), (corrs[t, :] <= corrs[t, t])))) next_results_pd['error'] = next_results_pd['error'].values / corrs.shape[0] next_results_pd['accuracy'] = next_results_pd['accuracy'].values / corrs.shape[0] next_results_pd['rank']= next_results_pd['rank'].values / corrs.shape[0] return next_results_pd def weighted_mean(x, axis=None, weights=None, tol=1e-5): if axis is None: axis = len(x.shape) - 1 if weights is None: weights = np.ones([1, x.shape[axis]]) # remove nans and force weights to sum to 1 weights[np.isnan(weights)] = 0 if np.sum(weights) == 0: return np.mean(x, axis=axis) # get rid of 0 weights to avoid unnecessary computations good_inds = np.abs(weights) > tol weights[good_inds] /= np.sum(weights[good_inds]) weighted_sum = np.zeros(np.take(x, 0, axis=axis).shape) for i in np.where(good_inds)[0]: weighted_sum += weights[i] * np.take(x, i, axis=axis) return weighted_sum def rmdiag(m): return m - np.diag(np.diag(m)) def r2z(r): """ Function that calculates the Fisher z-transformation Parameters ---------- r : int or ndarray Correlation value Returns ---------- result : int or ndarray Fishers z transformed correlation value """ return 0.5*(np.log(1+r) - np.log(1-r)) def z2r(z): """ Function that calculates the inverse Fisher z-transformation Parameters ---------- z : int or ndarray Fishers z transformed correlation value Returns ---------- result : int or ndarray Correlation value """ r = np.divide((np.exp(2*z) - 1), (np.exp(2*z) + 1)) r[np.isnan(r)] = 0 r[np.isinf(r)] = np.sign(r)[np.isinf(r)] return r def isodd(x): return np.remainder(x, 2) == 1 def iseven(x): return np.remainder(x, 2) == 0
[docs]def mat2vec(m): """ Function that converts correlation matrix to a vector Parameters ---------- m : ndarray Correlation matix Returns ---------- result : ndarray Vector """ K = m.shape[0] V = int((((K ** 2) - K) / 2) + K) if m.ndim == 2: y = np.zeros(V) y[0:K] = np.diag(m) #force m to by symmetric m = np.triu(rmdiag(m)) m[np.isnan(m)] = 0 m += m.T y[K:] = sd.squareform(rmdiag(m)) elif m.ndim == 3: T = m.shape[2] y = np.zeros([T, V]) for t in np.arange(T): y[t, :] = mat2vec(np.squeeze(m[:, :, t])) else: raise ValueError('Input must be a 2 or 3 dimensional Numpy array') return y
[docs]def vec2mat(v): """ Function that converts vector back to correlation matrix Parameters ---------- result : ndarray Vector Returns ---------- m : ndarray Correlation matix """ if (v.ndim == 1) or (v.shape[0] == 1): x = int(0.5*(np.sqrt(8*len(v) + 1) - 1)) return sd.squareform(v[x:]) + np.diag(v[0:x]) elif v.ndim == 2: a = vec2mat(v[0, :]) y = np.zeros([a.shape[0], a.shape[1], v.shape[0]]) y[:, :, 0] = a for t in np.arange(1, v.shape[0]): y[:, :, t] = vec2mat(v[t, :]) else: raise ValueError('Input must be a 1 or 2 dimensional Numpy array') return y
def symmetric(m): return np.isclose(m, m.T).all() def get_xval_assignments(ndata, nfolds): group_assignments = np.zeros(ndata) groupsize = int(np.ceil(ndata / nfolds)) for i in range(1, nfolds): inds = np.arange(i * groupsize, np.min([(i + 1) * groupsize, ndata])) group_assignments[inds] = i np.random.shuffle(group_assignments) return group_assignments SMALL_SIZE = 18 MEDIUM_SIZE = 21 BIGGER_SIZE = 24 plt.rc('font', size=SMALL_SIZE) # controls default text sizes plt.rc('axes', titlesize=SMALL_SIZE) # fontsize of the axes title plt.rc('axes', labelsize=MEDIUM_SIZE) # fontsize of the x and y labels plt.rc('xtick', labelsize=SMALL_SIZE) # fontsize of the tick labels plt.rc('ytick', labelsize=SMALL_SIZE) # fontsize of the tick labels plt.rc('legend', fontsize=SMALL_SIZE) # legend fontsize plt.rc('figure', titlesize=BIGGER_SIZE) # fontsize of the figure title def plot_weights(weights, t=None, color='k', ax=None, xlab='Time (samples)', ylab='Weights', title=None, outfile=None): """ Plot weights Parameters ---------- weights : int or float or None Weights to plot t : int or float Time Returns ---------- results : png Plot of weights """ T = weights.shape[0] if t is None: t = np.round(T / 2) ts = np.arange(1, T + 1) if ax is None: ax = plt.gca() ax.plot(ts, weights[int(t), :], color=color) plt.xlim([1, T]) if not (xlab is None): plt.xlabel(xlab) if not (ylab is None): plt.ylabel(ylab) if not (title is None): plt.title(title) plt.tight_layout() if outfile: plt.savefig(outfile) else: plt.show()