Source code for stereo.io.reader

"""
@file: reader.py
@description:
@author: Ping Qiu
@email: qiuping1@genomics.cn
@last modified by: Nils Mechtel

change log:
    2021/03/05  add read_stereo_data function , by Ping Qiu.
    2021/08/12  move read_txt functions from StereoExpData here. Add read_ann_h5ad,
    andata_to_stereo function by Yiran Wu.
    2021/08/20
    2022/02/09  read raw data and result
"""
from copy import deepcopy
from typing import Optional
from typing import Union
from natsort import natsorted

import h5py
import numpy as np
import pandas as pd
from anndata import AnnData
from scipy.sparse import csr_matrix
from shapely.geometry import MultiPoint
from shapely.geometry import Point
from typing_extensions import Literal

from stereo.core.cell import Cell
from stereo.core.constants import CHIP_RESOLUTION
from stereo.core.gene import Gene
from stereo.core.stereo_exp_data import AnnBasedStereoExpData
from stereo.core.stereo_exp_data import StereoExpData
from stereo.core.result import _BaseResult
from stereo.io import h5ad
from stereo.io.utils import(
    remove_genes_number,
    integrate_matrix_by_genes,
    transform_marker_genes_to_anndata,
    get_gem_comments
)
from stereo.log_manager import logger
from stereo.utils.read_write_utils import ReadWriteUtils


[docs]@ReadWriteUtils.check_file_exists def read_gem( file_path: str, sep: str = '\t', bin_type: str = "bins", bin_size: int = 100, is_sparse: bool = True, bin_coord_offset: bool = False, gene_name_index: bool = False ): """ Read the Stereo-seq GEM file, and generate the StereoExpData object. Parameters ------------- file_path the path to input file. sep the separator string. bin_type the bin type includes `'bins'` or `'cell_bins'`, default to `'bins'`. bin_size the size of bin to merge, when `bin_type` is set to `'bins'`. is_sparse the expression matrix is sparse matrix, if `True`, otherwise `np.ndarray`. bin_coord_offset if set it to True, the coordinates of bins are calculated as ((gene_coordinates - min_coordinates) // bin_size) * bin_size + min_coordinates + bin_size/2 gene_name_index In a v0.1 gem file, the column geneID is the gene name actually, but in a v0.2, geneID just a ID for genes and there is an additional column called geneName where is the gene name, When the version of gem file is v0.2, set `gene_name_index` to True to set column geneName as index, otherwise, set column geneID, if a v0.1 gem file, `gene_name_index` will be ignored and column geneID is set as index. Returns ------------- An object of StereoExpData. """ data = StereoExpData(file_path=file_path, file_format='gem', bin_type=bin_type, bin_size=bin_size) comments_lines, _ = get_gem_comments(str(data.file)) # df = pd.read_csv(str(data.file), sep=sep, comment='#', header=0) df = pd.read_csv(str(data.file), sep=sep, header=comments_lines, engine='pyarrow') if 'MIDCounts' in df.columns: df.rename(columns={'MIDCounts': 'UMICount'}, inplace=True) elif 'MIDCount' in df.columns: df.rename(columns={'MIDCount': 'UMICount'}, inplace=True) if 'CellID' in df.columns: df.rename(columns={'CellID': 'cell_id'}, inplace=True) if 'label' in df.columns: df.rename(columns={'label': 'cell_id'}, inplace=True) dropna_subset = ['geneID', 'x', 'y', 'UMICount'] if 'cell_id' in df.columns: dropna_subset.append('cell_id') if 'geneName' in df.columns: dropna_subset.append('geneName') df.dropna( subset=dropna_subset, axis=0, inplace=True ) gdf = None if data.bin_type == 'cell_bins': gdf = parse_cell_bin_coor(df) else: if bin_coord_offset: df = parse_bin_coor(df, bin_size) else: df = parse_bin_coor_no_offset(df, bin_size) if gene_name_index and 'geneName' in df.columns: df['geneID'] = df['geneName'] cells = df['cell_id'].unique() genes = df['geneID'].unique() cells_dict = dict(zip(cells, range(0, len(cells)))) genes_dict = dict(zip(genes, range(0, len(genes)))) rows = df['cell_id'].map(cells_dict) cols = df['geneID'].map(genes_dict) # logger.info(f'the martrix has {len(cells)} cells, and {len(genes)} genes.') exp_matrix = csr_matrix((df['UMICount'], (rows, cols)), shape=(cells.shape[0], genes.shape[0]), dtype=np.int32) data.cells = Cell(cell_name=cells) data.genes = Gene(gene_name=genes) if not gene_name_index and 'geneName' in df.columns: gene_names = df.groupby(by='geneID').aggregate({'geneName': lambda n: np.unique(n)[0]})['geneName'] data.genes['real_gene_name'] = gene_names data.exp_matrix = exp_matrix if is_sparse else exp_matrix.toarray() if data.bin_type == 'bins': # data.position = df.loc[:, ['x_center', 'y_center']].drop_duplicates().values data.position = df.loc[:, ['bin_x', 'bin_y']].drop_duplicates().values else: data.position = gdf.loc[cells][['x_center', 'y_center']].values data.cells.cell_point = gdf.loc[cells]['cell_point'].values data.position = data.position.astype(np.uint32) data.offset_x = df['x'].min() data.offset_y = df['y'].min() resolution = 500 for chip_name in CHIP_RESOLUTION.keys(): if data.sn[0:len(chip_name)] == chip_name: resolution = CHIP_RESOLUTION[chip_name] break data.attr = { 'minX': df['x'].min(), 'minY': df['y'].min(), 'maxX': df['x'].max(), 'maxY': df['y'].max(), 'minExp': data.exp_matrix.min(), # noqa 'maxExp': data.exp_matrix.max(), # noqa 'resolution': resolution, } data.bin_coord_offset = bin_coord_offset logger.info(f'the martrix has {data.cell_names.size} cells, and {data.gene_names.size} genes.') return data
def parse_bin_coor(df, bin_size): """ merge bins to a bin unit according to the bin size, also calculate the center coordinate of bin unit, and generate cell id of bin unit using the coordinate after merged. :param df: a dataframe of the bin file. :param bin_size: the size of bin to merge. :return: """ x_min = df['x'].min() y_min = df['y'].min() df['bin_x'] = merge_bin_coor(df['x'].values, x_min, bin_size) df['bin_y'] = merge_bin_coor(df['y'].values, y_min, bin_size) df['cell_id'] = df['bin_x'].astype(str) + '_' + df['bin_y'].astype(str) df['bin_x'] = get_bin_center(df['bin_x'], x_min, bin_size) df['bin_y'] = get_bin_center(df['bin_y'], y_min, bin_size) return df def parse_bin_coor_no_offset(df: pd.DataFrame, bin_size: int): df['bin_x'] = ((df['x'] // bin_size) * bin_size).astype(np.uint64) df['bin_y'] = ((df['y'] // bin_size) * bin_size).astype(np.uint64) df['cell_id'] = np.bitwise_or( np.left_shift(df['bin_x'], 32), df['bin_y'] ) df.sort_values(by=['geneID', 'cell_id'], inplace=True) df['cell_id'] = df['cell_id'].astype('U') return df def parse_cell_bin_coor(df): gdf = df.groupby('cell_id').apply(lambda x: make_multipoint(x)) return gdf def make_multipoint(x): p = [Point(i) for i in zip(x['x'], x['y'])] mlp = MultiPoint(p).convex_hull x_center = mlp.centroid.x y_center = mlp.centroid.y return pd.Series({'cell_point': mlp, 'x_center': x_center, 'y_center': y_center}) def merge_bin_coor(coor: np.ndarray, coor_min: int, bin_size: int): return np.floor((coor - coor_min) / bin_size).astype(np.int32) def get_bin_center(bin_coor: np.ndarray, coor_min: int, bin_size: int): return bin_coor * bin_size + coor_min + int(bin_size / 2) def to_interval(interval_string): [left, right] = interval_string[1:-1].split(', ') interval = pd.Interval(float(left), float(right)) return interval
[docs]@ReadWriteUtils.check_file_exists def read_stereo_h5ad( file_path: str, use_raw: bool = True, use_result: bool = True, bin_type: str = None, bin_size: int = None ): """ Read the H5ad file, and generate the StereoExpData object. Parameters ---------------------- file_path the path to input H5ad file. use_raw whether to read data of `self.raw`. use_result whether to read `result` and `res_key`. bin_type the bin type includes `'bins'` or `'cell_bins'`. bin_size the size of bin to merge, when `bin_type` is set to `'bins'`. Returns -------------------- An object of StereoExpData. """ data = StereoExpData(file_path=file_path) if not data.file.exists(): logger.error('the input file is not exists, please check!') raise FileExistsError('the input file is not exists, please check!') with h5py.File(data.file, mode='r') as f: data = _read_stereo_h5ad_from_group(f, data, use_raw, use_result, bin_type, bin_size) return data
def _read_stereo_h5ad_from_group(f: Union[h5py.File, h5py.Group], data: StereoExpData, use_raw, use_result, bin_type=None, bin_size=None): # read data data.bin_type = bin_type if bin_type is not None else 'bins' data.bin_size = bin_size if bin_size is not None else 1 not_data_attr_keys = {'bin_type', 'bin_size', 'merged'} if f.attrs is not None: data.attr = {} for key, value in f.attrs.items(): if key not in not_data_attr_keys: data.attr[key] = value else: setattr(data, key, value) for k in f.keys(): if k == 'cells': data.cells = h5ad.read_group(f[k]) elif k == 'genes': data.genes = h5ad.read_group(f[k]) elif k == 'position': position = h5ad.read_dataset(f[k]) data.position = position[:, [0, 1]] if position.shape[1] >= 3: data.position_z = position[:, [2]] elif k == 'bin_type': data.bin_type = h5ad.read_dataset(f[k]) elif k == 'bin_size': data.bin_size = h5ad.read_dataset(f[k]) elif k == 'merged': data.merged = h5ad.read_dataset(f[k]) elif k == 'exp_matrix': if isinstance(f[k], h5py.Group): data.exp_matrix = h5ad.read_group(f[k]) else: data.exp_matrix = h5ad.read_dataset(f[k]) elif k == 'sn': sn_data = h5ad.read_group(f[k]) if sn_data.shape[0] == 1: data.sn = str(sn_data['sn'][0]) else: data.sn = {} for _, row in sn_data.iterrows(): batch, sn = row[0], row[1] data.sn[str(batch)] = str(sn) # read raw if use_raw is True and 'exp_matrix@raw' in f.keys(): data.tl.raw = StereoExpData() if isinstance(f['exp_matrix@raw'], h5py.Group): data.tl.raw.exp_matrix = h5ad.read_group(f['exp_matrix@raw']) else: data.tl.raw.exp_matrix = h5ad.read_dataset(f['exp_matrix@raw']) if 'cells@raw' in f.keys(): data.tl.raw.cells = h5ad.read_group(f['cells@raw']) else: data.tl.raw.cells = deepcopy(data.cells) if 'genes@raw' in f.keys(): data.tl.raw.genes = h5ad.read_group(f['genes@raw']) else: data.tl.raw.genes = deepcopy(data.genes) if 'position@raw' in f.keys(): position = h5ad.read_dataset(f['position@raw']) data.tl.raw.position = position[:, [0, 1]] if position.shape[1] >= 3: data.tl.raw.position_z = position[:, [2]] else: data.tl.raw.position = deepcopy(data.position) # read key_record and result if use_result is True and 'key_record' in f.keys(): h5ad.read_key_record(f['key_record'], data.tl.key_record) _read_stereo_h5_result(data.tl.key_record, data, f) return data def _read_stereo_h5_result(key_record: dict, data: StereoExpData, f: Union[h5py.File, h5py.Group]): import ast from ..utils.pipeline_utils import cell_cluster_to_gene_exp_cluster key_record = deepcopy(key_record) for analysis_key in list(key_record.keys()): res_keys = key_record[analysis_key] for res_key in res_keys: if analysis_key == 'hvg': hvg_df = h5ad.read_group(f[f'{res_key}@hvg']) # str to interval if 'mean_bin' in hvg_df.columns: hvg_df['mean_bin'] = [to_interval(interval_string) for interval_string in hvg_df['mean_bin']] data.tl.result[res_key] = hvg_df if analysis_key in ['pca', 'umap', 'totalVI', 'spatial_alignment_integration']: data.tl.result[res_key] = pd.DataFrame(h5ad.read_dataset(f[f'{res_key}@{analysis_key}'])) if analysis_key == 'pca': variance_ratio_key = f'{res_key}_variance_ratio' if f'{variance_ratio_key}@{analysis_key}_variance_ratio' in f.keys(): data.tl.result[variance_ratio_key] = h5ad.read_dataset(f[f'{variance_ratio_key}@{analysis_key}_variance_ratio']) # noqa if analysis_key == 'neighbors': data.tl.result[res_key] = { # 'neighbor': h5ad.read_group(f[f'neighbor@{res_key}@neighbors']), 'connectivities': h5ad.read_group(f[f'connectivities@{res_key}@neighbors']), 'nn_dist': h5ad.read_group(f[f'nn_dist@{res_key}@neighbors']) } if f'neighbor@{res_key}@neighbors' in f: data.tl.result[res_key]['neighbor'] = h5ad.read_group(f[f'neighbor@{res_key}@neighbors']) if analysis_key == 'cluster': data.tl.result[res_key] = h5ad.read_group(f[f'{res_key}@cluster']) gene_cluster_res_key = f'gene_exp_{res_key}' if ('gene_exp_cluster' not in data.tl.key_record) or ( gene_cluster_res_key not in data.tl.key_record['gene_exp_cluster']): gene_cluster_res = cell_cluster_to_gene_exp_cluster(data, res_key) if gene_cluster_res is not False: data.tl.result[gene_cluster_res_key] = gene_cluster_res data.tl.reset_key_record('gene_exp_cluster', gene_cluster_res_key) if analysis_key == 'sct': data.tl.result[res_key] = [ { 'counts': h5ad.read_group(f[f'exp_matrix@{res_key}@sct_counts']), 'data': h5ad.read_group(f[f'exp_matrix@{res_key}@sct_data']), 'scale.data': h5ad.read_group(f[f'exp_matrix@{res_key}@sct_scale']), }, { 'top_features': h5ad.read_dataset(f[f'genes@{res_key}@sct_top_features']), 'umi_genes': h5ad.read_dataset(f[f'genes@{res_key}@sct']), 'umi_cells': h5ad.read_dataset(f[f'cells@{res_key}@sct']), } ] data.tl.result[res_key][0]['scale.data'] = pd.DataFrame( data.tl.result[res_key][0]['scale.data'].toarray(), columns=data.tl.result[res_key][1]['umi_cells'], index=h5ad.read_dataset(f[f'genes@{res_key}@sct_scale_genename']), ) if analysis_key == 'gene_exp_cluster': data.tl.result[res_key] = h5ad.read_group(f[f'{res_key}@gene_exp_cluster']) if analysis_key == 'marker_genes': clusters = h5ad.read_dataset(f[f'clusters_record@{res_key}@marker_genes']) data.tl.result[res_key] = {} for cluster in clusters: cluster_key = f'{cluster}@{res_key}@marker_genes' if cluster != 'parameters': data.tl.result[res_key][cluster] = h5ad.read_group(f[cluster_key]) else: parameters_df: pd.DataFrame = h5ad.read_group(f[cluster_key]) data.tl.result[res_key]['parameters'] = {} for _, row in parameters_df.iterrows(): name = row['name'] value = row['value'] if value.lower() == 'true': value = True elif value.lower() == 'false': value = False data.tl.result[res_key]['parameters'][name] = value if analysis_key == 'cell_cell_communication': data.tl.result[res_key] = {} for key in ['means', 'significant_means', 'deconvoluted', 'pvalues']: full_key = f'{res_key}@{key}@cell_cell_communication' if full_key in f.keys(): data.tl.result[res_key][key] = h5ad.read_group(f[full_key]) parameters_df: pd.DataFrame = h5ad.read_group(f[f'{res_key}@parameters@cell_cell_communication']) data.tl.result[res_key]['parameters'] = {} for i, row in parameters_df.iterrows(): name = row['name'] value = row['value'] data.tl.result[res_key]['parameters'][name] = value if analysis_key == 'regulatory_network_inference': data.tl.result[res_key] = {} for key in ['regulons', 'auc_matrix', 'adjacencies']: full_key = f'{res_key}@{key}@regulatory_network_inference' if full_key in f.keys(): if key == 'regulons': data.tl.result[res_key][key] = ast.literal_eval(h5ad.read_dataset(f[full_key])) else: data.tl.result[res_key][key] = h5ad.read_group(f[full_key]) if analysis_key in ['co_occurrence']: data.tl.result[res_key] = {} for full_key in f.keys(): if not full_key.endswith(analysis_key): continue data_key = full_key.split('@')[1] data.tl.result[res_key][data_key] = h5ad.read_group(f[full_key])
[docs]@ReadWriteUtils.check_file_exists def read_h5ms(file_path, use_raw=True, use_result=True): """ Load a h5ms file as an object of MSData :param file_path: The path of h5ms file to be loaded. :param use_raw: Whether to load the raw data of each StereoExpData in MSData, defaults to True. :param use_result: Whether to load the analysis results which had been saved in h5ms file, defaults to True. :return: An object of MSData """ from stereo.core.ms_data import MSData with h5py.File(file_path, mode='r') as f: # ms_data = MSData() data_list = [] merged_data = None names = [] var_type = None relationship = None scopes_data = {} result_keys = {} # result = {} for k in f.keys(): if k == 'sample': for one_slice_key in f[k].keys(): data = StereoExpData() data_list.append( _read_stereo_h5ad_from_group(f[k][one_slice_key], data, use_raw, use_result)) # noqa elif k == 'sample_merged': for mk in f[k].keys(): scope_data = StereoExpData() scope_data = _read_stereo_h5ad_from_group(f[k][mk], scope_data, use_raw, use_result) scopes_data[mk] = scope_data if f[k][mk].attrs is not None: merged_from_all = f[k][mk].attrs.get('merged_from_all', False) if merged_from_all: merged_data = scope_data # merged_data = StereoExpData() # merged_data = _read_stereo_h5ad_from_group(f[k], merged_data, use_raw, use_result) # noqa elif k == 'names': names = h5ad.read_dataset(f[k]) if isinstance(names, np.ndarray): names = names.tolist() elif k == 'var_type': var_type = h5ad.read_dataset(f[k]) elif k == 'relationship': relationship = h5ad.read_dataset(f[k]) elif k == 'result_keys': for rk in f[k].keys(): result_keys[rk] = list(h5ad.read_dataset(f[k][rk])) # elif k == 'mss': # for key in f['mss'].keys(): # data = StereoExpData() # data.tl.result = {} # h5ad.read_key_record(f['mss'][key]['key_record'], data.tl.key_record) # _read_stereo_h5_result(data.tl.key_record, data, f['mss'][key]) # result[key] = data.tl.result else: logger.warn(f"{k} not in rules, did not read from h5ms") ms_data = MSData( _data_list=data_list, _names=names, _var_type=var_type, _relationship=relationship ) ms_data.merged_data = merged_data # ms_data.tl.result = result ms_data.scopes_data = scopes_data ms_data.tl.result_keys = result_keys return ms_data
[docs]@ReadWriteUtils.check_file_exists def read_seurat_h5ad( file_path: str, use_raw: bool = False ): """ Read the H5ad file in Anndata format of Seurat, and generate the StereoExpData object. Parameters ------------------ file_path the path of input H5ad file. use_raw whether to read data of `self.raw`. Returns ---------------------- An object of StereoExpData. """ data = StereoExpData(file_path=file_path) # basic # attributes = ["obsm", "varm", "obsp", "varp", "uns", "layers"] # df_attributes = ["obs", "var"] with h5py.File(data.file, mode='r') as f: if 'raw' not in f.keys(): use_raw = False for k in f.keys(): if k == "raw" or k.startswith("raw."): continue if k == "X": if isinstance(f[k], h5py.Dataset): data.exp_matrix = h5ad.read_dense_as_sparse(f[k], csr_matrix, 10000) else: data.exp_matrix = h5ad.read_group(f[k]) elif k == "obs": cells_df = h5ad.read_dataframe(f[k]) data.cells.cell_name = cells_df.index.values data.cells.total_counts = cells_df['total_counts'] if 'total_counts' in cells_df.keys() else None data.cells.pct_counts_mt = cells_df['pct_counts_mt'] if 'pct_counts_mt' in cells_df.keys() else None data.cells.n_genes_by_counts = cells_df[ 'n_genes_by_counts'] if 'n_genes_by_counts' in cells_df.keys() else None data.position = cells_df[['x', 'y']].to_numpy(dtype=np.uint32) for cluster_key in f['obs']['__categories'].keys(): if cluster_key == 'orig.ident': continue data.tl.result[cluster_key] = pd.DataFrame( {'bins': data.cells.cell_name, 'group': cells_df[cluster_key].values}) data.tl.key_record['cluster'].append(cluster_key) elif k == "var": genes_df = h5ad.read_dataframe(f[k]) data.genes.gene_name = genes_df.index.values if 'highly_variable' in genes_df: data.tl.result['highly_variable_genes'] = pd.DataFrame({ 'means': genes_df.means.values, 'dispersions': genes_df.dispersions.values, 'dispersions_norm': genes_df.dispersions_norm.values, 'highly_variable': genes_df.highly_variable.values == 1 }, index=genes_df.index.values) data.tl.key_record['hvg'].append('highly_variable_genes') elif k == 'obsm': for key in f['obsm'].keys(): if key == 'X_spatial': continue if key == 'X_pca': data.tl.result['pca'] = pd.DataFrame(h5ad.read_dataset(f['obsm']['X_pca'])) data.tl.key_record['pca'].append('pca') elif key == 'X_umap': data.tl.result['umap'] = pd.DataFrame(h5ad.read_dataset(f['obsm']['X_umap'])) data.tl.key_record['umap'].append('umap') else: # Base case pass if use_raw: data.tl.raw = StereoExpData() if isinstance(f['raw']['X'], h5py.Dataset): data.tl.raw.exp_matrix = h5ad.read_dense_as_sparse(f['raw']['X'], csr_matrix, 10000) else: data.tl.raw.exp_matrix = h5ad.read_group(f['raw']['X']) if 'obs' in f['raw']: cells_df = h5ad.read_dataframe(f[k]) data.tl.raw.cells.cell_name = cells_df.index.values data.position = cells_df[['x', 'y']].to_numpy(dtype=np.uint32) else: data.tl.raw.cells.cell_name = data.cells.cell_name.copy() data.tl.raw.position = data.position.copy() if 'var' in f['raw']: genes_df = h5ad.read_dataframe(f['raw']['var']) data.tl.raw.genes.gene_name = genes_df.index.values else: data.tl.raw.genes.gene_name = data.genes.gene_name.copy() return data
[docs]@ReadWriteUtils.check_file_exists def read_ann_h5ad( file_path: str, spatial_key: Optional[str] = "spatial", bin_type: str = None, bin_size: int = None, resolution: Optional[int] = 500 ): """ Read the H5ad file in Anndata format of Scanpy, and generate the StereoExpData object. Parameters ------------------ file_path the path to input H5ad file. spatial_key use `.obsm['spatial_key']` as coordiante information. bin_type the bin type includes `'bins'` or `'cell_bins'`, default to `'bins'`. bin_size the size of bin to merge, when `bin_type` is set to `'bins'`. resolution the resolution of chip, default 500nm. Returns --------------- An object of StereoExpData. """ data = StereoExpData(file_path=file_path, bin_type=bin_type, bin_size=bin_size) # basic # attributes = ["obsm", "varm", "obsp", "varp", "uns", "layers"] # df_attributes = ["obs", "var"] with h5py.File(data.file, mode='r') as f: for k in f.keys(): if k == "raw" or k.startswith("raw."): continue if k == "X": if isinstance(f[k], h5py.Group): data.exp_matrix = h5ad.read_group(f[k]) else: data.exp_matrix = h5ad.read_dataset(f[k]) elif k == "raw": assert False, "unexpected raw format" elif k == "obs": cells_df = h5ad.read_dataframe(f[k]) data.cells.cell_name = cells_df.index.values data.cells.total_counts = cells_df['total_counts'] if 'total_counts' in cells_df.keys() else None data.cells.pct_counts_mt = cells_df['pct_counts_mt'] if 'pct_counts_mt' in cells_df.keys() else None data.cells.n_genes_by_counts = cells_df[ 'n_genes_by_counts'] if 'n_genes_by_counts' in cells_df.keys() else None elif k == "var": genes_df = h5ad.read_dataframe(f[k]) data.genes.gene_name = genes_df.index.values elif k == 'obsm': if spatial_key is not None: if isinstance(f[k], h5py.Group): position = h5ad.read_group(f[k])[spatial_key] else: position = h5ad.read_dataset(f[k])[spatial_key] data.position = position[:, [0, 1]] if position.shape[1] >= 3: data.position_z = position[:, [2]] elif k == 'uns': uns = h5ad.read_group(f[k]) if 'bin_type' in uns: bin_type = uns['bin_type'] if 'bin_size' in uns: bin_size = uns['bin_size'] if 'resolution' in uns: resolution = uns['resolution'] if 'sn' in uns: sn_data = uns['sn'] if sn_data.shape[0] == 1: data.sn = str(sn_data['sn'][0]) else: data.sn = {} for _, row in sn_data.iterrows(): batch, sn = row[0], row[1] data.sn[str(batch)] = str(sn) else: # Base case pass data.bin_type = bin_type data.bin_size = bin_size data.attr = {'resolution': resolution} return data
# @ReadWriteUtils.check_file_exists
[docs]def read_h5ad( file_path: str = None, anndata: AnnData = None, flavor: str = 'scanpy', bin_type: str = None, bin_size: int = None, **kwargs ) -> Union[StereoExpData, AnnBasedStereoExpData]: """ Read a h5ad file or load a AnnData object Parameters ------------------ file_path the path of the h5ad file. anndata the object of AnnData which to be loaded, only available while `flavor` is `'scanpy'`. `file_path` and `anndata` only can input one of them. flavor the format of the h5ad file, defaults to `'scanpy'`. `scanpy`: AnnData format of scanpy `stereopy`: h5ad format of stereo bin_type the bin type includes `'bins'` or `'cell_bins'`. bin_size the size of bin to merge, when `bin_type` is set to `'bins'`. Returns --------------- An object of StereoExpData while `flavor` is `'stereopy'` or an object of AnnBasedStereoExpData while `flavor` is `'scanpy'` """ flavor = flavor.lower() if flavor == 'stereopy': if file_path is None: raise ValueError("The 'file_path' must be input.") if kwargs is None: kwargs = {} kwargs['bin_type'] = bin_type kwargs['bin_size'] = bin_size return read_stereo_h5ad(file_path, **kwargs) elif flavor == 'scanpy': if file_path is None and anndata is None: raise Exception("Must to input the 'file_path' or 'anndata'.") if file_path is not None and anndata is not None: raise Exception("'file_path' and 'anndata' only can input one of them") return AnnBasedStereoExpData(h5ad_file_path=file_path, based_ann_data=anndata, bin_type=bin_type, bin_size=bin_size, **kwargs) else: raise ValueError("Invalid value for 'flavor'")
[docs]def anndata_to_stereo( andata: AnnData, use_raw: bool = False, spatial_key: Optional[str] = None, resolution: Optional[int] = 500 ): """ Transform the Anndata object into StereoExpData format. Parameters ----------------------- andata the input Anndata object. use_raw use `anndata.raw.X` if True, otherwise `anndata.X`. spatial_key use `.obsm['spatial_key']` as coordiante information. resolution the resolution of chip, default 500nm. Returns --------------------- An object of StereoExpData. """ # data matrix,including X,raw,layer data = StereoExpData() data.exp_matrix = andata.raw.X if use_raw else andata.X # obs -> cell data.cells.cell_name = np.array(andata.obs_names) data.cells.n_genes_by_counts = andata.obs[ 'n_genes_by_counts'] if 'n_genes_by_counts' in andata.obs.columns.tolist() else None data.cells.total_counts = andata.obs['total_counts'] if 'total_counts' in andata.obs.columns.tolist() else None data.cells.pct_counts_mt = andata.obs['pct_counts_mt'] if 'pct_counts_mt' in andata.obs.columns.tolist() else None # var data.genes.gene_name = np.array(andata.var_names) data.genes.n_cells = andata.var['n_cells'] if 'n_cells' in andata.var.columns.tolist() else None data.genes.n_counts = andata.var['n_counts'] if 'n_counts' in andata.var.columns.tolist() else None # position if spatial_key is not None: position = andata.obsm[spatial_key] data.position = position[:, [0, 1]] if position.shape[1] >= 3: data.position_z = position[:, [2]] if 'bin_type' in andata.uns: data.bin_type = andata.uns['bin_type'] if 'bin_size' in andata.uns: data.bin_size = andata.uns['bin_size'] if 'resolution' in andata.uns: resolution = andata.uns['resolution'] data.attr = {'resolution': resolution} return data
[docs]def stereo_to_anndata( data: StereoExpData, flavor: Literal['scanpy', 'seurat'] = 'scanpy', sample_id: str = "sample", reindex: bool = False, output: str = None, base_adata: AnnData = None, split_batches: bool = True, compression: Optional[Literal["gzip", "lzf"]] = 'gzip' ): """ Transform the StereoExpData object into Anndata format. Parameters ----------------------- data the input StereoExpData object. flavor if you want to convert the output file into h5ad of Seurat, please set `'seurat'`. sample_id the sample name which will be set as `orig.ident` in obs. reindex if `True`, the cell index will be reindexed as `{sample_id}:{position_x}_{position_y}` format. output the path to output h5ad file. base_adata the input Anndata object. split_batches Whether to save each batch to a single file if it is a merged data, default to True. compression: The compression method to be used when saving data as a h5ad file, None means uncompressed, default to gzip. Returns ----------------- An object of Anndata. """ if data.merged and split_batches: from os import path from ..utils.data_helper import split data_list = split(data) batch = np.unique(data.cells.batch) adata_list = [] if output is not None: name, ext = path.splitext(output) for bno, d in zip(batch, data_list): if output is not None: boutput = f"{name}-{d.sn}{ext}" else: boutput = None adata = stereo_to_anndata(d, flavor=flavor, sample_id=sample_id, reindex=reindex, output=boutput, split_batches=False) adata_list.append(adata) return adata_list from scipy.sparse import issparse if isinstance(data, AnnBasedStereoExpData) and base_adata is None: base_adata = data._ann_data.copy() if base_adata is None: adata = AnnData(shape=data.exp_matrix.shape, dtype=np.float64, obs=data.cells.to_df(), var=data.genes.to_df()) adata.X = data.exp_matrix else: adata = base_adata # sample id logger.info(f"Adding {sample_id} in adata.obs['orig.ident'].") adata.obs['orig.ident'] = pd.Categorical([sample_id] * adata.obs.shape[0], categories=[sample_id]) if (data.position is not None) and ('spatial' not in adata.obsm): logger.info("Adding data.position as adata.obsm['spatial'] .") if data.position_z is not None: adata.obsm['spatial'] = np.concatenate([data.position, data.position_z], axis=1) else: adata.obsm['spatial'] = data.position logger.info("Adding data.position as adata.obs['x'] and adata.obs['y'] .") cell_names_index = data.cell_names.astype('str') adata.obs['x'] = pd.DataFrame(data.position[:, 0], index=cell_names_index) adata.obs['y'] = pd.DataFrame(data.position[:, 1], index=cell_names_index) if data.position_z is not None: adata.obs['z'] = pd.DataFrame(data.position_z, index=cell_names_index) if flavor != 'seurat': if data.bin_type is not None: adata.uns['bin_type'] = data.bin_type if data.bin_size is not None: adata.uns['bin_size'] = 1 if data.bin_type == 'cell_bins' else data.bin_size if data.attr is not None and 'resolution' in data.attr: adata.uns['resolution'] = data.attr['resolution'] if data.bin_type == 'cell_bins' and data.cells.cell_border is not None: adata.obsm['cell_border'] = data.cells.cell_border if data.sn is not None: if isinstance(data.sn, str): sn_list = [['-1', data.sn]] else: sn_list = [] for bno, sn in data.sn.items(): sn_list.append([bno, sn]) adata.uns['sn'] = pd.DataFrame(sn_list, columns=['batch', 'sn']) for key in data.tl.key_record.keys(): if data.tl.key_record[key]: if key == 'hvg': res_key = data.tl.key_record[key][-1] logger.info(f"Adding data.tl.result['{res_key}'] into adata.var .") adata.uns[key] = {'params': {}, 'source': 'stereopy', 'method': key} for i in data.tl.result[res_key]: if i == 'mean_bin': continue adata.var[i] = data.tl.result[res_key][i] elif key == 'sct': res_key = data.tl.key_record[key][-1] zero_index_data = data.tl.result[res_key][0] one_index_data = data.tl.result[res_key][1] logger.info(f"Adding data.tl.result['{res_key}'] into adata.uns['sct_'] .") adata.uns['sct_counts'] = csr_matrix(zero_index_data['counts'].T) adata.uns['sct_data'] = csr_matrix(zero_index_data['data'].T) adata.uns['sct_scale'] = csr_matrix(zero_index_data['scale.data'].T.to_numpy()) adata.uns['sct_scale_genename'] = list(zero_index_data['scale.data'].index) adata.uns['sct_top_features'] = list(one_index_data['top_features']) adata.uns['sct_cellname'] = list(one_index_data['umi_cells'].astype('str')) adata.uns['sct_genename'] = list(one_index_data['umi_genes']) elif key in ['pca', 'umap', 'tsne', 'totalVI', 'spatial_alignment_integration']: # pca :we do not keep variance and PCs(for varm which will be into feature.finding in pca of seurat.) res_key = data.tl.key_record[key][-1] sc_key = f'X_{key}' logger.info(f"Adding data.tl.result['{res_key}'] into adata.obsm['{sc_key}'] .") adata.obsm[sc_key] = data.tl.result[res_key].values elif key == 'neighbors': # neighbor :seurat use uns for conversion to @graph slot, but scanpy canceled neighbors of uns at present. # noqa # so this part could not be converted into seurat straightly. for res_key in data.tl.key_record[key]: sc_con = 'connectivities' if res_key == 'neighbors' else f'{res_key}_connectivities' sc_dis = 'distances' if res_key == 'neighbors' else f'{res_key}_distances' logger.info(f"Adding data.tl.result['{res_key}']['connectivities'] into adata.obsp['{sc_con}'] .") logger.info(f"Adding data.tl.result['{res_key}']['nn_dist'] into adata.obsp['{sc_dis}'] .") adata.obsp[sc_con] = data.tl.result[res_key]['connectivities'] adata.obsp[sc_dis] = data.tl.result[res_key]['nn_dist'] logger.info(f"Adding info into adata.uns['{res_key}'].") adata.uns[res_key] = {} adata.uns[res_key]['connectivities_key'] = sc_con adata.uns[res_key]['distance_key'] = sc_dis elif key == 'cluster': cell_name_index = data.cells.cell_name.astype('str') for res_key in data.tl.key_record[key]: logger.info(f"Adding data.tl.result['{res_key}'] into adata.obs['{res_key}'] .") adata.obs[res_key] = pd.DataFrame(data.tl.result[res_key]['group'].values, index=cell_name_index) elif key in ('gene_exp_cluster', 'cell_cell_communication'): for res_key in data.tl.key_record[key]: # logger.info(f"Adding data.tl.result['{res_key}'] into adata.uns['{key}@{res_key}']") # adata.uns[f"{key}@{res_key}"] = data.tl.result[res_key] logger.info(f"Adding data.tl.result['{res_key}'] into adata.uns['{res_key}']") adata.uns[res_key] = data.tl.result[res_key] # elif key == 'regulatory_network_inference': # for res_key in data.tl.key_record[key]: # logger.info(f"Adding data.tl.result['{res_key}'] into adata.uns['{res_key}'] .") # regulon_key = f'{res_key}_regulons' # res_key_data = data.tl.result[res_key] # adata.uns[regulon_key] = res_key_data['regulons'] # auc_matrix_key = f'{res_key}_auc_matrix' # adata.uns[auc_matrix_key] = res_key_data['auc_matrix'] # adjacencies_key = f'{res_key}_adjacencies' # adata.uns[adjacencies_key] = res_key_data['adjacencies'] elif key in ('co_occurrence', 'regulatory_network_inference'): for res_key in data.tl.key_record[key]: logger.info(f"Adding data.tl.result['{res_key}'] into adata.uns['{res_key}'] .") adata.uns[res_key] = data.tl.result[res_key] elif key == 'marker_genes': for res_key in data.tl.key_record[key]: uns_key = _BaseResult.RENAME_DICT.get(res_key, res_key) adata.uns[uns_key] = transform_marker_genes_to_anndata(data.tl.result[res_key]) else: continue if data.tl.raw is not None: if flavor == 'seurat': # keep same shape between @counts and @data for seurat,because somtimes dim of sct are not the same. logger.info("Adding data.tl.raw.exp_matrix as adata.uns['raw_counts'] .") adata.uns['raw_counts'] = data.tl.raw.exp_matrix if issparse(data.tl.raw.exp_matrix) \ else csr_matrix(data.tl.raw.exp_matrix) list_cell_names = data.tl.raw.cell_names.astype(str) adata.uns['raw_cellname'] = list(list_cell_names) adata.uns['raw_genename'] = list(data.tl.raw.gene_names) if data.tl.raw.position is not None and reindex: logger.info("Reindex as adata.uns['raw_cellname'] .") raw_sample = pd.DataFrame(['sample'] * data.tl.raw.cell_names.shape[0], index=list_cell_names) raw_x = pd.DataFrame(data.tl.raw.position[:, 0].astype(str), index=list_cell_names) raw_y = pd.DataFrame(data.tl.raw.position[:, 1].astype(str), index=list_cell_names) new_ix = np.array(raw_sample + "_" + raw_x + "_" + raw_y).tolist() adata.uns['raw_cellname'] = new_ix else: logger.info("Adding data.tl.raw.exp_matrix as adata.raw .") raw_exp = data.tl.raw.exp_matrix raw_genes = data.tl.raw.genes.to_df() raw_genes.dropna(axis=1, how='all', inplace=True) raw_adata = AnnData(shape=raw_exp.shape, var=raw_genes, dtype=np.float64) raw_adata.X = raw_exp adata.raw = raw_adata if reindex: logger.info("Reindex adata.X .") new_ix = (adata.obs['orig.ident'].astype(str) + ":" + adata.obs['x'].astype(str) + "_" + adata.obs['y'].astype(str)).tolist() adata.obs.index = new_ix if 'sct_cellname' in adata.uns.keys(): logger.info("Reindex as adata.uns['sct_cellname'] .") adata.uns['sct_cellname'] = new_ix if flavor == 'seurat': logger.info("Rename QC info.") adata.obs.rename(columns={'total_counts': "nCount_Spatial", "n_genes_by_counts": "nFeature_Spatial", "pct_counts_mt": 'percent.mito'}, inplace=True) logger.info("Finished conversion to anndata.") if output is not None: adata.write_h5ad(output, compression=compression) logger.info(f"Finished output to {output}") return adata
# def check_file(path, prefix, suffix): # filename = f"{path}/{prefix}{suffix}" # if os.path.isfile(filename): # return filename # elif suffix in {"matrix.mtx", "barcodes.tsv"} and os.path.isfile(f"{filename}.gz"): # return f'{filename}.gz' # elif suffix == "genes.tsv" and os.path.isfile(f'{path}/{prefix}features.tsv.gz'): # return f'{path}/{prefix}features.tsv.gz' # else: # # logger.error(f"{path} is not exist!") # # raise FileExistsError(f"can not find {path}/{prefix}{suffix}(or with .gz)!") # raise ValueError(f"can not find {filename}(or with .gz).") # def read_10x_data(path, prefix="", gene_ex_only=True): # """ # read 10x data # # :param path: the dictionary of the input files # :param prefix: the prefix of the input files # :param gene_ex_only: return gene expression data only if is True # :return: anndata # """ # # 1. check file status. # # if not os.path.exists(path): # # logger.error(f"{path} is not exist!") # # raise FileExistsError(f"{path} is not exist!") # basic_fileset = {'barcodes.tsv', 'genes.tsv', 'matrix.mtx'} # genefile = (f"{path}/{prefix}genes.tsv") # featurefile = (f"{path}/{prefix}features.tsv.gz") # adata = read_10x_mtx(path, prefix) # if os.path.isfile(genefile) or not gene_ex_only: # return adata # else: # gex_rows = list(map(lambda x: x == 'Gene Expression', adata.var['feature_types'])) # return adata[:, gex_rows].copy() # def read_10x_mtx(path, prefix="", var_names='gene_symbols', make_unique=True): # mtxfile = check_file(path, prefix, "matrix.mtx") # genesfile = check_file(path, prefix, "genes.tsv") # barcodesfile = check_file(path, prefix, "barcodes.tsv") # adata = read_mtx(mtxfile).T # transpose # genes = pd.read_csv(genesfile, header=None, sep='\t') # gene_id = genes[0].values # gene_symbol = genes[1].values # if var_names == 'gene_symbols': # var_names = genes[1].values # # if make_unique: # # var_names = anndata.utils.make_index_unique(pd.Index(var_names)) # adata.var_names = var_names # adata.var['gene_ids'] = genes[0].values # elif var_names == 'gene_ids': # adata.var_names = genes[0].values # adata.var['gene_symbols'] = genes[1].values # else: # raise ValueError("`var_names` needs to be 'gene_symbols' or 'gene_ids'") # if os.path.isfile(f"{path}/{prefix}features.tsv.gz"): # adata.var['feature_types'] = genes[2].values # # adata.obs_names = pd.read_csv(barcodesfile, header=None)[0].values # return adata
[docs]@ReadWriteUtils.check_file_exists def read_gef( file_path: str, bin_type: str = "bins", bin_size: int = 100, is_sparse: bool = True, gene_list: Optional[list] = None, region: Optional[list] = None, gene_name_index: Optional[bool] = False ): """ Read the GEF (.h5) file, and generate the StereoExpData object. Parameters --------------- file_path the path to input file. bin_type the bin type includes `'bins'` or `'cell_bins'`. bin_size the size of bin to merge, which only takes effect when the `bin_type` is set as `'bins'`. is_sparse the matrix is sparse matrix, if `True`, otherwise `np.ndarray`. gene_list select targeted data based on the gene list. region restrict data to the region condition, like [minX, maxX, minY, maxY]. gene_name_index `True` to set gene name as index if the version of gef file is 4 or greater, otherwise to set gene id, if the version is 3 or less, `gene_name_index` would be forced to `True` because there is no gene id in this case. Returns ------------------------ An object of StereoExpData. """ logger.info('read_gef begin ...') from gefpy.utils import gef_is_cell_bin is_cell_bin = gef_is_cell_bin(file_path) if bin_type == 'cell_bins': if not is_cell_bin: raise Exception('This file is not the type of CellBin.') data = StereoExpData(file_path=file_path, file_format='gef', bin_type=bin_type, bin_size=bin_size) from gefpy.cgef_reader_cy import CgefR gef = CgefR(file_path, True) cellborders_coord_list, coord_count_per_cell = gef.get_cellborders([]) border_points_count_per_cell = int(coord_count_per_cell / 2) cell_borders = cellborders_coord_list.reshape((-1, border_points_count_per_cell, 2)) if gene_list is not None or region is not None: if gene_list is None: gene_list = [] if region is None: region = [] uniq_cell, gene_names, count, cell_ind, gene_ind, dnb_cnt, cell_area, gene_id = gef.get_filtered_data(region, gene_list) gene_num = gene_names.size cell_num = uniq_cell.size exp_matrix = csr_matrix((count, (cell_ind, gene_ind)), shape=(cell_num, gene_num), dtype=np.uint32) position = np.array( list((zip(np.right_shift(uniq_cell, 32), np.bitwise_and(uniq_cell, 0xffffffff))))).astype('uint32') data.position = position # logger.info(f'the matrix has {cell_num} cells, and {gene_num} genes.') uniq_cell_borders = cell_borders[np.in1d(gef.get_cell_names(), uniq_cell)] data.cells = Cell(cell_name=uniq_cell, cell_border=uniq_cell_borders) data.cells['dnbCount'] = dnb_cnt data.cells['area'] = cell_area if len(gene_id[0]) == 0: gene_name_index = True gene_names = remove_genes_number(gene_names) if gene_name_index: if len(gene_id[0]) > 0: exp_matrix, gene_names = integrate_matrix_by_genes(gene_names, cell_num, exp_matrix.data, exp_matrix.indices, exp_matrix.indptr) data.genes = Gene(gene_name=gene_names) else: data.genes = Gene(gene_name=gene_id) # data.genes['gene_name_underline'] = gene_names data.genes['real_gene_name'] = gene_names data.exp_matrix = exp_matrix if is_sparse else exp_matrix.toarray() else: # from gefpy.cell_exp_reader import CellExpReader # cell_bin_gef = CellExpReader(file_path) # data.position = cell_bin_gef.positions # logger.info(f'the matrix has {cell_bin_gef.cell_num} cells, and {cell_bin_gef.gene_num} genes.') # exp_matrix = csr_matrix((cell_bin_gef.count, (cell_bin_gef.rows, cell_bin_gef.cols)), # shape=(cell_bin_gef.cell_num, cell_bin_gef.gene_num), dtype=np.uint32) # data.cells = Cell(cell_name=cell_bin_gef.cells, cell_border=cell_borders) # data.cells['dnbCount'] = cell_bin_gef.dnbCount # data.cells['area'] = cell_bin_gef.area # data.genes = Gene(gene_name=cell_bin_gef.genes) # data.exp_matrix = exp_matrix if is_sparse else exp_matrix.toarray() cell_names = gef.get_cell_names() cell_num = gef.get_cell_num() gene_names, gene_id = gef.get_gene_names() gene_names = gene_names.astype('U') gene_id = gene_id.astype('U') gene_num = gef.get_gene_num() # logger.info(f'the matrix has {cell_num} cells, and {gene_num} genes.') indices, indptr, count = gef.get_sparse_matrix_indices(order='cell') exp_matrix = csr_matrix((count, indices, indptr), shape=(cell_num, gene_num), dtype=np.uint32) data.cells = Cell(cell_name=cell_names, cell_border=cell_borders) cells = gef.get_cells() data.cells['dnbCount'] = cells['dnbCount'] data.cells['area'] = cells['area'] data.position = np.zeros(shape=(cell_num, 2), dtype=np.uint32) data.position[:, 0] = cells['x'] data.position[:, 1] = cells['y'] if len(gene_id[0]) == 0: gene_name_index = True gene_names = remove_genes_number(gene_names) if gene_name_index: if len(gene_id[0]) > 0: exp_matrix, gene_names = integrate_matrix_by_genes(gene_names, cell_num, count, indices, indptr) data.genes = Gene(gene_name=gene_names) else: data.genes = Gene(gene_name=gene_id) # data.genes['gene_name_underline'] = gene_names data.genes['real_gene_name'] = gene_names data.exp_matrix = exp_matrix if is_sparse else exp_matrix.toarray() data.attr = { 'resolution': read_gef_info(file_path)['resolution'] } logger.info(f'the matrix has {data.cell_names.size} cells, and {data.gene_names.size} genes.') gef.cgef_close() del gef return data else: if is_cell_bin: raise Exception('This file is not the type of SquareBin.') from gefpy.bgef_reader_cy import BgefR gef = BgefR(file_path, bin_size, 4, True) data = StereoExpData(file_path=file_path, bin_type=bin_type, bin_size=bin_size) data.offset_x, data.offset_y = gef.get_offset() gef_attr = gef.get_exp_attr() data.attr = { 'minX': gef_attr[0], 'minY': gef_attr[1], 'maxX': gef_attr[2], 'maxY': gef_attr[3], 'maxExp': gef_attr[4], 'resolution': gef_attr[5], } if gene_list is not None or region is not None: if gene_list is None: gene_list = [] if region is None: region = [] uniq_cell, gene_names, count, cell_ind, gene_ind, gene_id = gef.get_filtered_data(region, gene_list) cell_num = uniq_cell.size gene_num = gene_names.size data.position = np.array( list((zip(np.right_shift(uniq_cell, 32), np.bitwise_and(uniq_cell, 0xffffffff))))).astype('uint32') data.cells = Cell(cell_name=uniq_cell) exp_matrix = csr_matrix((count, (cell_ind, gene_ind)), shape=(cell_num, gene_num), dtype=np.uint32) if len(gene_id[0]) == 0: gene_name_index = True gene_names = remove_genes_number(gene_names) if gene_name_index: if len(gene_id[0]) > 0: exp_matrix, gene_names = integrate_matrix_by_genes(gene_names, cell_num, exp_matrix.data, exp_matrix.indices, exp_matrix.indptr) data.genes = Gene(gene_name=gene_names) else: data.genes = Gene(gene_name=gene_id) # data.genes['gene_name_underline'] = gene_names data.genes['real_gene_name'] = gene_names data.exp_matrix = exp_matrix if is_sparse else exp_matrix.toarray() else: # gene_num = gef.get_gene_num() # uniq_cells, rows, count = gef.get_exp_data() # cell_num = len(uniq_cells) # logger.info(f'the matrix has {cell_num} cells, and {gene_num} genes.') # cols, uniq_genes, _ = gef.get_gene_data() # data.position = np.array(list( # (zip(np.right_shift(uniq_cells, 32), np.bitwise_and(uniq_cells, 0xffffffff))))).astype('uint32') # exp_matrix = csr_matrix((count, (rows, cols)), shape=(cell_num, gene_num), dtype=np.uint32) # data.cells = Cell(cell_name=uniq_cells) # data.genes = Gene(gene_name=uniq_genes) # data.exp_matrix = exp_matrix if is_sparse else exp_matrix.toarray() cell_names = gef.get_cell_names() cell_num = gef.get_cell_num() gene_names, gene_id = gef.get_gene_names() gene_num = gef.get_gene_num() data.position = np.array(list( (zip(np.right_shift(cell_names, 32), np.bitwise_and(cell_names, 0xffffffff))))).astype('uint32') data.cells = Cell(cell_name=cell_names) if len(gene_id[0]) == 0: # an old version gef file, no gene id gene_name_index = True cell_ind, gene_ind, count = gef.get_sparse_matrix_indices2() exp_matrix = csr_matrix((count, (cell_ind, gene_ind)), shape=(cell_num, gene_num), dtype=np.uint32) gene_names = remove_genes_number(gene_names) if gene_name_index: if len(gene_id[0]) > 0: exp_matrix, gene_names = integrate_matrix_by_genes(gene_names, cell_num, exp_matrix.data, exp_matrix.indices, exp_matrix.indptr) data.genes = Gene(gene_name=gene_names) else: data.genes = Gene(gene_name=gene_id) # data.genes['gene_name_underline'] = gene_names data.genes['real_gene_name'] = gene_names data.exp_matrix = exp_matrix if is_sparse else exp_matrix.toarray() logger.info(f'the matrix has {data.cell_names.size} cells, and {data.gene_names.size} genes.') logger.info('read_gef end.') return data
[docs]@ReadWriteUtils.check_file_exists def read_gef_info(file_path: str): """ Read the property information of the GEF `(.h5)` file. Parameters ------------- file_path the path to input file. Returns -------------------- An attribute dictionary. """ from gefpy.utils import gef_is_cell_bin bin_type = gef_is_cell_bin(file_path) h5_file = h5py.File(file_path, 'r') info_dict = {} if not bin_type: logger.info('This is GEF file which contains traditional bin infomation.') logger.info('bin_type: bins') info_dict['bin_list'] = list(h5_file['geneExp'].keys()) logger.info('Bin size list: {0}'.format(info_dict['bin_list'])) if type(h5_file['geneExp']['bin1']['expression'].attrs['resolution']) is np.ndarray: info_dict['resolution'] = h5_file['geneExp']['bin1']['expression'].attrs['resolution'][0] else: info_dict['resolution'] = h5_file['geneExp']['bin1']['expression'].attrs['resolution'] logger.info('Resolution: {0}'.format(info_dict['resolution'])) info_dict['gene_count'] = h5_file['geneExp']['bin1']['gene'].shape[0] logger.info('Gene count: {0}'.format(info_dict['gene_count'])) maxX = h5_file['geneExp']['bin1']['expression'].attrs['maxX'][0] minX = h5_file['geneExp']['bin1']['expression'].attrs['minX'][0] maxY = h5_file['geneExp']['bin1']['expression'].attrs['maxY'][0] minY = h5_file['geneExp']['bin1']['expression'].attrs['minY'][0] info_dict['offsetX'] = minX logger.info('offsetX: {0}'.format(info_dict['offsetX'])) info_dict['offsetY'] = minY logger.info('offsetY: {0}'.format(info_dict['offsetY'])) info_dict['width'] = maxX - minX logger.info('Width: {0}'.format(info_dict['width'])) info_dict['height'] = maxY - minY logger.info('Height: {0}'.format(info_dict['height'])) info_dict['maxExp'] = h5_file['geneExp']['bin1']['expression'].attrs['maxExp'][0] logger.info('Max Exp: {0}'.format(info_dict['maxExp'])) else: logger.info('This is GEF file which contains cell bin infomation.') logger.info('bin_type: cell_bins') from gefpy.cgef_reader_cy import CgefR cgef = CgefR(file_path) info_dict['cell_num'] = cgef.get_cell_num() logger.info('Number of cells: {0}'.format(info_dict['cell_num'])) info_dict['gene_num'] = cgef.get_gene_num() logger.info('Number of gene: {0}'.format(info_dict['gene_num'])) info_dict['resolution'] = h5_file.attrs['resolution'][0] logger.info('Resolution: {0}'.format(info_dict['resolution'])) info_dict['offsetX'] = h5_file.attrs['offsetX'][0] logger.info('offsetX: {0}'.format(info_dict['offsetX'])) info_dict['offsetY'] = h5_file.attrs['offsetY'][0] logger.info('offsetY: {0}'.format(info_dict['offsetY'])) info_dict['averageGeneCount'] = h5_file['cellBin']['cell'].attrs['averageGeneCount'][0] logger.info('Average number of genes: {0}'.format(info_dict['averageGeneCount'])) info_dict['maxGeneCount'] = h5_file['cellBin']['cell'].attrs['maxGeneCount'][0] logger.info('Maximum number of genes: {0}'.format(info_dict['maxGeneCount'])) info_dict['averageExpCount'] = h5_file['cellBin']['cell'].attrs['averageExpCount'][0] logger.info('Average expression: {0}'.format(info_dict['averageExpCount'])) info_dict['maxExpCount'] = h5_file['cellBin']['cell'].attrs['maxExpCount'][0] logger.info('Maximum expression: {0}'.format(info_dict['maxExpCount'])) return info_dict
# @ReadWriteUtils.check_file_exists # def read_h5ad(file_path: str, flavor: str = 'scanpy'): # ''' # :param file_path: h5ad file path. # :return: `StereoExpData`-like `AnnBasedStereoExpData` obj # ''' # if flavor == 'scanpy': # from stereo.core.stereo_exp_data import AnnBasedStereoExpData # return AnnBasedStereoExpData(file_path) # elif flavor == 'seurat': # raise NotImplementedError # else: # raise Exception