""" This module provides a class for querying the MongoDB database and accessing the data store in it. """
import h5py
import numpy as np
from typing import Dict, Any, List
from pprint import pprint
from pathlib import Path
from ase import Atoms
[docs]
class Query:
""" This is a class for querying a MongoDB database. It takes a query string and parses it into a dictionary format
that can be used to query the database. The query string can contain various operators such as 'and', 'or', 'not',
'any', and field-value pairs. The class also provides methods to display the query, apply it to the database, and
convert the results to an HDF5 file format.
Parameters
----------
querystring : str
The query string to be parsed.
Attributes
----------
querystring : str
The original query string.
parsed_query : dict
The parsed query in dictionary format.
results : list
The results of the query from the database.
hdf5_filename : str
The name of the HDF5 file to be created (without extension).
theorylevels : dict
A dictionary to store the theory levels of the molecules and how many molecules match each level.
Notes
-----
The query string should be in the format:
field operator value
Queries can be run using the apply method of the Query class, like this::
query = "nmols eq 5"
q = Query(query)
results = q.apply(db, "ani_qdpi")
for molecule in results:
print(str(molecule.get('molecule_id')))
See Also
--------
pymongo : The MongoDB driver for Python.
h5py : The HDF5 file format library for Python.
"""
def __init__(self, querystring: str):
self.querystring = querystring
self.parsed_query = self.parse_query(self.querystring)
self.theorylevels = {}
[docs]
def parse_query(self, querystring):
"""Parses the query string and returns a dictionary of the query.
This function interprets the query string and converts it into a dictionary format
that can be used to query a MongoDB database. The query string can contain various
operators such as 'and', 'or', 'not', 'any', and field-value pairs.
Parameters
----------
querystring : str
The query string to be parsed.
Returns
-------
dict
A dictionary representing the parsed query.
Raises
------
ValueError
If the query string is invalid or contains unsupported operators.
Notes
-----
The query string should be in the format:
field operator value
where 'field' is the name of the field to query, 'operator' is the comparison operator
(e.g., 'gt', 'lt', 'gte', 'lte', 'eq', 'ne', 'any', 'all'), and 'value' is the value to compare against.
Here are some examples::
example_queries=[
"nmols eq 5",
"not nmols eq 5",
"nmols eq 1",
"contains_elements any [H,N,O,C]",
"contains_elements any [H,O] and contains_elements any [C]",
"contains_elements any [H,O] and not contains_elements any [C]",
"contains_elements any [H,N,O,C] or nmols gt 1",
"molecular_charge eq -1",
"not molecular_charge eq 0",
]
Examples
--------
>>> from pharmaforge.queries.Query import Query
>>> q = Query("nmols eq 5")
>>> q.parse_query("nmols eq 5")
{'nmols': {'$eq': 5}}
>>> q = Query("not nmols eq 5")
>>> q.parse_query("not nmols eq 5")
{'nmols': {'$not': {'$eq': 5}}}
>>> q = Query("contains_elements any [H,N,O,C]")
>>> q.parse_query("contains_elements any [H,N,O,C]")
{'contains_elements': {'$not': {'$nin': ['H', 'N', 'O', 'C']}}}
>>> q = Query("contains_elements any [H,O] and contains_elements any [C]")
>>> q.parse_query("contains_elements any [H,O] and contains_elements any [C]")
{'$and': [{'contains_elements': {'$not': {'$nin': ['H', 'O']}}}, {'contains_elements': {'$not': {'$nin': ['C']}}}]}
>>> q = Query("contains_elements any [H,O] and not contains_elements any [C]")
>>> q.parse_query("contains_elements any [H,O] and not contains_elements any [C]")
{'$and': [{'contains_elements': {'$not': {'$nin': ['H', 'O']}}}, {'contains_elements': {'$not': {'$not': {'$nin': ['C']}}}}]}
>>> q = Query("contains_elements any [H,N,O,C] or nmols gt 1")
>>> q.parse_query("contains_elements any [H,N,O,C] or nmols gt 1")
{'$or': [{'contains_elements': {'$not': {'$nin': ['H', 'N', 'O', 'C']}}}, {'nmols': {'$gt': 1}}]}
>>> q = Query("molecular_charge eq -1")
>>> q.parse_query("molecular_charge eq -1")
{'molecular_charge': {'$eq': -1}}
>>> q = Query("not molecular_charge eq 0")
>>> q.parse_query("not molecular_charge eq 0")
{'molecular_charge': {'$not': {'$eq': 0}}}
>>> q = Query("contains_elements only [H,N,O,C]")
>>> q.parse_query("contains_elements only [H,N,O,C]")
{'contains_elements': {'$not': {'$elemMatch': {'$nin': ['H', 'N', 'O', 'C']}}}}
"""
query = querystring.strip()
if "and" in query or "or" in query:
# Handle compound queries
operator = "$and" if "and" in query else "$or"
subqueries = query.split("and" if "and" in query else "or")
subqueries = [subquery.strip() for subquery in subqueries]
parsed_subqueries = [self.parse_query(subquery) for subquery in subqueries]
return {operator: parsed_subqueries}
if "not" in query:
# Handle negation
subquery = query.replace("not", "").strip()
parsed_subquery = self.parse_query(subquery)
return self._negate(parsed_subquery)
if "any" in query:
return self._interpret_any(query)
if "only" in query:
return self._interpret_only(query)
# Default case for simple field-value pairs
if len(query.split(" ")) == 3:
field, operator, value = query.split(" ")
field = field.strip()
operator = operator.strip()
if operator not in ["gt", "lt", "gte", "lte", "eq", "ne", "any", "all", "only"]:
raise ValueError(f"Invalid operator: {operator}")
value = value.strip()
value = self._interpret_value_type(value)
return {field: {f"${operator}": value}}
if len(query.split(" ")) == 2:
field, value = query.split(" ")
field = field.strip()
value = value.strip()
value = self._interpret_value_type(value)
return {field: value}
# If no valid query is identified, return an empty dictionary
return {}
[docs]
def display_query(self):
"""Displays the the parsed query in a readable format. This prints it in a convenient json format.
This function is useful for debugging and understanding the structure of the query, and seeing it across multiple lines.
Parameters
----------
None
Returns
-------
None
"""
pprint(self.parsed_query)
[docs]
def apply(self, db, collection, verbose=True):
"""Returns the results of the query.
Parameters
----------
db : pymongo.database.Database
The MongoDB database to query.
collection : str
The name of the collection to query.
verbose : bool, optional
If True, prints additional information about the query results. Default is True.
Returns
-------
list
A list of documents matching the query.
Raises
------
ValueError
If the query string is invalid or contains unsupported operators.
Notes
-----
The function applies the parsed query to the specified collection in the MongoDB database
and returns the results as a list of documents. The function also prints the number of documents
found and the theory levels of the molecules in the results.
"""
if not self.parsed_query:
self.parse_query()
self.results = list(db[collection].find(self.parsed_query))
if verbose:
print("Found", len(self.results), "documents matching the query ", self.querystring)
self.theorylevels = {}
for result in self.results:
theorylevels = result.get('theorylevels').keys()
for theorylevel in theorylevels:
if theorylevel not in self.theorylevels:
self.theorylevels[theorylevel] = 0
self.theorylevels[theorylevel] += 1
if verbose:
print("Theory levels found in the query results:")
for theorylevel, count in self.theorylevels.items():
print(f"{theorylevel}: {count} molecules")
return self.results
[docs]
def results_to_deepmdkit(self, hdf5_filename, level_of_theory=None, second_level_of_theory=None, verbose=False):
"""
Create an HDF5 file from MongoDB query results.
Parameters
----------
hdf5_filename : str
Name of the HDF5 file to create (without extension).
level_of_theory : str
The level of theory to be used for the HDF5 file. This is required to specify the forces and energies.
second_level_of_theory : str, optional
An additional level of theory to be used if training a delta MLP. Default is None.
verbose : bool, optional
If True, prints additional information about the HDF5 file creation process. Default is False.
Returns
-------
None
The function creates an HDF5 file and does not return any value.
Notes
-----
The function creates an HDF5 file with the specified filename and stores the query results in it.
As an example::
query = "nmols gt 4 and nmols lt 7"
q = Query(query)
print("*****************************************")
print(f"Query: {q.querystring}")
print(f"Parsed query: {q.parsed_query}")
q.display_query()
results = q.apply(db, "ani_qdpi")
for molecule in results:
print(str(molecule.get('molecule_id')))
q.results_to_deepmdkit("saved_model.hdf5")
"""
if level_of_theory not in self.theorylevels:
raise ValueError("Level of theory not specified or not found in the query results, availabeable levels are: ", self.theorylevels.keys())
hdf5_filename = Path(hdf5_filename)
if not hdf5_filename.parent.exists():
hdf5_filename.parent.mkdir(parents=True, exist_ok=True)
if hdf5_filename.suffix != "hdf5":
hdf5_filename = hdf5_filename.with_suffix(".hdf5")
with h5py.File(f"{hdf5_filename}", "w") as hdf_file:
inserted_data = False # Flag to track whether data is inserted
for molecule in self.results:
try:
# Ensure molecule_id exists
molecule_id = str(molecule.get('molecule_id'))
if not molecule_id:
print(f"Skipping molecule without molecule_id: {molecule}")
continue # Skip entries without a molecule_id
mol_group = hdf_file.create_group(molecule_id)
if "nopbc" in molecule:
mol_group.create_dataset("nopbc", data=np.array(molecule["nopbc"], dtype=bool))
set000_group = mol_group.create_group("set.000")
energy_data = np.array(molecule['theorylevels'][level_of_theory]["energies"])
force_data = np.array(molecule['theorylevels'][level_of_theory]["forces"])
set000_group.create_dataset("coord.npy", data=np.array(molecule["coordinates"]))
if second_level_of_theory is not None:
energydata_lower = molecule['theorylevels'][second_level_of_theory]["energies"]
forcedata_lower = molecule['theorylevels'][second_level_of_theory]["forces"]
energy_diff = energy_data - energydata_lower
forces_diff = force_data - forcedata_lower
set000_group.create_dataset("energy.npy", data=energy_diff)
set000_group.create_dataset("force.npy", data=forces_diff)
else:
set000_group.create_dataset("force.npy", data=force_data)
set000_group.create_dataset("energy.npy", data=energy_data)
if "atom_ener_coeff" in molecule:
set000_group.create_dataset("atom_ener_coeff", data=np.array(molecule["atom_ener_coeff"]))
if "type.raw" in molecule:
mol_group.create_dataset("type.raw", data=np.array(molecule["type.raw"]))
if "type_map.raw" in molecule:
mol_group.create_dataset(
"type_map.raw",
data=np.array([t.encode("utf-8") for t in molecule["type_map.raw"]])
)
inserted_data = True
if verbose: print(f"Added data for molecule: {molecule_id}")
except Exception as e:
print(f"Error processing molecule {molecule.get('molecule_id', 'Unknown')}: {str(e)}")
if not inserted_data:
print("No data was added to the HDF5 file. Check the query or data.")
else:
print(f"HDF5 file '{hdf5_filename}' created successfully with data.")
print(f"HDF5 file creation process completed for '{hdf5_filename}'.")
def _interpret_value_type(self, value):
"""Interprets the value type and converts it to the appropriate type."""
if "." in value:
try:
value = float(value)
except ValueError:
raise ValueError(f"Invalid value: {value}")
elif (value.startswith("[") and value.endswith("]")):
try:
value = list(map(str.strip, value[1:-1].split(",")))
except ValueError:
raise ValueError(f"Invalid value: {value}")
else:
try:
value = int(value)
except ValueError as e:
value = str(value)
return value
def _interpret_any(self, query):
"""Interprets the 'any' operator and converts it to the appropriate type.
For a query that looks like field any value
This should return
{field: $not {$nin: value}}"""
if isinstance(query, str):
field, value = query.split("any")
field = field.strip()
value = value.strip()
value = self._interpret_value_type(value)
return {field: {"$not": {"$nin": value}}}
return {}
def _interpret_only(self, query):
"""Interprets the 'only' operator and converts it to the appropriate type.
For a query that looks like field only value
This should return field: {"$not": {"elemMatch": {"$nin": value}}}}
"""
if isinstance(query, str):
field, value = query.split("only")
field = field.strip()
value = value.strip()
value = self._interpret_value_type(value)
return {field: {"$not": {"$elemMatch": {"$nin": value}}}}
return {}
def _negate(self, query):
"""Performs a logical not operation on the query.
This should return
{field: {not {operator: value}}}
"""
if isinstance(query, dict):
for field, value in query.items():
if isinstance(value, dict):
for operator, val in value.items():
return {field: {"$not" : {f"{operator}": val}}}
else:
return {field: {"$not": value}}
return {}
[docs]
@staticmethod
def result_to_ase(result):
"""Converts the result to an ASE structure.
Parameters
----------
result : dict
The result from the database query.
Returns
-------
ase_structures : list
A list of ASE Atoms objects representing the structure.
Notes
-----
This function is a placeholder and should be implemented based on the specific requirements of the ASE structure.
"""
result.get('molecule_id')
type_map = np.array(result.get('type.raw'))
raw_map = np.array(result.get('type_map.raw'))
charge = result.get('charge')
if charge is None:
charge = 0
atom_types = str(''.join(raw_map[type_map]))
atom_ener_coeff = result.get('atom_ener_coeff')
if atom_ener_coeff is not None:
atom_ener_coeff = np.array(result.get('atom_ener_coeff'))
else:
print("Defaulting atom_ener_coeff to ones")
atom_ener_coeff = np.ones_like(type_map)
all_coords = result.get('coordinates')
ase_structures = []
for i, coord in enumerate(all_coords):
coords = np.array(coord).reshape(-1, 3)
atoms_obj = Atoms(atom_types, coords, info={"charge": charge, "atom_ener_coeff": atom_ener_coeff})
ase_structures.append(atoms_obj)
return ase_structures