Source code for stereo.io.reader

"""
@file: reader.py
@description:
@author: Ping Qiu
@email: qiuping1@genomics.cn
@last modified by: Nils Mechtel

change log:
    2021/03/05  add read_stereo_data function , by Ping Qiu.
    2021/08/12  move read_txt functions from StereoExpData here. Add read_ann_h5ad,
    andata_to_stereo function by Yiran Wu.
    2021/08/20
    2022/02/09  read raw data and result
"""
from copy import deepcopy
from typing import Optional, Union, List
import re

import h5py
import numpy as np
import pandas as pd
from anndata import AnnData
from scipy.sparse import csr_matrix
from shapely.geometry import MultiPoint
from shapely.geometry import Point
from typing_extensions import Literal

from stereo.core.cell import Cell
from stereo.core.constants import CHIP_RESOLUTION
from stereo.core.gene import Gene
from stereo.core.stereo_exp_data import AnnBasedStereoExpData
from stereo.core.stereo_exp_data import StereoExpData
from stereo.core.result import _BaseResult
from stereo.io import h5ad
from stereo.io.utils import(
    integrate_matrix_by_genes,
    transform_marker_genes_to_anndata,
    get_gem_comments
)
from stereo.log_manager import logger
from stereo.utils.read_write_utils import ReadWriteUtils


[docs]@ReadWriteUtils.check_file_exists def read_gem( file_path: str, sep: str = '\t', bin_type: str = "bins", bin_size: int = 100, is_sparse: bool = True, center_coordinates: bool = False, gene_name_index: bool = False ): """ Read the Stereo-seq GEM file, and generate the StereoExpData object. Parameters ------------- file_path the path to input file. sep the separator string. bin_type the bin type includes `'bins'` or `'cell_bins'`, default to `'bins'`. bin_size the size of bin to merge, when `bin_type` is set to `'bins'`. is_sparse the expression matrix is sparse matrix, if `True`, otherwise `np.ndarray`. center_coordinates if set it to True, the coordinate of each bin will be the center of the bin, otherwise, the coordinate of each bin will be the left-top corner of the bin. gene_name_index In a **v0.1** gem file, the column **geneID** actually is the **gene name**, but in **v0.2**, **geneID** is just the **ID** for genes and there is an additional column called **geneName** which is the **gene name**, When being **v0.2**, setting `gene_name_index` to True means setting column **geneName** as index, otherwise, setting column **geneID** as index and the column **geneName** is stored in `data.real_gene_names`, if **v0.1**, `gene_name_index` will be ignored and the column **geneID** will be set as index, regardless of **v0.1** or **v0.2**, the column set as index is stored in `data.gene_names`, the index mentioned here is the index of `data.genes`. Returns ------------- An object of StereoExpData. """ data = StereoExpData(file_path=file_path, file_format='gem', bin_type=bin_type, bin_size=bin_size) comments_lines, _ = get_gem_comments(str(data.file)) # df = pd.read_csv(str(data.file), sep=sep, comment='#', header=0) df = pd.read_csv(str(data.file), sep=sep, header=comments_lines, engine='pyarrow') if 'MIDCounts' in df.columns: df.rename(columns={'MIDCounts': 'UMICount'}, inplace=True) elif 'MIDCount' in df.columns: df.rename(columns={'MIDCount': 'UMICount'}, inplace=True) if 'CellID' in df.columns: df.rename(columns={'CellID': 'cell_id'}, inplace=True) if 'label' in df.columns: df.rename(columns={'label': 'cell_id'}, inplace=True) dropna_subset = ['geneID', 'x', 'y', 'UMICount'] if 'cell_id' in df.columns: dropna_subset.append('cell_id') if 'geneName' in df.columns: dropna_subset.append('geneName') df.dropna( subset=dropna_subset, axis=0, inplace=True ) gdf = None if data.bin_type == 'cell_bins': gdf = parse_cell_bin_coor(df) else: if center_coordinates: df = parse_bin_coor(df, bin_size) else: df = parse_bin_coor_no_offset(df, bin_size) if gene_name_index and 'geneName' in df.columns: df['geneID'] = df['geneName'] cells = df['cell_id'].unique() genes = df['geneID'].unique() cells_dict = dict(zip(cells, range(0, len(cells)))) genes_dict = dict(zip(genes, range(0, len(genes)))) rows = df['cell_id'].map(cells_dict) cols = df['geneID'].map(genes_dict) # logger.info(f'the martrix has {len(cells)} cells, and {len(genes)} genes.') exp_matrix = csr_matrix((df['UMICount'], (rows, cols)), shape=(cells.shape[0], genes.shape[0]), dtype=np.int32) data.exp_matrix = exp_matrix if is_sparse else exp_matrix.toarray() data.cells = Cell(cell_name=cells) data.genes = Gene(gene_name=genes) if not gene_name_index and 'geneName' in df.columns: gene_names = df.groupby(by='geneID').aggregate({'geneName': lambda n: np.unique(n)[0]})['geneName'] data.genes['real_gene_name'] = gene_names if data.bin_type == 'bins': # data.position = df.loc[:, ['x_center', 'y_center']].drop_duplicates().values data.position = df.loc[:, ['bin_x', 'bin_y']].drop_duplicates().values else: data.position = gdf.loc[cells][['x_center', 'y_center']].values data.cells.cell_point = gdf.loc[cells]['cell_point'].values data.position = data.position.astype(np.uint32) data.offset_x = df['x'].min() data.offset_y = df['y'].min() resolution = 500 for chip_name in CHIP_RESOLUTION.keys(): if data.sn[0:len(chip_name)] == chip_name: resolution = CHIP_RESOLUTION[chip_name] break data.attr = { 'minX': df['x'].min(), 'minY': df['y'].min(), 'maxX': df['x'].max(), 'maxY': df['y'].max(), 'minExp': data.exp_matrix.min(), # noqa 'maxExp': data.exp_matrix.max(), # noqa 'resolution': resolution, } data.center_coordinates = center_coordinates logger.info(f'the martrix has {data.cell_names.size} cells, and {data.gene_names.size} genes.') return data
def parse_bin_coor(df, bin_size): """ merge bins to a bin unit according to the bin size, also calculate the center coordinate of bin unit, and generate cell id of bin unit using the coordinate after merged. :param df: a dataframe of the bin file. :param bin_size: the size of bin to merge. :return: """ x_min = df['x'].min() y_min = df['y'].min() df['bin_x'] = merge_bin_coor(df['x'].values, x_min, bin_size) df['bin_y'] = merge_bin_coor(df['y'].values, y_min, bin_size) df['cell_id'] = df['bin_x'].astype(str) + '_' + df['bin_y'].astype(str) df['bin_x'] = get_bin_center(df['bin_x'], x_min, bin_size) df['bin_y'] = get_bin_center(df['bin_y'], y_min, bin_size) return df def parse_bin_coor_no_offset(df: pd.DataFrame, bin_size: int): df['bin_x'] = ((df['x'] // bin_size) * bin_size).astype(np.uint64) df['bin_y'] = ((df['y'] // bin_size) * bin_size).astype(np.uint64) df['cell_id'] = np.bitwise_or( np.left_shift(df['bin_x'], 32), df['bin_y'] ) df.sort_values(by=['geneID', 'cell_id'], inplace=True) df['cell_id'] = df['cell_id'].astype('U') return df def parse_cell_bin_coor(df): gdf = df.groupby('cell_id').apply(lambda x: make_multipoint(x)) return gdf def make_multipoint(x): p = [Point(i) for i in zip(x['x'], x['y'])] mlp = MultiPoint(p).convex_hull x_center = mlp.centroid.x y_center = mlp.centroid.y return pd.Series({'cell_point': mlp, 'x_center': x_center, 'y_center': y_center}) def merge_bin_coor(coor: np.ndarray, coor_min: int, bin_size: int): return np.floor((coor - coor_min) / bin_size).astype(np.int32) def get_bin_center(bin_coor: np.ndarray, coor_min: int, bin_size: int): return bin_coor * bin_size + coor_min + int(bin_size / 2) def to_interval(interval_string: str): if interval_string.lower() == 'nan': return np.NaN [left, right] = interval_string[1:-1].split(', ') interval = pd.Interval(float(left), float(right)) return interval @ReadWriteUtils.check_file_exists def read_stereo_h5ad( file_path: str, use_raw: bool = True, use_result: bool = True, bin_type: str = None, bin_size: int = None ): """ Read the H5ad file, and generate the StereoExpData object. Parameters ---------------------- file_path the path to input H5ad file. use_raw whether to read data of `self.raw`. use_result whether to read `result` and `res_key`. bin_type the bin type includes `'bins'` or `'cell_bins'`. bin_size the size of bin to merge, when `bin_type` is set to `'bins'`. Returns -------------------- An object of StereoExpData. """ data = StereoExpData(file_path=file_path) if not data.file.exists(): logger.error('the input file is not exists, please check!') raise FileExistsError('the input file is not exists, please check!') with h5py.File(data.file, mode='r') as f: data = _read_stereo_h5ad_from_group(f, data, use_raw, use_result, bin_type, bin_size) return data def _read_stereo_h5ad_from_group(f: Union[h5py.File, h5py.Group], data: StereoExpData, use_raw, use_result, bin_type=None, bin_size=None): # read data data.bin_type = bin_type if bin_type is not None else 'bins' data.bin_size = bin_size if bin_size is not None else 1 not_data_attr_keys = {'bin_type', 'bin_size', 'merged'} if f.attrs is not None: data.attr = {} for key, value in f.attrs.items(): if key not in not_data_attr_keys: data.attr[key] = value else: setattr(data, key, value) for k in f.keys(): if k == 'cells': data.cells = h5ad.read_group(f[k]) elif k == 'genes': data.genes = h5ad.read_group(f[k]) if 'mean_bin' in data.genes: data.genes['mean_bin'] = [to_interval(interval_string) for interval_string in data.genes['mean_bin']] elif k == 'position': data.spatial = h5ad.read_dataset(f[k]) elif k == 'position_offset': data.position_offset = h5ad.read_group(f[k]) elif k == 'position_min': data.position_min = h5ad.read_group(f[k]) elif k == 'bin_type': data.bin_type = h5ad.read_dataset(f[k]) elif k == 'bin_size': data.bin_size = h5ad.read_dataset(f[k]) elif k == 'merged': data.merged = h5ad.read_dataset(f[k]) elif k == 'exp_matrix': if isinstance(f[k], h5py.Group): data.exp_matrix = h5ad.read_group(f[k]) else: data.exp_matrix = h5ad.read_dataset(f[k]) elif k == 'sn': sn_data = h5ad.read_group(f[k]) if sn_data.shape[0] == 1: data.sn = str(sn_data['sn'][0]) else: data.sn = {} for _, row in sn_data.iterrows(): batch, sn = row[0], row[1] data.sn[str(batch)] = str(sn) elif k == 'layers': for layer_key in f[k].keys(): if isinstance(f[k][layer_key], h5py.Group): data.layers[layer_key] = h5ad.read_group(f[k][layer_key]) else: data.layers[layer_key] = h5ad.read_dataset(f[k][layer_key]) # read raw if use_raw is True and 'exp_matrix@raw' in f.keys(): data.tl.raw = StereoExpData() if isinstance(f['exp_matrix@raw'], h5py.Group): data.tl.raw.exp_matrix = h5ad.read_group(f['exp_matrix@raw']) else: data.tl.raw.exp_matrix = h5ad.read_dataset(f['exp_matrix@raw']) if 'cells@raw' in f.keys(): data.tl.raw.cells = h5ad.read_group(f['cells@raw']) else: data.tl.raw.cells = deepcopy(data.cells) if 'genes@raw' in f.keys(): data.tl.raw.genes = h5ad.read_group(f['genes@raw']) else: data.tl.raw.genes = deepcopy(data.genes) if 'position@raw' in f.keys(): position = h5ad.read_dataset(f['position@raw']) data.tl.raw.position = position[:, [0, 1]] if position.shape[1] >= 3: data.tl.raw.position_z = position[:, [2]] else: data.tl.raw.position = deepcopy(data.position) # read key_record and result if use_result is True and 'key_record' in f.keys(): h5ad.read_key_record(f['key_record'], data.tl.key_record) _read_stereo_h5_result(data.tl.key_record, data, f) return data def _read_stereo_h5_result(key_record: dict, data: StereoExpData, f: Union[h5py.File, h5py.Group]): import ast from ..utils.pipeline_utils import cell_cluster_to_gene_exp_cluster key_record = deepcopy(key_record) for analysis_key in list(key_record.keys()): res_keys = key_record[analysis_key] for res_key in res_keys: if analysis_key == 'hvg': # hvg_df = h5ad.read_group(f[f'{res_key}@hvg']) # # str to interval # if 'mean_bin' in hvg_df.columns: # hvg_df['mean_bin'] = [to_interval(interval_string) for interval_string in hvg_df['mean_bin']] # data.tl.result[res_key] = hvg_df hvg_columns = h5ad.read_dataset(f[f'{res_key}@hvg']) dict.setdefault(data.tl.result, res_key, list(hvg_columns)) if analysis_key in ['pca', 'umap', 'totalVI', 'spatial_alignment_integration']: data.tl.result[res_key] = pd.DataFrame(h5ad.read_dataset(f[f'{res_key}@{analysis_key}'])) if analysis_key == 'pca': variance_ratio_key = f'{res_key}_variance_ratio' if f'{variance_ratio_key}@{analysis_key}_variance_ratio' in f.keys(): data.tl.result[variance_ratio_key] = h5ad.read_dataset(f[f'{variance_ratio_key}@{analysis_key}_variance_ratio']) # noqa if analysis_key == 'neighbors': neighbor_res = { # 'neighbor': h5ad.read_group(f[f'neighbor@{res_key}@neighbors']), 'neighbor': None, 'connectivities': h5ad.read_group(f[f'connectivities@{res_key}@neighbors']), 'nn_dist': h5ad.read_group(f[f'nn_dist@{res_key}@neighbors']) } if f'neighbor@{res_key}@neighbors' in f: neighbor_res['neighbor'] = h5ad.read_group(f[f'neighbor@{res_key}@neighbors']) if f'params@{res_key}@neighbors' in f: neighbor_res['params'] = h5ad.read_group(f[f'params@{res_key}@neighbors']) data.tl.result[res_key] = neighbor_res if analysis_key == 'cluster': if f'{res_key}@cluster' in f: data.tl.result[res_key] = h5ad.read_group(f[f'{res_key}@cluster']) gene_cluster_res_key = f'gene_exp_{res_key}' if ('gene_exp_cluster' not in data.tl.key_record) or ( gene_cluster_res_key not in data.tl.key_record['gene_exp_cluster']): gene_cluster_res = cell_cluster_to_gene_exp_cluster(data, res_key) if gene_cluster_res is not False: data.tl.result[gene_cluster_res_key] = gene_cluster_res data.tl.reset_key_record('gene_exp_cluster', gene_cluster_res_key) if analysis_key == 'sct': data.tl.result[res_key] = [ { 'counts': h5ad.read_group(f[f'exp_matrix@{res_key}@sct_counts']), 'data': h5ad.read_group(f[f'exp_matrix@{res_key}@sct_data']), 'scale.data': h5ad.read_group(f[f'exp_matrix@{res_key}@sct_scale']), }, { 'top_features': h5ad.read_dataset(f[f'genes@{res_key}@sct_top_features']), 'umi_genes': h5ad.read_dataset(f[f'genes@{res_key}@sct']), 'umi_cells': h5ad.read_dataset(f[f'cells@{res_key}@sct']), } ] data.tl.result[res_key][0]['scale.data'] = pd.DataFrame( data.tl.result[res_key][0]['scale.data'].toarray(), columns=data.tl.result[res_key][1]['umi_cells'], index=h5ad.read_dataset(f[f'genes@{res_key}@sct_scale_genename']), ) if analysis_key == 'gene_exp_cluster': data.tl.result[res_key] = h5ad.read_group(f[f'{res_key}@gene_exp_cluster']) if analysis_key == 'marker_genes': clusters = h5ad.read_dataset(f[f'clusters_record@{res_key}@marker_genes']) data.tl.result[res_key] = {} for cluster in clusters: cluster_key = f'{cluster}@{res_key}@marker_genes' if cluster != 'parameters': data.tl.result[res_key][cluster] = h5ad.read_group(f[cluster_key]) else: parameters_df: pd.DataFrame = h5ad.read_group(f[cluster_key]) data.tl.result[res_key]['parameters'] = {} for _, row in parameters_df.iterrows(): name = row['name'] value = row['value'] if value.lower() == 'true': value = True elif value.lower() == 'false': value = False data.tl.result[res_key]['parameters'][name] = value if analysis_key == 'cell_cell_communication': data.tl.result[res_key] = {} for key in ['means', 'significant_means', 'deconvoluted', 'pvalues']: full_key = f'{res_key}@{key}@cell_cell_communication' if full_key in f.keys(): data.tl.result[res_key][key] = h5ad.read_group(f[full_key]) parameters_df: pd.DataFrame = h5ad.read_group(f[f'{res_key}@parameters@cell_cell_communication']) data.tl.result[res_key]['parameters'] = {} for i, row in parameters_df.iterrows(): name = row['name'] value = row['value'] data.tl.result[res_key]['parameters'][name] = value if analysis_key == 'regulatory_network_inference': data.tl.result[res_key] = {} for key in ['regulons', 'auc_matrix', 'adjacencies']: full_key = f'{res_key}@{key}@regulatory_network_inference' if full_key in f.keys(): if key == 'regulons': data.tl.result[res_key][key] = ast.literal_eval(h5ad.read_dataset(f[full_key])) else: data.tl.result[res_key][key] = h5ad.read_group(f[full_key]) if analysis_key in ['co_occurrence']: data.tl.result[res_key] = {} for full_key in f.keys(): if not full_key.endswith(analysis_key): continue data_key = full_key.split('@')[1] data.tl.result[res_key][data_key] = h5ad.read_group(f[full_key]) def _read_anndata_from_group(f: h5py.Group) -> AnnBasedStereoExpData: from distutils.version import StrictVersion from anndata import __version__ as anndata_version if StrictVersion(anndata_version) < StrictVersion('0.8.0'): from anndata._io.utils import read_attribute as read_elem else: from anndata._io.specs.registry import read_elem adata = AnnData( **{k: read_elem(f[k]) for k in f.keys()} ) data = AnnBasedStereoExpData(based_ann_data=adata) # if 'key_record' in adata.uns: # data.tl.key_record = {k: list(v) for k, v in adata.uns['key_record'].items()} # del adata.uns['key_record'] data.merged = f.attrs.get('merged', False) data.spatial_key = f.attrs.get('spatial_key', 'spatial') return data
[docs]@ReadWriteUtils.check_file_exists def read_h5ms(file_path, use_raw=True, use_result=True): """ Load a h5ms file as an object of MSData :param file_path: The path of h5ms file to be loaded. :param use_raw: Whether to load the raw data of each StereoExpData in MSData, defaults to True. :param use_result: Whether to load the analysis results which had been saved in h5ms file, defaults to True. :return: An object of MSData """ from stereo.core.ms_data import MSData with h5py.File(file_path, mode='r') as f: # ms_data = MSData() data_list = [] merged_data = None names = [] var_type = None relationship = None scopes_data = {} result_keys = {} # result = {} for k in f.keys(): if k == 'sample': slice_keys = list(f[k].keys()) slice_keys.sort(key=lambda k: int(k.split('_')[1])) for one_slice_key in slice_keys: # data = _read_stereo_h5ad_from_group(f[k][one_slice_key], StereoExpData(), use_raw, use_result) encoding_type = f[k][one_slice_key].attrs.get('encoding-type', 'stereo_exp_data') if encoding_type == 'anndata': data = _read_anndata_from_group(f[k][one_slice_key]) else: data = _read_stereo_h5ad_from_group(f[k][one_slice_key], StereoExpData(), use_raw, use_result) data_list.append(data) elif k == 'sample_merged': for mk in f[k].keys(): # scope_data = _read_stereo_h5ad_from_group(f[k][mk], StereoExpData(), use_raw, use_result) encoding_type = f[k][mk].attrs.get('encoding-type', 'stereo_exp_data') if encoding_type == 'anndata': scope_data = _read_anndata_from_group(f[k][mk]) else: scope_data = _read_stereo_h5ad_from_group(f[k][mk], StereoExpData(), use_raw, use_result) scopes_data[mk] = scope_data if f[k][mk].attrs is not None: merged_from_all = f[k][mk].attrs.get('merged_from_all', False) if merged_from_all: merged_data = scope_data # merged_data = StereoExpData() # merged_data = _read_stereo_h5ad_from_group(f[k], merged_data, use_raw, use_result) # noqa elif k == 'names': names = h5ad.read_dataset(f[k]) if isinstance(names, np.ndarray): names = names.tolist() elif k == 'var_type': var_type = h5ad.read_dataset(f[k]) elif k == 'relationship': relationship = h5ad.read_dataset(f[k]) elif k == 'result_keys': for rk in f[k].keys(): result_keys[rk] = list(h5ad.read_dataset(f[k][rk])) # elif k == 'mss': # for key in f['mss'].keys(): # data = StereoExpData() # data.tl.result = {} # h5ad.read_key_record(f['mss'][key]['key_record'], data.tl.key_record) # _read_stereo_h5_result(data.tl.key_record, data, f['mss'][key]) # result[key] = data.tl.result else: logger.warn(f"{k} not in rules, did not read from h5ms") ms_data = MSData( _data_list=data_list, _names=names, _var_type=var_type, _relationship=relationship ) ms_data.merged_data = merged_data # ms_data.tl.result = result ms_data.scopes_data = scopes_data ms_data.tl.result_keys = result_keys return ms_data
[docs]@ReadWriteUtils.check_file_exists def read_seurat_h5ad( file_path: str, use_raw: bool = False ): """ Read the H5ad file in Anndata format of Seurat, and generate the StereoExpData object. Parameters ------------------ file_path the path of input H5ad file. use_raw whether to read data of `self.raw`. Returns ---------------------- An object of StereoExpData. """ data = StereoExpData(file_path=file_path) # basic # attributes = ["obsm", "varm", "obsp", "varp", "uns", "layers"] # df_attributes = ["obs", "var"] with h5py.File(data.file, mode='r') as f: if 'raw' not in f.keys(): use_raw = False for k in f.keys(): if k == "raw" or k.startswith("raw."): continue if k == "X": if isinstance(f[k], h5py.Dataset): data.exp_matrix = h5ad.read_dense_as_sparse(f[k], csr_matrix, 10000) else: data.exp_matrix = h5ad.read_group(f[k]) elif k == "obs": cells_df = h5ad.read_dataframe(f[k]) data.cells.cell_name = cells_df.index.values data.cells.total_counts = cells_df['total_counts'] if 'total_counts' in cells_df.keys() else None data.cells.pct_counts_mt = cells_df['pct_counts_mt'] if 'pct_counts_mt' in cells_df.keys() else None data.cells.n_genes_by_counts = cells_df[ 'n_genes_by_counts'] if 'n_genes_by_counts' in cells_df.keys() else None data.position = cells_df[['x', 'y']].to_numpy(dtype=np.uint32) for cluster_key in f['obs']['__categories'].keys(): if cluster_key == 'orig.ident': continue data.tl.result[cluster_key] = pd.DataFrame( {'bins': data.cells.cell_name, 'group': cells_df[cluster_key].values}) data.tl.key_record['cluster'].append(cluster_key) elif k == "var": genes_df = h5ad.read_dataframe(f[k]) data.genes.gene_name = genes_df.index.values if 'highly_variable' in genes_df: data.tl.result['highly_variable_genes'] = pd.DataFrame({ 'means': genes_df.means.values, 'dispersions': genes_df.dispersions.values, 'dispersions_norm': genes_df.dispersions_norm.values, 'highly_variable': genes_df.highly_variable.values == 1 }, index=genes_df.index.values) data.tl.key_record['hvg'].append('highly_variable_genes') elif k == 'obsm': for key in f['obsm'].keys(): if key == 'X_spatial': continue if key == 'X_pca': data.tl.result['pca'] = pd.DataFrame(h5ad.read_dataset(f['obsm']['X_pca'])) data.tl.key_record['pca'].append('pca') elif key == 'X_umap': data.tl.result['umap'] = pd.DataFrame(h5ad.read_dataset(f['obsm']['X_umap'])) data.tl.key_record['umap'].append('umap') else: # Base case pass if use_raw: data.tl.raw = StereoExpData() if isinstance(f['raw']['X'], h5py.Dataset): data.tl.raw.exp_matrix = h5ad.read_dense_as_sparse(f['raw']['X'], csr_matrix, 10000) else: data.tl.raw.exp_matrix = h5ad.read_group(f['raw']['X']) if 'obs' in f['raw']: cells_df = h5ad.read_dataframe(f[k]) data.tl.raw.cells.cell_name = cells_df.index.values data.position = cells_df[['x', 'y']].to_numpy(dtype=np.uint32) else: data.tl.raw.cells.cell_name = data.cells.cell_name.copy() data.tl.raw.position = data.position.copy() if 'var' in f['raw']: genes_df = h5ad.read_dataframe(f['raw']['var']) data.tl.raw.genes.gene_name = genes_df.index.values else: data.tl.raw.genes.gene_name = data.genes.gene_name.copy() return data
@ReadWriteUtils.check_file_exists def read_ann_h5ad( file_path: str, spatial_key: Optional[str] = "spatial", bin_type: str = None, bin_size: int = None, resolution: Optional[int] = 500 ): """ Read the H5ad file in Anndata format of Scanpy, and generate the StereoExpData object. Parameters ------------------ file_path the path to input H5ad file. spatial_key use `.obsm['spatial_key']` as coordiante information. bin_type the bin type includes `'bins'` or `'cell_bins'`, default to `'bins'`. bin_size the size of bin to merge, when `bin_type` is set to `'bins'`. resolution the resolution of chip, default 500nm. Returns --------------- An object of StereoExpData. """ data = StereoExpData(file_path=file_path, bin_type=bin_type, bin_size=bin_size) # basic # attributes = ["obsm", "varm", "obsp", "varp", "uns", "layers"] # df_attributes = ["obs", "var"] with h5py.File(data.file, mode='r') as f: for k in f.keys(): if k == "raw" or k.startswith("raw."): continue if k == "X": if isinstance(f[k], h5py.Group): data.exp_matrix = h5ad.read_group(f[k]) else: data.exp_matrix = h5ad.read_dataset(f[k]) elif k == "raw": assert False, "unexpected raw format" elif k == "obs": cells_df = h5ad.read_dataframe(f[k]) data.cells.cell_name = cells_df.index.values data.cells.total_counts = cells_df['total_counts'] if 'total_counts' in cells_df.keys() else None data.cells.pct_counts_mt = cells_df['pct_counts_mt'] if 'pct_counts_mt' in cells_df.keys() else None data.cells.n_genes_by_counts = cells_df[ 'n_genes_by_counts'] if 'n_genes_by_counts' in cells_df.keys() else None elif k == "var": genes_df = h5ad.read_dataframe(f[k]) data.genes.gene_name = genes_df.index.values elif k == 'obsm': if spatial_key is not None: if isinstance(f[k], h5py.Group): position = h5ad.read_group(f[k])[spatial_key] else: position = h5ad.read_dataset(f[k])[spatial_key] data.position = position[:, [0, 1]] if position.shape[1] >= 3: data.position_z = position[:, [2]] elif k == 'uns': uns = h5ad.read_group(f[k]) if 'bin_type' in uns: bin_type = uns['bin_type'] if 'bin_size' in uns: bin_size = uns['bin_size'] if 'resolution' in uns: resolution = uns['resolution'] if 'sn' in uns: sn_data = uns['sn'] if sn_data.shape[0] == 1: data.sn = str(sn_data['sn'][0]) else: data.sn = {} for _, row in sn_data.iterrows(): batch, sn = row[0], row[1] data.sn[str(batch)] = str(sn) else: # Base case pass data.bin_type = bin_type data.bin_size = bin_size data.attr = {'resolution': resolution} return data # @ReadWriteUtils.check_file_exists
[docs]def read_h5ad( file_path: str = None, anndata: AnnData = None, flavor: str = 'scanpy', bin_type: str = None, bin_size: int = None, spatial_key: str = 'spatial', **kwargs ) -> Union[StereoExpData, AnnBasedStereoExpData]: """ Read a h5ad file or load a AnnData object Parameters ------------------ file_path the path of the h5ad file. anndata the object of AnnData to be loaded, only available while `flavor` is `'scanpy'`. `file_path` and `anndata` only can input one of them. flavor the format of the h5ad file, defaults to `'scanpy'`. `scanpy`: AnnData format of scanpy `stereopy`: h5 format of stereo bin_type the bin type includes `'bins'` and `'cell_bins'`. bin_size the size of bin to merge, when `bin_type` is set to `'bins'`. spatial_key the key of spatial information in AnnData.obsm, default to `'spatial'`. Only available while `flavor` is `'scanpy'`. Returns --------------- An object of StereoExpData while `flavor` is `'stereopy'` or an object of AnnBasedStereoExpData while `flavor` is `'scanpy'` """ flavor = flavor.lower() if flavor == 'stereopy': if file_path is None: raise ValueError("The 'file_path' must be input.") if kwargs is None: kwargs = {} kwargs['bin_type'] = bin_type kwargs['bin_size'] = bin_size return read_stereo_h5ad(file_path, **kwargs) elif flavor == 'scanpy': if file_path is None and anndata is None: raise Exception("Must to input the 'file_path' or 'anndata'.") if file_path is not None and anndata is not None: raise Exception("'file_path' and 'anndata' only can input one of them") return AnnBasedStereoExpData(h5ad_file_path=file_path, based_ann_data=anndata, bin_type=bin_type, bin_size=bin_size, spatial_key=spatial_key, **kwargs) else: raise ValueError("Invalid value for 'flavor'")
def anndata_to_stereo( andata: AnnData, use_raw: bool = False, spatial_key: Optional[str] = None, resolution: Optional[int] = 500 ): """ Transform the Anndata object into StereoExpData format. Parameters ----------------------- andata the input Anndata object. use_raw use `anndata.raw.X` if True, otherwise `anndata.X`. spatial_key use `.obsm['spatial_key']` as coordiante information. resolution the resolution of chip, default 500nm. Returns --------------------- An object of StereoExpData. """ # data matrix,including X,raw,layer data = StereoExpData() data.exp_matrix = andata.raw.X if use_raw else andata.X # obs -> cell data.cells.cell_name = np.array(andata.obs_names) data.cells.n_genes_by_counts = andata.obs[ 'n_genes_by_counts'] if 'n_genes_by_counts' in andata.obs.columns.tolist() else None data.cells.total_counts = andata.obs['total_counts'] if 'total_counts' in andata.obs.columns.tolist() else None data.cells.pct_counts_mt = andata.obs['pct_counts_mt'] if 'pct_counts_mt' in andata.obs.columns.tolist() else None # var data.genes.gene_name = np.array(andata.var_names) data.genes.n_cells = andata.var['n_cells'] if 'n_cells' in andata.var.columns.tolist() else None data.genes.n_counts = andata.var['n_counts'] if 'n_counts' in andata.var.columns.tolist() else None # position if spatial_key is not None: position = andata.obsm[spatial_key] data.position = position[:, [0, 1]] if position.shape[1] >= 3: data.position_z = position[:, [2]] if 'bin_type' in andata.uns: data.bin_type = andata.uns['bin_type'] if 'bin_size' in andata.uns: data.bin_size = andata.uns['bin_size'] if 'resolution' in andata.uns: resolution = andata.uns['resolution'] data.attr = {'resolution': resolution} return data
[docs]def stereo_to_anndata( data: StereoExpData, flavor: Literal['scanpy', 'seurat'] = 'scanpy', sample_id: str = "sample", reindex: bool = False, output: str = None, base_adata: AnnData = None, split_batches: bool = True, compression: Optional[Literal["gzip", "lzf"]] = 'gzip' ) -> AnnData: """ Transform the StereoExpData object into Anndata format. Parameters ----------------------- data the input StereoExpData object. flavor if you want to convert the output file into h5ad of Seurat, please set `'seurat'`. sample_id the sample name which will be set as `orig.ident` in obs. reindex if `True`, the cell index will be reindexed as `{sample_id}:{position_x}_{position_y}` format. output the path to output h5ad file. base_adata the input Anndata object. split_batches Whether to save each batch to a single file if it is a merged data, default to True. compression: The compression method to be used when saving data as a h5ad file, None means uncompressed, default to gzip. Returns ----------------- An object of Anndata. """ if data.merged and split_batches: from os import path from ..utils.data_helper import split data_list = split(data) batch = np.unique(data.cells.batch) adata_list = [] if output is not None: name, ext = path.splitext(output) for bno, d in zip(batch, data_list): if output is not None: boutput = f"{name}-{d.sn}{ext}" else: boutput = None adata = stereo_to_anndata(d, flavor=flavor, sample_id=sample_id, reindex=reindex, output=boutput, split_batches=False) adata_list.append(adata) return adata_list from scipy.sparse import issparse if isinstance(data, AnnBasedStereoExpData) and base_adata is None: base_adata = data._ann_data.copy() if base_adata is None: adata = AnnData(shape=data.exp_matrix.shape, dtype=np.float64, obs=data.cells.to_df(), var=data.genes.to_df()) adata.X = data.exp_matrix else: adata = base_adata # sample id logger.info(f"Adding {sample_id} in adata.obs['orig.ident'].") adata.obs['orig.ident'] = pd.Categorical([sample_id] * adata.obs.shape[0], categories=[sample_id]) if (data.position is not None) and ('spatial' not in adata.obsm): logger.info("Adding data.position as adata.obsm['spatial'] .") if data.position_z is not None: adata.obsm['spatial'] = np.concatenate([data.position, data.position_z], axis=1) else: adata.obsm['spatial'] = data.position logger.info("Adding data.position as adata.obs['x'] and adata.obs['y'] .") cell_names_index = data.cell_names.astype('str') adata.obs['x'] = pd.DataFrame(data.position[:, 0], index=cell_names_index) adata.obs['y'] = pd.DataFrame(data.position[:, 1], index=cell_names_index) if data.position_z is not None: adata.obs['z'] = pd.DataFrame(data.position_z, index=cell_names_index) if flavor != 'seurat': if data.bin_type is not None: adata.uns['bin_type'] = data.bin_type if data.bin_size is not None: adata.uns['bin_size'] = 1 if data.bin_type == 'cell_bins' else data.bin_size if data.attr is not None and 'resolution' in data.attr: adata.uns['resolution'] = data.attr['resolution'] if data.bin_type == 'cell_bins' and data.cells.cell_border is not None: adata.obsm['cell_border'] = deepcopy(data.cells.cell_border) if 'key_record' not in adata.uns: adata.uns['key_record'] = deepcopy(data.tl.key_record) if data.position_offset is not None: adata.uns['position_offset'] = deepcopy(data.position_offset) if data.position_min is not None: adata.uns['position_min'] = deepcopy(data.position_min) adata.uns['merged'] = data.merged if data.sn is not None: if isinstance(data.sn, str): sn_list = [['-1', data.sn]] else: sn_list = [] for bno, sn in data.sn.items(): sn_list.append([bno, sn]) adata.uns['sn'] = pd.DataFrame(sn_list, columns=['batch', 'sn']) for key, value in data.layers.items(): adata.layers[key] = deepcopy(value) for key in data.tl.key_record.keys(): if data.tl.key_record[key]: if key == 'hvg': res_key = data.tl.key_record[key][-1] logger.info(f"Adding data.tl.result['{res_key}'] into adata.var .") adata.uns[key] = {'params': {}, 'source': 'stereopy', 'method': key} for i in data.tl.result[res_key]: adata.var[i] = data.tl.result[res_key][i] if 'mean_bin' in adata.var.columns: adata.var.drop(columns='mean_bin', inplace=True) elif key == 'sct': res_key = data.tl.key_record[key][-1] zero_index_data = data.tl.result[res_key][0] one_index_data = data.tl.result[res_key][1] logger.info(f"Adding data.tl.result['{res_key}'] into adata.uns['sct_'] .") adata.uns['sct_counts'] = csr_matrix(zero_index_data['counts'].T) adata.uns['sct_data'] = csr_matrix(zero_index_data['data'].T) adata.uns['sct_scale'] = csr_matrix(zero_index_data['scale.data'].T.to_numpy()) adata.uns['sct_scale_genename'] = list(zero_index_data['scale.data'].index) adata.uns['sct_top_features'] = list(one_index_data['top_features']) adata.uns['sct_cellname'] = list(one_index_data['umi_cells'].astype('str')) adata.uns['sct_genename'] = list(one_index_data['umi_genes']) elif key in ['pca', 'umap', 'tsne', 'totalVI', 'spatial_alignment_integration']: # pca :we do not keep variance and PCs(for varm which will be into feature.finding in pca of seurat.) res_key = data.tl.key_record[key][-1] sc_key = f'X_{key}' logger.info(f"Adding data.tl.result['{res_key}'] into adata.obsm['{sc_key}'] .") adata.obsm[sc_key] = data.tl.result[res_key].values if key == 'pca': variance_ratio_key = f'{res_key}_variance_ratio' if variance_ratio_key in data.tl.result: logger.info(f"Adding data.tl.result['{variance_ratio_key}'] into adata.uns['{key}_variance_ratio'] .") adata.uns[variance_ratio_key] = data.tl.result[variance_ratio_key] elif key == 'neighbors': # neighbor :seurat use uns for conversion to @graph slot, but scanpy canceled neighbors of uns at present. # noqa # so this part could not be converted into seurat straightly. for res_key in data.tl.key_record[key]: sc_con = 'connectivities' if res_key == 'neighbors' else f'{res_key}_connectivities' sc_dis = 'distances' if res_key == 'neighbors' else f'{res_key}_distances' logger.info(f"Adding data.tl.result['{res_key}']['connectivities'] into adata.obsp['{sc_con}'] .") logger.info(f"Adding data.tl.result['{res_key}']['nn_dist'] into adata.obsp['{sc_dis}'] .") adata.obsp[sc_con] = data.tl.result[res_key]['connectivities'] adata.obsp[sc_dis] = data.tl.result[res_key]['nn_dist'] logger.info(f"Adding info into adata.uns['{res_key}'].") adata.uns[res_key] = {} adata.uns[res_key]['connectivities_key'] = sc_con adata.uns[res_key]['distances_key'] = sc_dis if 'params' in data.tl.result[res_key]: adata.uns[res_key]['params'] = data.tl.result[res_key]['params'] elif key == 'cluster': cell_name_index = data.cells.cell_name.astype('str') for res_key in data.tl.key_record[key]: logger.info(f"Adding data.tl.result['{res_key}'] into adata.obs['{res_key}'] .") adata.obs[res_key] = pd.DataFrame(data.tl.result[res_key]['group'].values, index=cell_name_index) elif key in ('gene_exp_cluster', 'cell_cell_communication'): for res_key in data.tl.key_record[key]: # logger.info(f"Adding data.tl.result['{res_key}'] into adata.uns['{key}@{res_key}']") # adata.uns[f"{key}@{res_key}"] = data.tl.result[res_key] logger.info(f"Adding data.tl.result['{res_key}'] into adata.uns['{res_key}']") adata.uns[res_key] = data.tl.result[res_key] # elif key == 'regulatory_network_inference': # for res_key in data.tl.key_record[key]: # logger.info(f"Adding data.tl.result['{res_key}'] into adata.uns['{res_key}'] .") # regulon_key = f'{res_key}_regulons' # res_key_data = data.tl.result[res_key] # adata.uns[regulon_key] = res_key_data['regulons'] # auc_matrix_key = f'{res_key}_auc_matrix' # adata.uns[auc_matrix_key] = res_key_data['auc_matrix'] # adjacencies_key = f'{res_key}_adjacencies' # adata.uns[adjacencies_key] = res_key_data['adjacencies'] elif key in ('co_occurrence', 'regulatory_network_inference'): for res_key in data.tl.key_record[key]: logger.info(f"Adding data.tl.result['{res_key}'] into adata.uns['{res_key}'] .") adata.uns[res_key] = data.tl.result[res_key] elif key == 'marker_genes': for res_key in data.tl.key_record[key]: uns_key = _BaseResult.RENAME_DICT.get(res_key, res_key) adata.uns[uns_key] = transform_marker_genes_to_anndata(data.tl.result[res_key]) elif key == 'spatial_hotspot': for res_key in data.tl.key_record[key]: if res_key in adata.uns: del adata.uns[res_key] if 'key_record' in adata.uns: adata.uns['key_record']['spatial_hotspot'] = [] else: continue if data.tl.raw is not None: if flavor == 'seurat': # keep same shape between @counts and @data for seurat,because somtimes dim of sct are not the same. logger.info("Adding data.tl.raw.exp_matrix as adata.uns['raw_counts'] .") adata.uns['raw_counts'] = data.tl.raw.exp_matrix if issparse(data.tl.raw.exp_matrix) \ else csr_matrix(data.tl.raw.exp_matrix) list_cell_names = data.tl.raw.cell_names.astype(str) adata.uns['raw_cellname'] = list(list_cell_names) adata.uns['raw_genename'] = list(data.tl.raw.gene_names) if data.tl.raw.position is not None and reindex: logger.info("Reindex as adata.uns['raw_cellname'] .") raw_sample = pd.DataFrame(['sample'] * data.tl.raw.cell_names.shape[0], index=list_cell_names) raw_x = pd.DataFrame(data.tl.raw.position[:, 0].astype(str), index=list_cell_names) raw_y = pd.DataFrame(data.tl.raw.position[:, 1].astype(str), index=list_cell_names) new_ix = np.array(raw_sample + "_" + raw_x + "_" + raw_y).tolist() adata.uns['raw_cellname'] = new_ix else: logger.info("Adding data.tl.raw.exp_matrix as adata.raw .") raw_exp = data.tl.raw.exp_matrix raw_genes = data.tl.raw.genes.to_df() raw_genes.dropna(axis=1, how='all', inplace=True) raw_adata = AnnData(shape=raw_exp.shape, var=raw_genes, dtype=np.float64) raw_adata.X = raw_exp adata.raw = raw_adata if reindex: logger.info("Reindex adata.X .") new_ix = (adata.obs['orig.ident'].astype(str) + ":" + adata.obs['x'].astype(str) + "_" + adata.obs['y'].astype(str)).tolist() adata.obs.index = new_ix if 'sct_cellname' in adata.uns.keys(): logger.info("Reindex as adata.uns['sct_cellname'] .") adata.uns['sct_cellname'] = new_ix if flavor == 'seurat': logger.info("Rename QC info.") adata.obs.rename(columns={'total_counts': "nCount_Spatial", "n_genes_by_counts": "nFeature_Spatial", "pct_counts_mt": 'percent.mito'}, inplace=True) logger.info("Finished conversion to anndata.") if output is not None: adata.write_h5ad(output, compression=compression) logger.info(f"Finished output to {output}") return adata
# def check_file(path, prefix, suffix): # filename = f"{path}/{prefix}{suffix}" # if os.path.isfile(filename): # return filename # elif suffix in {"matrix.mtx", "barcodes.tsv"} and os.path.isfile(f"{filename}.gz"): # return f'{filename}.gz' # elif suffix == "genes.tsv" and os.path.isfile(f'{path}/{prefix}features.tsv.gz'): # return f'{path}/{prefix}features.tsv.gz' # else: # # logger.error(f"{path} is not exist!") # # raise FileExistsError(f"can not find {path}/{prefix}{suffix}(or with .gz)!") # raise ValueError(f"can not find {filename}(or with .gz).") # def read_10x_data(path, prefix="", gene_ex_only=True): # """ # read 10x data # # :param path: the dictionary of the input files # :param prefix: the prefix of the input files # :param gene_ex_only: return gene expression data only if is True # :return: anndata # """ # # 1. check file status. # # if not os.path.exists(path): # # logger.error(f"{path} is not exist!") # # raise FileExistsError(f"{path} is not exist!") # basic_fileset = {'barcodes.tsv', 'genes.tsv', 'matrix.mtx'} # genefile = (f"{path}/{prefix}genes.tsv") # featurefile = (f"{path}/{prefix}features.tsv.gz") # adata = read_10x_mtx(path, prefix) # if os.path.isfile(genefile) or not gene_ex_only: # return adata # else: # gex_rows = list(map(lambda x: x == 'Gene Expression', adata.var['feature_types'])) # return adata[:, gex_rows].copy() # def read_10x_mtx(path, prefix="", var_names='gene_symbols', make_unique=True): # mtxfile = check_file(path, prefix, "matrix.mtx") # genesfile = check_file(path, prefix, "genes.tsv") # barcodesfile = check_file(path, prefix, "barcodes.tsv") # adata = read_mtx(mtxfile).T # transpose # genes = pd.read_csv(genesfile, header=None, sep='\t') # gene_id = genes[0].values # gene_symbol = genes[1].values # if var_names == 'gene_symbols': # var_names = genes[1].values # # if make_unique: # # var_names = anndata.utils.make_index_unique(pd.Index(var_names)) # adata.var_names = var_names # adata.var['gene_ids'] = genes[0].values # elif var_names == 'gene_ids': # adata.var_names = genes[0].values # adata.var['gene_symbols'] = genes[1].values # else: # raise ValueError("`var_names` needs to be 'gene_symbols' or 'gene_ids'") # if os.path.isfile(f"{path}/{prefix}features.tsv.gz"): # adata.var['feature_types'] = genes[2].values # # adata.obs_names = pd.read_csv(barcodesfile, header=None)[0].values # return adata
[docs]@ReadWriteUtils.check_file_exists def read_gef( file_path: str, bin_type: str = "bins", bin_size: int = 100, is_sparse: bool = True, gene_list: Optional[list] = None, region: Optional[list] = None, gene_name_index: Optional[bool] = False, num_threads: int = -1 ): """ Read the GEF (.h5) file, and generate the StereoExpData object. Parameters --------------- file_path the path to input file. bin_type the bin type includes `'bins'` or `'cell_bins'`. bin_size the size of bin to merge, which only takes effect when the `bin_type` is set as `'bins'`. is_sparse the matrix is sparse matrix, if `True`, otherwise `np.ndarray`. gene_list select targeted data based on the gene list. region restrict data to the region condition, like [minX, maxX, minY, maxY]. gene_name_index In a gef file whose version is 3 or lower, there is only a column called **geneName** which is the **gene name**, but in the version higher than 3, additional column called **geneID** is added, which is the **ID** for genes, When being higher version, setting `gene_name_index` to True means setting column **geneName** as index, otherwise, setting column **geneID** as index and the column **geneName** is stored in `data.real_gene_names`, if lower version, `gene_name_index` will be ignored and the column **geneID** will be set as index, regardless of lower or higher version, the column set as index is stored in `data.gene_names`, the index mentioned here is the index of `data.genes`. num_threads the number of threads to read the data, only available when `bin_type` is `'bins'`. -1 means to use all the cores of the machine. Returns ------------------------ An object of StereoExpData. """ logger.info('read_gef begin ...') from gefpy.utils import gef_is_cell_bin is_cell_bin = gef_is_cell_bin(file_path) if bin_type == 'cell_bins': if not is_cell_bin: raise Exception('This file is not the type of CellBin.') data = StereoExpData(file_path=file_path, file_format='gef', bin_type=bin_type, bin_size=bin_size) from gefpy.cgef_reader_cy import CgefR gef = CgefR(file_path, True) cellborders_coord_list, coord_count_per_cell = gef.get_cellborders([]) border_points_count_per_cell = int(coord_count_per_cell / 2) cell_borders = cellborders_coord_list.reshape((-1, border_points_count_per_cell, 2)) if gene_list is not None or region is not None: if gene_list is None: gene_list = [] if region is None: region = [] uniq_cell, gene_names, count, cell_ind, gene_ind, dnb_cnt, cell_area, gene_id = gef.get_filtered_data(region, gene_list) gene_num = gene_names.size cell_num = uniq_cell.size if cell_num == 0 or gene_num == 0: raise Exception('Can not find the data based on the gene list or region.') exp_matrix = csr_matrix((count, (cell_ind, gene_ind)), shape=(cell_num, gene_num), dtype=np.uint32) uniq_cell_borders = cell_borders[np.in1d(gef.get_cell_names(), uniq_cell)] data.cells = Cell(cell_name=uniq_cell, cell_border=uniq_cell_borders) data.cells['dnbCount'] = dnb_cnt data.cells['area'] = cell_area if len(gene_id[0]) == 0: gene_name_index = True if gene_name_index: if len(gene_id[0]) > 0: exp_matrix, gene_names = integrate_matrix_by_genes(gene_names, cell_num, exp_matrix.data, exp_matrix.indices, exp_matrix.indptr) data.genes = Gene(gene_name=gene_names) else: data.genes = Gene(gene_name=gene_id) data.genes['real_gene_name'] = gene_names data.exp_matrix = exp_matrix if is_sparse else exp_matrix.toarray() data.position = np.array( list((zip(np.right_shift(uniq_cell, 32), np.bitwise_and(uniq_cell, 0xffffffff))))).astype('uint32') else: cell_names = gef.get_cell_names() cell_num = gef.get_cell_num() gene_names, gene_id = gef.get_gene_names() gene_names = gene_names.astype('U') gene_id = gene_id.astype('U') gene_num = gef.get_gene_num() indices, indptr, count = gef.get_sparse_matrix_indices(order='cell') exp_matrix = csr_matrix((count, indices, indptr), shape=(cell_num, gene_num), dtype=np.uint32) data.cells = Cell(cell_name=cell_names, cell_border=cell_borders) cells = gef.get_cells() data.cells['dnbCount'] = cells['dnbCount'] data.cells['area'] = cells['area'] if len(gene_id[0]) == 0: gene_name_index = True if gene_name_index: if len(gene_id[0]) > 0: exp_matrix, gene_names = integrate_matrix_by_genes(gene_names, cell_num, count, indices, indptr) data.genes = Gene(gene_name=gene_names) else: data.genes = Gene(gene_name=gene_id) data.genes['real_gene_name'] = gene_names data.exp_matrix = exp_matrix if is_sparse else exp_matrix.toarray() data.position = np.zeros(shape=(cell_num, 2), dtype=np.uint32) data.position[:, 0] = cells['x'] data.position[:, 1] = cells['y'] data.attr = { 'resolution': read_gef_info(file_path)['resolution'] } logger.info(f'the matrix has {data.cell_names.size} cells, and {data.gene_names.size} genes.') gef.cgef_close() return data else: if is_cell_bin: raise Exception('This file is not the type of SquareBin.') from gefpy.bgef_reader_cy import BgefR if num_threads <= 0: from multiprocessing import cpu_count num_threads = cpu_count() gef = BgefR(file_path, bin_size, num_threads, True) data = StereoExpData(file_path=file_path, file_format='gef', bin_type=bin_type, bin_size=bin_size) data.offset_x, data.offset_y = gef.get_offset() gef_attr = gef.get_exp_attr() data.attr = { 'minX': gef_attr[0], 'minY': gef_attr[1], 'maxX': gef_attr[2], 'maxY': gef_attr[3], 'maxExp': gef_attr[4], 'resolution': gef_attr[5], } if gene_list is not None or region is not None: if gene_list is None: gene_list = [] if region is None: region = [] uniq_cell, gene_names, count, cell_ind, gene_ind, gene_id = gef.get_filtered_data(region, gene_list) cell_num = uniq_cell.size gene_num = gene_names.size if cell_num == 0 or gene_num == 0: raise Exception('Can not find the data based on the gene list or region.') data.cells = Cell(cell_name=uniq_cell) exp_matrix = csr_matrix((count, (cell_ind, gene_ind)), shape=(cell_num, gene_num), dtype=np.uint32) if len(gene_id[0]) == 0: gene_name_index = True if gene_name_index: if len(gene_id[0]) > 0: exp_matrix, gene_names = integrate_matrix_by_genes(gene_names, cell_num, exp_matrix.data, exp_matrix.indices, exp_matrix.indptr) data.genes = Gene(gene_name=gene_names) else: data.genes = Gene(gene_name=gene_id) data.genes['real_gene_name'] = gene_names data.exp_matrix = exp_matrix if is_sparse else exp_matrix.toarray() data.position = np.array( list((zip(np.right_shift(uniq_cell, 32), np.bitwise_and(uniq_cell, 0xffffffff))))).astype('uint32') else: cell_names = gef.get_cell_names() cell_num = gef.get_cell_num() gene_names, gene_id = gef.get_gene_names() gene_num = gef.get_gene_num() data.cells = Cell(cell_name=cell_names) if len(gene_id[0]) == 0: # an old version gef file, no gene id gene_name_index = True cell_ind, gene_ind, count = gef.get_sparse_matrix_indices2() exp_matrix = csr_matrix((count, (cell_ind, gene_ind)), shape=(cell_num, gene_num), dtype=np.uint32) if gene_name_index: if len(gene_id[0]) > 0: exp_matrix, gene_names = integrate_matrix_by_genes(gene_names, cell_num, exp_matrix.data, exp_matrix.indices, exp_matrix.indptr) data.genes = Gene(gene_name=gene_names) else: data.genes = Gene(gene_name=gene_id) data.genes['real_gene_name'] = gene_names data.exp_matrix = exp_matrix if is_sparse else exp_matrix.toarray() data.position = np.array(list( (zip(np.right_shift(cell_names, 32), np.bitwise_and(cell_names, 0xffffffff))))).astype('uint32') logger.info(f'the matrix has {data.cell_names.size} cells, and {data.gene_names.size} genes.') logger.info('read_gef end.') return data
[docs]@ReadWriteUtils.check_file_exists def read_gef_info(file_path: str): """ Read the property information of the GEF `(.h5)` file. Parameters ------------- file_path the path to input file. Returns -------------------- An attribute dictionary. """ from gefpy.utils import gef_is_cell_bin bin_type = gef_is_cell_bin(file_path) h5_file = h5py.File(file_path, 'r') info_dict = {} if not bin_type: logger.info('This is GEF file which contains traditional bin infomation.') logger.info('bin_type: bins') info_dict['bin_list'] = list(h5_file['geneExp'].keys()) logger.info('Bin size list: {0}'.format(info_dict['bin_list'])) if type(h5_file['geneExp']['bin1']['expression'].attrs['resolution']) is np.ndarray: info_dict['resolution'] = h5_file['geneExp']['bin1']['expression'].attrs['resolution'][0] else: info_dict['resolution'] = h5_file['geneExp']['bin1']['expression'].attrs['resolution'] logger.info('Resolution: {0}'.format(info_dict['resolution'])) info_dict['gene_count'] = h5_file['geneExp']['bin1']['gene'].shape[0] logger.info('Gene count: {0}'.format(info_dict['gene_count'])) maxX = h5_file['geneExp']['bin1']['expression'].attrs['maxX'][0] minX = h5_file['geneExp']['bin1']['expression'].attrs['minX'][0] maxY = h5_file['geneExp']['bin1']['expression'].attrs['maxY'][0] minY = h5_file['geneExp']['bin1']['expression'].attrs['minY'][0] info_dict['offsetX'] = minX logger.info('offsetX: {0}'.format(info_dict['offsetX'])) info_dict['offsetY'] = minY logger.info('offsetY: {0}'.format(info_dict['offsetY'])) info_dict['width'] = maxX - minX logger.info('Width: {0}'.format(info_dict['width'])) info_dict['height'] = maxY - minY logger.info('Height: {0}'.format(info_dict['height'])) info_dict['maxExp'] = h5_file['geneExp']['bin1']['expression'].attrs['maxExp'][0] logger.info('Max Exp: {0}'.format(info_dict['maxExp'])) else: logger.info('This is GEF file which contains cell bin infomation.') logger.info('bin_type: cell_bins') from gefpy.cgef_reader_cy import CgefR cgef = CgefR(file_path) info_dict['cell_num'] = cgef.get_cell_num() logger.info('Number of cells: {0}'.format(info_dict['cell_num'])) info_dict['gene_num'] = cgef.get_gene_num() logger.info('Number of gene: {0}'.format(info_dict['gene_num'])) info_dict['resolution'] = h5_file.attrs['resolution'][0] logger.info('Resolution: {0}'.format(info_dict['resolution'])) info_dict['offsetX'] = h5_file.attrs['offsetX'][0] logger.info('offsetX: {0}'.format(info_dict['offsetX'])) info_dict['offsetY'] = h5_file.attrs['offsetY'][0] logger.info('offsetY: {0}'.format(info_dict['offsetY'])) info_dict['averageGeneCount'] = h5_file['cellBin']['cell'].attrs['averageGeneCount'][0] logger.info('Average number of genes: {0}'.format(info_dict['averageGeneCount'])) info_dict['maxGeneCount'] = h5_file['cellBin']['cell'].attrs['maxGeneCount'][0] logger.info('Maximum number of genes: {0}'.format(info_dict['maxGeneCount'])) info_dict['averageExpCount'] = h5_file['cellBin']['cell'].attrs['averageExpCount'][0] logger.info('Average expression: {0}'.format(info_dict['averageExpCount'])) info_dict['maxExpCount'] = h5_file['cellBin']['cell'].attrs['maxExpCount'][0] logger.info('Maximum expression: {0}'.format(info_dict['maxExpCount'])) return info_dict
[docs]@ReadWriteUtils.check_file_exists def mudata_to_msdata( file_path: str = None, sample_names: Optional[Union[np.ndarray, List[str], None]] = None, scope_names: Optional[Union[np.ndarray, List[str], None]] = None, entire_merged_data_name: Optional[str] = None ): """ Read a h5mu file and convert it to a MSData object. :param file_path: The path of the MuData file, defaults to None :param sample_names: The names of single samples that are saved in the MuData object, defaults to None, if None, the names starting with 'sample_' will be used. :param scope_names: The names of merged samples that are saved in the MuData object, defaults to None, if None, the names like 'scope_[0,1,2]' will be used. :param entire_merged_data_name: The name of the merged sample which is merged from all samples, default to None, if None, use the one like 'scope_[0,1,2]' whose square brackets contain index sequence of all samples. :return: The MSData object. .. note:: You need to install the mudata package before using this function: pip install mudata """ try: from mudata import read_h5mu except ImportError: raise ImportError("Please install mudata first: `pip install mudata`.") from stereo.core.ms_data import MSData mudata = read_h5mu(file_path) mod_keys = list(mudata.mod.keys()) if sample_names is None: sample_names = [] left_mod_keys = [] for k in mod_keys: match = re.match(r'^sample_\d+$', k) if match: sample_names.append(k) else: left_mod_keys.append(k) sample_names.sort(key=lambda x: int(x.split('_')[1])) mod_keys = left_mod_keys data_list = [AnnBasedStereoExpData(based_ann_data=mudata[n]) for n in sample_names if n in mudata.mod] if len(data_list) == 0: raise ValueError("No sample data found in the MuData object.") if 'names' in mudata.uns: names = list(mudata.uns['names']) else: names = sample_names var_type = mudata.uns.get('var_type', 'intersect') relationship = mudata.uns.get('relationship', 'other') relationship_info = mudata.uns.get('relationship_info', {}) ms_data = MSData( _data_list=data_list, _names=names, _var_type=var_type, _relationship=relationship, _relationship_info=relationship_info ) if entire_merged_data_name is None: entire_merged_data_name = ms_data.generate_scope_key(ms_data.names) entire_merged_data = None if scope_names is None: scope_names = [] left_mod_keys = [] for k in mod_keys: match = re.match(r'^scope_\[\d+(,\d+)*\]$', k) if match: scope_names.append(k) else: left_mod_keys.append(k) mod_keys = left_mod_keys scopes_data = { n: AnnBasedStereoExpData(based_ann_data=mudata[n]) for n in scope_names if n in mudata.mod } for k in scopes_data.keys(): if k == entire_merged_data_name: entire_merged_data = scopes_data[k] if not re.match(r'^scope_\[\d+(,\d+)*\]$', k): del scopes_data[k] entire_merged_data_name = ms_data.generate_scope_key(ms_data.names) scopes_data[entire_merged_data_name] = entire_merged_data break if len(scopes_data) > 0: ms_data.scopes_data = scopes_data ms_data.merged_data = entire_merged_data if 'result_keys' in mudata.uns: for n, k in mudata.uns['result_keys'].items(): if n not in ms_data.scopes_data: continue ms_data.tl.result_keys[n] = list(k) del mudata return ms_data