Source code for pharmaforge.interfaces.abstractio

import numpy as np

from abc import ABC, abstractmethod
from ase.neighborlist import NeighborList
from ase.constraints import FixAtoms, FixInternals
from ase.optimize import BFGS
from multiprocessing import Pool, Manager

from pharmaforge.queries import Query


[docs] class AbstractIO(ABC):
[docs] def ObtainDeepData(self, filename): """ Obtain the system data from the DeepIO interface and converts each configuration to an ASE atoms object. Parameters ---------- filename : str The name of the file containing the system data. Returns ------- None """ from pharmaforge.interfaces import DPDataInterface dpdata_inter = DPDataInterface() dpsystems = dpdata_inter.read(filename) self.systems = {} for system in dpsystems: ase_structures = system.to_ase_structure() self.systems[system.formula]=ase_structures
[docs] def ObtainQueryData(self, queryresults): """ Obtain the system data from a query Parameters ---------- queryresults : str The results of a query to obtain the system data. Returns ------- None See Also -------- pharmaforge.queries.Query.results_to_ase : Function to convert query results to ASE atoms object. """ self.systems = {} for result in queryresults: uniq_formula=result.get('molecule_id') self.systems[uniq_formula] = Query.result_to_ase(result)
[docs] @staticmethod def find_clusters(structure, cutoff=10.0): """Identify clusters of atoms in a structure based on a distance cutoff. Parameters ---------- structure : ase.Atoms The ASE atoms object containing the system data. cutoff : float The distance cutoff for identifying clusters. Returns ------- clusters : list of ase.Atoms A list of ASE atoms objects, each representing a cluster. """ neighbor_list = NeighborList([cutoff / 2.0] * len(structure), self_interaction=False, bothways=True) neighbor_list.update(structure) visited = set() clusters = [] def dfs(atom_idx, cluster): if atom_idx in visited: return visited.add(atom_idx) cluster.append(atom_idx) neighbors = neighbor_list.get_neighbors(atom_idx)[0] for neighbor in neighbors: dfs(neighbor, cluster) original_ener_coeff = structure.info.get("atom_ener_coeff") if original_ener_coeff is None: original_ener_coeff = np.ones(len(structure)) for atom_idx in range(len(structure)): if atom_idx not in visited: cluster_indices = [] dfs(atom_idx, cluster_indices) clusters.append(structure[cluster_indices]) # This subsets the original energy coefficients to the cluster clusters[-1].info["atom_ener_coeff"] = original_ener_coeff[cluster_indices] return clusters
[docs] def calculate(self, limit=0, verbose=False, num_workers=1): """ Calculate the energies and forces of the system using the interface. Parameters ---------- limit : int The maximum number of calculations to perform. Default is 0 (no limit). verbose : bool Whether to print progress information. Default is False. num_workers : int The number of worker processes to use for multiprocessing. Default is 1. Returns ------- energies : dict A dictionary containing the energies of the system. forces : dict A dictionary containing the forces acting on the atoms in the system. Raises ------ ValueError If no system data is found. If no calculator is found. """ if num_workers > 1 and self.allow_parallel: return self._caclulate_parallel(limit, verbose, num_workers) elif num_workers == 1 or not self.allow_parallel: return self._calculate_serial(limit, verbose) else: raise ValueError("num_workers must be greater than 0. Please set num_workers to 1 or greater.")
def _calculate_serial(self, limit=0, verbose=False): """ Calculates the energies and forces of the system using the interface. Parameters ---------- limit : int The maximum number of calculations to perform. Default is 0 (no limit). Returns ------- energies : dict A dictionary containing the energies of the system. forces : dict A dictionary containing the forces acting on the atoms in the system. Raises ------ ValueError If no system data is found. If no calculator is found. """ if not hasattr(self, 'systems'): raise ValueError("No system data found. Please obtain system data first.") if not hasattr(self, 'calculator'): raise ValueError("No calculator found. Please set a calculator first.") self.energies, self.forces = {},{} calc_count, system_count = 0, 0 for formula in self.systems: if verbose: print(f"Calculating system {system_count+1}/{len(self.systems)}") system = self.systems[formula] self.energies[formula], self.forces[formula] = [], [] for config_counter, ase_structure in enumerate(system): if verbose: print(f"Calculating configuration {config_counter} of {len(system)} in system {formula}.", flush=True) if hasattr(ase_structure, "info") and "charge" in ase_structure.info: charge = int(ase_structure.info.get("charge")) else: charge = 0 if limit > 0 and calc_count == limit: print(f"Limit of {limit} calculations reached.") print(f"Total calculations performed: {calc_count}") return self.energies, self.forces clusters = AbstractIO.find_clusters(ase_structure) total_energy = 0.0 total_forces = [] for cluster in clusters: atom_ener_coeff = cluster.info.get("atom_ener_coeff") if len(np.unique(atom_ener_coeff)) > 1: raise ValueError("Each atom in the cluster must have the same atom_ener_coeff ") cluster_coeff = np.unique(atom_ener_coeff)[0] cluster_charge = charge/len(cluster) calc = self._calculate(cluster, charge=cluster_charge) e, f = self._report_data(calc) total_energy += cluster_coeff * e total_forces.extend(cluster_coeff * f) total_forces = np.array(total_forces) self.energies[formula].append(total_energy) self.forces[formula].append(total_forces) calc_count += 1 system_count += 1 print(f"Total calculations performed: {calc_count}") return self.energies, self.forces
[docs] @staticmethod def process_structure(args): system_idx, structure_idx, structure, _calc_func, _report_func = args if hasattr(structure, "info") and "charge" in structure.info: charge = int(structure.info.get("charge")) else: charge = 0 clusters = AbstractIO.find_clusters(structure) total_energy = 0.0 total_forces = [] for cluster in clusters: atom_ener_coeff = cluster.info.get("atom_ener_coeff") if len(np.unique(atom_ener_coeff)) > 1: raise ValueError("Each atom in the cluster must have the same atom_ener_coeff ") cluster_coeff = np.unique(atom_ener_coeff)[0] cluster_charge = charge/len(cluster) calc = _calc_func(cluster, charge=cluster_charge) e, f = _report_func(calc) total_energy += cluster_coeff * e total_forces.extend(cluster_coeff * f) total_forces = np.array(total_forces) e = total_energy f = total_forces return system_idx, structure_idx, e, f
def _caclulate_parallel(self, limit=0, verbose=False, num_workers=4): """ Calculates the energies and forces of the system using multiprocessing. .. warning:: This method is incompatible with previous serial calculations. If you must run both, please run them in separate scripts. Parameters ---------- limit : int The maximum number of calculations to perform. Default is 0 (no limit). verbose : bool Whether to print progress information. Default is False. num_workers : int The number of worker processes to use for multiprocessing. Default is 4. Returns ------- energies : dict A dictionary containing the energies of the system. forces : dict A dictionary containing the forces acting on the atoms in the system. Raises ------ ValueError If no system data is found. If no calculator is found. """ if not hasattr(self, 'systems'): raise ValueError("No system data found. Please obtain system data first.") if not hasattr(self, 'calculator'): raise ValueError("No calculator found. Please set a calculator first.") manager = Manager() energies_mp = manager.dict() forces_mp = manager.dict() tasks = [] calc_count = 0 for system_idx, formula in enumerate(self.systems): if verbose: print(f"Preparing tasks for system {system_idx+1}/{len(self.systems)}") system = self.systems[formula] energies_mp[formula] = manager.list() forces_mp[formula] = manager.list() for structure_idx, structure in enumerate(system): if limit > 0 and calc_count >= limit: break tasks.append((formula, structure_idx, structure, self._calculate, self._report_data)) calc_count += 1 if limit > 0 and calc_count > limit: tasks = tasks[:limit] print(f"Total tasks prepared: {len(tasks)}") with Pool(num_workers) as pool: results = pool.map(AbstractIO.process_structure, tasks) print("Mapping Complete") for result in results: formula, structure_idx, e, f = result energies_mp[formula].append(e) forces_mp[formula].append(f) energies = {key: list(value) for key, value in energies_mp.items()} forces = {key: list(value) for key, value in forces_mp.items()} if verbose: print(f"Total calculations performed: {len(results)}") return energies, forces def _report_data(self, calc): """ Report the energies and forces of the system. Parameters ---------- calc : object The object containing the calculation results. Returns ------- e : float The energy of the system. f : list The forces acting on the atoms in the system. """ import os import sys from contextlib import redirect_stdout, redirect_stderr with open(os.devnull, 'w') as fnull: with redirect_stdout(fnull), redirect_stderr(fnull): e = calc.get_potential_energy() f = calc.get_forces() return e, f def _optimize(self, structure, constrain_mask=None, optimizer_opts={"fmax":0.01}): """Perform a geometry optimization on the structure. Parameters ---------- structure : ase.Atoms The ASE atoms object containing the system data. constrain_mask : list, optional A list of indices to fix during optimization. Default is None (no constraints). optimizer_opts : dict, optional Additional options for the optimizer. Default is an empty dictionary. Returns ------- structure : ase.Atoms The optimized ASE atoms object. """ if constrain_mask is not None: #structure.set_constraint(FixAtoms(indices=constrain_mask)) dihedral_cons = [structure.get_dihedral(*constrain_mask), constrain_mask] print(f"Setting dihedral constraint: {dihedral_cons}") del structure.constraints structure.set_constraint(FixInternals(dihedrals_deg=[dihedral_cons])) dyn = BFGS(structure, logfile='amber_opt.log') dyn.run(fmax=optimizer_opts.get("fmax", 0.01), steps=optimizer_opts.get("max_steps", 1000)) return structure @abstractmethod def _calculate(self, structure): """ Calculate the system data. Parameters ---------- structure : object The object containing the system data. Returns ------- calc : object The calculator object. """ pass