Source code for pharmaforge.dbutils.create_mongodb_collections

import os
import argparse
import h5py

from pathlib import Path
from pymongo import MongoClient

from pharmaforge.io import modify_molecule_name
from pharmaforge.labeling import count_molecules, count_water_molecules



[docs] def process_hdf5_folder(folder_path, database_name, level_of_theory=None, data_source=None): """ Process all HDF5 files in a folder and create MongoDB collections for each HDF5 file. Parameters ---------- folder_path : str Path to the folder containing HDF5 files database_name : str Name of the MongoDB database to use level_of_theory : str, optional Level of theory to use for all collections data_source : str, optional Data source identifier for all collections Raises ------ FileNotFoundError If no HDF5 files are found in the specified folder RuntimeError If an error occurs while processing a file TODO ---- - Remove hardcoded MongoDB connection string """ # Connect to MongoDB try: client = MongoClient("mongodb://localhost:27017") db = client[database_name] except Exception as e: raise RuntimeError(f"Error connecting to MongoDB: {e}") # Get all HDF5 files in the folder hdf5_files = [f for f in os.listdir(folder_path) if f.endswith(('.h5', '.hdf5'))] if not hdf5_files: raise FileNotFoundError(f"No HDF5 files found in {folder_path}") print(f"Found {len(hdf5_files)} HDF5 files") # Process each HDF5 file for hdf5_file in hdf5_files: try: # Get the part before the first underscore and append _qdpi base_name = Path(hdf5_file).stem # Get the base name without extension prefix = base_name.split('_')[0] # Take the part before first underscore collection_name = f"{prefix}_qdpi" # Ensure the name is MongoDB-friendly collection_name = ''.join(c if c.isalnum() or c == '_' else '_' for c in collection_name) # Get the full file path file_path = os.path.join(folder_path, hdf5_file) print(f"\nProcessing {hdf5_file}...") print(f"Creating collection: {collection_name}") # Check if the collection already exists if collection_name in db.list_collection_names(): #check if the theory level is the same existing_collection = db[collection_name] existing_level_of_theory = existing_collection.find_one({"level_of_theory": {"$exists": True}}) if existing_level_of_theory and existing_level_of_theory.get("level_of_theory") != level_of_theory: add_theory_to_collection( existing_collection, file_path, level_of_theory=level_of_theory, data_source=data_source ) else: # Create the collection collection = db[collection_name] # Process the file create_collection( collection, file_path, level_of_theory=level_of_theory, data_source=data_source ) print(f"Successfully processed {hdf5_file}") except Exception as e: raise RuntimeError(f"Error processing {hdf5_file}: {e}") print("\nProcessing complete!")
[docs] def create_collection(collection_name, hdf5_file_path, level_of_theory="wB97M-D3(BJ)/def2-TZVPPD", data_source=None): """ Function to create mongodb collections using the hdf5 files having smiles notation This adds extra fields including atom_coeff if present, and related configuration fields Parameters ---------- collection_name : str Name of the MongoDB collection to create. hdf5_file_path : str Path to the HDF5 file containing molecule data. level_of_theory : str Level of theory used for the calculations. data_source : str Source of the data, if applicable. Returns ------- None Raises ------ FileNotFoundError If the specified HDF5 file does not exist. """ hdf5_file_path = Path(hdf5_file_path) if not hdf5_file_path.exists(): raise FileNotFoundError(f"The file {hdf5_file_path} does not exist.") print(f"Processing file: {hdf5_file_path} for collection: {collection_name}") with h5py.File(hdf5_file_path, 'r') as hdf_file: for molecule_id in hdf_file.keys(): # Each molecule (e.g., C10Cl0F0H16N2O2S0) molecule_data = { "molecule_id": molecule_id, "smiles": hdf_file[molecule_id]["smiles"][()].decode("utf-8") if "smiles" in hdf_file[molecule_id] else None, "nopbc": bool(hdf_file[molecule_id]["nopbc"][()]) if "nopbc" in hdf_file[molecule_id] else None, "type.raw": hdf_file[molecule_id]["type.raw"][:].tolist() if "type.raw" in hdf_file[molecule_id] else [], "type_map.raw": [x.decode("utf-8") for x in hdf_file[molecule_id]["type_map.raw"][:]] if "type_map.raw" in hdf_file[molecule_id] else [] } new_mol_name, contains_elements = modify_molecule_name(molecule_id) molecule_data["contains_elements"] = contains_elements molecule_data["chemical_formula"] = new_mol_name if "smiles" in hdf_file[molecule_id]: nmols = count_molecules(hdf_file[molecule_id]["smiles"][()].decode("utf-8")) nwatermols = count_water_molecules(hdf_file[molecule_id]["smiles"][()].decode("utf-8")) molecule_data["nmols"] = nmols molecule_data["nwatermols"] = nwatermols if "charge" in hdf_file[molecule_id]: molecule_data["charge"] = int(hdf_file[molecule_id]["charge"][()]) molecule_data["theorylevels"] = { level_of_theory: {} } if "set.000" in hdf_file[molecule_id]: molecule_data["coordinates"] = hdf_file[molecule_id]["set.000"]["coord.npy"][:].tolist() set_000_group = hdf_file[molecule_id]["set.000"] molecule_data["theorylevels"][level_of_theory] = { "energies": set_000_group["energy.npy"][:].tolist(), "forces": set_000_group["force.npy"][:].tolist(), } molecule_data["nconfigs"] = len(set_000_group["energy.npy"][:]) # Check if atom_coeff.npy exists in set.000 has_atom_coeff = "atom_ener_coeff.npy" in set_000_group if has_atom_coeff: molecule_data["atom_ener_coeff"] = set_000_group["atom_ener_coeff.npy"][:].tolist() molecule_data["Rel"] = True # Add data_source if provided if data_source is not None: molecule_data["data_source"] = data_source # Inserting the molecule document into MongoDB collection_name.insert_one(molecule_data) print(f"Data inserted successfully to {collection_name}")
[docs] def add_theory_to_collection(collection_name, hdf5_file_path, level_of_theory): """ Add a new level of theory to an existing MongoDB collection using data from an HDF5 file. Parameters ---------- collection_name : pymongo.collection.Collection The MongoDB collection to update. hdf5_file_path : str Path to the HDF5 file containing the new level of theory data. level_of_theory : str The name of the new level of theory to add. Returns ------- None Raises ------ FileNotFoundError If the specified HDF5 file does not exist. KeyError If the molecule ID does not exist in the collection. """ hdf5_file_path = Path(hdf5_file_path) if not hdf5_file_path.exists(): raise FileNotFoundError(f"The file {hdf5_file_path} does not exist.") print(f"Processing file: {hdf5_file_path} to add theory level: {level_of_theory}") with h5py.File(hdf5_file_path, 'r') as hdf_file: for molecule_id in hdf_file.keys(): if not collection_name.find_one({"molecule_id": molecule_id}): raise KeyError(f"Molecule ID {molecule_id} does not exist in the collection. Skipping.") if "set.000" in hdf_file[molecule_id]: set_000_group = hdf_file[molecule_id]["set.000"] theory_data = { "energies": set_000_group["energy.npy"][:].tolist(), "forces": set_000_group["force.npy"][:].tolist(), } # Update the document in the collection result = collection_name.update_one( {"molecule_id": molecule_id}, {"$set": {f"theorylevels.{level_of_theory}": theory_data}} ) if result.modified_count > 0: print(f"Updated molecule: {molecule_id} with new theory level: {level_of_theory}") else: print(f"No document found for molecule: {molecule_id}. Skipping.") print(f"Finished adding theory level: {level_of_theory} to the collection.") return
if __name__ == "__main__": parser = argparse.ArgumentParser(description='Process HDF5 files and create MongoDB collections.') # Required arguments parser.add_argument('folder_path', type=str, help='Path to the folder containing HDF5 files') parser.add_argument('database_name', type=str, help='Name of the MongoDB database') # Optional arguments parser.add_argument('--level-of-theory', type=str, default="wB97M-D3(BJ)/def2-TZVPPD", help='Level of theory for the collections (default: wB97M-D3(BJ)/def2-TZVPPD)') parser.add_argument('--data-source', type=str, help='Data source identifier for the collections') args = parser.parse_args() process_hdf5_folder( folder_path=args.folder_path, database_name=args.database_name, level_of_theory=args.level_of_theory, data_source=args.data_source )