import os
import time
import pandas as pd
import panel as pn
from stereo.algorithm.algorithm_base import AlgorithmBase
from .cell_cell_communication.exceptions import InvalidMicroEnvInput
from .cell_cell_communication.exceptions import PipelineResultInexistent
from .cell_cell_communication.spatial_scoloc import GetMicroEnvs
class GenCccMicroEnvs(AlgorithmBase):
[docs] def main(
self,
cluster_res_key: str = 'cluster',
n_boot: int = 20,
boot_prop: float = 0.8,
dimension: int = 3,
fill_rare: bool = True,
min_num: int = 30,
binsize: float = 2,
eps: float = 1e-20,
show_dividing_by_thresholds: bool = True,
method: str = 'split',
threshold: float = None,
output_path: str = None,
res_key: str = 'ccc_micro_envs'
):
"""
Generate the micro-environment used for the CCC analysis.
This function should be ran twice because it includes two parts:
1) Calculating how the diffrent clusters are divided into diffrent micro environments under diffrent thresholds.
You can choose a appropriate threshold based on the divided result.
In order to run this part, you need to set the parameter `threshold` to None.
The output is a dataframe which format like below:
threshold subgroup_result
0.44298617727504136 [{'1'}, {'2'}, {'3'}]
0.625776310617184 [{'1', '2'}, {'3'}]
The column `subgroup_result` is a list of set in which each set which contains some clusters represents a micro-environment.
2) Generating the micro environments by setting a appropriate `method` and `threshold` based on the result of first part.
On this part, the parameters befor `method` are all ignored.
The output is a dataframe which format like below:
cell_type microenviroment
NKcells_1 microenv_0
NKcells_0 microenv_0
Tcells microenv_1
Myeloid microenv_2
:param cluster_res_key: the key which specifies the clustering result in data.tl.result.
:param n_boot: number of bootstrap samples, default = 100.
:param boot_prop: proportion of each bootstrap sample, default = 0.8.
:param dimension: 2 or 3.
:param fill_rare: bool, whether simulate cells for rare cell type when calculating kde.
:param min_num: if a cell type has cells < min_num, it is considered rare.
:param binsize: grid size used for kde.
:param eps: fill eps to zero kde to avoid inf KL divergence.
:param show_dividing_by_thresholds: whether to display the result while running the first part of this function.
:param method: define micro environments using two methods:
1) minimum spanning tree, or
2) pruning the fully connected tree based on a given threshold of KL, then split the graph into multiple strongly connected component.
:param threshold: the threshold to divide micro environment.
1) set it to None to run the first part of this function.
1) set it to a appropriate value to run the second part.
:param output_path: the directory to save the result, if set it to None, the result is only stored in memory.
:param res_key: set a key to store the result to data.tl.result, in second part, it must be set the same as first part.
""" # noqa
if threshold is None:
if cluster_res_key not in self.pipeline_res:
raise PipelineResultInexistent(cluster_res_key)
if dimension not in [2, 3]:
raise InvalidMicroEnvInput('Dimension number can only be 2 or 3.')
if output_path is not None:
assert os.path.exists(output_path), f"{output_path} is not exists."
output_path = os.path.join(output_path,
f'micro_envs_{time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())}')
if not os.path.exists(output_path):
os.makedirs(output_path)
cell_type = self.pipeline_res[cluster_res_key]['group']
meta = pd.DataFrame({'cell': self.stereo_exp_data.cells.cell_name, 'cell_type': cell_type}).reset_index(
drop=True)
coord = pd.DataFrame({
'cell': self.stereo_exp_data.cells.cell_name,
'coord_x': self.stereo_exp_data.position[:, 0],
'coord_y': self.stereo_exp_data.position[:, 1],
})
if dimension == 3:
if self.stereo_exp_data.position_z is None:
raise InvalidMicroEnvInput(
"The position of cells must has the third dimension while setting `dimension` to 3.")
coord['coord_z'] = self.stereo_exp_data.position_z
gme = GetMicroEnvs()
mst_in_boot, mst_final, pairwise_kl_divergence, split_by_different_threshold = gme.main(
meta=meta,
coord=coord,
n_boot=n_boot,
boot_prop=boot_prop,
dimension=dimension,
fill_rare=fill_rare,
min_num=min_num,
binsize=binsize,
eps=eps,
output_path=output_path
)
self.pipeline_res[res_key] = {
'output_path': output_path,
'mst_in_boot': mst_in_boot,
'mst_final': mst_final,
'pairwise_kl_divergence': pairwise_kl_divergence,
'split_by_different_threshold': split_by_different_threshold
}
print("Now, you can choose a appropriate threshold based on this function's result.")
if show_dividing_by_thresholds:
pn.extension()
# split_by_different_threshold_copy = split_by_different_threshold.copy()
# split_by_different_threshold_copy['subgroup_result'] = split_by_different_threshold_copy[
# 'subgroup_result'].astype('U')
# return pn.widgets.DataFrame(split_by_different_threshold_copy, disabled=True, show_index=False,
# autosize_mode="fit_viewport", frozen_columns=1)
return pn.widgets.DataFrame(split_by_different_threshold, disabled=True, show_index=False,
autosize_mode="fit_viewport", frozen_columns=1)
else:
if res_key not in self.pipeline_res:
raise PipelineResultInexistent(res_key)
if method not in ['mst', 'split']:
raise ValueError(f"Invalid method({method}), choose it from 'mst' and 'split'")
if method == 'mst':
result_df = self.pipeline_res[res_key]['mst_final']
else:
result_df = self.pipeline_res[res_key]['pairwise_kl_divergence']
output_path = self.pipeline_res[res_key]['output_path']
gme = GetMicroEnvs()
micro_envs = gme.generate_micro_envs(method, threshold=threshold, result_df=result_df,
output_path=output_path)
self.pipeline_res[res_key]['micro_envs'] = micro_envs