import os
import argparse
import h5py
from pathlib import Path
from pymongo import MongoClient
from pharmaforge.io import modify_molecule_name
from pharmaforge.labeling import count_molecules, count_water_molecules
[docs]
def process_hdf5_folder(folder_path, database_name, level_of_theory=None, data_source=None):
"""
Process all HDF5 files in a folder and create MongoDB collections for each HDF5 file.
Parameters
----------
folder_path : str
Path to the folder containing HDF5 files
database_name : str
Name of the MongoDB database to use
level_of_theory : str, optional
Level of theory to use for all collections
data_source : str, optional
Data source identifier for all collections
Raises
------
FileNotFoundError
If no HDF5 files are found in the specified folder
RuntimeError
If an error occurs while processing a file
TODO
----
- Remove hardcoded MongoDB connection string
"""
# Connect to MongoDB
try:
client = MongoClient("mongodb://localhost:27017")
db = client[database_name]
except Exception as e:
raise RuntimeError(f"Error connecting to MongoDB: {e}")
# Get all HDF5 files in the folder
hdf5_files = [f for f in os.listdir(folder_path) if f.endswith(('.h5', '.hdf5'))]
if not hdf5_files:
raise FileNotFoundError(f"No HDF5 files found in {folder_path}")
print(f"Found {len(hdf5_files)} HDF5 files")
# Process each HDF5 file
for hdf5_file in hdf5_files:
try:
# Get the part before the first underscore and append _qdpi
base_name = Path(hdf5_file).stem # Get the base name without extension
prefix = base_name.split('_')[0] # Take the part before first underscore
collection_name = f"{prefix}_qdpi"
# Ensure the name is MongoDB-friendly
collection_name = ''.join(c if c.isalnum() or c == '_' else '_' for c in collection_name)
# Get the full file path
file_path = os.path.join(folder_path, hdf5_file)
print(f"\nProcessing {hdf5_file}...")
print(f"Creating collection: {collection_name}")
# Check if the collection already exists
if collection_name in db.list_collection_names():
#check if the theory level is the same
existing_collection = db[collection_name]
existing_level_of_theory = existing_collection.find_one({"level_of_theory": {"$exists": True}})
if existing_level_of_theory and existing_level_of_theory.get("level_of_theory") != level_of_theory:
add_theory_to_collection(
existing_collection,
file_path,
level_of_theory=level_of_theory,
data_source=data_source
)
else:
# Create the collection
collection = db[collection_name]
# Process the file
create_collection(
collection,
file_path,
level_of_theory=level_of_theory,
data_source=data_source
)
print(f"Successfully processed {hdf5_file}")
except Exception as e:
raise RuntimeError(f"Error processing {hdf5_file}: {e}")
print("\nProcessing complete!")
[docs]
def create_collection(collection_name, hdf5_file_path, level_of_theory="wB97M-D3(BJ)/def2-TZVPPD", data_source=None):
"""
Function to create mongodb collections using the hdf5 files having smiles notation
This adds extra fields including atom_coeff if present, and related configuration fields
Parameters
----------
collection_name : str
Name of the MongoDB collection to create.
hdf5_file_path : str
Path to the HDF5 file containing molecule data.
level_of_theory : str
Level of theory used for the calculations.
data_source : str
Source of the data, if applicable.
Returns
-------
None
Raises
------
FileNotFoundError
If the specified HDF5 file does not exist.
"""
hdf5_file_path = Path(hdf5_file_path)
if not hdf5_file_path.exists():
raise FileNotFoundError(f"The file {hdf5_file_path} does not exist.")
print(f"Processing file: {hdf5_file_path} for collection: {collection_name}")
with h5py.File(hdf5_file_path, 'r') as hdf_file:
for molecule_id in hdf_file.keys(): # Each molecule (e.g., C10Cl0F0H16N2O2S0)
molecule_data = {
"molecule_id": molecule_id,
"smiles": hdf_file[molecule_id]["smiles"][()].decode("utf-8") if "smiles" in hdf_file[molecule_id] else None,
"nopbc": bool(hdf_file[molecule_id]["nopbc"][()]) if "nopbc" in hdf_file[molecule_id] else None,
"type.raw": hdf_file[molecule_id]["type.raw"][:].tolist() if "type.raw" in hdf_file[molecule_id] else [],
"type_map.raw": [x.decode("utf-8") for x in hdf_file[molecule_id]["type_map.raw"][:]] if "type_map.raw" in hdf_file[molecule_id] else []
}
new_mol_name, contains_elements = modify_molecule_name(molecule_id)
molecule_data["contains_elements"] = contains_elements
molecule_data["chemical_formula"] = new_mol_name
if "smiles" in hdf_file[molecule_id]:
nmols = count_molecules(hdf_file[molecule_id]["smiles"][()].decode("utf-8"))
nwatermols = count_water_molecules(hdf_file[molecule_id]["smiles"][()].decode("utf-8"))
molecule_data["nmols"] = nmols
molecule_data["nwatermols"] = nwatermols
if "charge" in hdf_file[molecule_id]:
molecule_data["charge"] = int(hdf_file[molecule_id]["charge"][()])
molecule_data["theorylevels"] = {
level_of_theory: {}
}
if "set.000" in hdf_file[molecule_id]:
molecule_data["coordinates"] = hdf_file[molecule_id]["set.000"]["coord.npy"][:].tolist()
set_000_group = hdf_file[molecule_id]["set.000"]
molecule_data["theorylevels"][level_of_theory] = {
"energies": set_000_group["energy.npy"][:].tolist(),
"forces": set_000_group["force.npy"][:].tolist(),
}
molecule_data["nconfigs"] = len(set_000_group["energy.npy"][:])
# Check if atom_coeff.npy exists in set.000
has_atom_coeff = "atom_ener_coeff.npy" in set_000_group
if has_atom_coeff:
molecule_data["atom_ener_coeff"] = set_000_group["atom_ener_coeff.npy"][:].tolist()
molecule_data["Rel"] = True
# Add data_source if provided
if data_source is not None:
molecule_data["data_source"] = data_source
# Inserting the molecule document into MongoDB
collection_name.insert_one(molecule_data)
print(f"Data inserted successfully to {collection_name}")
[docs]
def add_theory_to_collection(collection_name, hdf5_file_path, level_of_theory):
"""
Add a new level of theory to an existing MongoDB collection using data from an HDF5 file.
Parameters
----------
collection_name : pymongo.collection.Collection
The MongoDB collection to update.
hdf5_file_path : str
Path to the HDF5 file containing the new level of theory data.
level_of_theory : str
The name of the new level of theory to add.
Returns
-------
None
Raises
------
FileNotFoundError
If the specified HDF5 file does not exist.
KeyError
If the molecule ID does not exist in the collection.
"""
hdf5_file_path = Path(hdf5_file_path)
if not hdf5_file_path.exists():
raise FileNotFoundError(f"The file {hdf5_file_path} does not exist.")
print(f"Processing file: {hdf5_file_path} to add theory level: {level_of_theory}")
with h5py.File(hdf5_file_path, 'r') as hdf_file:
for molecule_id in hdf_file.keys():
if not collection_name.find_one({"molecule_id": molecule_id}):
raise KeyError(f"Molecule ID {molecule_id} does not exist in the collection. Skipping.")
if "set.000" in hdf_file[molecule_id]:
set_000_group = hdf_file[molecule_id]["set.000"]
theory_data = {
"energies": set_000_group["energy.npy"][:].tolist(),
"forces": set_000_group["force.npy"][:].tolist(),
}
# Update the document in the collection
result = collection_name.update_one(
{"molecule_id": molecule_id},
{"$set": {f"theorylevels.{level_of_theory}": theory_data}}
)
if result.modified_count > 0:
print(f"Updated molecule: {molecule_id} with new theory level: {level_of_theory}")
else:
print(f"No document found for molecule: {molecule_id}. Skipping.")
print(f"Finished adding theory level: {level_of_theory} to the collection.")
return
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Process HDF5 files and create MongoDB collections.')
# Required arguments
parser.add_argument('folder_path', type=str,
help='Path to the folder containing HDF5 files')
parser.add_argument('database_name', type=str,
help='Name of the MongoDB database')
# Optional arguments
parser.add_argument('--level-of-theory', type=str,
default="wB97M-D3(BJ)/def2-TZVPPD",
help='Level of theory for the collections (default: wB97M-D3(BJ)/def2-TZVPPD)')
parser.add_argument('--data-source', type=str,
help='Data source identifier for the collections')
args = parser.parse_args()
process_hdf5_folder(
folder_path=args.folder_path,
database_name=args.database_name,
level_of_theory=args.level_of_theory,
data_source=args.data_source
)