Source code for stereo.core.stereo_exp_data

#!/usr/bin/env python3
# coding: utf-8
"""
@author: Ping Qiu  qiuping1@genomics.cn
@last modified by: Ping Qiu
@file:stereo_exp_data.py
@time:2021/03/22

change log:
    2021/08/12  add to_andata function , by wuyiran.
"""
from __future__ import annotations
from copy import deepcopy
from typing import Optional
from typing import Union
from warnings import warn

import anndata
import numba
import numpy as np
import pandas as pd
from scipy.sparse import (
    spmatrix,
    issparse,
    csr_matrix
)

from .cell import AnnBasedCell
from .cell import Cell
from .data import Data
from .gene import AnnBasedGene
from .gene import Gene
from ..log_manager import logger

def gene_name_2_gene_id(gene_name_input, real_gene_name, gene_id):
    gene_name_input = np.array(gene_name_input, dtype='U')
    tmp = gene_name_input[:, None] == real_gene_name
    ind = np.nonzero(tmp)[1]
    if len(ind) > 0:
        return gene_id[ind]
    return gene_name_input


[docs]class StereoExpData(Data):
[docs] def __init__( self, file_path: Optional[str] = None, file_format: Optional[str] = None, bin_type: Optional[str] = None, bin_size: Optional[int] = 100, exp_matrix: Optional[Union[np.ndarray, spmatrix]] = None, genes: Optional[Union[np.ndarray, Gene]] = None, cells: Optional[Union[np.ndarray, Cell]] = None, position: Optional[np.ndarray] = None, position_z: Optional[np.ndarray] = None, output: Optional[str] = None, partitions: Optional[int] = 1, offset_x: Optional[str] = None, offset_y: Optional[str] = None, attr: Optional[dict] = None, merged: bool = False, spatial_key: str = 'spatial' ): """ The core data object is designed for expression matrix of spatial omics, which can be set corresponding properties directly to initialize the data. Parameters ------------------- file_path the path to input file of expression matrix. file_format the format of input file. bin_type the type of bin, if the file format is Stereo-seq file including `'bins'` or `'cell_bins'`. bin_size the size of the bin to merge, when `bin_type` is `'bins'`. exp_matrix the expression matrix. genes the gene object which contains information of gene level. cells the cell object which contains information of cell level. position spatial location information. output the path to output file. partitions the number of multi-process cores, used when processing files in parallel. offset_x the x value of the offset . offset_y the y value of the offset . attr attribute information from GEF file. """ super(StereoExpData, self).__init__(file_path=file_path, file_format=file_format, partitions=partitions, output=output) self._exp_matrix = exp_matrix self._exon_matrix = None self._genes = genes if isinstance(genes, Gene) else Gene(gene_name=genes) self._cells = cells if isinstance(cells, Cell) else Cell(cell_name=cells) # self._raw_position = None # self._position = position # self._position_z = position_z self._position_offset = None self._position_min = None self._bin_type = bin_type self._bin_size = bin_size self._tl = None self._plt = None self._offset_x = offset_x self._offset_y = offset_y self._attr = attr if attr is not None else {'resolution': 500} self._merged = merged self._sn = self.get_sn_from_path(file_path) self.center_coordinates = False self.spatial_key = spatial_key self.__set_position(position, position_z, spatial_key) self._layers = None
def __set_position(self, position, position_z=None, spatial_key='spatial'): assert isinstance(spatial_key, str), "spatial_key must be str." if position is not None: assert position is not None, "position must be not None." assert isinstance(position, np.ndarray), "position must be np.ndarray." assert position.ndim == 2, "position must be 2 dimensions." assert position.shape[1] == 2, "the length of position's second dimension must be 2." assert position.shape[0] == self.n_cells, "the length of position must be equal to the number of cells." if position_z is not None: assert position is not None, "position must be gave when position_z is not None." assert isinstance(position_z, np.ndarray), "position_z must be np.ndarray." assert position_z.size == self.n_cells, "the length of position_z must be equal to the number of cells." if position_z.ndim == 1: position_z = position_z.reshape(-1, 1) position = np.concatenate([position, position_z], axis=1) if position is not None: self.cells_matrix[spatial_key] = position def get_sn_from_path(self, file_path): """ Get the SN information of input file. """ if file_path is None: return None from os import path return path.basename(file_path).split('.')[0].strip() @property def plt(self): """ Call the visualization module. """ if self._plt is None: from ..plots.plot_collection import PlotCollection self._plt = PlotCollection(self) return self._plt @property def tl(self): """ call StPipeline method. """ if self._tl is None: from .st_pipeline import StPipeline self._tl = StPipeline(self) return self._tl def sub_by_index(self, cell_index=None, gene_index=None, filter_raw=True): """ Get data subset by indexl list of cells or genes. :param cell_index: a list of cell index. :param gene_index: a list of gene index. :return: """ if cell_index is not None: self.exp_matrix = self.exp_matrix[cell_index, :] # self.position = self.position[cell_index, :] if self.position is not None else None # self.position_z = self.position_z[cell_index] if self.position_z is not None else None self.cells = self.cells.sub_set(cell_index) for key, value in self.layers.items(): self.layers[key] = value[cell_index, :] if gene_index is not None: self.exp_matrix = self.exp_matrix[:, gene_index] self.genes = self.genes.sub_set(gene_index) for key, value in self.layers.items(): self.layers[key] = value[:, gene_index] filter_raw = filter_raw and self.raw is not None if filter_raw: self.raw.sub_by_index(cell_index, gene_index, False) return self def sub_by_name( self, cell_name: Optional[Union[np.ndarray, list]] = None, gene_name: Optional[Union[np.ndarray, list]] = None, filter_raw: bool = True, copy: bool = True ): """ Get data subset by name list of cells or genes. :param cell_name: a list of cell name. :param gene_name: a list of gene name. :return: """ data = deepcopy(self) if copy else self cell_index, gene_index = None, None if cell_name is not None: cell_index = self.cells.obs.index.get_indexer(cell_name) cell_index = cell_index[cell_index != -1] if gene_name is not None: if data.genes.real_gene_name is not None: gene_name = gene_name_2_gene_id(gene_name, data.genes.real_gene_name, data.genes.gene_name) gene_index = self.genes.var.index.get_indexer(gene_name) gene_index = gene_index[gene_index != -1] return data.sub_by_index(cell_index, gene_index, filter_raw) def sub_exp_matrix_by_name( self, cell_name: Optional[Union[np.ndarray, list, str, int]] = None, gene_name: Optional[Union[np.ndarray, list, str]] = None, order_preserving: bool = True ) -> Union[np.ndarray, spmatrix]: new_exp_matrix = self.exp_matrix if cell_name is not None: if isinstance(cell_name, str) or isinstance(cell_name, int): cell_name = [cell_name] if order_preserving: # index = [np.argwhere(self.cell_names == c)[0][0] for c in cell_name] index = self.cells.obs.index.get_indexer(cell_name) index = index[index != -1] else: index = np.isin(self.cell_names, cell_name) new_exp_matrix = new_exp_matrix[index] if gene_name is not None: if isinstance(gene_name, str): gene_name = [gene_name] if order_preserving: # index = [np.argwhere(self.gene_names == g)[0][0] for g in gene_name] index = self.genes.var.index.get_indexer(gene_name) index = index[index != -1] else: index = np.isin(self.gene_names, gene_name) new_exp_matrix = new_exp_matrix[:, index] return new_exp_matrix def get_index(self, cell_list=None, gene_list=None, only_highly_genes=False): return self.cells.get_index(cell_list), self.genes.get_index(gene_list, only_highly_genes) def get_exp_matrix( self, use_raw: bool = False, layer: Optional[str] = None, cell_list: Optional[Union[np.ndarray, list, str, int]] = None, gene_list: Optional[Union[np.ndarray, list, str, int]] = None, only_highly_genes: bool = False, to_array: bool = False ) -> Union[np.ndarray, spmatrix]: cell_index, gene_index = self.get_index(cell_list, gene_list, only_highly_genes) if layer is not None: assert layer in self.layers, f"layer '{layer}' is not exist." exp_matrix = self.layers[layer] else: if use_raw is None: use_raw = True if self.raw is not None else False else: use_raw = use_raw and self.raw is not None if use_raw: if self.raw.shape != self.shape: exp_matrix = self.raw.get_exp_matrix(cell_list=self.cell_names, gene_list=self.gene_names) else: exp_matrix = self.raw.exp_matrix else: exp_matrix = self.exp_matrix if cell_index.size != self.cells.size: exp_matrix = exp_matrix[cell_index, :] if gene_index.size != self.genes.size: exp_matrix = exp_matrix[:, gene_index] if to_array: return exp_matrix.toarray() if issparse(exp_matrix) else exp_matrix else: return exp_matrix def check(self): """ Check whether the parameters meet the requirement. :return: """ super(StereoExpData, self).check() self.bin_type_check(self._bin_type) @staticmethod def bin_type_check(bin_type): """ Check whether the bin type is from specific options. :param bin_type: bin type value, 'bins' or 'cell_bins'. :return: """ if (bin_type is not None) and (bin_type not in ['bins', 'cell_bins']): logger.error(f"the bin type `{bin_type}` is not in the range, please check!") raise Exception @property def layers(self): if self._layers is None: from .data_component import Layers self._layers = Layers(self) return self._layers @property def shape(self): """ Get the shape of expression matrix. :return: """ return self.exp_matrix.shape @property def gene_names(self): """ Get the gene names. :return: """ return self.genes.gene_name @property def cell_names(self): """ Get the cell names. :return: """ return self.cells.cell_name @property def cell_borders(self): """ Get the cell borders. """ return self.cells.cell_border @property def genes(self): """ Get the gene object. :return: """ return self._genes @genes.setter def genes(self, gene): """ set the value of self._genes. :param gene: a object of Gene :return: """ self._genes = gene @property def genes_matrix(self): """ Get the genes matrix. """ return self._genes.matrix @property def genes_pairwise(self): """ Get the genes pairwise. """ return self._genes.pairwise @property def cells(self): """ Get the cell object. :return: """ return self._cells @cells.setter def cells(self, cell): """ set the value of self._cells. :param cell: a object of Cell :return: """ self._cells = cell @property def cells_matrix(self): """ Get the cells matrix. """ return self._cells.matrix @property def cells_pairwise(self): """ Get the cells pairwise. """ return self._cells.pairwise @property def n_cells(self): """ Get the number of cells. :return: """ return self.exp_matrix.shape[0] @property def n_genes(self): """ Get the number of genes. :return: """ return self.exp_matrix.shape[1] @property def exp_matrix(self) -> Union[np.ndarray, spmatrix]: """ Get the expression matrix. :return: """ return self._exp_matrix @exp_matrix.setter def exp_matrix(self, exp_matrix): """ set the value of self._exp_matrix. :param pos_array: np.ndarray or sparse.spmatrix. :return: """ self._exp_matrix = exp_matrix @property def exon_matrix(self) -> Union[np.ndarray, spmatrix]: """ Get the exon matrix. :return: """ return self._exon_matrix @exon_matrix.setter def exon_matrix(self, exon_matrix): """ set the value of self._exon_matrix. :param pos_array: np.ndarray or sparse.spmatrix. :return: """ self._exon_matrix = exon_matrix @property def bin_type(self): """ Get the bin type. :return: """ return self._bin_type @bin_type.setter def bin_type(self, b_type): """ set the value of self._bin_type. :param b_type: the value of bin type, 'bins' or 'cell_bins'. :return: """ self.bin_type_check(b_type) self._bin_type = b_type @property def bin_size(self): """ Get the bin size. :return: """ return self._bin_size @bin_size.setter def bin_size(self, bin_size): self._bin_size = bin_size # @property # def raw_position(self): # return self._raw_position # @raw_position.setter # def raw_position(self, pos): # self._raw_position = pos @property def spatial(self): return self.cells_matrix[self.spatial_key] @spatial.setter def spatial(self, spatial): assert isinstance(spatial, np.ndarray), "spatial must be np.ndarray." assert spatial.ndim in (2, 3), "spatial must be 2 or 3 dimensions." self.cells_matrix[self.spatial_key] = spatial @property def position(self): """ Get the information of spatial location. :return: """ # return self._position if self.spatial_key not in self.cells_matrix: return None if self.cells_matrix[self.spatial_key].shape[1] >= 2: return self.cells_matrix[self.spatial_key][:, 0:2] return None @position.setter def position(self, position): """ set the value of self._position. :param pos: the value of position, a np.ndarray . :return: """ # self._position = position assert isinstance(position, np.ndarray), "position must be np.ndarray." assert position.ndim == 2, "position must be 2 dimensions." assert position.shape[1] == 2, "the length of position's second dimension must be 2." assert position.shape[0] == self.n_cells, "the length of position must be equal to the number of cells." if self.spatial_key not in self.cells_matrix: self.cells_matrix[self.spatial_key] = position else: if self.cells_matrix[self.spatial_key].shape[1] == 1: self.cells_matrix[self.spatial_key] = np.concatenate([position, self.cells_matrix[self.spatial_key]], axis=1) else: self.cells_matrix[self.spatial_key][:, [0, 1]] = position @property def position_z(self): # return self._position_z if self.spatial_key not in self.cells_matrix: return None if self.cells_matrix[self.spatial_key].shape[1] >= 3: return self.cells_matrix[self.spatial_key][:, 2:3] if self.cells_matrix[self.spatial_key].shape[1] == 1: return self.cells_matrix[self.spatial_key] return None @position_z.setter def position_z(self, position_z): # self._position_z = position_z assert isinstance(position_z, np.ndarray), "position_z must be np.ndarray." assert position_z.size == self.n_cells, "the length of position_z must be equal to the number of cells." if position_z.ndim == 1: position_z = position_z.reshape(-1, 1) if self.spatial_key not in self.cells_matrix or self.cells_matrix[self.spatial_key].shape[1] == 1: self.cells_matrix[self.spatial_key] = position_z else: self.cells_matrix[self.spatial_key] = np.concatenate( [self.cells_matrix[self.spatial_key][:, [0, 1]], position_z], axis=1 ) @property def position_offset(self): """ Get the offset of position in gef. """ return self._position_offset @position_offset.setter def position_offset(self, position_offset): self._position_offset = position_offset @property def position_min(self): return self._position_min @position_min.setter def position_min(self, position_min): self._position_min = position_min @property def offset_x(self): """ Get the x value of the offset. :return: """ return self._offset_x @offset_x.setter def offset_x(self, min_x): """ :param min_x: offset of x. :return: """ self._offset_x = min_x @property def offset_y(self): """ Get the y value of the offset. :return: """ return self._offset_y @offset_y.setter def offset_y(self, min_y): """ :param min_y: offset of y. :return: """ self._offset_y = min_y @property def attr(self): """ Get the attribute information. :return: """ return self._attr @attr.setter def attr(self, attr): """ :param attr: dict of attr. :return: """ self._attr = attr @property def merged(self): """ Get the flag whether merged. """ return self._merged @merged.setter def merged(self, merged): self._merged = merged @property def sn(self): """ Get the sample name. """ return self._sn @sn.setter def sn(self, sn): self._sn = sn @property def raw(self): return self.tl.raw @property def resolution(self): if self.attr is not None and 'resolution' in self.attr: return self.attr['resolution'] else: return None def to_df(self): """ Transform StereoExpData object to pd.DataFrame. :return: """ df = pd.DataFrame( self.exp_matrix.toarray() if issparse(self.exp_matrix) else self.exp_matrix, columns=self.gene_names, index=self.cell_names ) return df def sparse2array(self): """ Transform expression matrix to array if it is parse matrix. :return: """ if issparse(self.exp_matrix): self.exp_matrix = self.exp_matrix.toarray() return self.exp_matrix def array2sparse(self): """ Transform expression matrix to sparse matrix if it is ndarray. :return: """ if not issparse(self.exp_matrix): self.exp_matrix = csr_matrix(self.exp_matrix) return self.exp_matrix def __str__(self): format_str = f"StereoExpData object with n_cells X n_genes = {self.shape[0]} X {self.shape[1]}" format_str += f"\nbin_type: {self.bin_type}" if self.bin_type == 'bins': format_str += f"\n{'bin_size: %d' % self.bin_size}" format_str += f"\noffset_x = {self.offset_x}" format_str += f"\noffset_y = {self.offset_y}" format_str += str(self.cells) format_str += str(self.genes) if self.cells_matrix: format_str += f"\ncells_matrix = {list(self.cells_matrix.keys())}" if self.genes_matrix: format_str += f"\ngenes_matrix = {list(self.genes_matrix.keys())}" if self.cells_pairwise: format_str += f"\ncells_pairwise = {list(self.cells_pairwise.keys())}" if self.genes_pairwise: format_str += f"\ngenes_pairwise = {list(self.genes_pairwise.keys())}" format_str += f"\n{str(self.layers)}" format_key_record = { key: value for key, value in self.tl.key_record.items() if value } # warn( # 'FutureWarning: `pca`, `neighbors`, `cluster`, `umap` will be inaccessible in result in future version.' # '\nMake sure your code access result from the right property, such as `pca` and `umap` will be in the ' # '`StereoExpData.cells_matrix`.', # category=FutureWarning # ) if format_key_record: format_str += f"\ntl.key_record: {format_key_record}" result_key = [] for rks in self.tl.key_record.values(): if rks is not None and len(rks) > 0: result_key += rks for rk in self.tl.result.keys(): if rk not in result_key: result_key.append(rk) format_str += f"\ntl.result: {result_key}" return format_str def __repr__(self): return self.__str__() def issparse(self): """ Check whether the matrix is sparse matrix type. """ return issparse(self.exp_matrix) def reset_position(self): if self.position_offset is not None: batches = np.unique(self.cells.batch) position = self.position for bno in batches: idx = np.where(self.cells.batch == bno)[0] position[idx] -= self.position_offset[bno] position[idx] += self.position_min[bno] self.position = position self.position_offset = None self.position_min = None def __add__(self, other): from stereo.core.ms_data import MSData if isinstance(other, StereoExpData): return MSData([self, other]) elif isinstance(other, MSData): return other.__add__(self) else: raise TypeError("only support StereoExpData and MSData!") def write(self, filename, to_anndata=False, **kwargs): if 'output' in kwargs: del kwargs['output'] kwargs.setdefault('output', filename) kwargs.setdefault('split_batches', False) if to_anndata: from stereo.io.reader import stereo_to_anndata stereo_to_anndata(self, **kwargs) else: from stereo.io.writer import write_h5ad write_h5ad(self, **kwargs) def to_ann_based(self): from stereo.io.reader import stereo_to_anndata adata = stereo_to_anndata(self, flavor='scanpy', split_batches=False) return AnnBasedStereoExpData(based_ann_data=adata) def _remove_unused_categories(self): self.cells._remove_unused_categories() self.genes._remove_unused_categories()
class AnnBasedStereoExpData(StereoExpData): def __init__( self, h5ad_file_path: str = None, based_ann_data: anndata.AnnData = None, bin_type: str = None, bin_size: int = None, spatial_key: str = 'spatial', *args, **kwargs ): super(AnnBasedStereoExpData, self).__init__(*args, **kwargs) if h5ad_file_path is None and based_ann_data is None: raise Exception("Must to input the 'h5ad_file_path' or 'based_ann_data'.") if h5ad_file_path is not None and based_ann_data is not None: raise Exception("'h5ad_file_path' and 'based_ann_data' only can input one of them") if based_ann_data: assert type(based_ann_data) is anndata.AnnData self._ann_data = based_ann_data else: self._ann_data = anndata.read_h5ad(h5ad_file_path) self._genes = AnnBasedGene(self._ann_data) self._cells = AnnBasedCell(self._ann_data) if 'resolution' in self._ann_data.uns: self.attr = {'resolution': self._ann_data.uns['resolution']} del self._ann_data.uns['resolution'] if 'merged' in self._ann_data.uns: self.merged = self._ann_data.uns['merged'] del self._ann_data.uns['merged'] if bin_type is not None and 'bin_type' not in self._ann_data.uns: self._ann_data.uns['bin_type'] = bin_type if bin_size is not None and 'bin_size' not in self._ann_data.uns: self._ann_data.uns['bin_size'] = bin_size if h5ad_file_path is not None and 'sn' not in self._ann_data.uns: sn = self.get_sn_from_path(h5ad_file_path) if sn is not None: self._ann_data.uns['sn'] = pd.DataFrame([[-1, sn]], columns=['batch', 'sn']) if 'position_offset' in self._ann_data.uns: self.position_offset = self._ann_data.uns['position_offset'] del self._ann_data.uns['position_offset'] if 'position_min' in self._ann_data.uns: self.position_min = self._ann_data.uns['position_min'] del self._ann_data.uns['position_min'] from .st_pipeline import AnnBasedStPipeline self._tl = AnnBasedStPipeline(self) if 'key_record' in self._ann_data.uns: key_record = self._ann_data.uns['key_record'] self._tl._key_record = self._ann_data.uns['key_record'] = {key: list(value) for key, value in key_record.items()} if 'result_keys' in self._ann_data.uns: result_keys = self._ann_data.uns['result_keys'] self._tl.result.keys().extend(result_keys) del self._ann_data.uns['result_keys'] if self._ann_data.raw: self._tl._raw = AnnBasedStereoExpData(based_ann_data=self._ann_data.raw.to_adata()) self.spatial_key = spatial_key self.file_format = 'h5ad' def __str__(self): # return str(self._ann_data) format_str = f"AnnBasedStereoExpData object with n_cells X n_genes = {self.shape[0]} X {self.shape[1]}" format_str += f"\nadata: id({id(self._ann_data)})" format_str += f"\nbin_type: {self.bin_type}" if self.bin_type == 'bins': format_str += f"\n{'bin_size: %d' % self.bin_size}" format_str += f"\noffset_x = {self.offset_x}" format_str += f"\noffset_y = {self.offset_y}" format_str += str(self.cells) format_str += str(self.genes) if self.cells_matrix: format_str += f"\ncells_matrix = {list(self.cells_matrix.keys())}" if self.genes_matrix: format_str += f"\ngenes_matrix = {list(self.genes_matrix.keys())}" if self.cells_pairwise: format_str += f"\ncells_pairwise = {list(self.cells_pairwise.keys())}" if self.genes_pairwise: format_str += f"\ngenes_pairwise = {list(self.genes_pairwise.keys())}" format_str += f"\n{str(self.layers)}" format_key_record = { key: value for key, value in self.tl.key_record.items() if value } if format_key_record: format_str += f"\ntl.key_record: {format_key_record}" result_key = [] for rks in self.tl.key_record.values(): if rks is not None and len(rks) > 0: result_key += rks for rk in self.tl.result.keys(): if rk not in result_key: result_key.append(rk) format_str += f"\ntl.result: {result_key}" return format_str def __repr__(self): return self.__str__() # def __getattr__(self, name: str): # if name.startswith('__'): # raise AttributeError # if hasattr(self._ann_data, name): # return getattr(self._ann_data, name) # else: # return None @property def layers(self): return self._ann_data.layers @property def exp_matrix(self): return self._ann_data.X @exp_matrix.setter def exp_matrix(self, pos_array: spmatrix): self._ann_data.X = pos_array @property def genes(self): return self._genes @genes.setter def genes(self, gene: AnnBasedGene): self._genes = gene @property def cells(self): return self._cells @cells.setter def cells(self, cell: AnnBasedCell): self._cells = cell @property def plt(self): """ Call the visualization module. """ if self._plt is None: from ..plots.plot_collection import PlotCollection self._plt = PlotCollection(self) return self._plt @property def tl(self): """ call StPipeline method. """ return self._tl @property def position(self): if self.spatial_key in self._ann_data.obsm: return self._ann_data.obsm[self.spatial_key][:, 0:2] elif 'x' in self._ann_data.obs.columns and 'y' in self._ann_data.obs.columns: return self._ann_data.obs[['x', 'y']].to_numpy() return None # @position.setter # def position(self, pos): # if 'spatial' in self._ann_data.obsm: # self._ann_data.obsm['spatial'][:, [0, 1]] = pos @property def position_z(self): if self.spatial_key in self._ann_data.obsm: if self._ann_data.obsm[self.spatial_key].shape[1] >= 3: return self._ann_data.obsm[self.spatial_key][:, 2:3] else: return None elif 'z' in self._ann_data.obs.columns: return self._ann_data.obs[['z']].to_numpy() return None @position.setter def position(self, position: np.ndarray): if len(position.shape) != 2: raise ValueError("the shape of position must be 2 dimensions.") if position.shape[1] != 2: raise ValueError("the length of position's second dimension must be 2.") if self.spatial_key in self._ann_data.obsm: self._ann_data.obsm[self.spatial_key][:, [0, 1]] = position elif 'x' in self._ann_data.obs.columns and 'y' in self._ann_data.obs.columns: self._ann_data.obs['x'] = position[:, 0] self._ann_data.obs['y'] = position[:, 1] else: self._ann_data.obsm[self.spatial_key] = position @position_z.setter def position_z(self, position_z: np.ndarray): if (position_z.shape) == 1: position_z = position_z.reshape(-1, 1) if self.spatial_key in self._ann_data.obsm: self._ann_data.obsm[self.spatial_key] = np.concatenate( [self._ann_data.obsm[self.spatial_key][:, [0, 1]], position_z], axis=1) else: self._ann_data.obs['z'] = position_z @property def bin_type(self): return self._ann_data.uns.get('bin_type', 'bins') @bin_type.setter def bin_type(self, bin_type): self.bin_type_check(bin_type) self._ann_data.uns['bin_type'] = bin_type @property def bin_size(self): return self._ann_data.uns.get('bin_size', 1) @bin_size.setter def bin_size(self, bin_size): self._ann_data.uns['bin_size'] = bin_size @property def sn(self): sn = None if 'sn' in self._ann_data.uns: sn_data: pd.DataFrame = self._ann_data.uns['sn'] if sn_data.shape[0] == 1: sn = sn_data.iloc[0]['sn'] else: sn = {} for _, row in sn_data.iterrows(): sn[row['batch']] = row['sn'] return sn @sn.setter def sn(self, sn): if isinstance(sn, str): sn_list = [['-1', sn]] elif isinstance(sn, dict): sn_list = [] for bno, sn in sn.items(): sn_list.append([bno, sn]) else: raise TypeError(f'sn must be type of str or dict, but now is {type(sn)}') self._ann_data.uns['sn'] = pd.DataFrame(sn_list, columns=['batch', 'sn']) def sub_by_index(self, cell_index=None, gene_index=None, filter_raw=True): if cell_index is not None: self._ann_data._inplace_subset_obs(cell_index) if gene_index is not None: self._ann_data._inplace_subset_var(gene_index) filter_raw = filter_raw and self.raw is not None if filter_raw: self.raw.sub_by_index(cell_index, gene_index, False) if gene_index is not None: self._ann_data._raw = self._ann_data.raw[:, gene_index] return self def sub_by_name( self, cell_name: Optional[Union[np.ndarray, list]] = None, gene_name: Optional[Union[np.ndarray, list]] = None, filter_raw: bool = True, copy: bool = True ): data = deepcopy(self) if copy else self if gene_name is not None: if data.genes.real_gene_name is not None: gene_name = gene_name_2_gene_id(gene_name, data.genes.real_gene_name, data.genes.gene_name) data.sub_by_index(cell_name, gene_name, filter_raw) return data @property def adata(self): return self._ann_data def write(self, filename, **kwargs): from stereo.io.reader import stereo_to_anndata if 'output' in kwargs: del kwargs['output'] kwargs.setdefault('output', filename) kwargs.setdefault('split_batches', False) stereo_to_anndata(self, **kwargs)