Source code for deepchem_server.core.feat

import ast
import multiprocessing as mp
import os
import re
import tempfile
from typing import Dict, Iterable, List, Optional, Set, Union

import deepchem as dc
import pandas as pd
from rdkit import Chem

from deepchem_server.core import config
from deepchem_server.core.address import DeepchemAddress
from deepchem_server.core.cards import DataCard


featurizer_map = {
    "ecfp": dc.feat.CircularFingerprint,
    "graphconv": dc.feat.ConvMolFeaturizer,
    "weave": dc.feat.WeaveFeaturizer,
    "molgraphconv": dc.feat.MolGraphConvFeaturizer,
    "dummy": dc.feat.DummyFeaturizer,
    "grover": dc.feat.GroverFeaturizer,
    "rdkitconformer": dc.feat.RDKitConformerFeaturizer,
    "dmpnn": dc.feat.DMPNNFeaturizer,
}


[docs] def split_dataset(dataset_path: str, file_type: str, n_partition: int, available_checkpoints: List[int]) -> List[str]: """Split the dataset into n partitions. Parameters ---------- dataset_path : str The path to the dataset. file_type : str The type of the dataset (e.g., 'csv', 'sdf'). n_partition : int The number of partitions to split the dataset into. available_checkpoints : list of int List of checkpoint partition IDs that are already available. Returns ------- list of str The list of file paths of the partitioned datasets. Raises ------ NotImplementedError If the file type is not supported for featurization. """ basedir = os.path.dirname(dataset_path) datasets = [] if file_type == 'csv': df = pd.read_csv(dataset_path) part_size = df.shape[0] // n_partition overflow = df.shape[0] % n_partition # remainder of the datapoints after n equal partitions for i in range(n_partition): if i in available_checkpoints: continue start = i * part_size end = (i + 1) * part_size if i == n_partition - 1: end += overflow # adds overflow datapoints to the last partition df_subset = df.iloc[start:end] dest_dir = os.path.join(basedir, f'part{i}') os.makedirs(dest_dir, exist_ok=True) csv_path = os.path.join(dest_dir, f'part{i}.csv') datasets.append(csv_path) df_subset.to_csv(csv_path, index=False) elif file_type == 'sdf': suppl = Chem.SDMolSupplier(dataset_path) mol_l = [] for mol in suppl: mol_l.append(mol) n_rows = len(mol_l) part_size = n_rows // n_partition overflow = n_rows % n_partition # remainder of the datapoints after n equal partitions for i in range(n_partition): if i in available_checkpoints: continue start = i * part_size end = (i + 1) * part_size if i == n_partition - 1: end += overflow # adds overflow datapoints to the last partition dest_dir = os.path.join(basedir, f'part{i}') os.makedirs(dest_dir, exist_ok=True) sdf_path = os.path.join(dest_dir, f'part{i}.sdf') datasets.append(sdf_path) writer = Chem.SDWriter(sdf_path) for mol in mol_l[start:end]: writer.write(mol) else: raise NotImplementedError(f"Featurization of {dataset_path} not supported.") return datasets
[docs] def featurize_part( main_dataset_address: str, dataset_path: str, file_type: str, featurizer: dc.feat.Featurizer, dataset_column: str, label_column: Optional[str], checkpoint_output_key: str, nproc: int, ) -> None: """Featurize a part of the dataset. Parameters ---------- main_dataset_address : str Address of the main dataset being featurized. dataset_path : str The path to the dataset partition to featurize. file_type : str The type of the dataset (e.g., 'csv', 'sdf'). featurizer : dc.feat.Featurizer The featurizer to use. dataset_column : str The column containing the input for featurizer. label_column : str, optional The target column in case this dataset is going to be used for training purposes. checkpoint_output_key : str The output key for checkpoint '.partial' folder. nproc : int The total number of partitions being processed. Returns ------- None Raises ------ ValueError If datastore is not set. NotImplementedError If the file type is not supported for featurization. """ datastore = config.get_datastore() if datastore is None: raise ValueError("Datastore not set") dest_dir = os.path.dirname(dataset_path) if file_type == 'csv': if label_column is not None: loader = dc.data.CSVLoader(tasks=[label_column], feature_field=dataset_column, featurizer=featurizer) else: loader = dc.data.CSVLoader(tasks=[], feature_field=dataset_column, featurizer=featurizer) dataset = loader.create_dataset(dataset_path) elif file_type == 'sdf': if label_column is not None: loader = dc.data.SDFLoader(tasks=[label_column], featurizer=featurizer, sanitize=True) else: loader = dc.data.SDFLoader(tasks=[], featurizer=featurizer, sanitize=True) dataset = loader.create_dataset(dataset_path) else: raise NotImplementedError(f"Featurization of '{file_type}' not supported.") dataset.move(dest_dir) checkpoint_id = dataset_path.split('/')[-1].split('.')[0][-1] card = DataCard(address='', file_type='dir', featurizer=featurizer, data_type=type(dataset).__name__, checkpoint_id=checkpoint_id, n_core=nproc, parent=main_dataset_address, description=f"featurized partition of \ {main_dataset_address} : {checkpoint_id} of {nproc-1}") datastore.upload_data_from_memory(dataset, checkpoint_output_key + f'/_checkpoint/part_{checkpoint_id}_of_{nproc-1}', card)
[docs] def featurize_multi_core( main_dataset_address: str, raw_dataset_path: str, file_type: str, feat: dc.feat.Featurizer, dataset_column: str, label_column: Optional[str], basedir: str, nproc: int, checkpoint_output_key: str, available_checkpoints: List[int], ) -> Iterable[Union[List, str]]: """Featurize the dataset in parallel. Parameters ---------- main_dataset_address : str Address of the main dataset being featurized. raw_dataset_path : str The path to the raw dataset. file_type : str The type of the dataset (e.g., 'csv', 'sdf'). feat : dc.feat.Featurizer The featurizer to use. dataset_column : str The column containing the input for featurizer. label_column : str, optional The target column in case this dataset is going to be used for training purposes. basedir : str The base directory where the dataset is stored. nproc : int The number of partitions to split the dataset into. checkpoint_output_key : str The output key for checkpoint '.partial' folder. available_checkpoints : list of int The list of checkpoint ids already completed in the previous run (if any). Returns ------- list A list containing [datasets, merge_dir] where datasets is a list of DiskDataset objects and merge_dir is the directory path for merging. """ dataset_paths = split_dataset(raw_dataset_path, file_type, nproc, available_checkpoints) processes = [] for dataset_path in dataset_paths: p = mp.Process(target=featurize_part, args=(main_dataset_address, dataset_path, file_type, feat, dataset_column, label_column, checkpoint_output_key, nproc)) processes.append(p) p.start() for p in processes: p.join() merge_dir = os.path.join(basedir, 'merged') os.makedirs(merge_dir, exist_ok=True) datasets = [] for i in range(nproc): dataset_dir = os.path.join(basedir, f'part{i}') datasets.append(dc.data.DiskDataset(data_dir=dataset_dir)) return [datasets, merge_dir]
[docs] def featurize( dataset_address: str, featurizer: str, output: str, dataset_column: str, feat_kwargs: Dict = dict(), label_column: Optional[str] = None, n_core: Optional[int] = None, single_core_threshold: Optional[int] = 250, ) -> Optional[str]: """Featurize the dataset at given address with specified featurizer. Writes output to datastore. If the compute node has more than 1 CPU core then the featurization is done by splitting the dataset into parts of equal size and featurizing each part in parallel. The featurized parts are then merged into a single dataset and written to the datastore. The number of parts is equal to the number of cores available on the machine. If the compute node has only 1 CPU core then the featurization will be done in a single process. Restart support: The featurize primitive saves a `(output).partial` folder where the checkpoints are saved until completion. To resume a failed featurize execution, the featurize primitive can be rerun with the same arguments and the checkpoints will be restored from the `(output).partial` folder and the folder is deleted once the featurization process is complete. Note: The restart fails if the n_core < n_core used before restart. Additionally, the checkpoints must belong to the same dataset address as the initial run, otherwise, they will not be considered for the restart. Parameters ---------- dataset_address : str The deepchem address of the dataset to featurize. featurizer : str Has to be a featurizer string in mappings. output : str The name of output featurized dataset in your workspace. dataset_column : str Column containing the input for featurizer. feat_kwargs : dict, optional Keyword arguments to pass to featurizer on initialization, by default {}. label_column : str, optional The target column in case this dataset is going to be used for training purposes. n_core : int, optional The number of cores to use for featurization. single_core_threshold : int, optional The threshold size of the dataset size in megabytes above which multicore featurization will be used, by default 250. Returns ------- str Deepchem address of the featurized dataset. Raises ------ ValueError If featurizer is not recognized, if input column is not specified for CSV files, or if datastore is not set. NotImplementedError If the dataset format is not supported for featurization. """ # TODO Allow a list of label column for multitask learning # TODO: Handle parsing of dictionary via parser featurizer = featurizer.lower() if featurizer not in featurizer_map: raise ValueError(f"Featurizer not recognized.\nAvailable featurizers: {featurizer_map}") if dataset_address.endswith('csv'): if dataset_column == 'None' or dataset_column is None: raise ValueError("Please specify input column.") if isinstance(feat_kwargs, str): feat_kwargs = ast.literal_eval(feat_kwargs) if label_column == 'None': label_column = None feat_kwargs_restore = {} if feat_kwargs: if "features_generator" in feat_kwargs: feat_generator = feat_kwargs["features_generator"] feat_kwargs_restore["features_generator"] = feat_generator feat_kwargs["features_generator"] = featurizer_map[feat_generator]() feat = featurizer_map[featurizer](**feat_kwargs) else: feat = featurizer_map[featurizer]() assert dataset_address.endswith('csv') or dataset_address.endswith('sdf') datastore = config.get_datastore() if datastore is None: raise ValueError("Datastore not set") output_key = DeepchemAddress.get_key(output) checkpoint_output_key = output_key + ".partial" tempdir = tempfile.TemporaryDirectory() basedir = os.path.join(tempdir.name) if datastore.exists(output_key): raise FileExistsError( f"Output address {output_key} already exists in datastore. Please choose a different output name.") # check if _checkpoint/ folder exists in given output folder in datastore available_checkpoints = [] _storage_loc = datastore.storage_loc.rstrip("/") pattern = re.compile(fr"{_storage_loc}/{checkpoint_output_key}/_checkpoint/part_\d+_of_\d+\.cdc$") n_core_set: Set[int] = set() for item in datastore._get_datastore_objects(_storage_loc): match = pattern.search(item) if match: card = datastore.get(item) if card and hasattr(card, 'parent') and card.parent == dataset_address: n_core_set.add(card.n_core) available_checkpoints.append(int(card.checkpoint_id)) chkpt_tmp_path: str = os.path.join(tempdir.name, f'part{int(card.checkpoint_id)}') chkpt_address = item[:-4] # removes .cdc datastore.download_object(chkpt_address, chkpt_tmp_path) if len(n_core_set) == 1: n_core = list(n_core_set)[0] if n_core > os.cpu_count(): # type: ignore raise Exception( f"Current job config is insufficient to restart the job as it requires atleast {n_core//2} vcpus") elif len(n_core_set) > 1: raise Exception("Checkpoints found with more than one partition type") if n_core is None: nproc = os.cpu_count() else: nproc = n_core if dataset_address.endswith('csv'): raw_dataset_path = os.path.join(tempdir.name, 'temp.csv') dataset_size = datastore.get_file_size(dataset_address) datastore.download_object(dataset_address, raw_dataset_path) file_size = dataset_size / (1000 * 1000) if nproc and nproc > 1 and file_size and single_core_threshold and file_size > single_core_threshold: datasets, merge_dir = featurize_multi_core(dataset_address, raw_dataset_path, 'csv', feat, dataset_column, label_column, basedir, nproc, checkpoint_output_key, available_checkpoints) dataset = dc.data.DiskDataset.merge(datasets, merge_dir=merge_dir) else: if label_column is not None: loader = dc.data.CSVLoader(tasks=[label_column], feature_field=dataset_column, featurizer=feat) else: loader = dc.data.CSVLoader(tasks=[], feature_field=dataset_column, featurizer=feat) dataset = loader.create_dataset(raw_dataset_path) elif dataset_address.endswith('sdf'): raw_dataset_path = os.path.join(tempdir.name, 'temp.sdf') dataset_size = datastore.get_file_size(dataset_address) datastore.download_object(dataset_address, raw_dataset_path) file_size = dataset_size / (1024 * 1024) if nproc and nproc > 1 and file_size and single_core_threshold and file_size > single_core_threshold: datasets, merge_dir = featurize_multi_core(dataset_address, raw_dataset_path, 'sdf', feat, dataset_column, label_column, basedir, nproc, checkpoint_output_key, available_checkpoints) dataset = dc.data.DiskDataset.merge(datasets, merge_dir=merge_dir) else: if label_column is not None: loader = dc.data.SDFLoader(tasks=[label_column], featurizer=feat, sanitize=True) else: loader = dc.data.SDFLoader(tasks=[], featurizer=feat, sanitize=True) dataset = loader.create_dataset(raw_dataset_path) else: raise NotImplementedError(f"Featurization of {dataset_address} not supported.") for key, value in feat_kwargs_restore.items(): feat_kwargs[key] = value card = DataCard(address='', file_type='dir', featurizer=featurizer, data_type=type(dataset).__name__, feat_kwargs=feat_kwargs) featurized_address = datastore.upload_data_from_memory(dataset, output_key, card) if checkpoint_output_key + '/' in datastore.list_data(): datastore.delete_object(address=_storage_loc + "/" + checkpoint_output_key, kind='dir') return featurized_address