Source code for pharmaforge.database

import h5py
import numpy as np
from pymongo import MongoClient

from pathlib import Path

[docs] class DataBase: """ A class to represent a pymongo database of molecules """ def __init__(self): self.data = {} return @classmethod def __construct__(cls, data_names, data_path=None): """ Construct a database from a list of data names Parameters ---------- data_names : list of str A list of data names to construct the database from data_path : str The path to the data files. If None, the data names will be used as the file names Returns ------- db : DataBase A database object containing the data Raises ------ ValueError : If the file type is not hdf5 """ db = cls() for name in data_names: if "hdf5" not in name: raise ValueError('File type must be hdf5, found: ' + name) if data_path is not None: data_path = Path(data_path) else: data_path = Path('.') filename = data_path / name db.add_data(filename) return db
[docs] def add_data(self, filepath): """ Add data to the database Parameters ---------- filepath : str or Path The name or path to the data file to add. The file must be in hdf5 format. Returns ------- None Raises ------ ValueError : If the file type is not hdf5 FileNotFoundError : If the file is not found, or the path does not exist """ filepath = Path(filepath) if not filepath.exists(): raise FileNotFoundError('File does not exist: ' + str(filepath)) fstring = filepath.stem ftype = filepath.suffix if ftype != '.hdf5': raise ValueError('File type must be hdf5, found: ' + ftype) # fstring = os.path.splitext(os.path.basename(filename))[0] fname = fstring.split('/')[-1] db = h5py.File(filepath, 'r') self.data[fname] = db return
[docs] def obtain_feature(self, feature_key, db_key = None): """ Obtain a feature from the database Parameters ---------- feature_key : str The key of the feature to obtain db_key : str The key of the database to search Returns ------- feature : list A list of the feature values """ unique_keys = self.find_all_unique(aslist=True) if feature_key not in unique_keys: raise KeyError(f'Feature {feature_key} not found in database') if db_key is None: feature = [] for key in self.data.keys(): db = self.data[key] for mol_key in db.keys(): try: feature.append(db[mol_key][feature_key]) except KeyError: pass else: db = self.data[db_key] feature = [] for mol_key in db.keys(): try: feature.append(db[mol_key][feature_key]) except: pass return feature
[docs] def find_empty_molecules(self): """ Find all empty molecules in the database Returns ------- empty : dictionary of lists A dictionary of all empty molecules in the database """ empty = {} for key in self.data.keys(): db = self.data[key] empty[key] = [] for mol_key in db.keys(): if len(db[mol_key].keys()) == 0: empty[key].append(mol_key) emp_count = self._count(empty) print("Empty molecules found in database:") print("Database: Count") for key in emp_count.keys(): print(key + ": " + str(emp_count[key])) return empty
[docs] def find_molecule(self, mol_key, verbose=True): """ Find a molecule in the database Parameters ---------- mol_key : str The key of the molecule to search Returns ------- mol : dict The molecule data """ for key in self.data.keys(): db = self.data[key] if mol_key in db.keys(): if verbose: print("Found molecule in database: ", key) return db[mol_key] return None
[docs] def find_shared_molecules(self, db_key1, db_key2): """ Find all shared molecules between two databases Parameters ---------- db_key1 : str The key of the first database to search db_key2 : strAdding SMILES codes to HDF5 Files The key of the second database to search Returns ------- shared : list A list of all shared molecules between the two databases """ db1 = self.data[db_key1] db2 = self.data[db_key2] shared = [] for key in db1.keys(): if key in db2.keys(): # print(key) shared.append(key) return shared
[docs] def find_unique(self, db_key): """ Find all unique keys in an individual database Parameters ---------- db_key : str The key of the database to search Returns ------- unique : list A list of all unique keys in the database """ db = self.data[db_key] unique = [] for key in db.keys(): sub_key = db[key].keys() if len(sub_key) == 1: print(key) for k in sub_key: if k not in unique: unique.append(k) return unique
[docs] def find_all_unique(self, aslist=False): """ Find all unique keys in the entire database Parameters ---------- aslist : bool Whether to return the unique keys as a list or a dictionary Returns ------- unique : dictionary of lists A dictionary of all unique keys in the database """ if aslist: unique = [] for key in self.data.keys(): unique += self.find_unique(key) return set(unique) else: unique = {} for key in self.data.keys(): unique[key] = self.find_unique(key) return unique
def _count(self, input): """ Counts numbers of items in an object, while respecting the type Parameters ---------- input : object The object to count Returns ------- count : object The number of items in the object. If the object is a dictionary, the count will be a dictionary of counts. If the object is a list, the count will be an integer. """ if type(input) == dict: count = {} for key in input.keys(): count[key] = len(input[key]) elif type(input) == list: count = len(input) return count
# labels = ['ani.hdf5', 'comp6.hdf5', 'freesolvmd.hdf5','geom.hdf5','spice.hdf5', 're.hdf5', 'remd.hdf5'] # db = DataBase.__construct__(labels) # db1 = DataBase.__construct__('re_with_smiles.hdf5')
[docs] class Loader: """ A class ot access the database, once it has been put into the mongoDB Parameters ---------- client_addr : str, optional The address of the client to open. Default is 'mongodb://localhost:27017/' database_name : str, optional The name of the database to open. Default is None collection_name : str, optional The name of the collection to open. Default is None Attributes ---------- client : pymongo.MongoClient The MongoDB client db : pymongo.database.Database The database object collection : pymongo.collection.Collection The collection object """ def __init__(self, client_addr='mongodb://localhost:27017/', database_name=None, collection_name=None): self.client = None self.db = None self.collection = None if database_name is not None: verbose = False else: verbose = True self._open_client(client_addr, verbose=verbose) if client_addr is None: client_addr = 'mongodb://localhost:27017/' if database_name is not None: self.db = self.select_db(database_name, verbose=False) if collection_name is not None: self.select_collection(collection_name) def _open_client(self, client_addr, verbose=True): """ Open a client to the database Parameters ---------- client_addr : str The address of the client to open Returns ------- None """ try: self.client = MongoClient(client_addr) except Exception as e: print(f"Error connecting to MongoDB client: {e}") return if verbose: self.list_db_names() return
[docs] def list_db_names(self): """ List all database names in the client Parameters ---------- None Returns ------- db_names : list A list of all database names in the client """ db_names = self.client.list_database_names() print("Databases in client:") for db_name in db_names: print(db_name) return db_names
[docs] def select_db(self, db_name, verbose=True): """ Select a database from the client Parameters ---------- db_name : str The name of the database to select Returns ------- None """ self.db = self.client[db_name] if verbose: print(f"Selected database: {db_name}") self.list_collections(verbose=verbose) return self.db
[docs] def list_collections(self, verbose=True): """ List all collections in the selected database Parameters ---------- verbose : bool Whether to print the collection names or not Returns ------- collection_names : list A list of all collection names in the database """ if self.db is None: print("No database selected") return [] collection_names = self.db.list_collection_names() if verbose: print("Collections in database:") for collection_name in collection_names: if verbose: print(collection_name) self.collections = collection_names return collection_names
[docs] def select_collection(self, collection_name): """ Select a collection from the database Parameters ---------- collection_name : str The name of the collection to select Returns ------- None """ if self.db is None: print("No database selected") return self.collection = self.db[collection_name] print(f"Selected collection: {collection_name}")
[docs] def list_collection_entry(self, entry=None): """ List all the data in the selected entry of the collection Parameters ---------- entry : str The name of the entry to list. If None, all entries will be listed Returns ------- None """ if self.collection is None: print("No collection selected") return if entry is None or not isinstance(entry, int): raise ValueError("No entry selected") listing = self.collection.find()[entry] print("***************************************") print("Listing of entry: ", entry) print("***************************************") for key in listing.keys(): if isinstance(listing[key], dict): print(key + ":") for sub_key in listing[key].keys(): if isinstance(listing[key][sub_key], dict): print(" " + sub_key + ":") for sub_sub_key in listing[key][sub_key].keys(): if isinstance(listing[key][sub_key][sub_sub_key], list): print(" " + " " + sub_sub_key + ": " + str(np.shape(listing[key][sub_key][sub_sub_key]))) else: print(" " + " " + sub_sub_key + ": " + str(listing[key][sub_key][sub_sub_key])) else: if isinstance(listing[key][sub_key], list): print(" " + sub_key + ": " + str(np.shape(listing[key][sub_key]))) else: print(" " + sub_key + ": " + str(listing[key][sub_key])) else: if isinstance(listing[key], list): print(key + ": " + str(np.shape(listing[key]))) else: print(key + ": " + str(listing[key])) if key == "atom_ener_coeff": print(listing[key]) print("***************************************")