Source code for deepchem_server.core.primitives.del_denoising

"""DEL denoising primitive.

The DEL Denoise primitive scores DEL screening data to identify compounds that
are strongly enriched in the target selection relative to background noise.

Two scoring modes:

``unified`` (default)
    Uses Poisson confidence intervals computed across all replicates
    simultaneously.  For each row the enrichment ratio is defined as
    the lower bound of the target Poisson interval divided by the upper
    bound of the control Poisson interval.  Values above 1 indicate
    enrichment above background.  This strategy keeps per-replicate
    columns intact and is the recommended approach when replicates are
    trusted individually.

``non_unified``
    Sums replicate counts into a single target total and a single
    control total, then computes a z-score for each column separately.
    The z-score formula is ``(p0 - p1) / sqrt(p1 * (1 - p1))``, where
    ``p0`` is the row's fractional count and ``p1 = 1 / n_rows``.
    This strategy is useful when replicate-level data is noisy or
    unavailable.

Optionally, the primitive can collapse three-part (trisynthon) rows
into all pairwise (disynthon) combinations before scoring by setting
``use_disynthon_pairs=True``.
"""

import logging
import os
import json
import tempfile
from math import sqrt
from typing import Dict, List, Optional, Set, Tuple

import numpy as np
import pandas as pd
from scipy.stats import chi2

from deepchem_server.core.common import config
from deepchem_server.core.common.address import DeepchemAddress
from deepchem_server.core.common.cards import DataCard
from deepchem_server.core.common.progress_logger import log_progress


logger = logging.getLogger(__name__)

DEFAULT_SMILES_COLS = ["smiles_a", "smiles_b", "smiles_c"]
DEFAULT_CONTROL_COLS = ["seq_matrix_1", "seq_matrix_2", "seq_matrix_3"]
DEFAULT_TARGET_COLS = ["seq_target_1", "seq_target_2", "seq_target_3"]


[docs] def poissfit(vec: pd.Series, alpha: float = 0.05) -> Tuple[float, float]: """Poisson confidence interval for replicate counts. Parameters ---------- vec : pd.Series Replicate counts for one row. alpha : float Significance level (default 0.05 for 95% CI). Returns ------- Tuple[float, float] ``(lower_bound, upper_bound)`` of the estimated Poisson rate. Examples -------- >>> import pandas as pd >>> lower, upper = poissfit(pd.Series([10, 12, 11])) >>> round(lower, 4) 7.5719 >>> round(upper, 4) 15.4481 >>> lower < upper True """ k_sum = vec.sum() n = len(vec) lower = 0.5 * chi2.ppf(alpha / 2, 2 * k_sum) / n upper = 0.5 * chi2.ppf(1 - alpha / 2, 2 * (k_sum + 1)) / n return (lower, upper)
[docs] def get_enrichment_ratio(row: pd.Series, control_cols: List[str], target_cols: List[str], alpha: float = 0.05) -> float: """Enrichment ratio: target_lower_bound / control_upper_bound. Parameters ---------- row : pd.Series One row with control and target count columns. control_cols : List[str] Control count column names. target_cols : List[str] Target count column names. alpha : float Significance level for the confidence interval. Returns ------- float Ratio or 0.0 when the control upper bound is zero. Examples -------- >>> import pandas as pd >>> row = pd.Series({"ctrl_1": 5, "ctrl_2": 6, "tgt_1": 50, "tgt_2": 55}) >>> ratio = get_enrichment_ratio(row, ["ctrl_1", "ctrl_2"], ["tgt_1", "tgt_2"]) >>> round(ratio, 4) 4.3633 >>> ratio > 1.0 True """ _, c_upper = poissfit(row[control_cols], alpha) t_lower, _ = poissfit(row[target_cols], alpha) if c_upper == 0: return 0.0 return t_lower / c_upper
[docs] def calculate_poisson_enrichment(df: pd.DataFrame, control_cols: List[str], target_cols: List[str], alpha: float = 0.05) -> pd.DataFrame: """Add a Poisson_Enrichment column to the DataFrame. Parameters ---------- df : pd.DataFrame Input data with control and target count columns. control_cols : List[str] Control count column names. target_cols : List[str] Target count column names. alpha : float Significance level for confidence intervals. Returns ------- pd.DataFrame Copy of dataframe with a Poisson_Enrichment column added. Examples -------- >>> import pandas as pd >>> df = pd.DataFrame({ ... "seq_matrix_1": [10, 20], "seq_matrix_2": [12, 18], "seq_matrix_3": [11, 22], ... "seq_target_1": [50, 30], "seq_target_2": [55, 28], "seq_target_3": [48, 32], ... }) >>> result = calculate_poisson_enrichment( ... df, ... ["seq_matrix_1", "seq_matrix_2", "seq_matrix_3"], ... ["seq_target_1", "seq_target_2", "seq_target_3"], ... ) >>> "Poisson_Enrichment" in result.columns True >>> list(result["Poisson_Enrichment"].round(4)) [2.799, 0.9371] """ result_df = df.copy() sub_df = result_df[control_cols + target_cols].astype(float) result_df["Poisson_Enrichment"] = sub_df.apply( lambda row: get_enrichment_ratio(row, control_cols, target_cols, alpha), axis=1) return result_df
[docs] def calculate_normalized_enrichment_score(row: pd.Series, total_sum: float, row_count: int, column_name: str) -> float: """Z-score for one row: (p0 - p1) / sqrt(p1 * (1 - p1)). Parameters ---------- row : pd.Series One DataFrame row. total_sum : float Sum of column_name across all rows. row_count : int Number of rows in the DataFrame. column_name : str Column to read the count from. Returns ------- float Normalized score. Examples -------- >>> import pandas as pd >>> row = pd.Series({"count_col": 30}) >>> score = calculate_normalized_enrichment_score(row, total_sum=115.0, row_count=5, column_name="count_col") >>> round(score, 4) 0.1522 """ p0 = row[column_name] / total_sum p1 = 1 / row_count return (p0 - p1) / sqrt(p1 * (1 - p1))
[docs] def calculate_hit_threshold(df: pd.DataFrame, column_name: str, percentile: float) -> float: """Return the percentile cutoff for a column. Parameters ---------- df : pd.DataFrame Input DataFrame. column_name : str Column to compute the percentile on. percentile : float Percentile value (0--100). Returns ------- float The cutoff value. Examples -------- >>> import pandas as pd >>> df = pd.DataFrame({"Poisson_Enrichment": [0.1, 0.5, 1.2, 2.3, 0.8, 3.1, 0.3, 0.9, 1.5, 4.0]}) >>> threshold = calculate_hit_threshold(df, "Poisson_Enrichment", 80.0) >>> threshold 2.46 """ return np.percentile(df[column_name], percentile)
def get_disynthon_smiles( d1_idx: str, d2_idx: str, smiles_dict_inv: Dict[str, str], failed_smiles: Set, failed_combines: Set, ) -> Optional[str]: """Look up two fragments by index and merge them into one SMILES string. Parameters ---------- d1_idx : str Index key for the first fragment. d2_idx : str Index key for the second fragment. smiles_dict_inv : Dict[str, str] Maps index keys to SMILES strings. failed_smiles : Set Collects invalid SMILES (modified in place). failed_combines : Set Collects pairs that fail to merge (modified in place). Returns ------- Optional[str] Merged SMILES or None on failure. Examples -------- >>> failed_smiles, failed_combines = set(), set() >>> get_disynthon_smiles("0", "1", {"0": "CCO", "1": "CCN"}, failed_smiles, failed_combines) 'CCN.CCO' >>> get_disynthon_smiles("0", "9", {"0": "CCO", "1": "CCN"}, failed_smiles, failed_combines) is None True """ from rdkit import Chem try: smi_1 = smiles_dict_inv[d1_idx] smi_2 = smiles_dict_inv[d2_idx] except KeyError: return None mol1 = Chem.MolFromSmiles(smi_1) if mol1 is None: failed_smiles.add(smi_1) return None mol2 = Chem.MolFromSmiles(smi_2) if mol2 is None: failed_smiles.add(smi_2) return None try: combined = Chem.CombineMols(mol1, mol2) return Chem.MolToSmiles(combined) except Exception: failed_combines.add((smi_1, smi_2)) return None def create_disynthon_pairs( df: pd.DataFrame, smiles_cols: List[str], count_cols: List[str], is_unified: bool, ) -> Tuple[pd.DataFrame, Dict[str, str]]: """Generate all pairwise groupings (AB, AC, BC) from three-part data. Parameters ---------- df : pd.DataFrame Input data with SMILES and count columns. smiles_cols : List[str] Three SMILES column names. count_cols : List[str] Count columns to aggregate. is_unified : bool If True, keep individual count columns. If False, pre-sum them into two totals before grouping. Returns ------- Tuple[pd.DataFrame, Dict[str, str]] (pair_df, smiles_dict). pair_df has Disynthon_1, Disynthon_2 and aggregated counts. smiles_dict maps SMILES to index strings. Examples -------- >>> import pandas as pd >>> df = pd.DataFrame({ ... "smiles_a": ["CCO", "CCO", "CCN"], ... "smiles_b": ["CCN", "CCC", "CCC"], ... "smiles_c": ["CCC", "CCO", "CCO"], ... "seq_matrix_1": [5, 3, 7], "seq_target_1": [20, 10, 15], ... }) >>> pair_df, smiles_dict = create_disynthon_pairs( ... df, ["smiles_a", "smiles_b", "smiles_c"], ["seq_matrix_1", "seq_target_1"], is_unified=True ... ) >>> "Disynthon_1" in pair_df.columns True >>> len(pair_df) > 0 True """ smiles_set: set = set() for col in smiles_cols: smiles_set.update(df[col].dropna()) smiles_list = list(smiles_set) smiles_dict = {smi: str(i) for i, smi in enumerate(smiles_list)} df_work = df.copy() for col in smiles_cols: df_work[col] = df_work[col].map(smiles_dict) df_work = df_work.dropna(subset=smiles_cols) if not is_unified: target_count_cols = [c for c in count_cols if "target" in c.lower()] control_count_cols = [c for c in count_cols if "matrix" in c.lower() or "control" in c.lower()] df_work["seq_target_sum"] = df_work[target_count_cols].sum(axis=1) df_work["seq_control_sum"] = df_work[control_count_cols].sum(axis=1) agg_cols = ["seq_target_sum", "seq_control_sum"] else: agg_cols = count_cols pair_frames = [] n = len(smiles_cols) for i in range(n): for j in range(i + 1, n): col1, col2 = smiles_cols[i], smiles_cols[j] agg_dict = {c: "sum" for c in agg_cols} pair_df = df_work.groupby([col1, col2]).agg(agg_dict).reset_index() pair_df.rename(columns={col1: "Disynthon_1", col2: "Disynthon_2"}, inplace=True) pair_frames.append(pair_df) result = pd.concat(pair_frames, ignore_index=True) return result, smiles_dict
[docs] def collapse_to_disynthons( df: pd.DataFrame, smiles_cols: List[str], control_cols: List[str], target_cols: List[str], is_unified: bool, aggregate_operation: str = "sum", min_count_threshold: int = 0, ) -> Tuple[pd.DataFrame, int]: """Collapse three-part rows into pairwise combinations. Parameters ---------- df : pd.DataFrame Cleaned input (no NaN/duplicate rows). smiles_cols : List[str] Three SMILES column names. control_cols : List[str] Control count column names. target_cols : List[str] Target count column names. is_unified : bool If True, keep individual count columns. If False, pre-sum into two totals. aggregate_operation : str How to combine duplicate counts: 'sum' or 'mean'. min_count_threshold : int Drop rows with total count below this value. Returns ------- Tuple[pd.DataFrame, int] (collapsed_df, n_failed). collapsed_df has a disynthons column and aggregated counts. n_failed is the number of SMILES that could not be merged. Examples -------- >>> import pandas as pd >>> df = pd.DataFrame({ ... "smiles_a": ["CCO", "CCO", "CCN"], ... "smiles_b": ["CCN", "CCC", "CCC"], ... "smiles_c": ["CCC", "CCO", "CCO"], ... "seq_matrix_1": [5, 3, 7], "seq_matrix_2": [6, 4, 8], "seq_matrix_3": [5, 3, 6], ... "seq_target_1": [20, 10, 15], "seq_target_2": [22, 11, 16], "seq_target_3": [19, 9, 14], ... }) >>> collapsed_df, n_failed = collapse_to_disynthons( ... df, ... smiles_cols=["smiles_a", "smiles_b", "smiles_c"], ... control_cols=["seq_matrix_1", "seq_matrix_2", "seq_matrix_3"], ... target_cols=["seq_target_1", "seq_target_2", "seq_target_3"], ... is_unified=True, ... ) >>> "disynthons" in collapsed_df.columns True >>> len(collapsed_df) 4 >>> n_failed 0 """ count_cols = control_cols + target_cols pair_df, smiles_dict = create_disynthon_pairs(df, smiles_cols, count_cols, is_unified) smiles_dict_inv = {v: k for k, v in smiles_dict.items()} failed_smiles: Set = set() failed_combines: Set = set() pair_df["disynthons"] = pair_df.apply( lambda row: get_disynthon_smiles(row["Disynthon_1"], row["Disynthon_2"], smiles_dict_inv, failed_smiles, failed_combines), axis=1, ) pair_df = pair_df[pair_df["disynthons"].notna()] n_failed = len(failed_smiles) + len(failed_combines) logger.info(f"del_denoise: disynthon collapse had {n_failed} SMILES combination failures") numeric_cols = [c for c in pair_df.columns if c not in ("Disynthon_1", "Disynthon_2", "disynthons")] agg_dict = {c: aggregate_operation for c in numeric_cols} pair_df = pair_df.groupby("disynthons").agg(agg_dict).reset_index() if min_count_threshold > 0: count_sum = pair_df[numeric_cols].sum(axis=1) pair_df = pair_df[count_sum >= min_count_threshold] return pair_df, n_failed
[docs] def del_denoise( dataset_address: str, output_key: str, strategy: str = "unified", control_cols: Optional[List[str]] = None, target_cols: Optional[List[str]] = None, add_hit_labels: bool = False, hit_percentile: float = 90.0, alpha: float = 0.05, drop_duplicates: bool = True, use_disynthon_pairs: bool = False, smiles_cols: Optional[List[str]] = None, aggregate_operation: str = "sum", min_count_threshold: int = 0, ) -> str: """Score DEL screening data to identify strong binders. Reads a CSV of raw sequencing counts, scores each compound using the chosen enrichment strategy, and writes the result back to the datastore. Scoring strategies ------------------ **unified** Applies Poisson confidence intervals across all replicate columns simultaneously. The enrichment score for each row is - Poisson_Enrichment = target_lower_CI / control_upper_CI where the CIs are computed via poissfit. **non_unified** Sums replicate counts to form seq_target_sum and seq_control_sum, then computes a z-score for each. Parameters ---------- dataset_address : str Datastore address of the input CSV. output_key : str Name for the output CSV in the datastore. strategy : str 'unified' (Poisson ratio) or 'non_unified' (z-score). control_cols : Optional[List[str]] Control count column names. target_cols : Optional[List[str]] Target count column names. add_hit_labels : bool Add binary 0/1 hit columns based on a percentile cutoff. hit_percentile : float Percentile cutoff for hits (0--100). Used when add_hit_labels is True. alpha : float Confidence level for Poisson intervals. Used when strategy is 'unified'. drop_duplicates : bool Remove duplicate SMILES rows before scoring. use_disynthon_pairs : bool Collapse three-part rows into pairwise combinations before scoring. smiles_cols : Optional[List[str]] Three SMILES column names for the pairwise collapse. Used when use_disynthon_pairs is True. aggregate_operation : str 'sum' or 'mean' for combining duplicate pair counts. Used when use_disynthon_pairs is True. min_count_threshold : int Drop pair rows with total count below this value. Used when use_disynthon_pairs is True. Returns ------- str Datastore address of the output CSV. Raises ------ ValueError If strategy is invalid or the datastore is not configured. References ---------- "DeepChem-DEL: An Open Source Framework for Reproducible DEL Modeling and Benchmarking." (2025). https://doi.org/10.26434/chemrxiv-2025-f11mk Examples -------- Unified scoring: >>> from deepchem_server.core.common.cards import DataCard >>> from deepchem_server.core.common import config >>> from deepchem_server.core.datastore import DiskDataStore >>> import tempfile, pandas as pd >>> disk_datastore = DiskDataStore('profile', 'project', tempfile.mkdtemp()) >>> config.set_datastore(disk_datastore) >>> df = pd.DataFrame({ ... "smiles": ["CCO", "CCN", "CCC"], ... "seq_matrix_1": [10, 20, 5], "seq_matrix_2": [12, 18, 6], ... "seq_matrix_3": [11, 22, 4], ... "seq_target_1": [50, 30, 8], "seq_target_2": [55, 28, 7], ... "seq_target_3": [48, 32, 9], ... }) >>> card = DataCard(address='', file_type='csv', data_type='pandas.DataFrame') >>> addr = disk_datastore.upload_data_from_memory(df, "raw_del.csv", card) >>> result_addr = del_denoise(dataset_address=addr, output_key="denoised") >>> result_addr 'deepchem://profile/project/denoised.csv' With hit labels: >>> result_addr = del_denoise( ... dataset_address=addr, ... output_key="denoised_hits", ... strategy="unified", ... add_hit_labels=True, ... hit_percentile=90.0, ... ) >>> result_addr 'deepchem://profile/project/denoised_hits.csv' Non-unified scoring: >>> result_addr = del_denoise( ... dataset_address=addr, ... output_key="denoised_nu", ... strategy="non_unified", ... add_hit_labels=True, ... ) >>> result_addr 'deepchem://profile/project/denoised_nu.csv' """ if control_cols is None: control_cols = list(DEFAULT_CONTROL_COLS) if target_cols is None: target_cols = list(DEFAULT_TARGET_COLS) if smiles_cols is None: smiles_cols = list(DEFAULT_SMILES_COLS) # All params arrive as plain strings when called through the HTTP router. add_hit_labels = str(add_hit_labels).lower() == "true" drop_duplicates = str(drop_duplicates).lower() == "true" use_disynthon_pairs = str(use_disynthon_pairs).lower() == "true" hit_percentile = float(hit_percentile) alpha = float(alpha) min_count_threshold = int(min_count_threshold) if isinstance(control_cols, str): control_cols = json.loads(control_cols) if isinstance(target_cols, str): target_cols = json.loads(target_cols) if isinstance(smiles_cols, str): smiles_cols = json.loads(smiles_cols) datastore = config.get_datastore() if datastore is None: raise ValueError("Datastore not set") log_progress("del_denoise", 10, "downloading dataset") tmpdir = tempfile.TemporaryDirectory() local_path = os.path.join(tmpdir.name, "input.csv") datastore.download_object(dataset_address, local_path) log_progress("del_denoise", 15, "loading and cleaning data") df = pd.read_csv(local_path) n_input = len(df) if use_disynthon_pairs: df = df.dropna(subset=["smiles"]) if drop_duplicates: df = df.drop_duplicates(subset=["smiles"]) is_unified = strategy == "unified" log_progress("del_denoise", 25, "collapsing trisynthons into disynthon pairs") df, n_failed = collapse_to_disynthons( df, smiles_cols, control_cols, target_cols, is_unified, aggregate_operation, min_count_threshold, ) if not is_unified: control_cols = ["seq_control_sum"] target_cols = ["seq_target_sum"] smiles_col = "disynthons" else: smiles_col = "disynthons" if "disynthons" in df.columns else "smiles" df = df.dropna(subset=[smiles_col]) if drop_duplicates: df = df.drop_duplicates(subset=[smiles_col]) n_output = len(df) logger.info(f"del_denoise: {n_input} -> {n_output} rows after cleaning") if strategy == "unified": log_progress("del_denoise", 50, "computing Poisson enrichment scores") df = calculate_poisson_enrichment(df, control_cols, target_cols, alpha) if add_hit_labels: log_progress("del_denoise", 70, "computing hit labels") threshold = calculate_hit_threshold(df, "Poisson_Enrichment", hit_percentile) df["hits"] = (df["Poisson_Enrichment"] > threshold).astype(int) elif strategy == "non_unified": log_progress("del_denoise", 40, "summing replicate counts") df["seq_target_sum"] = df[target_cols].sum(axis=1) df["seq_control_sum"] = df[control_cols].sum(axis=1) log_progress("del_denoise", 55, "computing z-score enrichment (target)") total_target = df["seq_target_sum"].sum() total_control = df["seq_control_sum"].sum() row_count = len(df) df["Target_Enrichment_Score"] = df.apply( lambda row: calculate_normalized_enrichment_score(row, total_target, row_count, "seq_target_sum"), axis=1, ) log_progress("del_denoise", 65, "computing z-score enrichment (control)") df["Control_Enrichment_Score"] = df.apply( lambda row: calculate_normalized_enrichment_score(row, total_control, row_count, "seq_control_sum"), axis=1, ) if add_hit_labels: log_progress("del_denoise", 75, "computing hit labels") target_threshold = calculate_hit_threshold(df, "Target_Enrichment_Score", hit_percentile) control_threshold = calculate_hit_threshold(df, "Control_Enrichment_Score", hit_percentile) df["target_hits"] = (df["Target_Enrichment_Score"] > target_threshold).astype(int) df["control_hits"] = (df["Control_Enrichment_Score"] > control_threshold).astype(int) else: raise ValueError(f"Unknown strategy '{strategy}'. Must be 'unified' or 'non_unified'.") log_progress("del_denoise", 90, "uploading denoised dataset") if not output_key.endswith(".csv"): output_key = output_key + ".csv" output_key = DeepchemAddress.get_key(output_key) card_kwargs: Dict = dict( address="", file_type="csv", data_type="pandas.DataFrame", shape=(n_output, len(df.columns)), description=f"DEL denoised enrichment scores ({strategy})", strategy=strategy, parent=dataset_address, control_cols=control_cols, target_cols=target_cols, n_input_rows=n_input, n_output_rows=n_output, use_disynthon_pairs=use_disynthon_pairs, add_hit_labels=str(add_hit_labels), hit_percentile=hit_percentile, ) if use_disynthon_pairs: card_kwargs["n_disynthons"] = n_output card_kwargs["aggregate_operation"] = aggregate_operation card_kwargs["min_count_threshold"] = min_count_threshold card_kwargs["n_failed_smiles"] = n_failed if strategy == "unified": card_kwargs["alpha"] = alpha card = DataCard(**card_kwargs) return datastore.upload_data_from_memory(df, output_key, card)