checkatlas
checkatlas.checkatlas
checkatlas.atlas
atlas_sampling(df_annot, type_df, args)
If args.plot_celllimit != 0 and args.plot_celllimit < len(df_annot) The atlas qC table will be sampled for MultiQC
Parameters: |
|
---|
Returns: |
|
---|
Source code in checkatlas/atlas.py
def atlas_sampling(
df_annot: pd.DataFrame, type_df: str, args: argparse.Namespace
) -> pd.DataFrame:
"""
If args.plot_celllimit != 0 and args.plot_celllimit < len(df_annot)
The atlas qC table will be sampled for MultiQC
Args:
df_annot (pd.DataFrame): Table to sample
type_df (str): type of table
args (argparse.Namespace): arguments of checkatlas workflow
Returns:
pd.DataFrame: Sampled QC table
"""
if args.plot_celllimit != 0 and args.plot_celllimit < len(df_annot):
logger.debug(f"Sample {type_df} table with {len(df_annot)} cells")
df_annot = df_annot.sample(args.plot_celllimit)
logger.debug(f"{type_df} table sampled to {len(df_annot)} cells")
return df_annot
clean_scanpy_atlas(adata, atlas_info)
Clean the Scanpy object to be sure to get all information out of it
- Make var names unique
- Make var unique for Raw matrix
- If OBS_CLUSTERS are present and in int32 -> be sure to transform them in categorical
Parameters: |
|
---|
Returns: |
|
---|
Source code in checkatlas/atlas.py
def clean_scanpy_atlas(adata: AnnData, atlas_info: dict) -> AnnData:
"""
Clean the Scanpy object to be sure to get all information out of it
- Make var names unique
- Make var unique for Raw matrix
- If OBS_CLUSTERS are present and in int32 -> be sure to
transform them in categorical
Args:
adata (AnnData): atlas to analyse
atlas_info (dict): info on the atlas
Returns:
AnnData: cleaned atlas
"""
logger.debug(f"Clean scanpy: {atlas_info[checkatlas.ATLAS_NAME_KEY]}")
# Make var names unique
list_var = adata.var_names
if len(set(list_var)) == len(list_var):
logger.debug("Var names unique")
else:
logger.debug(
"Var names not unique, ran : adata.var_names_make_unique()"
)
adata.var_names_make_unique()
# Test a second time if it is unique (sometimes it helps)
list_var = adata.var_names
if len(set(list_var)) == len(list_var):
logger.debug("Var names unique")
else:
logger.debug(
"Var names not unique, ran : adata.var_names_make_unique()"
)
adata.var_names_make_unique()
# If it is still not unique, create unique var_names "by hand"
list_var = adata.var_names
if len(set(list_var)) == len(list_var):
logger.debug("Var names unique")
else:
logger.debug(
"Var names not unique, ran : adata.var_names_make_unique()"
)
adata.var.index = [
x + "_" + str(i)
for i, x in zip(range(len(adata.var)), adata.var_names)
]
list_var = adata.var_names
if len(set(list_var)) == len(list_var):
logger.debug("Var names unique")
# Make var unique for Raw matrix
if adata.raw is not None:
list_var = adata.raw.var_names
if len(set(list_var)) == len(list_var):
logger.debug("Var names for Raw unique, transform ")
else:
logger.debug("Var names for Raw not unique")
adata.raw.var.index = [
x + "_" + str(i)
for i, x in zip(range(len(adata.raw.var)), adata.raw.var_names)
]
list_var = adata.raw.var_names
if len(set(list_var)) == len(list_var):
logger.debug("Var names for Raw unique")
# If OBS_CLUSTERS are present and in int32 -> be sure to
# transform them in categorical
for obs_key in adata.obs_keys():
for obs_key_celltype in OBS_CLUSTERS:
if obs_key_celltype in obs_key:
if (
adata.obs[obs_key].dtype == np.int32
or adata.obs[obs_key].dtype == np.int64
):
adata.obs[obs_key] = pd.Categorical(adata.obs[obs_key])
return adata
create_anndata_table(adata, atlas_info, args)
Create an html table with all AnnData arguments The html code will make all elements of the table visible in MultiQC
Parameters: |
|
---|
Source code in checkatlas/atlas.py
def create_anndata_table(
adata: AnnData, atlas_info: dict, args: argparse.Namespace
) -> None:
"""
Create an html table with all AnnData arguments
The html code will make all elements of the table visible in MultiQC
Args:
adata (AnnData): atlas to analyse
atlas_info (dict): info on the atlas
args (argparse.Namespace): list of arguments from checkatlas workflow
"""
atlas_name = atlas_info[checkatlas.ATLAS_NAME_KEY]
logger.debug(f"Create Adata table for {atlas_name}")
csv_path = files.get_file_path(
atlas_name, folders.ANNDATA, checkatlas.TSV_EXTENSION, args.path
)
# Create AnnData table
header = ["atlas_obs", "obsm", "var", "varm", "uns"]
df_summary = pd.DataFrame(index=[atlas_name], columns=header)
# html_element = "<span class=\"label label-primary\">"
# new_line = ''
# for value in list(adata.obs.columns):
# new_line += html_element + value + "</span><br>"
# print(new_line)
df_summary["atlas_obs"][atlas_name] = (
"<code>"
+ "</code><br><code>".join(list(adata.obs.columns))
+ "</code>"
)
df_summary["obsm"][atlas_name] = (
"<code>"
+ "</code><br><code>".join(list(adata.obsm_keys()))
+ "</code>"
)
df_summary["var"][atlas_name] = (
"<code>" + "</code><br><code>".join(list(adata.var_keys())) + "</code>"
)
df_summary["varm"][atlas_name] = (
"<code>"
+ "</code><br><code>".join(list(adata.varm_keys()))
+ "</code>"
)
df_summary["uns"][atlas_name] = (
"<code>" + "</code><br><code>".join(list(adata.uns_keys())) + "</code>"
)
df_summary.to_csv(csv_path, index=False, quoting=False, sep="\t")
create_metric_annot(adata, atlas_info, args)
Calc annotation metrics
Parameters: |
|
---|
Source code in checkatlas/atlas.py
def create_metric_annot(
adata: AnnData, atlas_info: dict, args: argparse.Namespace
) -> None:
"""
Calc annotation metrics
Args:
adata (AnnData): atlas to analyse
atlas_path (dict): path of the atlas
args (argparse.Namespace): list of arguments from checkatlas workflow
"""
atlas_name = atlas_info[checkatlas.ATLAS_NAME_KEY]
csv_path = files.get_file_path(
atlas_name,
folders.ANNOTATION,
checkatlas.TSV_EXTENSION,
args.path,
)
header = ["Annot_Sample", "Reference", "obs"] + args.metric_annot
df_annot = pd.DataFrame(columns=header)
obs_keys = get_viable_obs_annot(adata, args)
if len(obs_keys) > 1:
logger.debug(f"Calc annotation metrics for {atlas_name}")
ref_obs = obs_keys[0]
for i in range(1, len(obs_keys)):
obs_key = obs_keys[i]
dict_line = {
"Annot_Sample": [atlas_name + "_" + obs_key],
"Reference": [ref_obs],
"obs": [obs_key],
}
for metric in args.metric_annot:
logger.debug(
f"Calc {metric} for {atlas_name} "
f"with obs {obs_key} vs ref_obs {ref_obs}"
)
metric_value = metrics.calc_metric_annot_scanpy(
metric, adata, obs_key, ref_obs
)
dict_line[metric] = metric_value
df_line = pd.DataFrame(dict_line)
df_annot = pd.concat(
[df_annot, df_line], ignore_index=True, axis=0
)
df_annot.to_csv(csv_path, index=False, sep="\t")
else:
logger.debug(f"No viable obs_key was found for {atlas_name}")
create_metric_cluster(adata, atlas_info, args)
Calc clustering metrics
Parameters: |
|
---|
Source code in checkatlas/atlas.py
def create_metric_cluster(
adata: AnnData, atlas_info: dict, args: argparse.Namespace
) -> None:
"""
Calc clustering metrics
Args:
adata (AnnData): atlas to analyse
atlas_info (dict): path of the atlas
args (argparse.Namespace): list of arguments from checkatlas workflow
"""
atlas_name = atlas_info[checkatlas.ATLAS_NAME_KEY]
csv_path = files.get_file_path(
atlas_name,
folders.CLUSTER,
checkatlas.TSV_EXTENSION,
args.path,
)
header = ["Clust_Sample", "obs"] + args.metric_cluster
df_cluster = pd.DataFrame(columns=header)
obs_keys = get_viable_obs_annot(adata, args)
obsm_keys = get_viable_obsm(adata, args)
r = re.compile(".*umap*.")
obsm_umap_keys = list(filter(r.match, obsm_keys))
r = re.compile(".*tsne*.")
obsm_tsne_keys = list(filter(r.match, obsm_keys))
obsm_key_representation = ""
if len(obsm_umap_keys) > 0:
obsm_key_representation = obsm_umap_keys[0]
print("reach", obsm_key_representation)
elif len(obsm_tsne_keys) > 0:
obsm_key_representation = obsm_tsne_keys[0]
print("reach", obsm_key_representation)
if len(obs_keys) > 0:
logger.debug(f"Calc clustering metrics for {atlas_name}")
for obs_key in obs_keys:
dict_line = {
"Clust_Sample": [atlas_name + "_" + obs_key],
"obs": [obs_key],
}
for metric in args.metric_cluster:
logger.debug(
f"Calc {metric} for {atlas_name} "
f"with obs {obs_key} and obsm {obsm_key_representation}"
)
metric_value = metrics.calc_metric_cluster_scanpy(
metric, adata, obs_key, obsm_key_representation
)
dict_line[metric] = metric_value
df_line = pd.DataFrame(dict_line)
df_cluster = pd.concat(
[df_cluster, df_line], ignore_index=True, axis=0
)
df_cluster.to_csv(csv_path, index=False, sep="\t")
else:
logger.debug(f"No viable obs_key was found for {atlas_name}")
create_metric_dimred(adata, atlas_info, args)
Calc dimensionality reduction metrics
Parameters: |
|
---|
Source code in checkatlas/atlas.py
def create_metric_dimred(
adata: AnnData, atlas_info: dict, args: argparse.Namespace
) -> None:
"""
Calc dimensionality reduction metrics
Args:
adata (AnnData): atlas to analyse
atlas_info (dict): path of the atlas
args (argparse.Namespace): list of arguments from checkatlas workflow
"""
atlas_name = atlas_info[checkatlas.ATLAS_NAME_KEY]
csv_path = files.get_file_path(
atlas_name,
folders.DIMRED,
checkatlas.TSV_EXTENSION,
args.path,
)
header = ["Dimred_Sample", "obsm"] + args.metric_dimred
df_dimred = pd.DataFrame(columns=header)
obsm_keys = get_viable_obsm(adata, args)
if len(obsm_keys) > 0:
logger.debug(f"Calc dim red metrics for {atlas_name}")
for obsm_key in obsm_keys:
dict_line = {
"Dimred_Sample": [atlas_name + "_" + obsm_key],
"obsm": [obsm_key],
}
for metric in args.metric_dimred:
logger.debug(
f"Calc {metric} for {atlas_name} with obsm {obsm_key}"
)
metric_value = metrics.calc_metric_dimred(
metric, adata, obsm_key
)
dict_line[metric] = metric_value
df_line = pd.DataFrame(dict_line)
df_dimred = pd.concat(
[df_dimred, df_line], ignore_index=True, axis=0
)
df_dimred.to_csv(csv_path, index=False, sep="\t")
else:
logger.debug(f"No viable obsm_key was found for {atlas_name}")
create_qc_plots(adata, atlas_info, args)
Display the atlas QC plot Search for the OBS variable which correspond to the toal_RNA, total_UMI, MT_ratio, RT_ratio
Parameters: |
|
---|
Source code in checkatlas/atlas.py
def create_qc_plots(
adata: AnnData, atlas_info: dict, args: argparse.Namespace
) -> None:
"""
Display the atlas QC plot
Search for the OBS variable which correspond to the toal_RNA, total_UMI,
MT_ratio, RT_ratio
Args:
adata (AnnData): atlas to analyse
atlas_info (dict): info on the atlas
args (argparse.Namespace): list of arguments from checkatlas workflow
"""
atlas_name = atlas_info[checkatlas.ATLAS_NAME_KEY]
sc.settings.figdir = folders.get_workingdir(args.path)
sc.set_figure_params(dpi_save=80)
qc_path = os.sep + atlas_name + checkatlas.QC_FIG_EXTENSION
logger.debug(f"Create QC violin plot for {atlas_name}")
# mitochondrial genes
adata.var["mt"] = adata.var_names.str.startswith("MT-")
# ribosomal genes
adata.var["ribo"] = adata.var_names.str.startswith(("RPS", "RPL"))
sc.pp.calculate_qc_metrics(
adata,
qc_vars=["mt", "ribo"],
percent_top=None,
log1p=False,
inplace=True,
)
sc.pl.violin(
adata,
[
"n_genes_by_counts",
"total_counts",
"pct_counts_mt",
"pct_counts_ribo",
],
jitter=0.4,
multi_panel=True,
show=False,
save=qc_path,
)
create_qc_tables(adata, atlas_info, args)
Display the atlas QC table Search for the OBS variable which correspond to the toal_RNA, total_UMI, MT_ratio, RT_ratio
Parameters: |
|
---|
Source code in checkatlas/atlas.py
def create_qc_tables(
adata: AnnData, atlas_info: dict, args: argparse.Namespace
) -> None:
"""
Display the atlas QC table
Search for the OBS variable which correspond to the toal_RNA, total_UMI,
MT_ratio, RT_ratio
Args:
adata (AnnData): atlas to analyse
atlas_info (dict): info on the atlas
args (argparse.Namespace): list of arguments from checkatlas workflow
"""
atlas_name = atlas_info[checkatlas.ATLAS_NAME_KEY]
qc_path = files.get_file_path(
atlas_name, folders.QC, checkatlas.TSV_EXTENSION, args.path
)
logger.debug(f"Create QC tables for {atlas_name}")
qc_genes = []
# mitochondrial genes
adata.var["mt"] = adata.var_names.str.startswith("MT-")
if len(adata.var[adata.var["mt"]]) != 0:
qc_genes.append("mt")
logger.debug(f"Mitochondrial genes in {atlas_name} for QC")
else:
logger.debug(f"No mitochondrial genes in {atlas_name} for QC")
# ribosomal genes
adata.var["ribo"] = adata.var_names.str.startswith(("RPS", "RPL"))
if len(adata.var[adata.var["mt"]]) != 0:
qc_genes.append("ribo")
logger.debug(f"Ribosomal genes in {atlas_name} for QC")
else:
logger.debug(f"No ribosomal genes in {atlas_name} for QC")
sc.pp.calculate_qc_metrics(
adata,
qc_vars=qc_genes,
percent_top=None,
log1p=False,
inplace=True,
)
df_annot = adata.obs[get_viable_obs_qc(adata, args)]
# Rank cell by qc metric
for header in df_annot.columns:
if header != CELLINDEX_HEADER:
new_header = f"cellrank_{header}"
df_annot = df_annot.sort_values(header, ascending=False)
df_annot.loc[:, [new_header]] = range(1, adata.n_obs + 1)
# Sample QC table when more cells than args.plot_celllimit are present
df_annot = atlas_sampling(df_annot, "QC", args)
df_annot.loc[:, [CELLINDEX_HEADER]] = range(1, len(df_annot) + 1)
df_annot.to_csv(qc_path, index=False, quoting=False, sep="\t")
create_summary_table(adata, atlas_info, args)
Create a table with all summarizing variables
Parameters: |
|
---|
Source code in checkatlas/atlas.py
def create_summary_table(
adata: AnnData, atlas_info: dict, args: argparse.Namespace
) -> None:
"""
Create a table with all summarizing variables
Args:
adata (AnnData): atlas to analyse
atlas_path (str): path of the atlas
args (argparse.Namespace): list of arguments from checkatlas workflow
"""
atlas_name = atlas_info[checkatlas.ATLAS_NAME_KEY]
atlas_type = atlas_info[checkatlas.ATLAS_TYPE_KEY]
atlas_path = atlas_info[checkatlas.ATLAS_PATH_KEY]
logger.debug(f"Create Summary table for {atlas_name}")
csv_path = files.get_file_path(
atlas_name, folders.SUMMARY, checkatlas.TSV_EXTENSION, args.path
)
# Create summary table
header = [
"AtlasFileType",
"NbCells",
"NbGenes",
"AnnData.raw",
"AnnData.X",
"File_extension",
"File_path",
]
df_summary = pd.DataFrame(index=[atlas_name], columns=header)
df_summary["AtlasFileType"][atlas_name] = atlas_type
df_summary["NbCells"][atlas_name] = adata.n_obs
df_summary["NbGenes"][atlas_name] = adata.n_vars
df_summary["AnnData.raw"][atlas_name] = adata.raw is not None
df_summary["AnnData.X"][atlas_name] = adata.X is not None
df_summary["File_extension"][atlas_name] = atlas_name
df_summary["File_path"][atlas_name] = atlas_path
df_summary.to_csv(csv_path, index=False, sep="\t")
create_tsne_fig(adata, atlas_info, args)
Display the TSNE of celltypes Search for the OBS variable which correspond to the celltype annotation
Parameters: |
|
---|
Source code in checkatlas/atlas.py
def create_tsne_fig(
adata: AnnData, atlas_info: dict, args: argparse.Namespace
) -> None:
"""
Display the TSNE of celltypes
Search for the OBS variable which correspond to the celltype annotation
Args:
adata (AnnData): atlas to analyse
atlas_info (dict): info on the atlas
args (argparse.Namespace): list of arguments from checkatlas workflow
"""
atlas_name = atlas_info[checkatlas.ATLAS_NAME_KEY]
sc.set_figure_params(dpi_save=150)
# Search if tsne reduction exists
obsm_keys = get_viable_obsm(adata, args)
r = re.compile(".*tsne*.")
obsm_tsne_keys = list(filter(r.match, obsm_keys))
if len(obsm_tsne_keys) > 0:
obsm_tsne = obsm_tsne_keys[0]
logger.debug(
f"Create t-SNE figure for {atlas_name} with obsm={obsm_tsne}"
)
# Set the t-sne to display
if isinstance(adata.obsm[obsm_tsne], pd.DataFrame):
# Transform to numpy if it is a pandas dataframe
adata.obsm["X_tsne"] = adata.obsm[obsm_tsne].to_numpy()
else:
adata.obsm["X_tsne"] = adata.obsm[obsm_tsne]
# Setting up figures directory
sc.settings.figdir = sc.settings.figdir = folders.get_workingdir(
args.path
)
tsne_path = os.sep + atlas_name + checkatlas.TSNE_EXTENSION
# Exporting tsne
obs_keys = get_viable_obs_annot(adata, args)
if len(obs_keys) != 0:
sc.pl.tsne(adata, color=obs_keys[0], show=False, save=tsne_path)
else:
sc.pl.tsne(adata, show=False, save=tsne_path)
create_umap_fig(adata, atlas_info, args)
Display the UMAP of celltypes Search for the OBS variable which correspond to the celltype annotation
Parameters: |
|
---|
Source code in checkatlas/atlas.py
def create_umap_fig(
adata: AnnData, atlas_info: dict, args: argparse.Namespace
) -> None:
"""
Display the UMAP of celltypes
Search for the OBS variable which correspond to the celltype annotation
Args:
adata (AnnData): atlas to analyse
atlas_info (dict): info on the atlas
args (argparse.Namespace): list of arguments from checkatlas workflow
"""
atlas_name = atlas_info[checkatlas.ATLAS_NAME_KEY]
sc.set_figure_params(dpi_save=150)
# Search if umap reduction exists
obsm_keys = get_viable_obsm(adata, args)
r = re.compile(".*umap*.")
obsm_umap_keys = list(filter(r.match, obsm_keys))
if len(obsm_umap_keys) > 0:
obsm_umap = obsm_umap_keys[0]
logger.debug(
f"Create UMAP figure for {atlas_name} with obsm={obsm_umap}"
)
# Set the umap to display
if isinstance(adata.obsm[obsm_umap], pd.DataFrame):
# Transform to numpy if it is a pandas dataframe
adata.obsm["X_umap"] = adata.obsm[obsm_umap].to_numpy()
else:
adata.obsm["X_umap"] = adata.obsm[obsm_umap]
# Setting up figures directory
sc.settings.figdir = folders.get_workingdir(args.path)
umap_path = os.sep + atlas_name + checkatlas.UMAP_EXTENSION
# Exporting umap
obs_keys = get_viable_obs_annot(adata, args)
if len(obs_keys) != 0:
sc.pl.umap(adata, color=obs_keys[0], show=False, save=umap_path)
else:
sc.pl.umap(adata, show=False, save=umap_path)
get_viable_obs_annot(adata, args)
Search in obs_keys a match to OBS_CLUSTERS values ! Remove obs_key with only one category ! Extract sorted obs_keys in same order then OBS_CLUSTERS
Parameters: |
|
---|
Returns: |
|
---|
Source code in checkatlas/atlas.py
def get_viable_obs_annot(adata: AnnData, args: argparse.Namespace) -> list:
"""
Search in obs_keys a match to OBS_CLUSTERS values
! Remove obs_key with only one category !
Extract sorted obs_keys in same order then OBS_CLUSTERS
Args:
adata (AnnData): atlas to analyse
args (argparse.Namespace): list of arguments from checkatlas workflow
Returns:
list: obs_keys
"""
obs_keys = list()
# Get keys from OBS_CLUSTERS
for obs_key in adata.obs_keys():
for obs_key_celltype in args.obs_cluster:
if obs_key_celltype in obs_key:
if type(adata.obs[obs_key].dtype) == pd.CategoricalDtype:
obs_keys.append(obs_key)
# Remove keys with only one category and no NaN in the array
obs_keys_final = list()
for obs_key in obs_keys:
annotations = adata.obs[obs_key]
if not _object_dtype_isnan(annotations).any():
categories_temp = annotations.cat.categories
# remove nan if found
categories = categories_temp.dropna()
if True in categories.isin(["nan"]):
index = categories.get_loc("nan")
categories = categories.delete(index)
# Add obs_key with more than one category (with Nan removed)
if len(categories) != 1:
logger.debug(
f"Add obs_key {obs_key} with cat {categories_temp}"
)
obs_keys_final.append(obs_key)
return sorted(obs_keys_final)
get_viable_obs_qc(adata, args)
Search in obs_keys a match to OBS_QC values Extract sorted obs_keys in same order then OBS_QC
Parameters: |
|
---|
Returns: |
|
---|
Source code in checkatlas/atlas.py
def get_viable_obs_qc(adata: AnnData, args: argparse.Namespace) -> list:
"""
Search in obs_keys a match to OBS_QC values
Extract sorted obs_keys in same order then OBS_QC
Args:
adata (AnnData): atlas to analyse
args (argparse.Namespace): list of arguments from checkatlas workflow
Returns:
list: obs_keys
"""
obs_keys = list()
for obs_key in adata.obs_keys():
if obs_key in args.qc_display:
obs_keys.append(obs_key)
return obs_keys
get_viable_obsm(adata, args)
TO DO Search viable obsm for dimensionality reduction metric calc. ! No filter on osbm is appled for now !
Parameters: |
|
---|
Returns: |
|
---|
Source code in checkatlas/atlas.py
def get_viable_obsm(adata: AnnData, args: argparse.Namespace) -> list:
"""
TO DO
Search viable obsm for dimensionality reduction metric
calc.
! No filter on osbm is appled for now !
Args:
adata (AnnData): atlas to analyse
args (argparse.Namespace): list of arguments from checkatlas workflow
Returns:
list: obsm_keys
"""
obsm_keys = list()
# for obsm_key in adata.obsm_keys():
# if obsm_key in args.obsm_dimred:
obsm_keys = adata.obsm_keys()
logger.debug(f"Add obsm {obsm_keys}")
return obsm_keys
read_atlas(atlas_info)
Read Scanpy or Cellranger data : .h5ad or .h5
Parameters: |
|
---|
Returns: |
|
---|
Source code in checkatlas/atlas.py
def read_atlas(atlas_info: dict) -> AnnData:
"""
Read Scanpy or Cellranger data : .h5ad or .h5
Args:
atlas_path (dict): info about the atlas
Returns:
AnnData: scanpy object from .h5ad
"""
logger.info(
f"Load {atlas_info[checkatlas.ATLAS_NAME_KEY]} "
f"in {atlas_info[checkatlas.ATLAS_PATH_KEY]}"
)
try:
if (
atlas_info[checkatlas.ATLAS_TYPE_KEY]
== cellranger.CELLRANGER_TYPE_CURRENT
):
logger.debug(
"Read Cellranger >= v3 results "
f"{atlas_info[checkatlas.ATLAS_PATH_KEY]}"
)
adata = cellranger.read_cellranger_current(atlas_info)
elif (
atlas_info[checkatlas.ATLAS_TYPE_KEY]
== cellranger.CELLRANGER_TYPE_OBSOLETE
):
logger.debug(
"Read Cellranger < v3 results "
f"{atlas_info[checkatlas.ATLAS_PATH_KEY]}"
)
adata = cellranger.read_cellranger_obsolete(atlas_info)
else:
logger.debug(
f"Read Scanpy file {atlas_info[checkatlas.ATLAS_PATH_KEY]}"
)
adata = sc.read_h5ad(atlas_info[checkatlas.ATLAS_PATH_KEY])
return adata
except _io.utils.AnnDataReadError:
logger.warning(
"AnnDataReadError, cannot read: "
f"{atlas_info[checkatlas.ATLAS_PATH_KEY]}"
)
return dict()
checkatlas.seurat
check_seurat_install()
Check if Seurat is installed, run installation if not
Source code in checkatlas/seurat.py
def check_seurat_install() -> None:
"""Check if Seurat is installed, run installation if not"""
# import R's utility package
utils = rpackages.importr("utils")
# select a mirror for R packages
utils.chooseCRANmirror(ind=1) # select the first mirror in the list
# R package names
packnames = ("Seurat", "SeuratObject")
# Selectively install what needs to be install.
# We are fancy, just because we can.
names_to_install = [x for x in packnames if not rpackages.isinstalled(x)]
if len(names_to_install) > 0:
# create personal library
rcode = """dir.create(Sys.getenv("R_LIBS_USER"), recursive = TRUE)"""
robjects.r(rcode)
# add to the path
rcode = """.libPaths(Sys.getenv("R_LIBS_USER"))"""
robjects.r(rcode)
logger.debug(f"Set Rlibpaths: {robjects.r(rcode)}")
utils.install_packages(StrVector(names_to_install))
create_anndata_table(seurat, atlas_info, args=<class 'argparse.Namespace'>)
Create a table with all AnnData-like arguments in Seurat object :param seurat: :param atlas_name: :param atlas_path: :return:
Source code in checkatlas/seurat.py
def create_anndata_table(
seurat: RS4, atlas_info: dict, args=argparse.Namespace
) -> None:
"""
Create a table with all AnnData-like arguments in Seurat object
:param seurat:
:param atlas_name:
:param atlas_path:
:return:
"""
atlas_name = atlas_info[checkatlas.ATLAS_NAME_KEY]
logger.debug(f"Create Adata table for {atlas_name}")
csv_path = os.path.join(
folders.get_folder(args.path, folders.ANNDATA),
atlas_name + checkatlas.TSV_EXTENSION,
)
# Create AnnData table
header = ["atlas_obs", "obsm", "var", "varm", "uns"]
df_summary = pd.DataFrame(index=[atlas_name], columns=header)
# Create r_functions
r_obs = robjects.r(
"obs <- function(seurat){ return(colnames(seurat@meta.data))}"
)
r_obsm = robjects.r(
"f<-function(seurat){return(names(seurat@reductions))}"
)
r_uns = robjects.r(
"uns <- function(seurat){ return(colnames(seurat@misc))}"
)
obs_list = r_obs(seurat)
obsm_list = r_obsm(seurat)
var_list = [""]
varm_list = [""]
uns_list = [""]
if not isinstance(r_uns(seurat), NULLType):
uns_list = r_uns(seurat)
df_summary["atlas_obs"][atlas_name] = (
"<code>" + "</code><br><code>".join(obs_list) + "</code>"
)
df_summary["obsm"][atlas_name] = (
"<code>" + "</code><br><code>".join(obsm_list) + "</code>"
)
df_summary["var"][atlas_name] = (
"<code>" + "</code><br><code>".join(var_list) + "</code>"
)
df_summary["varm"][atlas_name] = (
"<code>" + "</code><br><code>".join(varm_list) + "</code>"
)
df_summary["uns"][atlas_name] = (
"<code>" + "</code><br><code>".join(uns_list) + "</code>"
)
df_summary.to_csv(csv_path, index=False, quoting=False, sep="\t")
create_metric_annot(seurat, atlas_info, args=<class 'argparse.Namespace'>)
Calc annotation metrics :param adata: :param atlas_path: :param atlas_info: :param args: :return:
Source code in checkatlas/seurat.py
def create_metric_annot(
seurat: RS4, atlas_info: dict, args=argparse.Namespace
) -> None:
"""
Calc annotation metrics
:param adata:
:param atlas_path:
:param atlas_info:
:param args:
:return:
"""
atlas_name = atlas_info[checkatlas.ATLAS_NAME_KEY]
csv_path = os.path.join(
folders.get_folder(args.path, folders.ANNOTATION),
atlas_name + checkatlas.TSV_EXTENSION,
)
header = ["Annot_Sample", "Reference", "obs"] + args.metric_annot
df_annot = pd.DataFrame(columns=header)
obs_keys = get_viable_obs_annot(seurat, args)
if len(obs_keys) > 1:
logger.debug(f"Calc annotation metrics for {atlas_name}")
if len(obs_keys) != 0:
ref_obs = obs_keys[0]
for i in range(1, len(obs_keys)):
obs_key = obs_keys[i]
dict_line = {
"Annot_Sample": [atlas_name + "_" + obs_key],
"Reference": [ref_obs],
"obs": [obs_key],
}
for metric in args.metric_annot:
logger.debug(
f"Calc {metric} for {atlas_name} "
f"with obs {obs_key} vs ref_obs {ref_obs}"
)
metric_value = metrics.calc_metric_annot_seurat(
metric, seurat, obs_key, ref_obs
)
dict_line[metric] = metric_value
df_line = pd.DataFrame(dict_line)
df_annot = pd.concat(
[df_annot, df_line], ignore_index=True, axis=0
)
df_annot.to_csv(csv_path, index=False, sep="\t")
else:
logger.debug(f"No viable obs_key was found for {atlas_name}")
create_metric_cluster(seurat, atlas_info, args=<class 'argparse.Namespace'>)
Calc clustering metrics :param seurat: :param atlas_path: :param atlas_info: :param args: :return:
Source code in checkatlas/seurat.py
def create_metric_cluster(
seurat: RS4, atlas_info: dict, args=argparse.Namespace
) -> None:
"""
Calc clustering metrics
:param seurat:
:param atlas_path:
:param atlas_info:
:param args:
:return:
"""
atlas_name = atlas_info[checkatlas.ATLAS_NAME_KEY]
csv_path = os.path.join(
folders.get_folder(args.path, folders.CLUSTER),
atlas_name + checkatlas.TSV_EXTENSION,
)
header = ["Clust_Sample", "obs"] + args.metric_cluster
df_cluster = pd.DataFrame(columns=header)
obs_keys = get_viable_obs_annot(seurat, args)
obsm_key_representation = "umap"
if len(obs_keys) > 0:
logger.debug(f"Calc clustering metrics for {atlas_name}")
for obs_key in obs_keys:
dict_line = {
"Clust_Sample": [atlas_name + "_" + obs_key],
"obs": [obs_key],
}
for metric in args.metric_cluster:
logger.debug(
f"Calc {metric} for {atlas_name} "
f"with obs {obs_key} and obsm {obsm_key_representation}"
)
metric_value = metrics.calc_metric_cluster_seurat(
metric, seurat, obs_key, obsm_key_representation
)
dict_line[metric] = metric_value
df_line = pd.DataFrame(dict_line)
df_cluster = pd.concat(
[df_cluster, df_line], ignore_index=True, axis=0
)
df_cluster.to_csv(csv_path, index=False, sep="\t")
else:
logger.debug(f"No viable obs_key was found for {atlas_name}")
create_metric_dimred(seurat, atlas_info, args=<class 'argparse.Namespace'>)
Calc dimensionality reduction metrics :param adata: :param atlas_path: :param atlas_info: :param args: :return:
Source code in checkatlas/seurat.py
def create_metric_dimred(
seurat: RS4, atlas_info: dict, args=argparse.Namespace
) -> None:
"""
Calc dimensionality reduction metrics
:param adata:
:param atlas_path:
:param atlas_info:
:param args:
:return:
"""
atlas_name = atlas_info[checkatlas.ATLAS_NAME_KEY]
csv_path = os.path.join(
folders.get_folder(args.path, folders.DIMRED),
atlas_name + checkatlas.TSV_EXTENSION,
)
header = ["Dimred_Sample", "obsm"] + args.metric_dimred
df_dimred = pd.DataFrame(columns=header)
# r_reduction = robjects.r(
# "reduc <- function(seurat, obsm_key){"
# " return(Embeddings(object = seurat, reduction = obsm_key))}"
# )
obsm_keys = get_viable_obsm(seurat, args)
if len(obsm_keys) > 0:
logger.debug(f"Calc dim red metrics for {atlas_name}")
for obsm_key in obsm_keys:
dict_line = {
"Dimred_Sample": [atlas_name + "_" + obsm_key],
"obsm": [obsm_key],
}
for metric in args.metric_dimred:
logger.debug(
f"Calc {metric} for {atlas_name} with obsm {obsm_key}"
)
# r_countmatrix = robjects.r(
# "mat <- function(seurat)
# { return(seurat@assays$RNA@counts)}"
# )
# high_dim_counts = ro.conversion.rpy2py(r_countmatrix(seurat))
# low_dim_counts = ro.conversion.rpy2py(
# r_reduction(seurat, obsm_key)
# )
# metric_value = metrics.calc_metric_dimred(
# metric, high_dim_counts, low_dim_counts)
logger.warning(
"!!! Dim reduction metrics not available for Seurat"
" at the moment !!!"
)
# metric_value = -1
# dict_line[metric] = str(metric_value)
df_line = pd.DataFrame(dict_line)
df_dimred = pd.concat(
[df_dimred, df_line], ignore_index=True, axis=0
)
df_dimred.to_csv(csv_path, index=False, sep="\t")
else:
logger.debug(f"No viable obsm_key was found for {atlas_name}")
create_qc_plots(seurat, atlas_info, args=<class 'argparse.Namespace'>)
Display the atlas QC Search for the OBS variable which correspond to the toal_RNA, total_UMI, MT_ratio, RT_ratio :param path: :param adata: :param atlas_name: :param atlas_path: :return:
Source code in checkatlas/seurat.py
def create_qc_plots(
seurat: RS4, atlas_info: dict, args=argparse.Namespace
) -> None:
"""
Display the atlas QC
Search for the OBS variable which correspond to the toal_RNA, total_UMI,
MT_ratio, RT_ratio
:param path:
:param adata:
:param atlas_name:
:param atlas_path:
:return:
"""
atlas_name = atlas_info[checkatlas.ATLAS_NAME_KEY]
qc_path = os.path.join(
folders.get_folder(args.path, folders.QC_FIG),
atlas_name + checkatlas.QC_FIG_EXTENSION,
)
logger.debug(f"Create QC violin plot for {atlas_name}")
importr("ggplot2")
r_cmd = (
"vln_plot <- function(seurat, obs, qc_path){"
"vln <- VlnPlot(seurat, features = obs, ncol = length(obs));"
"ggsave(qc_path, vln, width = 10, "
"height = 4, dpi = 150)}"
)
r_violin = robjects.r(r_cmd)
obs_keys = list(SEURAT_TO_SCANPY_OBS.keys())
r_obs = robjects.StrVector(obs_keys)
r_violin(seurat, r_obs, qc_path)
create_qc_tables(seurat, atlas_info, args=<class 'argparse.Namespace'>)
Display the atlas QC of seurat Search for the metadata variable which correspond to the total_RNA, total_UMI, MT_ratio, RT_ratio :param path: :param adata: :param atlas_name: :param atlas_path: :return:
Source code in checkatlas/seurat.py
def create_qc_tables(
seurat: RS4, atlas_info: dict, args=argparse.Namespace
) -> None:
"""
Display the atlas QC of seurat
Search for the metadata variable which correspond
to the total_RNA, total_UMI, MT_ratio, RT_ratio
:param path:
:param adata:
:param atlas_name:
:param atlas_path:
:return:
"""
atlas_name = atlas_info[checkatlas.ATLAS_NAME_KEY]
qc_path = os.path.join(
folders.get_folder(args.path, folders.QC),
atlas_name + checkatlas.TSV_EXTENSION,
)
logger.debug(f"Create QC tables for {atlas_name}")
obs_keys = get_viable_obs_qc(seurat, args)
r_meta = robjects.r("obs <- function(seurat){ return(seurat@meta.data)}")
r_metadata = r_meta(seurat)
with (ro.default_converter + pandas2ri.converter).context():
df_metadata = ro.conversion.get_conversion().rpy2py(r_metadata)
df_annot = df_metadata[obs_keys]
# rename columns with scanpy names
new_columns = list()
for column in df_annot.columns:
new_columns.append(SEURAT_TO_SCANPY_OBS[column])
df_annot.columns = new_columns
# Rank cell by qc metric
for header in df_annot.columns:
if header != atlas.CELLINDEX_HEADER:
new_header = f"cellrank_{header}"
df_annot = df_annot.sort_values(header, ascending=False)
df_annot.loc[:, [new_header]] = range(1, len(df_annot) + 1)
# Sample QC table when more cells than args.plot_celllimit are present
df_annot = atlas.atlas_sampling(df_annot, "QC", args)
df_annot.loc[:, [atlas.CELLINDEX_HEADER]] = range(1, len(df_annot) + 1)
df_annot.to_csv(qc_path, index=False, quoting=False, sep="\t")
create_summary_table(seurat, atlas_info, args=<class 'argparse.Namespace'>)
Create a table with all interesting variables :param seurat: :param atlas_name: :param csv_path: :return:
Source code in checkatlas/seurat.py
def create_summary_table(
seurat: RS4, atlas_info: dict, args=argparse.Namespace
) -> None:
"""
Create a table with all interesting variables
:param seurat:
:param atlas_name:
:param csv_path:
:return:
"""
atlas_name = atlas_info[checkatlas.ATLAS_NAME_KEY]
logger.debug(f"Create Summary table for {atlas_name}")
csv_path = os.path.join(
folders.get_folder(args.path, folders.SUMMARY),
atlas_name + checkatlas.TSV_EXTENSION,
)
# Create summary table
header = [
"AtlasFileType",
"NbCells",
"NbGenes",
"AnnData.raw",
"AnnData.X",
"File_extension",
"File_path",
]
r_nrow = robjects.r["nrow"]
r_ncol = robjects.r["ncol"]
ncells = r_ncol(seurat)[0]
ngenes = r_nrow(seurat)[0]
x_raw = False
x_norm = True
df_summary = pd.DataFrame(index=[atlas_name], columns=header)
df_summary["AtlasFileType"][atlas_name] = atlas_info[
checkatlas.ATLAS_TYPE_KEY
]
df_summary["NbCells"][atlas_name] = ncells
df_summary["NbGenes"][atlas_name] = ngenes
df_summary["AnnData.raw"][atlas_name] = x_raw
df_summary["AnnData.X"][atlas_name] = x_norm
df_summary["File_extension"][atlas_name] = atlas_info[
checkatlas.ATLAS_EXTENSION_KEY
]
df_summary["File_path"][atlas_name] = atlas_info[checkatlas.ATLAS_PATH_KEY]
df_summary.to_csv(csv_path, index=False, sep="\t")
create_tsne_fig(seurat, atlas_info, args=<class 'argparse.Namespace'>)
Display the TSNE of celltypes Search for the OBS variable which correspond to the celltype annotation :param path: :param adata: :param atlas_name: :param atlas_path: :return:
Source code in checkatlas/seurat.py
def create_tsne_fig(
seurat: RS4, atlas_info: dict, args=argparse.Namespace
) -> None:
"""
Display the TSNE of celltypes
Search for the OBS variable which correspond to the celltype annotation
:param path:
:param adata:
:param atlas_name:
:param atlas_path:
:return:
"""
atlas_name = atlas_info[checkatlas.ATLAS_NAME_KEY]
# Search if tsne reduction exists
r = re.compile(".*tsne*.")
r_names = robjects.r["names"]
obsm_list = r_names(seurat)
importr("ggplot2")
if len(list(filter(r.match, obsm_list))) > 0:
logger.debug(f"Create t-SNE figure for {atlas_name}")
# Setting up figures directory
tsne_path = os.path.join(
folders.get_folder(args.path, folders.TSNE),
atlas_name + checkatlas.TSNE_EXTENSION,
)
# Exporting tsne
obs_keys = get_viable_obs_annot(seurat, args)
r_cmd = (
"tsne <- function(seurat, obs_key, tsne_path){"
"tsne_plot <- DimPlot(seurat, group.by = obs_key, "
'reduction = "tsne");'
"ggsave(tsne_path, tsne_plot, width = 10, "
"height = 6, dpi = 76)}"
)
r_tsne = robjects.r(r_cmd)
r_tsne(seurat, obs_keys[0], tsne_path)
create_umap_fig(seurat, atlas_info, args=<class 'argparse.Namespace'>)
Display the UMAP of celltypes Search for the OBS variable which correspond to the celltype annotation :param path: :param adata: :param atlas_name: :param atlas_path: :return:
Source code in checkatlas/seurat.py
def create_umap_fig(
seurat: RS4, atlas_info: dict, args=argparse.Namespace
) -> None:
"""
Display the UMAP of celltypes
Search for the OBS variable which correspond to the celltype annotation
:param path:
:param adata:
:param atlas_name:
:param atlas_path:
:return:
"""
atlas_name = atlas_info[checkatlas.ATLAS_NAME_KEY]
# Search if tsne reduction exists
r = re.compile(".*umap*.")
r_names = robjects.r["names"]
obsm_list = r_names(seurat)
importr("ggplot2")
if len(list(filter(r.match, obsm_list))) > 0:
logger.debug(f"Create UMAP figure for {atlas_name}")
# Setting up figures directory
umap_path = os.path.join(
folders.get_folder(args.path, folders.UMAP),
atlas_name + checkatlas.UMAP_EXTENSION,
)
# Exporting umap
obs_keys = get_viable_obs_annot(seurat, args)
r_cmd = (
"umap <- function(seurat, obs_key, umap_path){"
"umap_plot <- DimPlot(seurat, group.by = obs_key, "
'reduction = "umap");'
"ggsave(umap_path, umap_plot, width = 10, "
"height = 6, dpi = 76)}"
)
r_umap = robjects.r(r_cmd)
r_umap(seurat, obs_keys[0], umap_path)
get_viable_obs_annot(seurat, args)
Search in obs_keys a match to OBS_CLUSTERS values ! Remove obs_key with only one category ! Extract sorted obs_keys in same order then OBS_CLUSTERS
Parameters: |
|
---|
Returns: |
|
---|
Source code in checkatlas/seurat.py
def get_viable_obs_annot(seurat: RS4, args: argparse.Namespace) -> list:
"""
Search in obs_keys a match to OBS_CLUSTERS values
! Remove obs_key with only one category !
Extract sorted obs_keys in same order then OBS_CLUSTERS
Args:
seurat (RS4): _description_
args (argparse.Namespace): _description_
Returns:
list: _description_
"""
obs_keys = list()
r_obs = robjects.r(
"obs <- function(seurat){ return(colnames(seurat@meta.data))}"
)
obs_key_seurat = r_obs(seurat)
r_annot = robjects.r(
"type <- function(seurat, obs_key){ "
"return(seurat[[obs_key]][[obs_key]])}"
)
# Get keys from OBS_CLUSTERS
for obs_key in obs_key_seurat:
for obs_key_celltype in args.obs_cluster:
if obs_key_celltype in obs_key:
if isinstance(r_annot(seurat, obs_key), FactorVector):
obs_keys.append(obs_key)
# Remove keys with only one category
obs_keys_final = list()
for obs_key in obs_keys:
annotations = r_annot(seurat, obs_key)
if len(annotations.levels) != 1:
logger.debug(
f"Add obs_key {obs_key} with cat {annotations.levels}"
)
obs_keys_final.append(obs_key)
return sorted(obs_keys_final)
get_viable_obs_qc(seurat, args)
Search in obs_keys a match to OBS_QC values Extract sorted obs_keys in same order then OBS_QC
Parameters: |
|
---|
Returns: |
|
---|
Source code in checkatlas/seurat.py
def get_viable_obs_qc(seurat: RS4, args: argparse.Namespace) -> list:
"""
Search in obs_keys a match to OBS_QC values
Extract sorted obs_keys in same order then OBS_QC
Args:
seurat (RS4): _description_
args (argparse.Namespace): _description_
Returns:
list: _description_
"""
r_obs = robjects.r(
"obs <- function(seurat){ return(colnames(seurat@meta.data))}"
)
obs_keys = list()
for obs_qc in args.qc_display:
obs_qc = SCANPY_TO_SEURAT_OBS[obs_qc]
if obs_qc in r_obs(seurat):
obs_keys.append(obs_qc)
return obs_keys
get_viable_obsm(seurat, args)
Search viable obsm for dimensionality reduction metric calc. ! No filter on osbm is appled for now ! :param seurat: :param args: :return:
Source code in checkatlas/seurat.py
def get_viable_obsm(seurat, args):
"""
Search viable obsm for dimensionality reduction metric
calc.
! No filter on osbm is appled for now !
:param seurat:
:param args:
:return:
"""
obsm_keys = list()
# for obsm_key in adata.obsm_keys():
# if obsm_key in args.obsm_dimred:
r_obsm = robjects.r(
"f<-function(seurat){return(names(seurat@reductions))}"
)
obsm_keys_r = r_obsm(seurat)
obsm_keys = list()
for obsm_key in obsm_keys_r:
print(obsm_key)
obsm_keys.append(obsm_key)
logger.debug(f"Add obsm {obsm_keys}")
return obsm_keys
read_atlas(atlas_info)
Read Seurat object in python using rpy2
Parameters: |
|
---|
Returns: |
|
---|
Source code in checkatlas/seurat.py
def read_atlas(atlas_info: dict) -> RS4:
"""Read Seurat object in python using rpy2
Args:
atlas_path (str): _description_
Returns:
RS4: _description_
"""
atlas_name = atlas_info[checkatlas.ATLAS_NAME_KEY]
atlas_path = atlas_info[checkatlas.ATLAS_PATH_KEY]
logger.info(f"Load {atlas_name} in " f"{atlas_path}")
rcode = f'readRDS("{atlas_path}")'
seurat = robjects.r(rcode)
rclass = robjects.r["class"]
if rclass(seurat)[0] == "Seurat":
importr("Seurat")
return seurat
else:
logger.info(f"{atlas_name} is not a Seurat object")
return None
checkatlas.cellranger
read_cellranger_current(atlas_info)
Read cellranger files.
Load first /outs/filtered_feature_bc_matrix.h5 Then add (if found): - Clustering - PCA- - UMAP - TSNE
Parameters: |
|
---|
Returns: |
|
---|
Source code in checkatlas/cellranger.py
def read_cellranger_current(atlas_info: dict) -> AnnData:
"""
Read cellranger files.
Load first /outs/filtered_feature_bc_matrix.h5
Then add (if found):
- Clustering
- PCA-
- UMAP
- TSNE
Args:
atlas_path (dict): info on the atlas
Returns:
AnnData: scanpy object from cellranger
"""
cellranger_out_path = os.path.dirname(
atlas_info[checkatlas.ATLAS_PATH_KEY]
)
cellranger_analysis_path = os.path.join(cellranger_out_path, "analysis")
cellranger_clust_path = os.path.join(
cellranger_analysis_path, "clustering"
)
cellranger_umap_path = os.path.join(cellranger_analysis_path, "umap")
cellranger_tsne_path = os.path.join(cellranger_analysis_path, "tsne")
cellranger_pca_path = os.path.join(cellranger_analysis_path, "pca")
# Search graphclust
graphclust_path = ""
for root, dirs, files in os.walk(cellranger_clust_path):
for dir in dirs:
if dir.endswith("graphclust"):
cluster_path = os.path.join(root, dir, "clusters.csv")
if os.path.exists(cluster_path) and not root.endswith("atac"):
graphclust_path = cluster_path
break
# Search kmeans
kmeans_path = ""
k_value = 0
found_kmeans = False
for root, dirs, files in os.walk(cellranger_clust_path):
for dir in dirs:
# Search the highest kmeans = 10
dir_prefix = "kmeans_10"
if dir_prefix in dir and not found_kmeans:
cluster_path = os.path.join(root, dir, "clusters.csv")
if os.path.exists(cluster_path):
kmeans_path = cluster_path
k_value = 10
found_kmeans = True
break
# Or search the highest kmeans = 5 (for multiome atlas)
dir_prefix = os.path.join("gex", "kmeans_5")
if dir_prefix in os.path.join(root, dir) and not found_kmeans:
cluster_path = os.path.join(root, dir, "clusters.csv")
if os.path.exists(cluster_path):
kmeans_path = cluster_path
k_value = 5
found_kmeans = True
break
# Search umap
rna_umap = ""
for root, dirs, files in os.walk(cellranger_umap_path):
for file in files:
if file.endswith("projection.csv") and not root.endswith("atac"):
rna_umap = os.path.join(root, file)
break
# Search t-SNE
rna_tsne = ""
for root, dirs, files in os.walk(cellranger_tsne_path):
for file in files:
if file.endswith("projection.csv") and not root.endswith("atac"):
rna_tsne = os.path.join(root, file)
break
rna_pca = ""
for root, dirs, files in os.walk(cellranger_pca_path):
for file in files:
if file.endswith("projection.csv") and not root.endswith("atac"):
rna_pca = os.path.join(root, file)
break
# Manage multiome cellranger files
dim_red_path = os.path.join(
cellranger_analysis_path, "dimensionality_reduction"
)
if os.path.exists(dim_red_path):
gex_path = os.path.join(dim_red_path, "gex")
if os.path.exists(gex_path):
rna_umap = os.path.join(gex_path, "umap_projection.csv")
rna_tsne = os.path.join(gex_path, "tsne_projection.csv")
rna_pca = os.path.join(gex_path, "pca_projection.csv")
# Read 10x h5 file
adata = sc.read_10x_h5(atlas_info[checkatlas.ATLAS_PATH_KEY])
adata.var_names_make_unique()
# Add cluster
if os.path.exists(graphclust_path):
df_cluster = pd.read_csv(graphclust_path, index_col=0)
adata.obs["cellranger_graphclust"] = df_cluster["Cluster"]
if os.path.exists(kmeans_path):
df_cluster = pd.read_csv(kmeans_path, index_col=0)
adata.obs["cellranger_kmeans_" + str(k_value)] = df_cluster["Cluster"]
# Add reduction
if os.path.exists(rna_umap):
df_umap = pd.read_csv(rna_umap, index_col=0)
adata.obsm["X_umap"] = df_umap
if os.path.exists(rna_tsne):
df_tsne = pd.read_csv(rna_tsne, index_col=0)
adata.obsm["X_tsne"] = df_tsne
if os.path.exists(rna_pca):
df_pca = pd.read_csv(rna_pca, index_col=0)
adata.obsm["X_pca"] = df_pca
return adata
read_cellranger_obsolete(atlas_info)
Read cellranger files.
Load first /outs/filtered_feature_bc_matrix.h5 Then add (if found): - Clustering - PCA- - UMAP - TSNE
Parameters: |
|
---|
Returns: |
|
---|
Source code in checkatlas/cellranger.py
def read_cellranger_obsolete(atlas_info: dict) -> AnnData:
"""
Read cellranger files.
Load first /outs/filtered_feature_bc_matrix.h5
Then add (if found):
- Clustering
- PCA-
- UMAP
- TSNE
Args:
atlas_path (dict): info on the atlas
Returns:
AnnData: scanpy object from cellranger
"""
cellranger_path = atlas_info[checkatlas.ATLAS_PATH_KEY].replace(
CELLRANGER_MATRIX_FILE, ""
)
cellranger_out_path = os.path.join(cellranger_path, os.pardir, os.pardir)
cellranger_analysis_path = os.path.join(
cellranger_out_path, "analysis_csv"
)
cellranger_umap_path = os.path.join(cellranger_analysis_path, "umap")
cellranger_tsne_path = os.path.join(cellranger_analysis_path, "tsne")
cellranger_pca_path = os.path.join(cellranger_analysis_path, "pca")
print(cellranger_out_path)
print(cellranger_analysis_path)
print(cellranger_umap_path)
# Search graphclust
graphclust_path = ""
for root, dirs, files in os.walk(cellranger_out_path):
for dir in dirs:
if dir.endswith("graphclust"):
cluster_path = os.path.join(root, dir, "clusters.csv")
if os.path.exists(cluster_path):
graphclust_path = cluster_path
break
# Search kmeans
kmeans_path = ""
k_value = 0
for root, dirs, files in os.walk(cellranger_out_path):
for dir in dirs:
if dir.endswith("kmeans"):
# Search the highest kmeans from 15 to 3
for k in reversed(range(3, 16)):
cluster_path = os.path.join(
root, dir, str(k) + "_clusters", "clusters.csv"
)
if os.path.exists(cluster_path):
kmeans_path = cluster_path
k_value = k
break
rna_umap = os.path.join(cellranger_umap_path, "projection.csv")
rna_tsne = os.path.join(cellranger_tsne_path, "projection.csv")
rna_pca = os.path.join(cellranger_pca_path, "projection.csv")
# get matrix folder
matrix_folder = os.path.dirname(atlas_info[checkatlas.ATLAS_PATH_KEY])
adata = sc.read_10x_mtx(matrix_folder)
adata.var_names_make_unique()
# Add cluster
if os.path.exists(graphclust_path):
df_cluster = pd.read_csv(graphclust_path, index_col=0)
adata.obs["cellranger_graphclust"] = df_cluster["Cluster"]
if os.path.exists(kmeans_path):
df_cluster = pd.read_csv(kmeans_path, index_col=0)
adata.obs["cellranger_kmeans_" + str(k_value)] = df_cluster["Cluster"]
# Add reduction
if os.path.exists(rna_umap):
df_umap = pd.read_csv(rna_umap, index_col=0)
adata.obsm["X_umap"] = df_umap
if os.path.exists(rna_tsne):
df_tsne = pd.read_csv(rna_tsne, index_col=0)
if len(df_tsne) == len(adata):
adata.obsm["X_tsne"] = df_tsne
if os.path.exists(rna_pca):
df_pca = pd.read_csv(rna_pca, index_col=0)
if len(df_pca) == len(adata):
adata.obsm["X_pca"] = df_pca
return adata
checkatlas.metrics.metrics
annotation_to_num(annotation, ref_annotation)
Transforms the annotations from categorical to numerical
Parameters
adata partition_key reference
Returns
Source code in checkatlas/metrics/metrics.py
def annotation_to_num(annotation, ref_annotation):
"""
Transforms the annotations from categorical to numerical
Parameters
----------
adata
partition_key
reference
Returns
-------
"""
annotation = annotation.to_numpy()
ref_annotation = ref_annotation.to_numpy()
le = LabelEncoder()
le.fit(annotation)
annotation = le.transform(annotation)
le2 = LabelEncoder()
le2.fit(ref_annotation)
ref_annotation = le2.transform(ref_annotation)
return annotation, ref_annotation
checkatlas.utils.folders
checkatlas_folders(path)
Check in path if the different checkatlas folders exists.
Create them if needed.
All folders are given by DICT_FOLDER
Parameters: |
|
---|
Returns: |
|
---|
Source code in checkatlas/utils/folders.py
def checkatlas_folders(path: str) -> None:
"""Check in path if the different checkatlas folders exists.<br>
Create them if needed.
All folders are given by DICT_FOLDER
Args:
path (str): Search path for atlas given by user
Returns:
None: None
"""
global_path = get_workingdir(path)
if not os.path.exists(global_path):
os.mkdir(global_path)
for key_folder in DICT_FOLDER.keys():
temp_path = os.path.join(global_path, key_folder)
if not os.path.exists(temp_path):
logger.debug(f"Create folder: {temp_path}")
os.mkdir(temp_path)
get_folder(path, key_folder)
Get the folder path giving the search path and the folder key in DICT_FOLDER
Parameters: |
|
---|
Returns: |
|
---|
Source code in checkatlas/utils/folders.py
def get_folder(path: str, key_folder: str) -> str:
"""Get the folder path giving the search path and
the folder key in DICT_FOLDER
Args:
path (str): Search path for atlas given by user
key_folder (str): key folder in the DICT_FOLDER
example: ANNDATA, SUMMARY, UMAP
Returns:
str: the folder path
"""
return os.path.join(get_workingdir(path), DICT_FOLDER[key_folder])
get_workingdir(path)
Return the working_dir = path of search + working_dir with working_dir = checkatlas_files/
Parameters: |
|
---|
Returns: |
|
---|
Source code in checkatlas/utils/folders.py
def get_workingdir(path: str) -> str:
"""Return the working_dir = path of search
+ working_dir
with working_dir = checkatlas_files/
Args:
path (str): Search path for atlas given by user
Returns:
str: os.path.join(path, working_dir)
"""
return os.path.join(path, WORKING_DIR)
checkatlas.utils.files
get_file_path(atlas_name, folder, extension, path)
summary
Parameters: |
|
---|
Returns: |
|
---|
Source code in checkatlas/utils/files.py
def get_file_path(
atlas_name: str, folder: str, extension: str, path: str
) -> str:
"""_summary_
Args:
atlas_name (str): _description_
args (argparse.Namespace): _description_
Returns:
str: _description_
"""
csv_path = os.path.join(
folders.get_folder(path, folder),
atlas_name + extension,
)
return csv_path
checkatlas.utils.checkatlas_arguments
get_version()
Get version of checkatlas from checkatlas/VERSION file :return: checkatlas version
Source code in checkatlas/utils/checkatlas_arguments.py
def get_version():
"""
Get version of checkatlas from checkatlas/VERSION file
:return: checkatlas version
"""
version_file = files(__package__).joinpath("VERSION")
with open(version_file) as file:
version = file.readline()
return version