checkatlas

checkatlas.checkatlas

checkatlas.atlas

atlas_sampling(df_annot, type_df, args)

If args.plot_celllimit != 0 and args.plot_celllimit < len(df_annot) The atlas qC table will be sampled for MultiQC

Parameters:
  • df_annot (pd.DataFrame) – Table to sample

  • type_df (str) – type of table

  • args (argparse.Namespace) – arguments of checkatlas workflow

Returns:
  • pd.DataFrame – Sampled QC table

Source code in checkatlas/atlas.py
def atlas_sampling(
    df_annot: pd.DataFrame, type_df: str, args: argparse.Namespace
) -> pd.DataFrame:
    """
    If args.plot_celllimit != 0 and args.plot_celllimit < len(df_annot)
    The atlas qC table will be sampled for MultiQC

    Args:
        df_annot (pd.DataFrame): Table to sample
        type_df (str): type of table
        args (argparse.Namespace): arguments of checkatlas workflow

    Returns:
        pd.DataFrame: Sampled QC table
    """
    if args.plot_celllimit != 0 and args.plot_celllimit < len(df_annot):
        logger.debug(f"Sample {type_df} table with {len(df_annot)} cells")
        df_annot = df_annot.sample(args.plot_celllimit)
        logger.debug(f"{type_df} table sampled to {len(df_annot)} cells")
    return df_annot

clean_scanpy_atlas(adata, atlas_info)

Clean the Scanpy object to be sure to get all information out of it

  • Make var names unique
  • Make var unique for Raw matrix
  • If OBS_CLUSTERS are present and in int32 -> be sure to transform them in categorical
Parameters:
  • adata (AnnData) – atlas to analyse

  • atlas_info (dict) – info on the atlas

Returns:
  • AnnData – cleaned atlas

Source code in checkatlas/atlas.py
def clean_scanpy_atlas(adata: AnnData, atlas_info: dict) -> AnnData:
    """
    Clean the Scanpy object to be sure to get all information out of it

    - Make var names unique
    - Make var unique for Raw matrix
    - If OBS_CLUSTERS are present and in int32 -> be sure to
    transform them in categorical

    Args:
        adata (AnnData): atlas to analyse
        atlas_info (dict): info on the atlas

    Returns:
        AnnData: cleaned atlas
    """
    logger.debug(f"Clean scanpy: {atlas_info[checkatlas.ATLAS_NAME_KEY]}")
    # Make var names unique
    list_var = adata.var_names
    if len(set(list_var)) == len(list_var):
        logger.debug("Var names unique")
    else:
        logger.debug(
            "Var names not unique, ran : adata.var_names_make_unique()"
        )
        adata.var_names_make_unique()
        # Test a second time if it is unique (sometimes it helps)
        list_var = adata.var_names
        if len(set(list_var)) == len(list_var):
            logger.debug("Var names unique")
        else:
            logger.debug(
                "Var names not unique, ran : adata.var_names_make_unique()"
            )
            adata.var_names_make_unique()
            # If it is still not unique, create unique var_names "by hand"
            list_var = adata.var_names
            if len(set(list_var)) == len(list_var):
                logger.debug("Var names unique")
            else:
                logger.debug(
                    "Var names not unique, ran : adata.var_names_make_unique()"
                )
                adata.var.index = [
                    x + "_" + str(i)
                    for i, x in zip(range(len(adata.var)), adata.var_names)
                ]
                list_var = adata.var_names
                if len(set(list_var)) == len(list_var):
                    logger.debug("Var names unique")
    # Make var unique for Raw matrix
    if adata.raw is not None:
        list_var = adata.raw.var_names
        if len(set(list_var)) == len(list_var):
            logger.debug("Var names for Raw unique, transform ")
        else:
            logger.debug("Var names for Raw not unique")
            adata.raw.var.index = [
                x + "_" + str(i)
                for i, x in zip(range(len(adata.raw.var)), adata.raw.var_names)
            ]
            list_var = adata.raw.var_names
            if len(set(list_var)) == len(list_var):
                logger.debug("Var names for Raw unique")

    # If OBS_CLUSTERS are present and in int32 -> be sure to
    # transform them in categorical
    for obs_key in adata.obs_keys():
        for obs_key_celltype in OBS_CLUSTERS:
            if obs_key_celltype in obs_key:
                if (
                    adata.obs[obs_key].dtype == np.int32
                    or adata.obs[obs_key].dtype == np.int64
                ):
                    adata.obs[obs_key] = pd.Categorical(adata.obs[obs_key])
    return adata

create_anndata_table(adata, atlas_info, args)

Create an html table with all AnnData arguments The html code will make all elements of the table visible in MultiQC

Parameters:
  • adata (AnnData) – atlas to analyse

  • atlas_info (dict) – info on the atlas

  • args (argparse.Namespace) – list of arguments from checkatlas workflow

Source code in checkatlas/atlas.py
def create_anndata_table(
    adata: AnnData, atlas_info: dict, args: argparse.Namespace
) -> None:
    """
    Create an html table with all AnnData arguments
    The html code will make all elements of the table visible in MultiQC
    Args:
        adata (AnnData): atlas to analyse
        atlas_info (dict): info on the atlas
        args (argparse.Namespace): list of arguments from checkatlas workflow
    """
    atlas_name = atlas_info[checkatlas.ATLAS_NAME_KEY]

    logger.debug(f"Create Adata table for {atlas_name}")
    csv_path = files.get_file_path(
        atlas_name, folders.ANNDATA, checkatlas.TSV_EXTENSION, args.path
    )
    # Create AnnData table
    header = ["atlas_obs", "obsm", "var", "varm", "uns"]
    df_summary = pd.DataFrame(index=[atlas_name], columns=header)
    # html_element = "<span class=\"label label-primary\">"
    # new_line = ''
    # for value in list(adata.obs.columns):
    #     new_line += html_element + value + "</span><br>"
    #     print(new_line)
    df_summary["atlas_obs"][atlas_name] = (
        "<code>"
        + "</code><br><code>".join(list(adata.obs.columns))
        + "</code>"
    )
    df_summary["obsm"][atlas_name] = (
        "<code>"
        + "</code><br><code>".join(list(adata.obsm_keys()))
        + "</code>"
    )
    df_summary["var"][atlas_name] = (
        "<code>" + "</code><br><code>".join(list(adata.var_keys())) + "</code>"
    )
    df_summary["varm"][atlas_name] = (
        "<code>"
        + "</code><br><code>".join(list(adata.varm_keys()))
        + "</code>"
    )
    df_summary["uns"][atlas_name] = (
        "<code>" + "</code><br><code>".join(list(adata.uns_keys())) + "</code>"
    )
    df_summary.to_csv(csv_path, index=False, quoting=False, sep="\t")

create_metric_annot(adata, atlas_info, args)

Calc annotation metrics

Parameters:
  • adata (AnnData) – atlas to analyse

  • atlas_path (dict) – path of the atlas

  • args (argparse.Namespace) – list of arguments from checkatlas workflow

Source code in checkatlas/atlas.py
def create_metric_annot(
    adata: AnnData, atlas_info: dict, args: argparse.Namespace
) -> None:
    """
    Calc annotation metrics

    Args:
        adata (AnnData): atlas to analyse
        atlas_path (dict): path of the atlas
        args (argparse.Namespace): list of arguments from checkatlas workflow
    """
    atlas_name = atlas_info[checkatlas.ATLAS_NAME_KEY]
    csv_path = files.get_file_path(
        atlas_name,
        folders.ANNOTATION,
        checkatlas.TSV_EXTENSION,
        args.path,
    )
    header = ["Annot_Sample", "Reference", "obs"] + args.metric_annot
    df_annot = pd.DataFrame(columns=header)
    obs_keys = get_viable_obs_annot(adata, args)
    if len(obs_keys) > 1:
        logger.debug(f"Calc annotation metrics for {atlas_name}")
        ref_obs = obs_keys[0]
        for i in range(1, len(obs_keys)):
            obs_key = obs_keys[i]
            dict_line = {
                "Annot_Sample": [atlas_name + "_" + obs_key],
                "Reference": [ref_obs],
                "obs": [obs_key],
            }
            for metric in args.metric_annot:
                logger.debug(
                    f"Calc {metric} for {atlas_name} "
                    f"with obs {obs_key} vs ref_obs {ref_obs}"
                )
                metric_value = metrics.calc_metric_annot_scanpy(
                    metric, adata, obs_key, ref_obs
                )
                dict_line[metric] = metric_value
            df_line = pd.DataFrame(dict_line)
            df_annot = pd.concat(
                [df_annot, df_line], ignore_index=True, axis=0
            )
        df_annot.to_csv(csv_path, index=False, sep="\t")
    else:
        logger.debug(f"No viable obs_key was found for {atlas_name}")

create_metric_cluster(adata, atlas_info, args)

Calc clustering metrics

Parameters:
  • adata (AnnData) – atlas to analyse

  • atlas_info (dict) – path of the atlas

  • args (argparse.Namespace) – list of arguments from checkatlas workflow

Source code in checkatlas/atlas.py
def create_metric_cluster(
    adata: AnnData, atlas_info: dict, args: argparse.Namespace
) -> None:
    """
    Calc clustering metrics

    Args:
        adata (AnnData): atlas to analyse
        atlas_info (dict): path of the atlas
        args (argparse.Namespace): list of arguments from checkatlas workflow
    """
    atlas_name = atlas_info[checkatlas.ATLAS_NAME_KEY]
    csv_path = files.get_file_path(
        atlas_name,
        folders.CLUSTER,
        checkatlas.TSV_EXTENSION,
        args.path,
    )
    header = ["Clust_Sample", "obs"] + args.metric_cluster
    df_cluster = pd.DataFrame(columns=header)
    obs_keys = get_viable_obs_annot(adata, args)
    obsm_keys = get_viable_obsm(adata, args)
    r = re.compile(".*umap*.")
    obsm_umap_keys = list(filter(r.match, obsm_keys))
    r = re.compile(".*tsne*.")
    obsm_tsne_keys = list(filter(r.match, obsm_keys))
    obsm_key_representation = ""
    if len(obsm_umap_keys) > 0:
        obsm_key_representation = obsm_umap_keys[0]
        print("reach", obsm_key_representation)
    elif len(obsm_tsne_keys) > 0:
        obsm_key_representation = obsm_tsne_keys[0]
        print("reach", obsm_key_representation)

    if len(obs_keys) > 0:
        logger.debug(f"Calc clustering metrics for {atlas_name}")
        for obs_key in obs_keys:
            dict_line = {
                "Clust_Sample": [atlas_name + "_" + obs_key],
                "obs": [obs_key],
            }
            for metric in args.metric_cluster:
                logger.debug(
                    f"Calc {metric} for {atlas_name} "
                    f"with obs {obs_key} and obsm {obsm_key_representation}"
                )
                metric_value = metrics.calc_metric_cluster_scanpy(
                    metric, adata, obs_key, obsm_key_representation
                )
                dict_line[metric] = metric_value
            df_line = pd.DataFrame(dict_line)
            df_cluster = pd.concat(
                [df_cluster, df_line], ignore_index=True, axis=0
            )
        df_cluster.to_csv(csv_path, index=False, sep="\t")
    else:
        logger.debug(f"No viable obs_key was found for {atlas_name}")

create_metric_dimred(adata, atlas_info, args)

Calc dimensionality reduction metrics

Parameters:
  • adata (AnnData) – atlas to analyse

  • atlas_info (dict) – path of the atlas

  • args (argparse.Namespace) – list of arguments from checkatlas workflow

Source code in checkatlas/atlas.py
def create_metric_dimred(
    adata: AnnData, atlas_info: dict, args: argparse.Namespace
) -> None:
    """
    Calc dimensionality reduction metrics

    Args:
        adata (AnnData): atlas to analyse
        atlas_info (dict): path of the atlas
        args (argparse.Namespace): list of arguments from checkatlas workflow
    """
    atlas_name = atlas_info[checkatlas.ATLAS_NAME_KEY]
    csv_path = files.get_file_path(
        atlas_name,
        folders.DIMRED,
        checkatlas.TSV_EXTENSION,
        args.path,
    )
    header = ["Dimred_Sample", "obsm"] + args.metric_dimred
    df_dimred = pd.DataFrame(columns=header)
    obsm_keys = get_viable_obsm(adata, args)
    if len(obsm_keys) > 0:
        logger.debug(f"Calc dim red metrics for {atlas_name}")
        for obsm_key in obsm_keys:
            dict_line = {
                "Dimred_Sample": [atlas_name + "_" + obsm_key],
                "obsm": [obsm_key],
            }
            for metric in args.metric_dimred:
                logger.debug(
                    f"Calc {metric} for {atlas_name} with obsm {obsm_key}"
                )
                metric_value = metrics.calc_metric_dimred(
                    metric, adata, obsm_key
                )
                dict_line[metric] = metric_value
            df_line = pd.DataFrame(dict_line)
            df_dimred = pd.concat(
                [df_dimred, df_line], ignore_index=True, axis=0
            )
        df_dimred.to_csv(csv_path, index=False, sep="\t")
    else:
        logger.debug(f"No viable obsm_key was found for {atlas_name}")

create_qc_plots(adata, atlas_info, args)

Display the atlas QC plot Search for the OBS variable which correspond to the toal_RNA, total_UMI, MT_ratio, RT_ratio

Parameters:
  • adata (AnnData) – atlas to analyse

  • atlas_info (dict) – info on the atlas

  • args (argparse.Namespace) – list of arguments from checkatlas workflow

Source code in checkatlas/atlas.py
def create_qc_plots(
    adata: AnnData, atlas_info: dict, args: argparse.Namespace
) -> None:
    """
    Display the atlas QC plot
    Search for the OBS variable which correspond to the toal_RNA, total_UMI,
     MT_ratio, RT_ratio

    Args:
        adata (AnnData): atlas to analyse
        atlas_info (dict): info on the atlas
        args (argparse.Namespace): list of arguments from checkatlas workflow
    """
    atlas_name = atlas_info[checkatlas.ATLAS_NAME_KEY]
    sc.settings.figdir = folders.get_workingdir(args.path)
    sc.set_figure_params(dpi_save=80)
    qc_path = os.sep + atlas_name + checkatlas.QC_FIG_EXTENSION
    logger.debug(f"Create QC violin plot for {atlas_name}")
    # mitochondrial genes
    adata.var["mt"] = adata.var_names.str.startswith("MT-")
    # ribosomal genes
    adata.var["ribo"] = adata.var_names.str.startswith(("RPS", "RPL"))
    sc.pp.calculate_qc_metrics(
        adata,
        qc_vars=["mt", "ribo"],
        percent_top=None,
        log1p=False,
        inplace=True,
    )
    sc.pl.violin(
        adata,
        [
            "n_genes_by_counts",
            "total_counts",
            "pct_counts_mt",
            "pct_counts_ribo",
        ],
        jitter=0.4,
        multi_panel=True,
        show=False,
        save=qc_path,
    )

create_qc_tables(adata, atlas_info, args)

Display the atlas QC table Search for the OBS variable which correspond to the toal_RNA, total_UMI, MT_ratio, RT_ratio

Parameters:
  • adata (AnnData) – atlas to analyse

  • atlas_info (dict) – info on the atlas

  • args (argparse.Namespace) – list of arguments from checkatlas workflow

Source code in checkatlas/atlas.py
def create_qc_tables(
    adata: AnnData, atlas_info: dict, args: argparse.Namespace
) -> None:
    """
    Display the atlas QC table
    Search for the OBS variable which correspond to the toal_RNA, total_UMI,
     MT_ratio, RT_ratio

    Args:
        adata (AnnData): atlas to analyse
        atlas_info (dict): info on the atlas
        args (argparse.Namespace): list of arguments from checkatlas workflow
    """
    atlas_name = atlas_info[checkatlas.ATLAS_NAME_KEY]
    qc_path = files.get_file_path(
        atlas_name, folders.QC, checkatlas.TSV_EXTENSION, args.path
    )
    logger.debug(f"Create QC tables for {atlas_name}")
    qc_genes = []
    # mitochondrial genes
    adata.var["mt"] = adata.var_names.str.startswith("MT-")
    if len(adata.var[adata.var["mt"]]) != 0:
        qc_genes.append("mt")
        logger.debug(f"Mitochondrial genes in {atlas_name} for QC")
    else:
        logger.debug(f"No mitochondrial genes in {atlas_name} for QC")
    # ribosomal genes
    adata.var["ribo"] = adata.var_names.str.startswith(("RPS", "RPL"))
    if len(adata.var[adata.var["mt"]]) != 0:
        qc_genes.append("ribo")
        logger.debug(f"Ribosomal genes in {atlas_name} for QC")
    else:
        logger.debug(f"No ribosomal genes in {atlas_name} for QC")

    sc.pp.calculate_qc_metrics(
        adata,
        qc_vars=qc_genes,
        percent_top=None,
        log1p=False,
        inplace=True,
    )
    df_annot = adata.obs[get_viable_obs_qc(adata, args)]
    # Rank cell by qc metric
    for header in df_annot.columns:
        if header != CELLINDEX_HEADER:
            new_header = f"cellrank_{header}"
            df_annot = df_annot.sort_values(header, ascending=False)
            df_annot.loc[:, [new_header]] = range(1, adata.n_obs + 1)

    # Sample QC table when more cells than args.plot_celllimit are present
    df_annot = atlas_sampling(df_annot, "QC", args)
    df_annot.loc[:, [CELLINDEX_HEADER]] = range(1, len(df_annot) + 1)
    df_annot.to_csv(qc_path, index=False, quoting=False, sep="\t")

create_summary_table(adata, atlas_info, args)

Create a table with all summarizing variables

Parameters:
  • adata (AnnData) – atlas to analyse

  • atlas_path (str) – path of the atlas

  • args (argparse.Namespace) – list of arguments from checkatlas workflow

Source code in checkatlas/atlas.py
def create_summary_table(
    adata: AnnData, atlas_info: dict, args: argparse.Namespace
) -> None:
    """
    Create a table with all summarizing variables

    Args:
        adata (AnnData): atlas to analyse
        atlas_path (str): path of the atlas
        args (argparse.Namespace): list of arguments from checkatlas workflow
    """
    atlas_name = atlas_info[checkatlas.ATLAS_NAME_KEY]
    atlas_type = atlas_info[checkatlas.ATLAS_TYPE_KEY]
    atlas_path = atlas_info[checkatlas.ATLAS_PATH_KEY]
    logger.debug(f"Create Summary table for {atlas_name}")
    csv_path = files.get_file_path(
        atlas_name, folders.SUMMARY, checkatlas.TSV_EXTENSION, args.path
    )
    # Create summary table
    header = [
        "AtlasFileType",
        "NbCells",
        "NbGenes",
        "AnnData.raw",
        "AnnData.X",
        "File_extension",
        "File_path",
    ]
    df_summary = pd.DataFrame(index=[atlas_name], columns=header)
    df_summary["AtlasFileType"][atlas_name] = atlas_type
    df_summary["NbCells"][atlas_name] = adata.n_obs
    df_summary["NbGenes"][atlas_name] = adata.n_vars
    df_summary["AnnData.raw"][atlas_name] = adata.raw is not None
    df_summary["AnnData.X"][atlas_name] = adata.X is not None
    df_summary["File_extension"][atlas_name] = atlas_name
    df_summary["File_path"][atlas_name] = atlas_path
    df_summary.to_csv(csv_path, index=False, sep="\t")

create_tsne_fig(adata, atlas_info, args)

Display the TSNE of celltypes Search for the OBS variable which correspond to the celltype annotation

Parameters:
  • adata (AnnData) – atlas to analyse

  • atlas_info (dict) – info on the atlas

  • args (argparse.Namespace) – list of arguments from checkatlas workflow

Source code in checkatlas/atlas.py
def create_tsne_fig(
    adata: AnnData, atlas_info: dict, args: argparse.Namespace
) -> None:
    """
    Display the TSNE of celltypes
    Search for the OBS variable which correspond to the celltype annotation

    Args:
        adata (AnnData): atlas to analyse
        atlas_info (dict): info on the atlas
        args (argparse.Namespace): list of arguments from checkatlas workflow
    """
    atlas_name = atlas_info[checkatlas.ATLAS_NAME_KEY]
    sc.set_figure_params(dpi_save=150)
    # Search if tsne reduction exists
    obsm_keys = get_viable_obsm(adata, args)
    r = re.compile(".*tsne*.")
    obsm_tsne_keys = list(filter(r.match, obsm_keys))
    if len(obsm_tsne_keys) > 0:
        obsm_tsne = obsm_tsne_keys[0]
        logger.debug(
            f"Create t-SNE figure for {atlas_name} with obsm={obsm_tsne}"
        )
        # Set the t-sne to display
        if isinstance(adata.obsm[obsm_tsne], pd.DataFrame):
            # Transform to numpy if it is a pandas dataframe
            adata.obsm["X_tsne"] = adata.obsm[obsm_tsne].to_numpy()
        else:
            adata.obsm["X_tsne"] = adata.obsm[obsm_tsne]
        # Setting up figures directory
        sc.settings.figdir = sc.settings.figdir = folders.get_workingdir(
            args.path
        )
        tsne_path = os.sep + atlas_name + checkatlas.TSNE_EXTENSION
        # Exporting tsne
        obs_keys = get_viable_obs_annot(adata, args)
        if len(obs_keys) != 0:
            sc.pl.tsne(adata, color=obs_keys[0], show=False, save=tsne_path)
        else:
            sc.pl.tsne(adata, show=False, save=tsne_path)

create_umap_fig(adata, atlas_info, args)

Display the UMAP of celltypes Search for the OBS variable which correspond to the celltype annotation

Parameters:
  • adata (AnnData) – atlas to analyse

  • atlas_info (dict) – info on the atlas

  • args (argparse.Namespace) – list of arguments from checkatlas workflow

Source code in checkatlas/atlas.py
def create_umap_fig(
    adata: AnnData, atlas_info: dict, args: argparse.Namespace
) -> None:
    """
    Display the UMAP of celltypes
    Search for the OBS variable which correspond to the celltype annotation

    Args:
        adata (AnnData): atlas to analyse
        atlas_info (dict): info on the atlas
        args (argparse.Namespace): list of arguments from checkatlas workflow
    """
    atlas_name = atlas_info[checkatlas.ATLAS_NAME_KEY]
    sc.set_figure_params(dpi_save=150)
    # Search if umap reduction exists
    obsm_keys = get_viable_obsm(adata, args)
    r = re.compile(".*umap*.")
    obsm_umap_keys = list(filter(r.match, obsm_keys))
    if len(obsm_umap_keys) > 0:
        obsm_umap = obsm_umap_keys[0]
        logger.debug(
            f"Create UMAP figure for {atlas_name} with obsm={obsm_umap}"
        )
        # Set the umap to display
        if isinstance(adata.obsm[obsm_umap], pd.DataFrame):
            # Transform to numpy if it is a pandas dataframe
            adata.obsm["X_umap"] = adata.obsm[obsm_umap].to_numpy()
        else:
            adata.obsm["X_umap"] = adata.obsm[obsm_umap]
        # Setting up figures directory
        sc.settings.figdir = folders.get_workingdir(args.path)
        umap_path = os.sep + atlas_name + checkatlas.UMAP_EXTENSION
        # Exporting umap
        obs_keys = get_viable_obs_annot(adata, args)
        if len(obs_keys) != 0:
            sc.pl.umap(adata, color=obs_keys[0], show=False, save=umap_path)
        else:
            sc.pl.umap(adata, show=False, save=umap_path)

get_viable_obs_annot(adata, args)

Search in obs_keys a match to OBS_CLUSTERS values ! Remove obs_key with only one category ! Extract sorted obs_keys in same order then OBS_CLUSTERS

Parameters:
  • adata (AnnData) – atlas to analyse

  • args (argparse.Namespace) – list of arguments from checkatlas workflow

Returns:
  • list – obs_keys

Source code in checkatlas/atlas.py
def get_viable_obs_annot(adata: AnnData, args: argparse.Namespace) -> list:
    """
    Search in obs_keys a match to OBS_CLUSTERS values
    ! Remove obs_key with only one category !
    Extract sorted obs_keys in same order then OBS_CLUSTERS

    Args:
        adata (AnnData): atlas to analyse
        args (argparse.Namespace): list of arguments from checkatlas workflow

    Returns:
        list: obs_keys
    """
    obs_keys = list()
    # Get keys from OBS_CLUSTERS
    for obs_key in adata.obs_keys():
        for obs_key_celltype in args.obs_cluster:
            if obs_key_celltype in obs_key:
                if type(adata.obs[obs_key].dtype) == pd.CategoricalDtype:
                    obs_keys.append(obs_key)
    # Remove keys with only one category and no NaN in the array
    obs_keys_final = list()
    for obs_key in obs_keys:
        annotations = adata.obs[obs_key]
        if not _object_dtype_isnan(annotations).any():
            categories_temp = annotations.cat.categories
            # remove nan if found
            categories = categories_temp.dropna()
            if True in categories.isin(["nan"]):
                index = categories.get_loc("nan")
                categories = categories.delete(index)
            # Add obs_key with more than one category (with Nan removed)
            if len(categories) != 1:
                logger.debug(
                    f"Add obs_key {obs_key} with cat {categories_temp}"
                )
                obs_keys_final.append(obs_key)
    return sorted(obs_keys_final)

get_viable_obs_qc(adata, args)

Search in obs_keys a match to OBS_QC values Extract sorted obs_keys in same order then OBS_QC

Parameters:
  • adata (AnnData) – atlas to analyse

  • args (argparse.Namespace) – list of arguments from checkatlas workflow

Returns:
  • list – obs_keys

Source code in checkatlas/atlas.py
def get_viable_obs_qc(adata: AnnData, args: argparse.Namespace) -> list:
    """
    Search in obs_keys a match to OBS_QC values
    Extract sorted obs_keys in same order then OBS_QC

    Args:
        adata (AnnData): atlas to analyse
        args (argparse.Namespace): list of arguments from checkatlas workflow

    Returns:
        list: obs_keys
    """
    obs_keys = list()
    for obs_key in adata.obs_keys():
        if obs_key in args.qc_display:
            obs_keys.append(obs_key)
    return obs_keys

get_viable_obsm(adata, args)

TO DO Search viable obsm for dimensionality reduction metric calc. ! No filter on osbm is appled for now !

Parameters:
  • adata (AnnData) – atlas to analyse

  • args (argparse.Namespace) – list of arguments from checkatlas workflow

Returns:
  • list – obsm_keys

Source code in checkatlas/atlas.py
def get_viable_obsm(adata: AnnData, args: argparse.Namespace) -> list:
    """
    TO DO
    Search viable obsm for dimensionality reduction metric
    calc.
    ! No filter on osbm is appled for now !
    Args:
        adata (AnnData): atlas to analyse
        args (argparse.Namespace): list of arguments from checkatlas workflow

    Returns:
        list: obsm_keys
    """
    obsm_keys = list()
    # for obsm_key in adata.obsm_keys():
    #   if obsm_key in args.obsm_dimred:
    obsm_keys = adata.obsm_keys()
    logger.debug(f"Add obsm {obsm_keys}")
    return obsm_keys

read_atlas(atlas_info)

Read Scanpy or Cellranger data : .h5ad or .h5

Parameters:
  • atlas_path (dict) – info about the atlas

Returns:
  • AnnData – scanpy object from .h5ad

Source code in checkatlas/atlas.py
def read_atlas(atlas_info: dict) -> AnnData:
    """
    Read Scanpy or Cellranger data : .h5ad or .h5

    Args:
        atlas_path (dict): info about the atlas

    Returns:
        AnnData: scanpy object from .h5ad
    """
    logger.info(
        f"Load {atlas_info[checkatlas.ATLAS_NAME_KEY]} "
        f"in {atlas_info[checkatlas.ATLAS_PATH_KEY]}"
    )
    try:
        if (
            atlas_info[checkatlas.ATLAS_TYPE_KEY]
            == cellranger.CELLRANGER_TYPE_CURRENT
        ):
            logger.debug(
                "Read Cellranger >= v3 results "
                f"{atlas_info[checkatlas.ATLAS_PATH_KEY]}"
            )
            adata = cellranger.read_cellranger_current(atlas_info)
        elif (
            atlas_info[checkatlas.ATLAS_TYPE_KEY]
            == cellranger.CELLRANGER_TYPE_OBSOLETE
        ):
            logger.debug(
                "Read Cellranger < v3 results "
                f"{atlas_info[checkatlas.ATLAS_PATH_KEY]}"
            )
            adata = cellranger.read_cellranger_obsolete(atlas_info)
        else:
            logger.debug(
                f"Read Scanpy file {atlas_info[checkatlas.ATLAS_PATH_KEY]}"
            )
            adata = sc.read_h5ad(atlas_info[checkatlas.ATLAS_PATH_KEY])
        return adata
    except _io.utils.AnnDataReadError:
        logger.warning(
            "AnnDataReadError, cannot read: "
            f"{atlas_info[checkatlas.ATLAS_PATH_KEY]}"
        )
        return dict()

checkatlas.seurat

check_seurat_install()

Check if Seurat is installed, run installation if not

Source code in checkatlas/seurat.py
def check_seurat_install() -> None:
    """Check if Seurat is installed, run installation if not"""
    # import R's utility package
    utils = rpackages.importr("utils")
    # select a mirror for R packages
    utils.chooseCRANmirror(ind=1)  # select the first mirror in the list
    # R package names
    packnames = ("Seurat", "SeuratObject")
    # Selectively install what needs to be install.
    # We are fancy, just because we can.
    names_to_install = [x for x in packnames if not rpackages.isinstalled(x)]
    if len(names_to_install) > 0:
        # create personal library
        rcode = """dir.create(Sys.getenv("R_LIBS_USER"), recursive = TRUE)"""
        robjects.r(rcode)
        # add to the path
        rcode = """.libPaths(Sys.getenv("R_LIBS_USER"))"""
        robjects.r(rcode)
        logger.debug(f"Set Rlibpaths: {robjects.r(rcode)}")
        utils.install_packages(StrVector(names_to_install))

create_anndata_table(seurat, atlas_info, args=<class 'argparse.Namespace'>)

Create a table with all AnnData-like arguments in Seurat object :param seurat: :param atlas_name: :param atlas_path: :return:

Source code in checkatlas/seurat.py
def create_anndata_table(
    seurat: RS4, atlas_info: dict, args=argparse.Namespace
) -> None:
    """
    Create a table with all AnnData-like arguments in Seurat object
    :param seurat:
    :param atlas_name:
    :param atlas_path:
    :return:
    """
    atlas_name = atlas_info[checkatlas.ATLAS_NAME_KEY]
    logger.debug(f"Create Adata table for {atlas_name}")
    csv_path = os.path.join(
        folders.get_folder(args.path, folders.ANNDATA),
        atlas_name + checkatlas.TSV_EXTENSION,
    )
    # Create AnnData table
    header = ["atlas_obs", "obsm", "var", "varm", "uns"]
    df_summary = pd.DataFrame(index=[atlas_name], columns=header)

    # Create r_functions
    r_obs = robjects.r(
        "obs <- function(seurat){ return(colnames(seurat@meta.data))}"
    )
    r_obsm = robjects.r(
        "f<-function(seurat){return(names(seurat@reductions))}"
    )
    r_uns = robjects.r(
        "uns <- function(seurat){ return(colnames(seurat@misc))}"
    )

    obs_list = r_obs(seurat)
    obsm_list = r_obsm(seurat)
    var_list = [""]
    varm_list = [""]
    uns_list = [""]
    if not isinstance(r_uns(seurat), NULLType):
        uns_list = r_uns(seurat)

    df_summary["atlas_obs"][atlas_name] = (
        "<code>" + "</code><br><code>".join(obs_list) + "</code>"
    )
    df_summary["obsm"][atlas_name] = (
        "<code>" + "</code><br><code>".join(obsm_list) + "</code>"
    )
    df_summary["var"][atlas_name] = (
        "<code>" + "</code><br><code>".join(var_list) + "</code>"
    )
    df_summary["varm"][atlas_name] = (
        "<code>" + "</code><br><code>".join(varm_list) + "</code>"
    )
    df_summary["uns"][atlas_name] = (
        "<code>" + "</code><br><code>".join(uns_list) + "</code>"
    )
    df_summary.to_csv(csv_path, index=False, quoting=False, sep="\t")

create_metric_annot(seurat, atlas_info, args=<class 'argparse.Namespace'>)

Calc annotation metrics :param adata: :param atlas_path: :param atlas_info: :param args: :return:

Source code in checkatlas/seurat.py
def create_metric_annot(
    seurat: RS4, atlas_info: dict, args=argparse.Namespace
) -> None:
    """
    Calc annotation metrics
    :param adata:
    :param atlas_path:
    :param atlas_info:
    :param args:
    :return:
    """
    atlas_name = atlas_info[checkatlas.ATLAS_NAME_KEY]
    csv_path = os.path.join(
        folders.get_folder(args.path, folders.ANNOTATION),
        atlas_name + checkatlas.TSV_EXTENSION,
    )
    header = ["Annot_Sample", "Reference", "obs"] + args.metric_annot
    df_annot = pd.DataFrame(columns=header)
    obs_keys = get_viable_obs_annot(seurat, args)
    if len(obs_keys) > 1:
        logger.debug(f"Calc annotation metrics for {atlas_name}")
        if len(obs_keys) != 0:
            ref_obs = obs_keys[0]
            for i in range(1, len(obs_keys)):
                obs_key = obs_keys[i]
                dict_line = {
                    "Annot_Sample": [atlas_name + "_" + obs_key],
                    "Reference": [ref_obs],
                    "obs": [obs_key],
                }
                for metric in args.metric_annot:
                    logger.debug(
                        f"Calc {metric} for {atlas_name} "
                        f"with obs {obs_key} vs ref_obs {ref_obs}"
                    )
                    metric_value = metrics.calc_metric_annot_seurat(
                        metric, seurat, obs_key, ref_obs
                    )
                    dict_line[metric] = metric_value
                df_line = pd.DataFrame(dict_line)
                df_annot = pd.concat(
                    [df_annot, df_line], ignore_index=True, axis=0
                )
            df_annot.to_csv(csv_path, index=False, sep="\t")
    else:
        logger.debug(f"No viable obs_key was found for {atlas_name}")

create_metric_cluster(seurat, atlas_info, args=<class 'argparse.Namespace'>)

Calc clustering metrics :param seurat: :param atlas_path: :param atlas_info: :param args: :return:

Source code in checkatlas/seurat.py
def create_metric_cluster(
    seurat: RS4, atlas_info: dict, args=argparse.Namespace
) -> None:
    """
    Calc clustering metrics
    :param seurat:
    :param atlas_path:
    :param atlas_info:
    :param args:
    :return:
    """
    atlas_name = atlas_info[checkatlas.ATLAS_NAME_KEY]
    csv_path = os.path.join(
        folders.get_folder(args.path, folders.CLUSTER),
        atlas_name + checkatlas.TSV_EXTENSION,
    )
    header = ["Clust_Sample", "obs"] + args.metric_cluster
    df_cluster = pd.DataFrame(columns=header)
    obs_keys = get_viable_obs_annot(seurat, args)
    obsm_key_representation = "umap"
    if len(obs_keys) > 0:
        logger.debug(f"Calc clustering metrics for {atlas_name}")
        for obs_key in obs_keys:
            dict_line = {
                "Clust_Sample": [atlas_name + "_" + obs_key],
                "obs": [obs_key],
            }
            for metric in args.metric_cluster:
                logger.debug(
                    f"Calc {metric} for {atlas_name} "
                    f"with obs {obs_key} and obsm {obsm_key_representation}"
                )
                metric_value = metrics.calc_metric_cluster_seurat(
                    metric, seurat, obs_key, obsm_key_representation
                )
                dict_line[metric] = metric_value
            df_line = pd.DataFrame(dict_line)
            df_cluster = pd.concat(
                [df_cluster, df_line], ignore_index=True, axis=0
            )
        df_cluster.to_csv(csv_path, index=False, sep="\t")
    else:
        logger.debug(f"No viable obs_key was found for {atlas_name}")

create_metric_dimred(seurat, atlas_info, args=<class 'argparse.Namespace'>)

Calc dimensionality reduction metrics :param adata: :param atlas_path: :param atlas_info: :param args: :return:

Source code in checkatlas/seurat.py
def create_metric_dimred(
    seurat: RS4, atlas_info: dict, args=argparse.Namespace
) -> None:
    """
    Calc dimensionality reduction metrics
    :param adata:
    :param atlas_path:
    :param atlas_info:
    :param args:
    :return:
    """
    atlas_name = atlas_info[checkatlas.ATLAS_NAME_KEY]
    csv_path = os.path.join(
        folders.get_folder(args.path, folders.DIMRED),
        atlas_name + checkatlas.TSV_EXTENSION,
    )
    header = ["Dimred_Sample", "obsm"] + args.metric_dimred
    df_dimred = pd.DataFrame(columns=header)
    # r_reduction = robjects.r(
    #     "reduc <- function(seurat, obsm_key){"
    #     " return(Embeddings(object = seurat, reduction = obsm_key))}"
    # )
    obsm_keys = get_viable_obsm(seurat, args)
    if len(obsm_keys) > 0:
        logger.debug(f"Calc dim red metrics for {atlas_name}")
        for obsm_key in obsm_keys:
            dict_line = {
                "Dimred_Sample": [atlas_name + "_" + obsm_key],
                "obsm": [obsm_key],
            }
            for metric in args.metric_dimred:
                logger.debug(
                    f"Calc {metric} for {atlas_name} with obsm {obsm_key}"
                )
                # r_countmatrix = robjects.r(
                #     "mat <- function(seurat)
                #     { return(seurat@assays$RNA@counts)}"
                # )
                # high_dim_counts = ro.conversion.rpy2py(r_countmatrix(seurat))
                # low_dim_counts = ro.conversion.rpy2py(
                #    r_reduction(seurat, obsm_key)
                # )
                # metric_value = metrics.calc_metric_dimred(
                # metric, high_dim_counts, low_dim_counts)
                logger.warning(
                    "!!! Dim reduction metrics not available for Seurat"
                    " at the moment !!!"
                )
                # metric_value = -1
                # dict_line[metric] = str(metric_value)
            df_line = pd.DataFrame(dict_line)
            df_dimred = pd.concat(
                [df_dimred, df_line], ignore_index=True, axis=0
            )
        df_dimred.to_csv(csv_path, index=False, sep="\t")
    else:
        logger.debug(f"No viable obsm_key was found for {atlas_name}")

create_qc_plots(seurat, atlas_info, args=<class 'argparse.Namespace'>)

Display the atlas QC Search for the OBS variable which correspond to the toal_RNA, total_UMI, MT_ratio, RT_ratio :param path: :param adata: :param atlas_name: :param atlas_path: :return:

Source code in checkatlas/seurat.py
def create_qc_plots(
    seurat: RS4, atlas_info: dict, args=argparse.Namespace
) -> None:
    """
    Display the atlas QC
    Search for the OBS variable which correspond to the toal_RNA, total_UMI,
     MT_ratio, RT_ratio
    :param path:
    :param adata:
    :param atlas_name:
    :param atlas_path:
    :return:
    """
    atlas_name = atlas_info[checkatlas.ATLAS_NAME_KEY]
    qc_path = os.path.join(
        folders.get_folder(args.path, folders.QC_FIG),
        atlas_name + checkatlas.QC_FIG_EXTENSION,
    )
    logger.debug(f"Create QC violin plot for {atlas_name}")
    importr("ggplot2")
    r_cmd = (
        "vln_plot <- function(seurat, obs, qc_path){"
        "vln <- VlnPlot(seurat, features = obs, ncol = length(obs));"
        "ggsave(qc_path, vln, width = 10, "
        "height = 4, dpi = 150)}"
    )
    r_violin = robjects.r(r_cmd)
    obs_keys = list(SEURAT_TO_SCANPY_OBS.keys())
    r_obs = robjects.StrVector(obs_keys)
    r_violin(seurat, r_obs, qc_path)

create_qc_tables(seurat, atlas_info, args=<class 'argparse.Namespace'>)

Display the atlas QC of seurat Search for the metadata variable which correspond to the total_RNA, total_UMI, MT_ratio, RT_ratio :param path: :param adata: :param atlas_name: :param atlas_path: :return:

Source code in checkatlas/seurat.py
def create_qc_tables(
    seurat: RS4, atlas_info: dict, args=argparse.Namespace
) -> None:
    """
    Display the atlas QC of seurat
    Search for the metadata variable which correspond
    to the total_RNA, total_UMI, MT_ratio, RT_ratio
    :param path:
    :param adata:
    :param atlas_name:
    :param atlas_path:
    :return:
    """
    atlas_name = atlas_info[checkatlas.ATLAS_NAME_KEY]
    qc_path = os.path.join(
        folders.get_folder(args.path, folders.QC),
        atlas_name + checkatlas.TSV_EXTENSION,
    )
    logger.debug(f"Create QC tables for {atlas_name}")
    obs_keys = get_viable_obs_qc(seurat, args)
    r_meta = robjects.r("obs <- function(seurat){ return(seurat@meta.data)}")
    r_metadata = r_meta(seurat)
    with (ro.default_converter + pandas2ri.converter).context():
        df_metadata = ro.conversion.get_conversion().rpy2py(r_metadata)
        df_annot = df_metadata[obs_keys]
        # rename columns with scanpy names
        new_columns = list()
        for column in df_annot.columns:
            new_columns.append(SEURAT_TO_SCANPY_OBS[column])
        df_annot.columns = new_columns

        # Rank cell by qc metric
        for header in df_annot.columns:
            if header != atlas.CELLINDEX_HEADER:
                new_header = f"cellrank_{header}"
                df_annot = df_annot.sort_values(header, ascending=False)
                df_annot.loc[:, [new_header]] = range(1, len(df_annot) + 1)

        # Sample QC table when more cells than args.plot_celllimit are present
        df_annot = atlas.atlas_sampling(df_annot, "QC", args)
        df_annot.loc[:, [atlas.CELLINDEX_HEADER]] = range(1, len(df_annot) + 1)
        df_annot.to_csv(qc_path, index=False, quoting=False, sep="\t")

create_summary_table(seurat, atlas_info, args=<class 'argparse.Namespace'>)

Create a table with all interesting variables :param seurat: :param atlas_name: :param csv_path: :return:

Source code in checkatlas/seurat.py
def create_summary_table(
    seurat: RS4, atlas_info: dict, args=argparse.Namespace
) -> None:
    """
    Create a table with all interesting variables
    :param seurat:
    :param atlas_name:
    :param csv_path:
    :return:
    """
    atlas_name = atlas_info[checkatlas.ATLAS_NAME_KEY]
    logger.debug(f"Create Summary table for {atlas_name}")
    csv_path = os.path.join(
        folders.get_folder(args.path, folders.SUMMARY),
        atlas_name + checkatlas.TSV_EXTENSION,
    )
    # Create summary table
    header = [
        "AtlasFileType",
        "NbCells",
        "NbGenes",
        "AnnData.raw",
        "AnnData.X",
        "File_extension",
        "File_path",
    ]
    r_nrow = robjects.r["nrow"]
    r_ncol = robjects.r["ncol"]
    ncells = r_ncol(seurat)[0]
    ngenes = r_nrow(seurat)[0]
    x_raw = False
    x_norm = True
    df_summary = pd.DataFrame(index=[atlas_name], columns=header)
    df_summary["AtlasFileType"][atlas_name] = atlas_info[
        checkatlas.ATLAS_TYPE_KEY
    ]
    df_summary["NbCells"][atlas_name] = ncells
    df_summary["NbGenes"][atlas_name] = ngenes
    df_summary["AnnData.raw"][atlas_name] = x_raw
    df_summary["AnnData.X"][atlas_name] = x_norm
    df_summary["File_extension"][atlas_name] = atlas_info[
        checkatlas.ATLAS_EXTENSION_KEY
    ]
    df_summary["File_path"][atlas_name] = atlas_info[checkatlas.ATLAS_PATH_KEY]
    df_summary.to_csv(csv_path, index=False, sep="\t")

create_tsne_fig(seurat, atlas_info, args=<class 'argparse.Namespace'>)

Display the TSNE of celltypes Search for the OBS variable which correspond to the celltype annotation :param path: :param adata: :param atlas_name: :param atlas_path: :return:

Source code in checkatlas/seurat.py
def create_tsne_fig(
    seurat: RS4, atlas_info: dict, args=argparse.Namespace
) -> None:
    """
    Display the TSNE of celltypes
    Search for the OBS variable which correspond to the celltype annotation
    :param path:
    :param adata:
    :param atlas_name:
    :param atlas_path:
    :return:
    """
    atlas_name = atlas_info[checkatlas.ATLAS_NAME_KEY]
    # Search if tsne reduction exists
    r = re.compile(".*tsne*.")
    r_names = robjects.r["names"]
    obsm_list = r_names(seurat)
    importr("ggplot2")
    if len(list(filter(r.match, obsm_list))) > 0:
        logger.debug(f"Create t-SNE figure for {atlas_name}")
        # Setting up figures directory
        tsne_path = os.path.join(
            folders.get_folder(args.path, folders.TSNE),
            atlas_name + checkatlas.TSNE_EXTENSION,
        )
        # Exporting tsne
        obs_keys = get_viable_obs_annot(seurat, args)
        r_cmd = (
            "tsne <- function(seurat, obs_key, tsne_path){"
            "tsne_plot <- DimPlot(seurat, group.by = obs_key, "
            'reduction = "tsne");'
            "ggsave(tsne_path, tsne_plot, width = 10, "
            "height = 6, dpi = 76)}"
        )
        r_tsne = robjects.r(r_cmd)
        r_tsne(seurat, obs_keys[0], tsne_path)

create_umap_fig(seurat, atlas_info, args=<class 'argparse.Namespace'>)

Display the UMAP of celltypes Search for the OBS variable which correspond to the celltype annotation :param path: :param adata: :param atlas_name: :param atlas_path: :return:

Source code in checkatlas/seurat.py
def create_umap_fig(
    seurat: RS4, atlas_info: dict, args=argparse.Namespace
) -> None:
    """
    Display the UMAP of celltypes
    Search for the OBS variable which correspond to the celltype annotation
    :param path:
    :param adata:
    :param atlas_name:
    :param atlas_path:
    :return:
    """
    atlas_name = atlas_info[checkatlas.ATLAS_NAME_KEY]
    # Search if tsne reduction exists
    r = re.compile(".*umap*.")
    r_names = robjects.r["names"]
    obsm_list = r_names(seurat)
    importr("ggplot2")
    if len(list(filter(r.match, obsm_list))) > 0:
        logger.debug(f"Create UMAP figure for {atlas_name}")
        # Setting up figures directory
        umap_path = os.path.join(
            folders.get_folder(args.path, folders.UMAP),
            atlas_name + checkatlas.UMAP_EXTENSION,
        )
        # Exporting umap
        obs_keys = get_viable_obs_annot(seurat, args)
        r_cmd = (
            "umap <- function(seurat, obs_key, umap_path){"
            "umap_plot <- DimPlot(seurat, group.by = obs_key, "
            'reduction = "umap");'
            "ggsave(umap_path, umap_plot, width = 10, "
            "height = 6, dpi = 76)}"
        )
        r_umap = robjects.r(r_cmd)
        r_umap(seurat, obs_keys[0], umap_path)

get_viable_obs_annot(seurat, args)

Search in obs_keys a match to OBS_CLUSTERS values ! Remove obs_key with only one category ! Extract sorted obs_keys in same order then OBS_CLUSTERS

Parameters:
  • seurat (RS4) – description

  • args (argparse.Namespace) – description

Returns:
  • listdescription

Source code in checkatlas/seurat.py
def get_viable_obs_annot(seurat: RS4, args: argparse.Namespace) -> list:
    """
    Search in obs_keys a match to OBS_CLUSTERS values
    ! Remove obs_key with only one category !
    Extract sorted obs_keys in same order then OBS_CLUSTERS

    Args:
        seurat (RS4): _description_
        args (argparse.Namespace): _description_

    Returns:
        list: _description_
    """
    obs_keys = list()
    r_obs = robjects.r(
        "obs <- function(seurat){ return(colnames(seurat@meta.data))}"
    )
    obs_key_seurat = r_obs(seurat)
    r_annot = robjects.r(
        "type <- function(seurat, obs_key){ "
        "return(seurat[[obs_key]][[obs_key]])}"
    )
    # Get keys from OBS_CLUSTERS
    for obs_key in obs_key_seurat:
        for obs_key_celltype in args.obs_cluster:
            if obs_key_celltype in obs_key:
                if isinstance(r_annot(seurat, obs_key), FactorVector):
                    obs_keys.append(obs_key)
    # Remove keys with only one category
    obs_keys_final = list()
    for obs_key in obs_keys:
        annotations = r_annot(seurat, obs_key)
        if len(annotations.levels) != 1:
            logger.debug(
                f"Add obs_key {obs_key} with cat {annotations.levels}"
            )
            obs_keys_final.append(obs_key)
    return sorted(obs_keys_final)

get_viable_obs_qc(seurat, args)

Search in obs_keys a match to OBS_QC values Extract sorted obs_keys in same order then OBS_QC

Parameters:
  • seurat (RS4) – description

  • args (argparse.Namespace) – description

Returns:
  • listdescription

Source code in checkatlas/seurat.py
def get_viable_obs_qc(seurat: RS4, args: argparse.Namespace) -> list:
    """
    Search in obs_keys a match to OBS_QC values
    Extract sorted obs_keys in same order then OBS_QC

    Args:
        seurat (RS4): _description_
        args (argparse.Namespace): _description_

    Returns:
        list: _description_
    """
    r_obs = robjects.r(
        "obs <- function(seurat){ return(colnames(seurat@meta.data))}"
    )
    obs_keys = list()
    for obs_qc in args.qc_display:
        obs_qc = SCANPY_TO_SEURAT_OBS[obs_qc]
        if obs_qc in r_obs(seurat):
            obs_keys.append(obs_qc)
    return obs_keys

get_viable_obsm(seurat, args)

Search viable obsm for dimensionality reduction metric calc. ! No filter on osbm is appled for now ! :param seurat: :param args: :return:

Source code in checkatlas/seurat.py
def get_viable_obsm(seurat, args):
    """
    Search viable obsm for dimensionality reduction metric
    calc.
    ! No filter on osbm is appled for now !
    :param seurat:
    :param args:
    :return:
    """
    obsm_keys = list()
    # for obsm_key in adata.obsm_keys():
    #   if obsm_key in args.obsm_dimred:
    r_obsm = robjects.r(
        "f<-function(seurat){return(names(seurat@reductions))}"
    )
    obsm_keys_r = r_obsm(seurat)
    obsm_keys = list()
    for obsm_key in obsm_keys_r:
        print(obsm_key)
        obsm_keys.append(obsm_key)
    logger.debug(f"Add obsm {obsm_keys}")
    return obsm_keys

read_atlas(atlas_info)

Read Seurat object in python using rpy2

Parameters:
  • atlas_path (str) – description

Returns:
  • RS4description

Source code in checkatlas/seurat.py
def read_atlas(atlas_info: dict) -> RS4:
    """Read Seurat object in python using rpy2

    Args:
        atlas_path (str): _description_

    Returns:
        RS4: _description_
    """
    atlas_name = atlas_info[checkatlas.ATLAS_NAME_KEY]
    atlas_path = atlas_info[checkatlas.ATLAS_PATH_KEY]
    logger.info(f"Load {atlas_name} in " f"{atlas_path}")
    rcode = f'readRDS("{atlas_path}")'
    seurat = robjects.r(rcode)
    rclass = robjects.r["class"]
    if rclass(seurat)[0] == "Seurat":
        importr("Seurat")
        return seurat
    else:
        logger.info(f"{atlas_name} is not a Seurat object")
        return None

checkatlas.cellranger

read_cellranger_current(atlas_info)

Read cellranger files.

Load first /outs/filtered_feature_bc_matrix.h5 Then add (if found): - Clustering - PCA- - UMAP - TSNE

Parameters:
  • atlas_path (dict) – info on the atlas

Returns:
  • AnnData – scanpy object from cellranger

Source code in checkatlas/cellranger.py
def read_cellranger_current(atlas_info: dict) -> AnnData:
    """
    Read cellranger files.

    Load first /outs/filtered_feature_bc_matrix.h5
    Then add (if found):
    - Clustering
    - PCA-
    - UMAP
    - TSNE
    Args:
        atlas_path (dict): info on the atlas

    Returns:
        AnnData: scanpy object from cellranger
    """
    cellranger_out_path = os.path.dirname(
        atlas_info[checkatlas.ATLAS_PATH_KEY]
    )
    cellranger_analysis_path = os.path.join(cellranger_out_path, "analysis")
    cellranger_clust_path = os.path.join(
        cellranger_analysis_path, "clustering"
    )
    cellranger_umap_path = os.path.join(cellranger_analysis_path, "umap")
    cellranger_tsne_path = os.path.join(cellranger_analysis_path, "tsne")
    cellranger_pca_path = os.path.join(cellranger_analysis_path, "pca")

    # Search graphclust
    graphclust_path = ""
    for root, dirs, files in os.walk(cellranger_clust_path):
        for dir in dirs:
            if dir.endswith("graphclust"):
                cluster_path = os.path.join(root, dir, "clusters.csv")
                if os.path.exists(cluster_path) and not root.endswith("atac"):
                    graphclust_path = cluster_path
                    break
    # Search kmeans
    kmeans_path = ""
    k_value = 0
    found_kmeans = False
    for root, dirs, files in os.walk(cellranger_clust_path):
        for dir in dirs:
            # Search the highest kmeans = 10
            dir_prefix = "kmeans_10"
            if dir_prefix in dir and not found_kmeans:
                cluster_path = os.path.join(root, dir, "clusters.csv")
                if os.path.exists(cluster_path):
                    kmeans_path = cluster_path
                    k_value = 10
                    found_kmeans = True
                    break
            # Or search the highest kmeans = 5 (for multiome atlas)
            dir_prefix = os.path.join("gex", "kmeans_5")
            if dir_prefix in os.path.join(root, dir) and not found_kmeans:
                cluster_path = os.path.join(root, dir, "clusters.csv")
                if os.path.exists(cluster_path):
                    kmeans_path = cluster_path
                    k_value = 5
                    found_kmeans = True
                    break

    # Search umap
    rna_umap = ""
    for root, dirs, files in os.walk(cellranger_umap_path):
        for file in files:
            if file.endswith("projection.csv") and not root.endswith("atac"):
                rna_umap = os.path.join(root, file)
                break

    # Search t-SNE
    rna_tsne = ""
    for root, dirs, files in os.walk(cellranger_tsne_path):
        for file in files:
            if file.endswith("projection.csv") and not root.endswith("atac"):
                rna_tsne = os.path.join(root, file)
                break

    rna_pca = ""
    for root, dirs, files in os.walk(cellranger_pca_path):
        for file in files:
            if file.endswith("projection.csv") and not root.endswith("atac"):
                rna_pca = os.path.join(root, file)
                break

    # Manage multiome cellranger files
    dim_red_path = os.path.join(
        cellranger_analysis_path, "dimensionality_reduction"
    )
    if os.path.exists(dim_red_path):
        gex_path = os.path.join(dim_red_path, "gex")
        if os.path.exists(gex_path):
            rna_umap = os.path.join(gex_path, "umap_projection.csv")
            rna_tsne = os.path.join(gex_path, "tsne_projection.csv")
            rna_pca = os.path.join(gex_path, "pca_projection.csv")

    # Read 10x h5 file
    adata = sc.read_10x_h5(atlas_info[checkatlas.ATLAS_PATH_KEY])
    adata.var_names_make_unique()

    # Add cluster
    if os.path.exists(graphclust_path):
        df_cluster = pd.read_csv(graphclust_path, index_col=0)
        adata.obs["cellranger_graphclust"] = df_cluster["Cluster"]
    if os.path.exists(kmeans_path):
        df_cluster = pd.read_csv(kmeans_path, index_col=0)
        adata.obs["cellranger_kmeans_" + str(k_value)] = df_cluster["Cluster"]

    # Add reduction
    if os.path.exists(rna_umap):
        df_umap = pd.read_csv(rna_umap, index_col=0)
        adata.obsm["X_umap"] = df_umap
    if os.path.exists(rna_tsne):
        df_tsne = pd.read_csv(rna_tsne, index_col=0)
        adata.obsm["X_tsne"] = df_tsne
    if os.path.exists(rna_pca):
        df_pca = pd.read_csv(rna_pca, index_col=0)
        adata.obsm["X_pca"] = df_pca

    return adata

read_cellranger_obsolete(atlas_info)

Read cellranger files.

Load first /outs/filtered_feature_bc_matrix.h5 Then add (if found): - Clustering - PCA- - UMAP - TSNE

Parameters:
  • atlas_path (dict) – info on the atlas

Returns:
  • AnnData – scanpy object from cellranger

Source code in checkatlas/cellranger.py
def read_cellranger_obsolete(atlas_info: dict) -> AnnData:
    """
    Read cellranger files.

    Load first /outs/filtered_feature_bc_matrix.h5
    Then add (if found):
    - Clustering
    - PCA-
    - UMAP
    - TSNE
    Args:
        atlas_path (dict): info on the atlas

    Returns:
        AnnData: scanpy object from cellranger
    """
    cellranger_path = atlas_info[checkatlas.ATLAS_PATH_KEY].replace(
        CELLRANGER_MATRIX_FILE, ""
    )
    cellranger_out_path = os.path.join(cellranger_path, os.pardir, os.pardir)
    cellranger_analysis_path = os.path.join(
        cellranger_out_path, "analysis_csv"
    )

    cellranger_umap_path = os.path.join(cellranger_analysis_path, "umap")
    cellranger_tsne_path = os.path.join(cellranger_analysis_path, "tsne")
    cellranger_pca_path = os.path.join(cellranger_analysis_path, "pca")
    print(cellranger_out_path)
    print(cellranger_analysis_path)
    print(cellranger_umap_path)

    # Search graphclust
    graphclust_path = ""
    for root, dirs, files in os.walk(cellranger_out_path):
        for dir in dirs:
            if dir.endswith("graphclust"):
                cluster_path = os.path.join(root, dir, "clusters.csv")
                if os.path.exists(cluster_path):
                    graphclust_path = cluster_path
                    break
    # Search kmeans
    kmeans_path = ""
    k_value = 0
    for root, dirs, files in os.walk(cellranger_out_path):
        for dir in dirs:
            if dir.endswith("kmeans"):
                # Search the highest kmeans from 15 to 3
                for k in reversed(range(3, 16)):
                    cluster_path = os.path.join(
                        root, dir, str(k) + "_clusters", "clusters.csv"
                    )
                    if os.path.exists(cluster_path):
                        kmeans_path = cluster_path
                        k_value = k
                        break

    rna_umap = os.path.join(cellranger_umap_path, "projection.csv")
    rna_tsne = os.path.join(cellranger_tsne_path, "projection.csv")
    rna_pca = os.path.join(cellranger_pca_path, "projection.csv")

    # get matrix folder
    matrix_folder = os.path.dirname(atlas_info[checkatlas.ATLAS_PATH_KEY])
    adata = sc.read_10x_mtx(matrix_folder)
    adata.var_names_make_unique()

    # Add cluster
    if os.path.exists(graphclust_path):
        df_cluster = pd.read_csv(graphclust_path, index_col=0)
        adata.obs["cellranger_graphclust"] = df_cluster["Cluster"]
    if os.path.exists(kmeans_path):
        df_cluster = pd.read_csv(kmeans_path, index_col=0)
        adata.obs["cellranger_kmeans_" + str(k_value)] = df_cluster["Cluster"]

    # Add reduction
    if os.path.exists(rna_umap):
        df_umap = pd.read_csv(rna_umap, index_col=0)
        adata.obsm["X_umap"] = df_umap
    if os.path.exists(rna_tsne):
        df_tsne = pd.read_csv(rna_tsne, index_col=0)
        if len(df_tsne) == len(adata):
            adata.obsm["X_tsne"] = df_tsne
    if os.path.exists(rna_pca):
        df_pca = pd.read_csv(rna_pca, index_col=0)
        if len(df_pca) == len(adata):
            adata.obsm["X_pca"] = df_pca
    return adata

checkatlas.metrics.metrics

annotation_to_num(annotation, ref_annotation)

Transforms the annotations from categorical to numerical

Parameters

adata partition_key reference

Returns
Source code in checkatlas/metrics/metrics.py
def annotation_to_num(annotation, ref_annotation):
    """
    Transforms the annotations from categorical to numerical

    Parameters
    ----------
    adata
    partition_key
    reference

    Returns
    -------

    """
    annotation = annotation.to_numpy()
    ref_annotation = ref_annotation.to_numpy()
    le = LabelEncoder()
    le.fit(annotation)
    annotation = le.transform(annotation)
    le2 = LabelEncoder()
    le2.fit(ref_annotation)
    ref_annotation = le2.transform(ref_annotation)
    return annotation, ref_annotation

checkatlas.utils.folders

checkatlas_folders(path)

Check in path if the different checkatlas folders exists.
Create them if needed. All folders are given by DICT_FOLDER

Parameters:
  • path (str) – Search path for atlas given by user

Returns:
  • None – None

Source code in checkatlas/utils/folders.py
def checkatlas_folders(path: str) -> None:
    """Check in path if the different checkatlas folders exists.<br>
    Create them if needed.
    All folders are given by DICT_FOLDER

    Args:
        path (str): Search path for atlas given by user

    Returns:
        None: None
    """
    global_path = get_workingdir(path)
    if not os.path.exists(global_path):
        os.mkdir(global_path)

    for key_folder in DICT_FOLDER.keys():
        temp_path = os.path.join(global_path, key_folder)
        if not os.path.exists(temp_path):
            logger.debug(f"Create folder: {temp_path}")
            os.mkdir(temp_path)

get_folder(path, key_folder)

Get the folder path giving the search path and the folder key in DICT_FOLDER

Parameters:
  • path (str) – Search path for atlas given by user

  • key_folder (str) – key folder in the DICT_FOLDER example: ANNDATA, SUMMARY, UMAP

Returns:
  • str – the folder path

Source code in checkatlas/utils/folders.py
def get_folder(path: str, key_folder: str) -> str:
    """Get the folder path giving the search path and
    the folder key in DICT_FOLDER

    Args:
        path (str): Search path for atlas given by user
        key_folder (str): key folder in the DICT_FOLDER
            example: ANNDATA, SUMMARY, UMAP

    Returns:
        str: the folder path
    """
    return os.path.join(get_workingdir(path), DICT_FOLDER[key_folder])

get_workingdir(path)

Return the working_dir = path of search + working_dir with working_dir = checkatlas_files/

Parameters:
  • path (str) – Search path for atlas given by user

Returns:
  • str – os.path.join(path, working_dir)

Source code in checkatlas/utils/folders.py
def get_workingdir(path: str) -> str:
    """Return the working_dir = path of search
    + working_dir
    with working_dir = checkatlas_files/

    Args:
        path (str): Search path for atlas given by user

    Returns:
        str: os.path.join(path, working_dir)
    """
    return os.path.join(path, WORKING_DIR)

checkatlas.utils.files

get_file_path(atlas_name, folder, extension, path)

summary

Parameters:
  • atlas_name (str) – description

  • args (argparse.Namespace) – description

Returns:
  • strdescription

Source code in checkatlas/utils/files.py
def get_file_path(
    atlas_name: str, folder: str, extension: str, path: str
) -> str:
    """_summary_

    Args:
        atlas_name (str): _description_
        args (argparse.Namespace): _description_

    Returns:
        str: _description_
    """
    csv_path = os.path.join(
        folders.get_folder(path, folder),
        atlas_name + extension,
    )
    return csv_path

checkatlas.utils.checkatlas_arguments

get_version()

Get version of checkatlas from checkatlas/VERSION file :return: checkatlas version

Source code in checkatlas/utils/checkatlas_arguments.py
def get_version():
    """
    Get version of checkatlas from checkatlas/VERSION file
    :return: checkatlas version
    """
    version_file = files(__package__).joinpath("VERSION")
    with open(version_file) as file:
        version = file.readline()
        return version