import h5py
import numpy as np
from pymongo import MongoClient
from pathlib import Path
[docs]
class DataBase:
""" A class to represent a pymongo database of molecules
"""
def __init__(self):
self.data = {}
return
@classmethod
def __construct__(cls, data_names, data_path=None):
""" Construct a database from a list of data names
Parameters
----------
data_names : list of str
A list of data names to construct the database from
data_path : str
The path to the data files. If None, the data names will be used as
the file names
Returns
-------
db : DataBase
A database object containing the data
Raises
------
ValueError : If the file type is not hdf5
"""
db = cls()
for name in data_names:
if "hdf5" not in name:
raise ValueError('File type must be hdf5, found: ' + name)
if data_path is not None:
data_path = Path(data_path)
else:
data_path = Path('.')
filename = data_path / name
db.add_data(filename)
return db
[docs]
def add_data(self, filepath):
""" Add data to the database
Parameters
----------
filepath : str or Path
The name or path to the data file to add. The file must be in hdf5 format.
Returns
-------
None
Raises
------
ValueError : If the file type is not hdf5
FileNotFoundError : If the file is not found, or the path does not exist
"""
filepath = Path(filepath)
if not filepath.exists():
raise FileNotFoundError('File does not exist: ' + str(filepath))
fstring = filepath.stem
ftype = filepath.suffix
if ftype != '.hdf5':
raise ValueError('File type must be hdf5, found: ' + ftype)
# fstring = os.path.splitext(os.path.basename(filename))[0]
fname = fstring.split('/')[-1]
db = h5py.File(filepath, 'r')
self.data[fname] = db
return
[docs]
def obtain_feature(self, feature_key, db_key = None):
""" Obtain a feature from the database
Parameters
----------
feature_key : str
The key of the feature to obtain
db_key : str
The key of the database to search
Returns
-------
feature : list
A list of the feature values
"""
unique_keys = self.find_all_unique(aslist=True)
if feature_key not in unique_keys:
raise KeyError(f'Feature {feature_key} not found in database')
if db_key is None:
feature = []
for key in self.data.keys():
db = self.data[key]
for mol_key in db.keys():
try:
feature.append(db[mol_key][feature_key])
except KeyError:
pass
else:
db = self.data[db_key]
feature = []
for mol_key in db.keys():
try:
feature.append(db[mol_key][feature_key])
except:
pass
return feature
[docs]
def find_empty_molecules(self):
""" Find all empty molecules in the database
Returns
-------
empty : dictionary of lists
A dictionary of all empty molecules in the database
"""
empty = {}
for key in self.data.keys():
db = self.data[key]
empty[key] = []
for mol_key in db.keys():
if len(db[mol_key].keys()) == 0:
empty[key].append(mol_key)
emp_count = self._count(empty)
print("Empty molecules found in database:")
print("Database: Count")
for key in emp_count.keys():
print(key + ": " + str(emp_count[key]))
return empty
[docs]
def find_molecule(self, mol_key, verbose=True):
""" Find a molecule in the database
Parameters
----------
mol_key : str
The key of the molecule to search
Returns
-------
mol : dict
The molecule data
"""
for key in self.data.keys():
db = self.data[key]
if mol_key in db.keys():
if verbose: print("Found molecule in database: ", key)
return db[mol_key]
return None
[docs]
def find_shared_molecules(self, db_key1, db_key2):
""" Find all shared molecules between two databases
Parameters
----------
db_key1 : str
The key of the first database to search
db_key2 : strAdding SMILES codes to HDF5 Files
The key of the second database to search
Returns
-------
shared : list
A list of all shared molecules between the two databases
"""
db1 = self.data[db_key1]
db2 = self.data[db_key2]
shared = []
for key in db1.keys():
if key in db2.keys():
# print(key)
shared.append(key)
return shared
[docs]
def find_unique(self, db_key):
""" Find all unique keys in an individual database
Parameters
----------
db_key : str
The key of the database to search
Returns
-------
unique : list
A list of all unique keys in the database
"""
db = self.data[db_key]
unique = []
for key in db.keys():
sub_key = db[key].keys()
if len(sub_key) == 1:
print(key)
for k in sub_key:
if k not in unique:
unique.append(k)
return unique
[docs]
def find_all_unique(self, aslist=False):
""" Find all unique keys in the entire database
Parameters
----------
aslist : bool
Whether to return the unique keys as a list or a dictionary
Returns
-------
unique : dictionary of lists
A dictionary of all unique keys in the database
"""
if aslist:
unique = []
for key in self.data.keys():
unique += self.find_unique(key)
return set(unique)
else:
unique = {}
for key in self.data.keys():
unique[key] = self.find_unique(key)
return unique
def _count(self, input):
""" Counts numbers of items in an object, while respecting the type
Parameters
----------
input : object
The object to count
Returns
-------
count : object
The number of items in the object. If the object is a dictionary, the
count will be a dictionary of counts. If the object is a list, the count
will be an integer.
"""
if type(input) == dict:
count = {}
for key in input.keys():
count[key] = len(input[key])
elif type(input) == list:
count = len(input)
return count
# labels = ['ani.hdf5', 'comp6.hdf5', 'freesolvmd.hdf5','geom.hdf5','spice.hdf5', 're.hdf5', 'remd.hdf5']
# db = DataBase.__construct__(labels)
# db1 = DataBase.__construct__('re_with_smiles.hdf5')
[docs]
class Loader:
""" A class ot access the database, once it has been put into the mongoDB
Parameters
----------
client_addr : str, optional
The address of the client to open. Default is 'mongodb://localhost:27017/'
database_name : str, optional
The name of the database to open. Default is None
collection_name : str, optional
The name of the collection to open. Default is None
Attributes
----------
client : pymongo.MongoClient
The MongoDB client
db : pymongo.database.Database
The database object
collection : pymongo.collection.Collection
The collection object
"""
def __init__(self, client_addr='mongodb://localhost:27017/', database_name=None, collection_name=None):
self.client = None
self.db = None
self.collection = None
if database_name is not None:
verbose = False
else:
verbose = True
self._open_client(client_addr, verbose=verbose)
if client_addr is None:
client_addr = 'mongodb://localhost:27017/'
if database_name is not None:
self.db = self.select_db(database_name, verbose=False)
if collection_name is not None:
self.select_collection(collection_name)
def _open_client(self, client_addr, verbose=True):
""" Open a client to the database
Parameters
----------
client_addr : str
The address of the client to open
Returns
-------
None
"""
try:
self.client = MongoClient(client_addr)
except Exception as e:
print(f"Error connecting to MongoDB client: {e}")
return
if verbose: self.list_db_names()
return
[docs]
def list_db_names(self):
""" List all database names in the client
Parameters
----------
None
Returns
-------
db_names : list
A list of all database names in the client
"""
db_names = self.client.list_database_names()
print("Databases in client:")
for db_name in db_names:
print(db_name)
return db_names
[docs]
def select_db(self, db_name, verbose=True):
""" Select a database from the client
Parameters
----------
db_name : str
The name of the database to select
Returns
-------
None
"""
self.db = self.client[db_name]
if verbose: print(f"Selected database: {db_name}")
self.list_collections(verbose=verbose)
return self.db
[docs]
def list_collections(self, verbose=True):
""" List all collections in the selected database
Parameters
----------
verbose : bool
Whether to print the collection names or not
Returns
-------
collection_names : list
A list of all collection names in the database
"""
if self.db is None:
print("No database selected")
return []
collection_names = self.db.list_collection_names()
if verbose: print("Collections in database:")
for collection_name in collection_names:
if verbose: print(collection_name)
self.collections = collection_names
return collection_names
[docs]
def select_collection(self, collection_name):
""" Select a collection from the database
Parameters
----------
collection_name : str
The name of the collection to select
Returns
-------
None
"""
if self.db is None:
print("No database selected")
return
self.collection = self.db[collection_name]
print(f"Selected collection: {collection_name}")
[docs]
def list_collection_entry(self, entry=None):
""" List all the data in the selected entry of the collection
Parameters
----------
entry : str
The name of the entry to list. If None, all entries will be listed
Returns
-------
None
"""
if self.collection is None:
print("No collection selected")
return
if entry is None or not isinstance(entry, int):
raise ValueError("No entry selected")
listing = self.collection.find()[entry]
print("***************************************")
print("Listing of entry: ", entry)
print("***************************************")
for key in listing.keys():
if isinstance(listing[key], dict):
print(key + ":")
for sub_key in listing[key].keys():
if isinstance(listing[key][sub_key], dict):
print(" " + sub_key + ":")
for sub_sub_key in listing[key][sub_key].keys():
if isinstance(listing[key][sub_key][sub_sub_key], list):
print(" " + " " + sub_sub_key + ": " + str(np.shape(listing[key][sub_key][sub_sub_key])))
else:
print(" " + " " + sub_sub_key + ": " + str(listing[key][sub_key][sub_sub_key]))
else:
if isinstance(listing[key][sub_key], list):
print(" " + sub_key + ": " + str(np.shape(listing[key][sub_key])))
else:
print(" " + sub_key + ": " + str(listing[key][sub_key]))
else:
if isinstance(listing[key], list):
print(key + ": " + str(np.shape(listing[key])))
else:
print(key + ": " + str(listing[key]))
if key == "atom_ener_coeff":
print(listing[key])
print("***************************************")