Source code for Bering.tools._cell_annotation

import logging
import numpy as np
import pandas as pd

from anndata import AnnData
import scanpy as sc
from scipy.sparse import csr_matrix

from ..objects import Bering_Graph as BrGraph

def _ensemble_annotation(
    bg: BrGraph,
    min_transcripts: int = 50,
    min_dominant_nodes: int = 30,
    min_dominant_ratio: float = 0.6,
):
    '''
    annotate cells based on ensemble approach
    '''
    df_spots = bg.spots_all.copy()

    # get cell abundance 1
    df_abundance = pd.DataFrame(df_spots.groupby(['predicted_cells']).size())
    df_abundance.columns = ['n_cells']
    df_abundance = df_abundance.sort_values(by = ['n_cells'], ascending = False)

    cells_1 = np.unique(df_abundance.loc[df_abundance['n_cells'] > min_transcripts, :].index.values)

    # get cell abundance 2
    df_abundance = pd.DataFrame(df_spots.groupby(['predicted_cells', 'predicted_node_labels']).size())
    df_abundance.columns = ['n_cells']
    df_abundance = df_abundance.sort_values(by = ['n_cells'], ascending = False)
    df_abundance.reset_index(inplace = True)

    cells_2 = np.unique(df_abundance.loc[df_abundance['n_cells'] > min_dominant_nodes, 'predicted_cells'].values)

    # filter out qualified cells
    df_perc = (
        df_spots.groupby("predicted_cells")["predicted_node_labels"]\
            .apply(lambda x: x.value_counts(normalize=True).head(1))\
                .mul(100)\
                    .rename_axis(['predicted_cells','predicted_node_labels'])\
                        .reset_index(name='Perc')
    )
    cells_3 = df_perc.loc[(df_perc['predicted_node_labels'] != 'background')&(df_perc['Perc'] >= min_dominant_ratio), 'predicted_cells'].values

    qualified_cells = np.intersect1d(np.intersect1d(cells_1, cells_2), cells_3)

    df_perc.set_index('predicted_cells', inplace = True)
    qualified_labels = df_perc.loc[qualified_cells, 'predicted_node_labels'].values
    label_dict = dict(zip(qualified_cells, qualified_labels))

    all_labels = np.array(['Unknown'] * df_spots.shape[0], dtype = 'object')
    all_cells = np.array([-1] * df_spots.shape[0])
    predicted_cells = df_spots['predicted_cells'].values

    for cell, label in zip(qualified_cells, qualified_labels):
        indices = np.where(predicted_cells == cell)[0]
        all_labels[indices] = label
        all_cells[indices] = cell

    df_spots['ensembled_cells'] = all_cells
    df_spots['ensembled_labels'] = all_labels

    bg.spots_predicted = df_spots.copy()

    return df_spots, label_dict

def _create_anndata(
    df_spots_all: pd.DataFrame,
    label_dict: dict,
):
    '''
    Create anndata for segmented cells
    '''
    # ensembled annotations
    df_spots = df_spots_all.loc[df_spots_all['ensembled_cells'] != -1, :].copy()
    df_expr = df_spots.groupby(['ensembled_cells', 'features']).size().unstack(fill_value = 0)
    
    df_cells = pd.DataFrame(index = df_expr.index.values)
    df_cells['n_counts'] = np.sum(df_expr.values, axis = 1)
    df_cells['n_genes'] = np.sum(df_expr.values != 0, axis = 1)
    df_cells['cx'] = pd.DataFrame(df_spots.groupby(['ensembled_cells']).mean()['x']).loc[df_cells.index.values, 'x'].values
    df_cells['cy'] = pd.DataFrame(df_spots.groupby(['ensembled_cells']).mean()['y']).loc[df_cells.index.values, 'y'].values
    df_cells['predicted_labels'] = [label_dict[i] for i in df_cells.index.values]

    df_features = pd.DataFrame(index = df_expr.columns)
    df_features['n_cells'] = np.sum(df_expr.values != 0, axis = 0)

    adata_ensembl = AnnData(
        X = csr_matrix(df_expr.values),
        obs = df_cells,
        var = df_features,
    )

    # pure segmented annotations
    df_expr = df_spots_all.groupby(['predicted_cells', 'features']).size().unstack(fill_value = 0)
    
    df_cells = pd.DataFrame(index = df_expr.index.values)
    df_cells['n_counts'] = np.sum(df_expr.values, axis = 1)
    df_cells['n_genes'] = np.sum(df_expr.values != 0, axis = 1)
    df_cells['cx'] = pd.DataFrame(df_spots_all.groupby(['predicted_cells'])['x'].mean()).loc[df_cells.index.values, 'x'].values
    df_cells['cy'] = pd.DataFrame(df_spots_all.groupby(['predicted_cells'])['y'].mean()).loc[df_cells.index.values, 'y'].values

    df_features = pd.DataFrame(index = df_expr.columns)
    df_features['n_cells'] = np.sum(df_expr.values != 0, axis = 0)

    adata_segmented = AnnData(
        X = csr_matrix(df_expr.values),
        obs = df_cells,
        var = df_features,
    )

    return adata_ensembl, adata_segmented

[docs]def cell_annotation( bg: BrGraph, min_transcripts: int = 50, min_dominant_nodes: int = 30, min_dominant_ratio: float = 0.6, ): ''' Annotate segmented cells based on ensemble strategy. A cell is considered as a qualified cell if it satisfies all of the following criteria: - Number of transcripts > min_transcripts; - Number of dominant nodes > min_dominant_nodes; - Ratio of dominant nodes > min_dominant_ratio. Other than that, the cell is not considered as a segmented cell and transcipts within this cell are assigned to the background. Parameters ---------- bg: BrGraph Bering graph object min_transcripts: int Minimum number of transcripts for a cell to be considered min_dominant_nodes: int Minimum number of dominant nodes for a cell to be considered min_dominant_ratio: float Minimum ratio of dominant nodes for a cell to be considered ''' df_spots, label_dict = _ensemble_annotation( bg = bg, min_transcripts = min_transcripts, min_dominant_nodes = min_dominant_nodes, min_dominant_ratio = min_dominant_ratio, ) adata_ensembl, adata_segmented = _create_anndata(df_spots, label_dict) return df_spots, adata_ensembl, adata_segmented
[docs]def cell_analyze( adata: AnnData, min_counts: int = 10, target_sum: int = 1000, n_neighbors: int = 10, ): ''' Run a standard analysis pipeline from single-cell data with raw counts. The pipeline includes: - Filter cells with low counts; - Normalize total counts; - Scale counts; - PCA; - Neighbor graph construction; - UMAP; - Leiden clustering. Parameters ---------- adata: AnnData Anndata object. It can be the output of :func:`~.cell_annotation` function. min_counts: int Minimum number of counts for a gene to be considered target_sum: int Target sum of counts for normalization n_neighbors: int Number of neighbors for KNN graph construction Returns ------- adata: AnnData Anndata object with updated fields ''' sc.pp.filter_cells(adata, min_counts = min_counts) sc.pp.normalize_total(adata, target_sum = target_sum) sc.pp.log1p(adata) sc.pp.scale(adata) sc.tl.pca(adata) sc.pp.neighbors(adata, n_neighbors = n_neighbors) sc.tl.umap(adata) sc.tl.leiden(adata) return adata