Source code for mdsuite.database.simulation_database

"""
MDSuite: A Zincwarecode package.

License
-------
This program and the accompanying materials are made available under the terms
of the Eclipse Public License v2.0 which accompanies this distribution, and is
available at https://www.eclipse.org/legal/epl-v20.html

SPDX-License-Identifier: EPL-2.0

Copyright Contributors to the Zincwarecode Project.

Contact Information
-------------------
email: zincwarecode@gmail.com
github: https://github.com/zincware
web: https://zincwarecode.com/

Citation
--------
If you use this module please cite us with:

Summary
-------
"""
import dataclasses
import logging
import pathlib
import time
import typing
from typing import List

import h5py as hf
import numpy as np
import tensorflow as tf

from mdsuite.utils.meta_functions import join_path

log = logging.getLogger(__name__)


[docs]@dataclasses.dataclass(frozen=True) class PropertyInfo: """ Information of a trajectory property. example: pos_info = PropertyInfo('Positions', 3) vel_info = PropertyInfo('Velocities', 3) Attributes ---------- name: The name of the property n_dims: The dimensionality of the property """ name: str n_dims: int
[docs]@dataclasses.dataclass class SpeciesInfo: """ Information of a species Attributes ---------- name Name of the species (e.g. 'Na') n_particles Number of particles of that species properties: list of PropertyInfo List of the properties that were recorded for the species mass and charge are optional """ name: str n_particles: int properties: List[PropertyInfo] mass: float = None charge: float = 0 def __eq__(self, other): same = ( self.name == other.name and self.n_particles == other.n_particles and self.mass == other.mass and self.charge == other.charge ) if len(self.properties) != len(other.properties): return False for prop_s, prop_o in zip(self.properties, other.properties): same = same and prop_s == prop_o return same
[docs]@dataclasses.dataclass class MoleculeInfo(SpeciesInfo): """Information about a Molecule All the information of a species + groups Attributes ---------- groups: dict A molecule specific dictionary for mapping the molecule to the particles. The keys of this dict are index references to a specific molecule, i.e. molecule 1 and the values are a dict of atom species and their indices belonging to that specific molecule. e.g water = {"groups": {"0": {"H": [0, 1], "O": [0]}} This tells us that the 0th water molecule consists of the 0th and 1st hydrogen atoms in the database as well as the 0th oxygen atom. """ groups: dict = None def __eq__(self, other): """Add a check to see if the groups are identical""" if self.groups != other.groups: return False return super(MoleculeInfo, self).__eq__(other)
[docs]@dataclasses.dataclass class TrajectoryMetadata: """ This metadata must be extracted from trajectory files to build the database into which the trajectory will be stored Attributes ---------- n_configurations : int Number of configurations of the whole trajectory. species_list: list of SpeciesInfo The information about all species in the system. box_l: list of float The simulation box size in three dimensions sample_rate : int optional The number of timesteps between consecutive samples # todo remove in favour of sample_step sample_step : int optional The time between consecutive configurations. E.g. for a simulation with time step 0.1 where the trajectory is written every 5 steps: sample_step = 0.5. Does not have to be specified (e.g. configurations from Monte Carlo scheme), but is needed for all dynamic observables. temperature : float optional The set temperature of the system. Optional because only applicable for MD simulations with thermostat. Needed for certain observables. simulation_data : str|Path, optional All other simulation data that can be extracted from the trajectory metadata. E.g. software version, pressure in NPT simulations, time step, ... """ n_configurations: int species_list: List[SpeciesInfo] box_l: list = None sample_rate: int = 1 sample_step: float = None temperature: float = None simulation_data: dict = dataclasses.field(default_factory=dict)
[docs]class TrajectoryChunkData: """ Class to specify the data format for transfer from the file to the database """ def __init__(self, species_list: List[SpeciesInfo], chunk_size: int): """ Parameters ---------- species_list : List[SpeciesInfo] List of SpeciesInfo. It contains the information which species are there and which properties are recorded for each chunk_size : int The number of configurations to be stored in this chunk """ self.chunk_size = chunk_size self.species_list = species_list self._data = dict() for sp_info in species_list: self._data[sp_info.name] = dict() for prop_info in sp_info.properties: self._data[sp_info.name][prop_info.name] = np.zeros( (chunk_size, sp_info.n_particles, prop_info.n_dims) )
[docs] def add_data(self, data: np.ndarray, config_idx, species_name, property_name): """ Add configuration data to the chunk Parameters ---------- data: The data to be added, with shape (n_configs, n_particles, n_dims). n_particles and n_dims relates to the species and the property that is being added config_idx: Start index of the configs that are being added. species_name Name of the species to which the data belongs property_name Name of the property being added example: Storing velocity Information for 42 Na atoms in the 17th iteration of a loop that reads 5 configs per loop: add_data(vel_array, 16*5, 'Na', 'Velocities') where vel.data.shape == (5,42,3) """ n_configs = len(data) self._data[species_name][property_name][ config_idx : config_idx + n_configs, :, : ] = data
[docs] def get_data(self): return self._data
[docs]class Database: """ Database class Databases make up a large part of the functionality of MDSuite and are kept fairly consistent in structure. Therefore, the database_path structure we are using has a separate class with commonly used methods which act as wrappers for the hdf5 database_path. Attributes ---------- path : str|Path The name of the database_path in question. """ def __init__(self, path: typing.Union[str, pathlib.Path] = "database"): """ Constructor for the database_path class. Parameters ---------- path : str|Path The name of the database_path in question. """ if isinstance(path, pathlib.Path): self.path = path.as_posix() elif isinstance(path, str): self.path = path # name of the database_path else: # TODO fix this! log.debug(f"Expected str|Path but found {type(path)}") self.path = path @staticmethod def _update_indices( data: np.array, reference: np.array, batch_size: int, n_atoms: int ): """ Update the indices key of the structure dictionary if the tensor_values must be sorted. Parameters ---------- data : np.ndarray reference : np.ndarray batch_size : int n_atoms : int Returns ------- """ ids = np.reshape(np.array(data[:, 0]).astype(int), (-1, n_atoms)) ref_ids = np.argsort(ids, axis=1) n_batches = ids.shape[0] return ( ref_ids[:, reference - 1] + (np.arange(n_batches) * n_atoms)[None].T ).flatten() @staticmethod def _build_path_input(structure: dict) -> dict: """ Build an input to a hdf5 database_path from a dictionary In many cases, whilst a dict can be passed on to a method, it is not ideal for use in the hdf5 database_path. This method takes a dictionary and return a new dictionary with the relevant file path. Parameters ---------- structure : dict General structure of the dictionary with relevant dataset sizes. e.g. {'Na': {'Forces': (200, 5000, 3)}, 'Pressure': (5000, 6), 'Temperature': (5000, 1)} In this case, the last value in the tuple corresponds to the number of components that wil be parsed to the database_path. The value in the dict can also be an integer corresponding to a resize operation such as {'Na': {'velocities' 100}}. In any case, the deepest portion of the dict must be a non-dict object and will be returned as the value of the path to it in the new dictionary. Returns ------- architecture : dict Corrected path in the hdf5 database_path. e.g. {'/Na/Velocities': 100}, or {'/Na/Forces': (200, 5000, 3)} """ # Build file paths for the addition. architecture = {} for group in structure: if type(structure[group]) is not dict: architecture[group] = structure[group] else: for subgroup in structure[group]: db_path = join_path(group, subgroup) architecture[db_path] = structure[group][subgroup] return architecture
[docs] def add_data(self, chunk: TrajectoryChunkData, start_idx: int): """ Add new data to the dataset. Parameters ---------- chunk: a data chunk start_idx: Configuration at which to start writing """ workaround_time_in_axis_1 = True chunk_data = chunk.get_data() with hf.File(self.path, "r+") as database: stop_index = start_idx + chunk.chunk_size for sp_info in chunk.species_list: for prop_info in sp_info.properties: dataset_name = f"{sp_info.name}/{prop_info.name}" write_data = chunk_data[sp_info.name][prop_info.name] dataset_shape = database[dataset_name].shape if len(dataset_shape) == 2: # only one particle database[dataset_name][start_idx:stop_index, :] = write_data[ :, 0, : ] elif len(dataset_shape) == 3: if workaround_time_in_axis_1: database[dataset_name][ :, start_idx:stop_index, : ] = np.swapaxes(write_data, 0, 1) else: database[dataset_name][start_idx:stop_index, ...] = write_data else: raise ValueError( "dataset shape must be either (n_part,n_config,n_dim) or" " (n_config, n_dim)" )
[docs] def resize_datasets(self, structure: dict): """ Resize a dataset so more tensor_values can be added Parameters ---------- structure : dict path to the dataset that needs to be resized. e.g. {'Na': {'velocities': (32, 100, 3)}} will resize all 'x', 'y', and 'z' datasets by 100 entries. Returns ------- """ with hf.File(self.path, "r+") as db: # construct the architecture dict architecture = self._build_path_input(structure=structure) # Check for a type error in the dataset information for identifier in architecture: dataset_information = architecture[identifier] if not isinstance(dataset_information, tuple): raise TypeError("Invalid input for dataset generation") # get the correct maximum shape for the dataset -- changes if an # experiment property or an atomic property if len(dataset_information[:-1]) == 1: axis = 0 expansion = dataset_information[0] + db[identifier].shape[0] else: axis = 1 expansion = dataset_information[1] + db[identifier].shape[1] db[identifier].resize(expansion, axis)
[docs] def initialize_database(self, structure: dict): """ Build a database_path with a general structure. Note, this method WILL overwrite a pre-existing database_path. This is because it is only to be called on the initial construction of an experiment class and the first addition of tensor_values to it. Parameters ---------- structure : dict General structure of the dictionary with relevant dataset sizes. e.g. {'Na': {'Forces': (200, 5000, 3)}, 'Pressure': (5000, 6), 'Temperature': (5000, 1)} In this case, the last value in the tuple corresponds to the number of components that wil be parsed to the database_path. Returns ------- """ self.add_dataset(structure) # add a dataset to the groups
[docs] def database_exists(self) -> bool: """ Check if the database file already exists """ return pathlib.Path(self.path).exists()
[docs] def add_dataset(self, structure: dict): """ Add a dataset of the necessary size to the database_path Just as a separate method exists for building the group structure of the hdf5 database_path, so too do we include a separate method for adding a dataset. This is so datasets can be added not just upon the initial construction of the database_path, but also if tensor_values is added in the future that should also be stored. This method will assume that a group has already been built, although this is not necessary for HDF5, the separation of the actions is good practice. Parameters ---------- structure : dict Structure of a single property to be added to the database_path. e.g. {'Na': {'Forces': (200, 5000, 3)}} Returns ------- Updates the database_path directly. """ with hf.File(self.path, "a") as database: architecture = self._build_path_input(structure) # get the correct file path for item in architecture: dataset_information = architecture[item] # get the tuple information dataset_path = item # get the dataset path in the database_path # Check for a type error in the dataset information try: if type(dataset_information) is not tuple: raise TypeError("Invalid input for dataset generation") except TypeError: raise TypeError if len(dataset_information[:-1]) == 1: vector_length = dataset_information[-1] max_shape = (None, vector_length) else: max_shape = list(dataset_information) max_shape[1] = None max_shape = tuple(max_shape) database.create_dataset( dataset_path, dataset_information, maxshape=max_shape, scaleoffset=5, chunks=True, )
def _add_group_structure(self, structure: dict): """ Add a simple group structure to a database_path. This method will take an input structure and build the required group structure in the hdf5 database_path. This will NOT however instantiate the dataset for the structure, only the group hierarchy. Parameters ---------- structure : dict Structure of a single property to be added to the database_path. e.g. {'Na': {'Forces': (200, 5000, 3)}} Returns ------- Updates the database_path directly. """ with hf.File(self.path, "a") as database: # Build file paths for the addition. architecture = self._build_path_input(structure=structure) for item in list(architecture): if item in database: log.info("Group structure already exists") else: database.create_group(item)
[docs] def get_memory_information(self) -> dict: """ Get memory information from the database_path Returns ------- memory_database : dict A dictionary of the memory information of the groups in the database_path """ with hf.File(self.path, "r") as database: memory_database = {} for item in database: for ds in database[item]: memory_database[join_path(item, ds)] = database[item][ds].nbytes return memory_database
[docs] def check_existence(self, path: str) -> bool: """ Check to see if a dataset is in the database_path Parameters ---------- path : str Path to the desired dataset Returns ------- response : bool If true, the path exists, else, it does not. """ with hf.File(self.path, "r") as database_object: keys = [] database_object.visit( lambda item: keys.append(database_object[item].name) if isinstance(database_object[item], hf.Dataset) else None ) path = f"/{path}" # add the / to avoid name overlapping response = any(list(item.endswith(path) for item in keys)) return response
[docs] def change_key_names(self, mapping: dict): """ Change the name of database_path keys Parameters ---------- mapping : dict Mapping for the change of names Returns ------- Updates the database_path """ with hf.File(self.path, "r+") as db: groups = list(db.keys()) for item in groups: if item in mapping: db.move(item, mapping[item])
[docs] def load_data( self, path_list: list = None, select_slice: np.s_ = np.s_[:], dictionary: bool = False, scaling: list = None, d_size: int = None, ): """ Load tensor_values from the database_path for some operation. Should be called by the tensor_values fetch class as this will ensure correct loading and pre-loading. Returns ------- """ if scaling is None: scaling = [1 for _ in range(len(path_list))] with hf.File(self.path, "r") as database: data = {} for i, item in enumerate(path_list): if type(select_slice) is dict: # index is the particle species name in the full item as a str. slice_index = item.decode().split("/")[0] my_slice = select_slice[slice_index] else: my_slice = select_slice try: data[item] = ( tf.convert_to_tensor(database[item][my_slice], dtype=tf.float64) * scaling[i] ) except TypeError: data[item] = ( tf.convert_to_tensor( database[item][my_slice[0]][:, my_slice[1], :], dtype=tf.float64, ) * scaling[i] ) data[str.encode("data_size")] = d_size return data
[docs] def get_load_time(self, database_path: str = None): """ Calculate the open/close time of the database_path. Parameters ---------- database_path : str Database path on which to test the time. Returns ------- opening time : float Time taken to open and close the database_path """ if database_path is None: start = time.time() database_path = hf.File(self.path, "r") database_path.close() stop = time.time() else: start = time.time() database_path = hf.File(database_path, "r") database_path.close() stop = time.time() return stop - start
[docs] def get_data_size(self, data_path: str) -> tuple: """ Return the size of a dataset as a tuple (n_rows, n_columns, n_bytes) Parameters ---------- data_path : str path to the tensor_values in the hdf5 database_path. Returns ------- dataset_properties : tuple Tuple of tensor_values about the dataset, e.g. (n_rows, n_columns, n_bytes) """ with hf.File(self.path, "r") as db: data_tuple = ( db[data_path].shape[0], db[data_path].shape[1], db[data_path].nbytes, ) return data_tuple
[docs] def get_database_summary(self): """ Get a summary of the database properties. Returns ------- summary : list A list of properties that are in the database """ with hf.File(self.path, "r") as db: return list(db.keys())