Source code for pharmaforge.queries.query

""" This module provides a class for querying the MongoDB database and accessing the data store in it. """

import h5py
import numpy as np

from typing import Dict, Any, List
from pprint import pprint
from pathlib import Path

from ase import Atoms



[docs] class Query: """ This is a class for querying a MongoDB database. It takes a query string and parses it into a dictionary format that can be used to query the database. The query string can contain various operators such as 'and', 'or', 'not', 'any', and field-value pairs. The class also provides methods to display the query, apply it to the database, and convert the results to an HDF5 file format. Parameters ---------- querystring : str The query string to be parsed. Attributes ---------- querystring : str The original query string. parsed_query : dict The parsed query in dictionary format. results : list The results of the query from the database. hdf5_filename : str The name of the HDF5 file to be created (without extension). theorylevels : dict A dictionary to store the theory levels of the molecules and how many molecules match each level. Notes ----- The query string should be in the format: field operator value Queries can be run using the apply method of the Query class, like this:: query = "nmols eq 5" q = Query(query) results = q.apply(db, "ani_qdpi") for molecule in results: print(str(molecule.get('molecule_id'))) See Also -------- pymongo : The MongoDB driver for Python. h5py : The HDF5 file format library for Python. """ def __init__(self, querystring: str): self.querystring = querystring self.parsed_query = self.parse_query(self.querystring) self.theorylevels = {}
[docs] def parse_query(self, querystring): """Parses the query string and returns a dictionary of the query. This function interprets the query string and converts it into a dictionary format that can be used to query a MongoDB database. The query string can contain various operators such as 'and', 'or', 'not', 'any', and field-value pairs. Parameters ---------- querystring : str The query string to be parsed. Returns ------- dict A dictionary representing the parsed query. Raises ------ ValueError If the query string is invalid or contains unsupported operators. Notes ----- The query string should be in the format: field operator value where 'field' is the name of the field to query, 'operator' is the comparison operator (e.g., 'gt', 'lt', 'gte', 'lte', 'eq', 'ne', 'any', 'all'), and 'value' is the value to compare against. Here are some examples:: example_queries=[ "nmols eq 5", "not nmols eq 5", "nmols eq 1", "contains_elements any [H,N,O,C]", "contains_elements any [H,O] and contains_elements any [C]", "contains_elements any [H,O] and not contains_elements any [C]", "contains_elements any [H,N,O,C] or nmols gt 1", "molecular_charge eq -1", "not molecular_charge eq 0", ] Examples -------- >>> from pharmaforge.queries.Query import Query >>> q = Query("nmols eq 5") >>> q.parse_query("nmols eq 5") {'nmols': {'$eq': 5}} >>> q = Query("not nmols eq 5") >>> q.parse_query("not nmols eq 5") {'nmols': {'$not': {'$eq': 5}}} >>> q = Query("contains_elements any [H,N,O,C]") >>> q.parse_query("contains_elements any [H,N,O,C]") {'contains_elements': {'$not': {'$nin': ['H', 'N', 'O', 'C']}}} >>> q = Query("contains_elements any [H,O] and contains_elements any [C]") >>> q.parse_query("contains_elements any [H,O] and contains_elements any [C]") {'$and': [{'contains_elements': {'$not': {'$nin': ['H', 'O']}}}, {'contains_elements': {'$not': {'$nin': ['C']}}}]} >>> q = Query("contains_elements any [H,O] and not contains_elements any [C]") >>> q.parse_query("contains_elements any [H,O] and not contains_elements any [C]") {'$and': [{'contains_elements': {'$not': {'$nin': ['H', 'O']}}}, {'contains_elements': {'$not': {'$not': {'$nin': ['C']}}}}]} >>> q = Query("contains_elements any [H,N,O,C] or nmols gt 1") >>> q.parse_query("contains_elements any [H,N,O,C] or nmols gt 1") {'$or': [{'contains_elements': {'$not': {'$nin': ['H', 'N', 'O', 'C']}}}, {'nmols': {'$gt': 1}}]} >>> q = Query("molecular_charge eq -1") >>> q.parse_query("molecular_charge eq -1") {'molecular_charge': {'$eq': -1}} >>> q = Query("not molecular_charge eq 0") >>> q.parse_query("not molecular_charge eq 0") {'molecular_charge': {'$not': {'$eq': 0}}} >>> q = Query("contains_elements only [H,N,O,C]") >>> q.parse_query("contains_elements only [H,N,O,C]") {'contains_elements': {'$not': {'$elemMatch': {'$nin': ['H', 'N', 'O', 'C']}}}} """ query = querystring.strip() if "and" in query or "or" in query: # Handle compound queries operator = "$and" if "and" in query else "$or" subqueries = query.split("and" if "and" in query else "or") subqueries = [subquery.strip() for subquery in subqueries] parsed_subqueries = [self.parse_query(subquery) for subquery in subqueries] return {operator: parsed_subqueries} if "not" in query: # Handle negation subquery = query.replace("not", "").strip() parsed_subquery = self.parse_query(subquery) return self._negate(parsed_subquery) if "any" in query: return self._interpret_any(query) if "only" in query: return self._interpret_only(query) # Default case for simple field-value pairs if len(query.split(" ")) == 3: field, operator, value = query.split(" ") field = field.strip() operator = operator.strip() if operator not in ["gt", "lt", "gte", "lte", "eq", "ne", "any", "all", "only"]: raise ValueError(f"Invalid operator: {operator}") value = value.strip() value = self._interpret_value_type(value) return {field: {f"${operator}": value}} if len(query.split(" ")) == 2: field, value = query.split(" ") field = field.strip() value = value.strip() value = self._interpret_value_type(value) return {field: value} # If no valid query is identified, return an empty dictionary return {}
[docs] def display_query(self): """Displays the the parsed query in a readable format. This prints it in a convenient json format. This function is useful for debugging and understanding the structure of the query, and seeing it across multiple lines. Parameters ---------- None Returns ------- None """ pprint(self.parsed_query)
[docs] def apply(self, db, collection, verbose=True): """Returns the results of the query. Parameters ---------- db : pymongo.database.Database The MongoDB database to query. collection : str The name of the collection to query. verbose : bool, optional If True, prints additional information about the query results. Default is True. Returns ------- list A list of documents matching the query. Raises ------ ValueError If the query string is invalid or contains unsupported operators. Notes ----- The function applies the parsed query to the specified collection in the MongoDB database and returns the results as a list of documents. The function also prints the number of documents found and the theory levels of the molecules in the results. """ if not self.parsed_query: self.parse_query() self.results = list(db[collection].find(self.parsed_query)) if verbose: print("Found", len(self.results), "documents matching the query ", self.querystring) self.theorylevels = {} for result in self.results: theorylevels = result.get('theorylevels').keys() for theorylevel in theorylevels: if theorylevel not in self.theorylevels: self.theorylevels[theorylevel] = 0 self.theorylevels[theorylevel] += 1 if verbose: print("Theory levels found in the query results:") for theorylevel, count in self.theorylevels.items(): print(f"{theorylevel}: {count} molecules") return self.results
[docs] def results_to_deepmdkit(self, hdf5_filename, level_of_theory=None, second_level_of_theory=None, verbose=False): """ Create an HDF5 file from MongoDB query results. Parameters ---------- hdf5_filename : str Name of the HDF5 file to create (without extension). level_of_theory : str The level of theory to be used for the HDF5 file. This is required to specify the forces and energies. second_level_of_theory : str, optional An additional level of theory to be used if training a delta MLP. Default is None. verbose : bool, optional If True, prints additional information about the HDF5 file creation process. Default is False. Returns ------- None The function creates an HDF5 file and does not return any value. Notes ----- The function creates an HDF5 file with the specified filename and stores the query results in it. As an example:: query = "nmols gt 4 and nmols lt 7" q = Query(query) print("*****************************************") print(f"Query: {q.querystring}") print(f"Parsed query: {q.parsed_query}") q.display_query() results = q.apply(db, "ani_qdpi") for molecule in results: print(str(molecule.get('molecule_id'))) q.results_to_deepmdkit("saved_model.hdf5") """ if level_of_theory not in self.theorylevels: raise ValueError("Level of theory not specified or not found in the query results, availabeable levels are: ", self.theorylevels.keys()) hdf5_filename = Path(hdf5_filename) if not hdf5_filename.parent.exists(): hdf5_filename.parent.mkdir(parents=True, exist_ok=True) if hdf5_filename.suffix != "hdf5": hdf5_filename = hdf5_filename.with_suffix(".hdf5") with h5py.File(f"{hdf5_filename}", "w") as hdf_file: inserted_data = False # Flag to track whether data is inserted for molecule in self.results: try: # Ensure molecule_id exists molecule_id = str(molecule.get('molecule_id')) if not molecule_id: print(f"Skipping molecule without molecule_id: {molecule}") continue # Skip entries without a molecule_id mol_group = hdf_file.create_group(molecule_id) if "nopbc" in molecule: mol_group.create_dataset("nopbc", data=np.array(molecule["nopbc"], dtype=bool)) set000_group = mol_group.create_group("set.000") energy_data = np.array(molecule['theorylevels'][level_of_theory]["energies"]) force_data = np.array(molecule['theorylevels'][level_of_theory]["forces"]) set000_group.create_dataset("coord.npy", data=np.array(molecule["coordinates"])) if second_level_of_theory is not None: energydata_lower = molecule['theorylevels'][second_level_of_theory]["energies"] forcedata_lower = molecule['theorylevels'][second_level_of_theory]["forces"] energy_diff = energy_data - energydata_lower forces_diff = force_data - forcedata_lower set000_group.create_dataset("energy.npy", data=energy_diff) set000_group.create_dataset("force.npy", data=forces_diff) else: set000_group.create_dataset("force.npy", data=force_data) set000_group.create_dataset("energy.npy", data=energy_data) if "atom_ener_coeff" in molecule: set000_group.create_dataset("atom_ener_coeff", data=np.array(molecule["atom_ener_coeff"])) if "type.raw" in molecule: mol_group.create_dataset("type.raw", data=np.array(molecule["type.raw"])) if "type_map.raw" in molecule: mol_group.create_dataset( "type_map.raw", data=np.array([t.encode("utf-8") for t in molecule["type_map.raw"]]) ) inserted_data = True if verbose: print(f"Added data for molecule: {molecule_id}") except Exception as e: print(f"Error processing molecule {molecule.get('molecule_id', 'Unknown')}: {str(e)}") if not inserted_data: print("No data was added to the HDF5 file. Check the query or data.") else: print(f"HDF5 file '{hdf5_filename}' created successfully with data.") print(f"HDF5 file creation process completed for '{hdf5_filename}'.")
def _interpret_value_type(self, value): """Interprets the value type and converts it to the appropriate type.""" if "." in value: try: value = float(value) except ValueError: raise ValueError(f"Invalid value: {value}") elif (value.startswith("[") and value.endswith("]")): try: value = list(map(str.strip, value[1:-1].split(","))) except ValueError: raise ValueError(f"Invalid value: {value}") else: try: value = int(value) except ValueError as e: value = str(value) return value def _interpret_any(self, query): """Interprets the 'any' operator and converts it to the appropriate type. For a query that looks like field any value This should return {field: $not {$nin: value}}""" if isinstance(query, str): field, value = query.split("any") field = field.strip() value = value.strip() value = self._interpret_value_type(value) return {field: {"$not": {"$nin": value}}} return {} def _interpret_only(self, query): """Interprets the 'only' operator and converts it to the appropriate type. For a query that looks like field only value This should return field: {"$not": {"elemMatch": {"$nin": value}}}} """ if isinstance(query, str): field, value = query.split("only") field = field.strip() value = value.strip() value = self._interpret_value_type(value) return {field: {"$not": {"$elemMatch": {"$nin": value}}}} return {} def _negate(self, query): """Performs a logical not operation on the query. This should return {field: {not {operator: value}}} """ if isinstance(query, dict): for field, value in query.items(): if isinstance(value, dict): for operator, val in value.items(): return {field: {"$not" : {f"{operator}": val}}} else: return {field: {"$not": value}} return {}
[docs] @staticmethod def result_to_ase(result): """Converts the result to an ASE structure. Parameters ---------- result : dict The result from the database query. Returns ------- ase_structures : list A list of ASE Atoms objects representing the structure. Notes ----- This function is a placeholder and should be implemented based on the specific requirements of the ASE structure. """ result.get('molecule_id') type_map = np.array(result.get('type.raw')) raw_map = np.array(result.get('type_map.raw')) charge = result.get('charge') if charge is None: charge = 0 atom_types = str(''.join(raw_map[type_map])) atom_ener_coeff = result.get('atom_ener_coeff') if atom_ener_coeff is not None: atom_ener_coeff = np.array(result.get('atom_ener_coeff')) else: print("Defaulting atom_ener_coeff to ones") atom_ener_coeff = np.ones_like(type_map) all_coords = result.get('coordinates') ase_structures = [] for i, coord in enumerate(all_coords): coords = np.array(coord).reshape(-1, 3) atoms_obj = Atoms(atom_types, coords, info={"charge": charge, "atom_ener_coeff": atom_ener_coeff}) ase_structures.append(atoms_obj) return ase_structures