Source code for pharmaforge.io.hdf5_utils

import h5py
import re, shutil
import os, sys
import numpy as np

from typing import Tuple, List
from rdkit import Chem
from rdkit.Chem import rdDetermineBonds
from pathlib import Path

from pharmaforge.database import DataBase



[docs] def modify_molecule_name(mol_name: str) -> Tuple[str, List[str]]: """ Modifies the molecule name by removing zero count elements and creating a type map. Parameters ---------- mol_name : str The original name of the molecule, which contains elements and their counts. Returns ------- Tuple[str, List[str]] A tuple containing the modified molecule name and a list of element types. """ elements = re.findall(r'([A-Za-z]+)(\d+)', mol_name) new_name = ''.join(f"{el}{count}" for el, count in elements if int(count) > 0) type_map = [el for el, count in elements if int(count) > 0] return new_name, type_map
[docs] def modify_hdf5_file(original_file: str, new_file: str) -> None: """ Modifies an HDF5 file by renaming molecules and creating a type map. Parameters ---------- original_file : str Path to the original HDF5 file. new_file : str Path to the new HDF5 file to be created. Returns ------- None The function creates a new HDF5 file with modified molecule names and type maps. Raises ------ FileNotFoundError If the original file does not exist. ValueError If the original file is not an HDF5 file or if the original and new files are the same. """ original_file = Path(original_file) if not original_file.is_file(): raise FileNotFoundError(f"Original file '{original_file}' does not exist.") if original_file == new_file: raise ValueError(f"Original file '{original_file}' and new file '{new_file}' are the same.") with h5py.File(original_file, 'r') as orig_db, h5py.File(new_file, 'w') as new_db: for mol_name in orig_db.keys(): new_name, type_map = modify_molecule_name(mol_name) orig_db.copy(mol_name, new_db, name=new_name) mol_group = new_db[new_name] if 'type_map.raw' in mol_group: del mol_group['type_map.raw'] mol_group.create_dataset('type_map.raw', data=type_map) print(f"Modified HDF5 file created: {new_file}")
[docs] def process_multiple_hdf5_files(files: List[str]) -> None: """ Process multiple HDF5 files by modifying their contents and creating new files. Parameters ---------- files : List[str] A list of paths to the original HDF5 files to be processed. Returns ------- None The function creates new HDF5 files with modified contents based on the original files. Raises ------ None The function handles file operations and exceptions internally. """ for original_file in files: new_file = original_file.replace('.hdf5', '_modified.hdf5') modify_hdf5_file(original_file, new_file)
[docs] def save_molecule_to_xyz(db, db_name, mol, output_file='molecule_structure.xyz', config=0): """ Saves a molecule's coordinates and types to a temporary XYZ file. Parameters ---------- db : DataBase An instance of the DataBase class containing the HDF5 data. db_name : str The key of the database to access. mol : str The name of the molecule to be saved. output_file : str The name of the output XYZ file. config : int The configuration option for the XYZ file. Returns ------- None The function creates an XYZ file with the molecule's coordinates and types. Raises ------ None The function handles file operations and exceptions internally. """ # fetching molecule information db_instance = db.data[db_name] # Check if the molecule is in the database format OR if its in the deepmdkit-format if "set.000" in db_instance[mol]: a = db_instance[mol]['set.000']['coord.npy'][:] else: a = db_instance[mol]['coordinates'][:] type_raw = db.data[db_name][mol]['type.raw'][:] type_map = db.data[db_name][mol]['type_map.raw'][:] coords = np.reshape(a[config], (-1, 3)) num_atoms = len(type_raw) molecule_name = mol with open(output_file, 'w') as file: # Write the first two lines file.write(f"{num_atoms}\n") file.write(f"Molecule: {molecule_name}\n") for i, atom_type_index in enumerate(type_raw): atom_symbol = type_map[atom_type_index].decode("utf-8") # Decode bytes to string atom_coords = coords[i] file.write(f"{atom_symbol} {atom_coords[0]:.5f} {atom_coords[1]:.5f} {atom_coords[2]:.5f}\n") print("Molecular structure saved to molecule_structure.xyz")
[docs] def assign_smiles(molecule_name, use_suggested_charge=False, maxcount=2, xyz_file='molecule_structure.xyz'): """ Attempts to assign a SMILES notation to a molecule using its XYZ coordinates. The function tries multiple possible formal charges to determine bonds and generate a valid SMILES representation using RDKit. If all attempts fail, the molecule name is added to the failed_mols list. The use_suggested_charge option allows the function to use the suggested charge from the RDKit error message, which usually reads something like: `Final molecular charge (0) does not match input (-1); could not find valid bond ordering.` In this case, it would have originally tried charge 0, but then the next charge it will attempt is -1. The max_count option allows the user to set how many times the function will try this before giving up and not generating a SMILES at all. .. warning:: This function, with the use_suggested_charge option, will try to assign the charge based on the error message from RDKit. Thus, it is important to implemennt some degree of skepticism with respect to whether the charges are actually the true charges or not. Parameters ---------- molecule_name : str The name of the molecule for which to generate SMILES. failed_mols : list A list to keep track of failed molecules. use_suggested_charge : bool, optional If True, use the suggested charge from the error message. Default is False. maxcount : int, optional The maximum number of attempts to generate SMILES with different charges. Only works when use_suggested_charge is true. Default is 5. Otherwise, if use_suggested_charge is false, the function will only try charge 0. xyz_file : str, optional The path to the XYZ file containing the molecule's coordinates. Default is 'molecule_structure.xyz', which is the default for the save_molecule_to_xyz function. Returns ------- str or None The SMILES notation for the molecule, or None if generation failed. int or None The charge used to generate the SMILES notation, or None if generation failed. Raises ------ FileNotFoundError If the XYZ file does not exist. """ xyz_file = Path(xyz_file) if not xyz_file.is_file(): print(f"XYZ file '{xyz_file}' does not exist.") raise FileNotFoundError(f"XYZ file '{xyz_file}' does not exist.") raw_mol = Chem.rdmolfiles.MolFromXYZFile("molecule_structure.xyz") if raw_mol is None: print(f"Failed to load molecule '{molecule_name}' from XYZ file.") return None charge=0 stop=False count=0 while not stop: print("Trying charge:", charge) mol = Chem.Mol(raw_mol) # Create a fresh copy each time try: rdDetermineBonds.DetermineBonds(mol, charge=charge) smiles = Chem.MolToSmiles(mol) print(f"SMILES notation for {molecule_name} generated with charge {charge}: {smiles}") return smiles, charge # Return immediately on success except Exception as e: print(f"Charge {charge} failed for '{molecule_name}' with error: {e}") if use_suggested_charge == True: if "does not match input" not in str(e): break charge = int(str(e).split("input ")[-1].split(")")[0].split("(")[-1]) else: print(f"Stop hit, stopping...") stop=True count += 1 if count >= maxcount: print(f"Max attempts reached for molecule '{molecule_name}'.") return None, None continue # Try next charge # If all charges fail print(f"All attempts failed for molecule '{molecule_name}'.") return None, None
# def assign_smiles(molecule_name, failed_mols): # raw_mol = Chem.rdmolfiles.MolFromXYZFile("molecule_structure.xyz") # if raw_mol is None: # print(f"Failed to load molecule '{molecule_name}' from XYZ file.") # failed_mols.append(molecule_name) # return None # mol = Chem.Mol(raw_mol) # try: # rdDetermineBonds.DetermineBonds(mol, charge=0) # smiles = Chem.MolToSmiles(mol) # print(f"SMILES notation for {molecule_name} is {smiles}") # return smiles # except Exception as e: # print(f"Error determining bonds for molecule '{molecule_name}': {e}") # failed_mols.append(molecule_name) # return None
[docs] def verify_all_shared_molecules(database_instance, db_key1, db_key2): """ Verify that all shared molecules between two databases have identical attributes, including nested groups like 'set.000'. Parameters ---------- database_instance : DataBase An instance of the DataBase class containing the HDF5 data. db_key1 : str The key of the first database to compare. db_key2 : str The key of the second database to compare. Returns ------- None Prints the matched and unmatched molecules between the two databases. """ # Step 1: Find shared molecules between the two databases shared_molecules = database_instance.find_shared_molecules(db_key1, db_key2) if not shared_molecules: print(f"No shared molecules found between {db_key1} and {db_key2}.") return matched_mols = [] unmatched_mols = [] for mol_key in shared_molecules: db1_molecule = database_instance.data[db_key1][mol_key] db2_molecule = database_instance.data[db_key2][mol_key] all_features_same = True # Flag to track if all features are identical for feature in db1_molecule.keys(): if feature == 'set.000': # Handle nested datasets in 'set.000' for sub_feature in db1_molecule[feature].keys(): mol_data1 = db1_molecule[feature][sub_feature][:] mol_data2 = db2_molecule[feature][sub_feature][:] # Add detailed comparison debug output if not np.array_equal(mol_data1, mol_data2): all_features_same = False break # Exit sub_feature comparison if there's a mismatch elif feature == 'nopbc': # Handle scalar attribute 'nopbc' mol_data1 = db1_molecule[feature][()] mol_data2 = db2_molecule[feature][()] # Add detailed comparison debug output if not np.array_equal(mol_data1, mol_data2): all_features_same = False break # Exit feature comparison if there's a mismatch else: mol_data1 = db1_molecule[feature][:] mol_data2 = db2_molecule[feature][:] if not np.array_equal(mol_data1, mol_data2): all_features_same = False break if all_features_same: matched_mols.append(mol_key) else: unmatched_mols.append(mol_key) print("Matched molecules:", matched_mols) print("Unmatched molecules:", unmatched_mols)
[docs] def check_shared_molecules(database_instance): """ Check which molecules are shared between all databases in the DataBase instance. Parameters ---------- database_instance : DataBase An instance of the DataBase class containing the HDF5 data. Returns ------- None Prints the shared molecules between all databases. """ db_keys = list(database_instance.data.keys()) for i in range(len(db_keys)): for j in range(i + 1, len(db_keys)): db_key1 = db_keys[i] db_key2 = db_keys[j] shared_molecules = database_instance.find_shared_molecules(db_key1, db_key2) if shared_molecules: print(f"Databases {db_key1} and {db_key2} have {len(shared_molecules)} shared molecules: {shared_molecules}") else: print(f"Databases {db_key1} and {db_key2} do not have shared molecules.")
# This is a helper function which can be used when you don't have hdf5 files with smiles.
[docs] def new_hdf5_file_with_smiles(original_file: str, new_file: str, exist_ok=False, use_suggested_charge=False, maxcount=5) -> None: """ Create a new HDF5 file with SMILES notation for each molecule. This function reads the original HDF5 file, generating a SMILES string for each molecule and saving it in the new file. The function also handles the case where the original file and the new file are the same, and raises appropriate errors if the files do not exist or if the new file already exists. Parameters ---------- original_file : str Path to the original HDF5 file. new_file : str Path to the new HDF5 file to be created. exist_ok : bool If True, overwrite the existing file if it exists. Default is False. use_suggested_charge : bool If True, use the suggested charge from the error message. Default is False. (option for assign_charge) maxcount : int The maximum number of attempts to generate SMILES with different charges. Default is 5. (option for assign_charge) Raises ------ FileNotFoundError If the original file does not exist. ValueError If the original file is not an HDF5 file. FileExistsError If the new file already exists and exist_ok is False. OSError If the directory for the new file cannot be created. See Also -------- molecule_to_xyz : Function to save a molecule's coordinates and types to a temporary XYZ file. assign_smiles : Function to generate SMILES notation for a molecule using its XYZ coordinates. """ original_file = Path(original_file) if not original_file.is_file(): raise FileNotFoundError(f"Original file '{original_file}' does not exist.") if not original_file.suffix == '.hdf5': raise ValueError(f"Original file '{original_file}' is not an HDF5 file.") if original_file == new_file: raise ValueError(f"Original file '{original_file}' and new file '{new_file}' are the same.") new_file = Path(new_file) if new_file.is_file() and not exist_ok: raise FileExistsError(f"New file '{new_file}' already exists. Please choose a different name.") if new_file.suffix != '.hdf5': raise ValueError(f"New file '{new_file}' is not an HDF5 file.") # check that the path to the new file exists, and if not, make it if not new_file.parent.is_dir(): try: new_file.parent.mkdir(parents=True) except Exception as e: raise OSError(f"Could not create directory for new file '{new_file}': {e}") failed_mols = [] db = DataBase() db.add_data(original_file) #original_filename = os.path.basename(original_file) fstring = original_file.stem with h5py.File(original_file, 'r') as orig_db, h5py.File(new_file, 'w') as new_db: for mol_name in orig_db.keys(): # Check if molecule exists in the original database if mol_name not in db.data.get(fstring, {}): print(f"Molecule '{mol_name}' not found in original db data") continue # Copy molecule data to the new file orig_db.copy(mol_name, new_db) # Try configs of the molecule until a smiles is generated. found = False if found: break save_molecule_to_xyz(db, fstring, mol_name, config=0) # Generate SMILES try: smiles, charge = assign_smiles(mol_name, use_suggested_charge=use_suggested_charge, maxcount=maxcount) found = True except ValueError as e: print(f"{mol_name} failed due to an error in SMILES generation: {e}, moving to next config.") found = False continue if not found or smiles is None: print("Not found, trying openbabel...") try: print("Trying openbabel...") from openbabel import openbabel obConversion = openbabel.OBConversion() obConversion.SetInAndOutFormats("xyz", "smi") mol = openbabel.OBMol() obConversion.ReadFile(mol, "molecule_structure.xyz") smiles = str(obConversion.WriteString(mol)) charge = mol.GetTotalCharge() except Exception as e: print(f"Failed to generate SMILES for molecule '{mol_name}' in {fstring}: {e}") shutil.copyfile("molecule_structure.xyz", f"{mol_name}_failed.xyz") # Copy the original XYZ file back failed_mols.append(mol_name) continue if smiles is not None: smiles = smiles.strip().split("\t")[0] # Clean up the SMILES string print("SMILES generated:", smiles) # Add SMILES notation to the new HDF5 file if successful mol_group = new_db[mol_name] if smiles is not None: if "smiles" in mol_group: print(f"Deleted existing SMILES notation for '{mol_name}' to add a new one") del mol_group["smiles"] mol_group.create_dataset("smiles", data=smiles.encode("utf-8")) mol_group.create_dataset("charge", data=charge) else: print(f"Failed to generate SMILES for '{mol_name}', skipping SMILES addition.") print(f"Modified HDF5 file with SMILES notation created: '{new_file}'") if failed_mols: print("Failed to process the following molecules:", failed_mols)
if __name__ == "main": hdf5_files = ['ani.hdf5', 'comp6.hdf5', 'freesolvmd.hdf5', 'geom.hdf5', 'spice.hdf5', 're.hdf5', 'remd.hdf5' ] process_multiple_hdf5_files(hdf5_files)