Source code for stereo.algorithm.time_series_analysis

import numpy as np
import pandas as pd
from sklearn.mixture import GaussianMixture

from stereo.algorithm.algorithm_base import AlgorithmBase
from stereo.constant import AlternativeType
from stereo.constant import FEATURE_P
from stereo.constant import FUZZY_C_RESULT
from stereo.constant import FUZZY_C_WEIGHT
from stereo.constant import GREATER_P
from stereo.constant import GREATER_PVALUE
from stereo.constant import LESS_P
from stereo.constant import LESS_PVALUE
from stereo.constant import LOG_FC
from stereo.constant import PValCombinationType
from stereo.constant import RunMethodType
from stereo.constant import SCORES
from stereo.constant import UseColType


class TimeSeriesAnalysis(AlgorithmBase):

[docs] def main( self, run_method=RunMethodType.tvg_marker.value, use_col=UseColType.timepoint.value, branch=None, p_val_combination=PValCombinationType.fdr.value, cluster_number=6, **kwargs ): """ :param run_method: the model type when the algorithm is run, default = `tvg_marker`. `tvg_marker`: Calculate time variable gene based on expression of celltypes in branch `other`: Use fuzzy C means cluster method to cluster genes based on 1-p_value of celltypes in branch :param use_col: the col in obs representing celltype or clustering :param branch: celltypes order in use_col :param p_val_combination: p_value combination method to use, choosing from ['fisher', 'mean', 'FDR'], default to 'FDR', only for `tvg_marker` run_method :param cluster_number: number of cluster, defaults to 6, only for `other` run_method other parameters: .. note:: All the parameters below are key word arguments and only for `tvg_marker` run_method. :param statistical_test_method: one tail statistical test method to be used for up or down regulated examination between time point, choosing from ['t-test', 'permutation', 'wilcoxon'], default to 't-test'] :param permutation_batch: permutation_batch to control memory consumption if statistical_test_method == 'permutation', the higher permutation_batch require higher memory consumption. default to 100. :param permutation_n_resamples: the times of resamples if statistical_test_method == 'permutation', default to 999. .. note:: All the parameters below are key word arguments and only for `other` run_method. :param spatial_weight: the weight to combine spatial feature, defaults to 1. :param n_spatial_feature: n top features to combine of spatial feature, defaults to 2. :param temporal_mean_threshold: filter out genes of which mean absolute temporal feature <= temporal_mean_threshold, defaults to 0.85. :param temporal_top_threshold: filter out genes of which top absolute temporal feature < temporal_top_threshold, defaults to 1. :param cluster_method: method to cluster gene based on spatial and temporal feature, choose from ['fuzzy_C_means', 'gaussian_mixture'], default to fuzzy_C_means. :param Epsilon: max value to finish iteration if cluster_method=='fuzzy_C_means', defaults to 1e-7. :param w_size: window size to rasterizing spatial expression, default to 20. :param use_col: the col in obs representing celltype or clustering, default to None. :param branch: celltypes order in use_col, default to None. :param seed: fix seed in numpy to keep output constant in every run. """ # noqa if run_method == RunMethodType.tvg_marker.value: self.TVG_marker( use_col=use_col, branch=branch, p_val_combination=p_val_combination, **kwargs ) else: self.fuzzy_C_gene_pattern_cluster( cluster_number=cluster_number, branch=branch, use_col=use_col, **kwargs )
def _statistic_mean_diff(self, y, x, axis=0): return np.mean(y, axis=axis) - np.mean(x, axis=axis) def TVG_marker(self, use_col, branch, p_val_combination=PValCombinationType.fisher.value, statistical_test_method = 't-test', permutation_batch=100, permutation_n_resamples=999): """ Calculate time variable gene based on expression of celltypes in branch :param use_col: the col in obs representing celltype or clustering :param branch: celltypes order in use_col :param p_val_combination: p_value combination method to use, choosing from ['fisher', 'mean', 'FDR'] :param statistical_test_method: one tail statistical test method to be used for up or down regulated examination between time point. choosing from ['t-test', 'permutation', 'wilcoxon'], default as 't-test' :param permutation_batch: permutation_batch to control memory consumption if statistical_test_method = 'permutation', the higher permutation_batch require higher memory consumption. default as 100. :param permutation_n_resamples: the times of resamples if statistical_test_method = 'permutation', default as 999. :return: stereo_exp_data contains Time Variabel Gene marker result """ from scipy.stats import ttest_ind from scipy.stats import permutation_test from scipy.stats import ranksums from scipy import sparse from scipy import stats label2exp = {} for x in branch: cell_list = self.stereo_exp_data.cells.to_df().loc[self.stereo_exp_data.cells[use_col] == x,].index test_exp_data = self.stereo_exp_data.sub_by_name(cell_name=cell_list.to_list()) if sparse.issparse(test_exp_data.exp_matrix): label2exp[x] = test_exp_data.exp_matrix.todense() else: label2exp[x] = np.mat(test_exp_data.exp_matrix) logFC = [] less_pvalue = [] greater_pvalue = [] scores = [] if statistical_test_method == 't-test': for i in range(len(branch)-1): score, pvalue = ttest_ind(label2exp[branch[i+1]], label2exp[branch[i]], axis=0, alternative='less') #np.nan_to_num(score, nan=0, copy = False) less_pvalue.append(np.nan_to_num(pvalue, nan=1, copy = False)) score, pvalue = ttest_ind(label2exp[branch[i+1]], label2exp[branch[i]], axis=0, alternative='greater') greater_pvalue.append(np.nan_to_num(pvalue, nan=1, copy = False)) logFC.append(np.array(np.log2((np.mean(label2exp[branch[i+1]], axis=0)+1e-9)/(np.mean(label2exp[branch[i]], axis=0)+1e-9)))[0]) scores.append(score) elif statistical_test_method == 'permutation': #print(statistical_test_method) for i in range(len(branch)-1): tmp_permutation = permutation_test((label2exp[branch[i+1]], label2exp[branch[i]]), self._statistic_mean_diff, n_resamples=permutation_n_resamples, batch=permutation_batch, vectorized=True, alternative='less',random_state=42, axis=0) score = tmp_permutation.statistic #pvalue = tmp_permutation.pvalue null_distribution = tmp_permutation.null_distribution pvalue = np.sum(score >= null_distribution, axis=0)/permutation_n_resamples #np.nan_to_num(score, nan=0, copy = False) less_pvalue.append(np.nan_to_num(pvalue, nan=1, copy = False)) pvalue = np.sum(score <= null_distribution, axis=0)/permutation_n_resamples greater_pvalue.append(np.nan_to_num(pvalue, nan=1, copy = False)) logFC.append(np.array(np.log2((np.mean(label2exp[branch[i+1]], axis=0)+1e-9)/(np.mean(label2exp[branch[i]], axis=0)+1e-9)))[0]) scores.append(score) elif statistical_test_method == 'wilcoxon': for i in range(len(branch)-1): score, pvalue = ranksums(label2exp[branch[i+1]], label2exp[branch[i]], axis=0, alternative='less') #np.nan_to_num(score, nan=0, copy = False) less_pvalue.append(np.nan_to_num(pvalue, nan=1, copy = False)) score, pvalue = ranksums(label2exp[branch[i+1]], label2exp[branch[i]], axis=0, alternative='greater') greater_pvalue.append(np.nan_to_num(pvalue, nan=1, copy = False)) logFC.append(np.array(np.log2((np.mean(label2exp[branch[i+1]], axis=0)+1e-9)/(np.mean(label2exp[branch[i]], axis=0)+1e-9)))[0]) scores.append(score) else: raise ValueError("statistical_test_method should be selected from ['t-test', 'permutation', 'wilcoxon']") self.stereo_exp_data.genes_matrix[SCORES] = np.array(scores).T self.stereo_exp_data.genes_matrix[SCORES] = np.nan_to_num(self.stereo_exp_data.genes_matrix[SCORES]) self.stereo_exp_data.genes_matrix[GREATER_P] = np.array(greater_pvalue).T self.stereo_exp_data.genes_matrix[LESS_P] = np.array(less_pvalue).T logFC = np.array(logFC).T self.stereo_exp_data.genes_matrix[LOG_FC] = logFC logFC = np.mean(logFC, axis=1) if p_val_combination == PValCombinationType.mean.value: less_pvalue = np.mean(np.array(less_pvalue), axis=0) greater_pvalue = np.mean(np.array(greater_pvalue), axis=0) elif p_val_combination == PValCombinationType.fisher.value: tmp = self.stereo_exp_data.genes_matrix[LESS_P].copy() tmp[tmp == 0] = np.min(tmp[tmp != 0]) tmp = np.sum(-2 * np.log(tmp), axis=1) less_pvalue = 1 - stats.chi2.cdf(tmp, self.stereo_exp_data.genes_matrix[LESS_P].shape[1]) tmp = self.stereo_exp_data.genes_matrix[GREATER_P].copy() tmp[tmp == 0] = np.min(tmp[tmp != 0]) tmp = np.sum(-2 * np.log(tmp), axis=1) greater_pvalue = 1 - stats.chi2.cdf(tmp, self.stereo_exp_data.genes_matrix[GREATER_P].shape[1]) elif p_val_combination == PValCombinationType.fdr.value: less_pvalue = 1 - np.prod(1 - self.stereo_exp_data.genes_matrix[LESS_P], axis=1) greater_pvalue = 1 - np.prod(1 - self.stereo_exp_data.genes_matrix[GREATER_P], axis=1) self.stereo_exp_data.genes[LESS_PVALUE] = less_pvalue self.stereo_exp_data.genes[GREATER_PVALUE] = greater_pvalue self.stereo_exp_data.genes[LOG_FC] = logFC def fuzzy_C(self, data, cluster_number, MAX=10000, m=2, Epsilon=1e-7, seed=20240523): """ fuzzy C means algorithm to cluster, helper function used in fuzzy_C_gene_pattern_cluster :param data: pd.DataFrame object for fuzzy C means cluster, each col represent a feature, each row represent a obsversion # noqa :param seed: fix seed in numpy to keep output constant in every run. :param cluster_number: number of cluster :param MAX: max value to random initialize :param m: degree of membership, default = 2 :param Epsilon: max value to finish iteration :return: fuzzy C means cluster result """ np.random.seed(seed) assert m > 1 import time import copy from scipy import spatial U = np.random.randint(1, int(MAX), (len(data), cluster_number)) U = U / np.sum(U, axis=1, keepdims=True) epoch = 0 tik = time.time() while True: epoch += 1 U_old = copy.deepcopy(U) U1 = U ** m U2 = np.expand_dims(U1, axis=2) U2 = np.repeat(U2, data.shape[1], axis=2) data1 = np.expand_dims(data, axis=1) data1 = np.repeat(data1, cluster_number, axis=1) dummy_sum_num = np.sum(U2 * data1, axis=0) dummy_sum_dum = np.sum(U1, axis=0) C = (dummy_sum_num.T / dummy_sum_dum).T # initializing distance matrix distance_matrix = spatial.distance_matrix(data, C) # update U distance_matrix_1 = np.expand_dims(distance_matrix, axis=1) distance_matrix_1 = np.repeat(distance_matrix_1, cluster_number, axis=1) distance_matrix_2 = np.expand_dims(distance_matrix, axis=2) distance_matrix_2 = np.repeat(distance_matrix_2, cluster_number, axis=2) U = np.sum((distance_matrix_1 / distance_matrix_2) ** (2 / (m - 1)), axis=1) U = 1 / U if epoch % 100 == 0: print('epoch {} : time cosumed{:.4f}s, loss:{}'.format(epoch, time.time() - tik, np.max(np.abs(U - U_old)))) tik = time.time() if np.max(np.abs(U - U_old)) < Epsilon: break return U def gene_spatial_feature(self, w_size=20): """ Use pca on rasterizing spatial expression to represent gene spatial feature :param w_size: window size to rasterizing spatial expression, :return: stereoexpdata.genes_matrix['spatial_info'] # a gene matrix to represent spatial feature """ from scipy import sparse from stereo.algorithm import dim_reduce from stereo.algorithm import normalization from stereo.algorithm import scale loc = self.stereo_exp_data.position.copy() loc = (loc / w_size).astype('int').astype('str') loc = np.array(['_'.join(x) for x in loc]) Exp_matrix = self.stereo_exp_data.exp_matrix if not sparse.issparse(Exp_matrix): Exp_matrix = sparse.csr_array(Exp_matrix) ci, gi = Exp_matrix.nonzero() values = Exp_matrix.data cl = loc[ci] cl = pd.Series(cl).astype('category') X = sparse.csr_matrix((values, (gi, cl.cat.codes.to_numpy())), shape=(np.max(gi) + 1, len(cl.cat.categories))) X = normalization.normalize_total(X, target_sum=1e4) X = np.log1p(X) X = scale.scale(X, zero_center=True, max_value=None) X_pca = dim_reduce.pca(X, min(50 , X.shape[1] -1))['x_pca'] self.stereo_exp_data.tl.result['spatial_feature'] = X_pca self.stereo_exp_data.genes_matrix['spatial_feature'] = X_pca def fuzzy_C_gene_pattern_cluster(self, cluster_number, spatial_weight=1, n_spatial_feature=2, cluster_method = 'fuzzy_C_means', temporal_mean_threshold=0.85, temporal_top_threshold=1, Epsilon=1e-7, w_size=None, use_col=None, branch=None, seed=20240523): """ Use fuzzy C means cluster method to cluster genes based on 1-p_value of celltypes in branch :param cluster_number: number of cluster :param spatial_weight: the weight to combine spatial feature :param n_spatial_feature: n top features to combine of spatial feature :param cluster_method: method to cluster gene based on spatial and temporal feature. choose from ['fuzzy_C_means', 'gaussian_mixture'], default as fuzzy_C_means. :param temporal_mean_threshold: filter out genes of which mean absolute temporal feature <= temporal_mean_threshold # noqa :param temporal_top_threshold: filter out genes of which top absolute temporal feature < temporal_top_threshold :param Epsilon: max value to finish iteration :param w_size: window size to rasterizing spatial expression, see also data.tl.gene_spatial_feature :param use_col: the col in obs representing celltype or clustering :param branch: celltypes order in use_col :param seed: fix seed in numpy to keep output constant in every run. :return: stereo_exp_data contains fuzzy_C_result """ if (GREATER_P not in self.stereo_exp_data.genes_matrix) or (LESS_P not in self.stereo_exp_data.genes_matrix): if use_col == None: # noqa print("greater_p and less_p not in stereo_exp_data.genes_matrix, you should run with run_method='tvg_marker' first") else: self.TVG_marker(use_col=use_col, branch=branch) sig = ((1 - self.stereo_exp_data.genes_matrix[GREATER_P]) >= ( 1 - self.stereo_exp_data.genes_matrix[LESS_P])).astype( int) sig[sig == 0] = -1 self.stereo_exp_data.genes_matrix[FEATURE_P] = np.max( [(1 - self.stereo_exp_data.genes_matrix[GREATER_P]), (1 - self.stereo_exp_data.genes_matrix[LESS_P])], axis=0) * sig # filtter useless gene useful_index_1 = ( np.mean(np.abs(self.stereo_exp_data.genes_matrix[FEATURE_P]), axis=1) > temporal_mean_threshold) useful_index_2 = ( np.max(np.abs(self.stereo_exp_data.genes_matrix[FEATURE_P]), axis=1) >= temporal_top_threshold) useful_index = useful_index_1 & useful_index_2 useless_index = ~useful_index if 'spatial_feature' not in self.stereo_exp_data.genes_matrix: if w_size is None: self.gene_spatial_feature() else: self.gene_spatial_feature(w_size) temporal_feature = self.stereo_exp_data.genes_matrix[FEATURE_P] spatial_feature = spatial_weight * self.stereo_exp_data.genes_matrix['spatial_feature'][:, :n_spatial_feature] merge_feature = np.concatenate([temporal_feature, spatial_feature], axis=1) if cluster_method == 'fuzzy_C_means': fuzzy_C_weight = self.fuzzy_C(merge_feature[useful_index], cluster_number, Epsilon=Epsilon, seed=seed) elif cluster_method == 'gaussian_mixture': gm = GaussianMixture(n_components=cluster_number, random_state=seed).fit(merge_feature[useful_index]) gm_weight = gm.predict_proba(merge_feature[useful_index]) fuzzy_C_weight = gm_weight else: raise ValueError("cluster_method should be selected from ['fuzzy_C_means', 'gaussian_mixture']") df = {x: fuzzy_C_weight[i] for i, x in enumerate(self.stereo_exp_data.genes.to_df().index[useful_index])} for x in self.stereo_exp_data.genes.to_df().index[useless_index]: df[x] = np.zeros(cluster_number) df = pd.DataFrame(df).T df = df.loc[self.stereo_exp_data.genes.to_df().index] self.stereo_exp_data.genes_matrix[FUZZY_C_WEIGHT] = df.to_numpy() fuzzy_c_result = np.argmax(self.stereo_exp_data.genes_matrix[FUZZY_C_WEIGHT], axis=1) + 1 fuzzy_c_result[useless_index] = 0 self.stereo_exp_data.genes[FUZZY_C_RESULT] = fuzzy_c_result