import numpy as np
from abc import ABC, abstractmethod
from ase.neighborlist import NeighborList
from ase.constraints import FixAtoms, FixInternals
from ase.optimize import BFGS
from multiprocessing import Pool, Manager
from pharmaforge.queries import Query
[docs]
class AbstractIO(ABC):
[docs]
def ObtainDeepData(self, filename):
""" Obtain the system data from the DeepIO interface and converts each configuration to an ASE atoms object.
Parameters
----------
filename : str
The name of the file containing the system data.
Returns
-------
None
"""
from pharmaforge.interfaces import DPDataInterface
dpdata_inter = DPDataInterface()
dpsystems = dpdata_inter.read(filename)
self.systems = {}
for system in dpsystems:
ase_structures = system.to_ase_structure()
self.systems[system.formula]=ase_structures
[docs]
def ObtainQueryData(self, queryresults):
""" Obtain the system data from a query
Parameters
----------
queryresults : str
The results of a query to obtain the system data.
Returns
-------
None
See Also
--------
pharmaforge.queries.Query.results_to_ase : Function to convert query results to ASE atoms object.
"""
self.systems = {}
for result in queryresults:
uniq_formula=result.get('molecule_id')
self.systems[uniq_formula] = Query.result_to_ase(result)
[docs]
@staticmethod
def find_clusters(structure, cutoff=10.0):
"""Identify clusters of atoms in a structure based on a distance cutoff.
Parameters
----------
structure : ase.Atoms
The ASE atoms object containing the system data.
cutoff : float
The distance cutoff for identifying clusters.
Returns
-------
clusters : list of ase.Atoms
A list of ASE atoms objects, each representing a cluster.
"""
neighbor_list = NeighborList([cutoff / 2.0] * len(structure), self_interaction=False, bothways=True)
neighbor_list.update(structure)
visited = set()
clusters = []
def dfs(atom_idx, cluster):
if atom_idx in visited:
return
visited.add(atom_idx)
cluster.append(atom_idx)
neighbors = neighbor_list.get_neighbors(atom_idx)[0]
for neighbor in neighbors:
dfs(neighbor, cluster)
original_ener_coeff = structure.info.get("atom_ener_coeff")
if original_ener_coeff is None:
original_ener_coeff = np.ones(len(structure))
for atom_idx in range(len(structure)):
if atom_idx not in visited:
cluster_indices = []
dfs(atom_idx, cluster_indices)
clusters.append(structure[cluster_indices])
# This subsets the original energy coefficients to the cluster
clusters[-1].info["atom_ener_coeff"] = original_ener_coeff[cluster_indices]
return clusters
[docs]
def calculate(self, limit=0, verbose=False, num_workers=1):
""" Calculate the energies and forces of the system using the interface.
Parameters
----------
limit : int
The maximum number of calculations to perform. Default is 0 (no limit).
verbose : bool
Whether to print progress information. Default is False.
num_workers : int
The number of worker processes to use for multiprocessing. Default is 1.
Returns
-------
energies : dict
A dictionary containing the energies of the system.
forces : dict
A dictionary containing the forces acting on the atoms in the system.
Raises
------
ValueError
If no system data is found.
If no calculator is found.
"""
if num_workers > 1 and self.allow_parallel:
return self._caclulate_parallel(limit, verbose, num_workers)
elif num_workers == 1 or not self.allow_parallel:
return self._calculate_serial(limit, verbose)
else:
raise ValueError("num_workers must be greater than 0. Please set num_workers to 1 or greater.")
def _calculate_serial(self, limit=0, verbose=False):
""" Calculates the energies and forces of the system using the interface.
Parameters
----------
limit : int
The maximum number of calculations to perform. Default is 0 (no limit).
Returns
-------
energies : dict
A dictionary containing the energies of the system.
forces : dict
A dictionary containing the forces acting on the atoms in the system.
Raises
------
ValueError
If no system data is found.
If no calculator is found.
"""
if not hasattr(self, 'systems'):
raise ValueError("No system data found. Please obtain system data first.")
if not hasattr(self, 'calculator'):
raise ValueError("No calculator found. Please set a calculator first.")
self.energies, self.forces = {},{}
calc_count, system_count = 0, 0
for formula in self.systems:
if verbose:
print(f"Calculating system {system_count+1}/{len(self.systems)}")
system = self.systems[formula]
self.energies[formula], self.forces[formula] = [], []
for config_counter, ase_structure in enumerate(system):
if verbose:
print(f"Calculating configuration {config_counter} of {len(system)} in system {formula}.", flush=True)
if hasattr(ase_structure, "info") and "charge" in ase_structure.info:
charge = int(ase_structure.info.get("charge"))
else:
charge = 0
if limit > 0 and calc_count == limit:
print(f"Limit of {limit} calculations reached.")
print(f"Total calculations performed: {calc_count}")
return self.energies, self.forces
clusters = AbstractIO.find_clusters(ase_structure)
total_energy = 0.0
total_forces = []
for cluster in clusters:
atom_ener_coeff = cluster.info.get("atom_ener_coeff")
if len(np.unique(atom_ener_coeff)) > 1:
raise ValueError("Each atom in the cluster must have the same atom_ener_coeff ")
cluster_coeff = np.unique(atom_ener_coeff)[0]
cluster_charge = charge/len(cluster)
calc = self._calculate(cluster, charge=cluster_charge)
e, f = self._report_data(calc)
total_energy += cluster_coeff * e
total_forces.extend(cluster_coeff * f)
total_forces = np.array(total_forces)
self.energies[formula].append(total_energy)
self.forces[formula].append(total_forces)
calc_count += 1
system_count += 1
print(f"Total calculations performed: {calc_count}")
return self.energies, self.forces
[docs]
@staticmethod
def process_structure(args):
system_idx, structure_idx, structure, _calc_func, _report_func = args
if hasattr(structure, "info") and "charge" in structure.info:
charge = int(structure.info.get("charge"))
else:
charge = 0
clusters = AbstractIO.find_clusters(structure)
total_energy = 0.0
total_forces = []
for cluster in clusters:
atom_ener_coeff = cluster.info.get("atom_ener_coeff")
if len(np.unique(atom_ener_coeff)) > 1:
raise ValueError("Each atom in the cluster must have the same atom_ener_coeff ")
cluster_coeff = np.unique(atom_ener_coeff)[0]
cluster_charge = charge/len(cluster)
calc = _calc_func(cluster, charge=cluster_charge)
e, f = _report_func(calc)
total_energy += cluster_coeff * e
total_forces.extend(cluster_coeff * f)
total_forces = np.array(total_forces)
e = total_energy
f = total_forces
return system_idx, structure_idx, e, f
def _caclulate_parallel(self, limit=0, verbose=False, num_workers=4):
""" Calculates the energies and forces of the system using multiprocessing.
.. warning:: This method is incompatible with previous serial calculations. If you must run both, please
run them in separate scripts.
Parameters
----------
limit : int
The maximum number of calculations to perform. Default is 0 (no limit).
verbose : bool
Whether to print progress information. Default is False.
num_workers : int
The number of worker processes to use for multiprocessing. Default is 4.
Returns
-------
energies : dict
A dictionary containing the energies of the system.
forces : dict
A dictionary containing the forces acting on the atoms in the system.
Raises
------
ValueError
If no system data is found.
If no calculator is found.
"""
if not hasattr(self, 'systems'):
raise ValueError("No system data found. Please obtain system data first.")
if not hasattr(self, 'calculator'):
raise ValueError("No calculator found. Please set a calculator first.")
manager = Manager()
energies_mp = manager.dict()
forces_mp = manager.dict()
tasks = []
calc_count = 0
for system_idx, formula in enumerate(self.systems):
if verbose:
print(f"Preparing tasks for system {system_idx+1}/{len(self.systems)}")
system = self.systems[formula]
energies_mp[formula] = manager.list()
forces_mp[formula] = manager.list()
for structure_idx, structure in enumerate(system):
if limit > 0 and calc_count >= limit:
break
tasks.append((formula, structure_idx, structure, self._calculate, self._report_data))
calc_count += 1
if limit > 0 and calc_count > limit:
tasks = tasks[:limit]
print(f"Total tasks prepared: {len(tasks)}")
with Pool(num_workers) as pool:
results = pool.map(AbstractIO.process_structure, tasks)
print("Mapping Complete")
for result in results:
formula, structure_idx, e, f = result
energies_mp[formula].append(e)
forces_mp[formula].append(f)
energies = {key: list(value) for key, value in energies_mp.items()}
forces = {key: list(value) for key, value in forces_mp.items()}
if verbose:
print(f"Total calculations performed: {len(results)}")
return energies, forces
def _report_data(self, calc):
""" Report the energies and forces of the system.
Parameters
----------
calc : object
The object containing the calculation results.
Returns
-------
e : float
The energy of the system.
f : list
The forces acting on the atoms in the system.
"""
import os
import sys
from contextlib import redirect_stdout, redirect_stderr
with open(os.devnull, 'w') as fnull:
with redirect_stdout(fnull), redirect_stderr(fnull):
e = calc.get_potential_energy()
f = calc.get_forces()
return e, f
def _optimize(self, structure, constrain_mask=None, optimizer_opts={"fmax":0.01}):
"""Perform a geometry optimization on the structure.
Parameters
----------
structure : ase.Atoms
The ASE atoms object containing the system data.
constrain_mask : list, optional
A list of indices to fix during optimization. Default is None (no constraints).
optimizer_opts : dict, optional
Additional options for the optimizer. Default is an empty dictionary.
Returns
-------
structure : ase.Atoms
The optimized ASE atoms object.
"""
if constrain_mask is not None:
#structure.set_constraint(FixAtoms(indices=constrain_mask))
dihedral_cons = [structure.get_dihedral(*constrain_mask), constrain_mask]
print(f"Setting dihedral constraint: {dihedral_cons}")
del structure.constraints
structure.set_constraint(FixInternals(dihedrals_deg=[dihedral_cons]))
dyn = BFGS(structure, logfile='amber_opt.log')
dyn.run(fmax=optimizer_opts.get("fmax", 0.01), steps=optimizer_opts.get("max_steps", 1000))
return structure
@abstractmethod
def _calculate(self, structure):
""" Calculate the system data.
Parameters
----------
structure : object
The object containing the system data.
Returns
-------
calc : object
The calculator object.
"""
pass