Source code for mdsuite.calculators.einstein_diffusion_coefficients

"""
MDSuite: A Zincwarecode package.

License
-------
This program and the accompanying materials are made available under the terms
of the Eclipse Public License v2.0 which accompanies this distribution, and is
available at https://www.eclipse.org/legal/epl-v20.html

SPDX-License-Identifier: EPL-2.0

Copyright Contributors to the Zincwarecode Project.

Contact Information
-------------------
email: zincwarecode@gmail.com
github: https://github.com/zincware
web: https://zincwarecode.com/

Citation
--------
If you use this module please cite us with:

Summary
-------
Module for the computation of self-diffusion coefficients using the Einstein method.
"""
from __future__ import annotations

import logging
from abc import ABC
from dataclasses import dataclass
from typing import Any, List, Union

import numpy as np
import tensorflow as tf
from bokeh.models import HoverTool, LinearAxis, Span
from bokeh.models.ranges import Range1d
from bokeh.plotting import figure
from tqdm import tqdm

from mdsuite import utils
from mdsuite.calculators.calculator import call
from mdsuite.calculators.trajectory_calculator import TrajectoryCalculator
from mdsuite.database.mdsuite_properties import mdsuite_properties
from mdsuite.utils.calculator_helper_methods import fit_einstein_curve

log = logging.getLogger(__name__)


[docs]@dataclass class Args: """ Data class for the saved properties. """ data_range: int correlation_time: int atom_selection: np.s_ tau_values: np.s_ molecules: bool species: list fit_range: int
[docs]class EinsteinDiffusionCoefficients(TrajectoryCalculator, ABC): """ Class for the Einstein diffusion coefficient implementation Attributes ---------- msd_array : np.ndarray MSd data updated during each ensemble computation. See Also -------- mdsuite.calculators.calculator.Calculator class Examples -------- project.experiment.run.EinsteinDiffusionCoefficients(data_range=500, plot=True, correlation_time=10) """ def __init__(self, **kwargs): """ Parameters ---------- experiment : Experiment Experiment class to call from experiments : Experiment Experiment classes to call from """ super().__init__(**kwargs) self.scale_function = {"linear": {"scale_factor": 150}} self.loaded_property = mdsuite_properties.unwrapped_positions self.x_label = r"$$\text{Time} / s$$" self.y_label = r"$$\text{MSD} / m^{2}$$" self.result_keys = [ "diffusion_coefficient", "uncertainty", "gradient", "intercept", ] self.result_series_keys = ["time", "msd", "gradients", "gradient_errors"] self.analysis_name = "Einstein Self-Diffusion Coefficients" self._dtype = tf.float64 self.msd_array = None log.info("starting Einstein Diffusion Computation") @call def __call__( self, plot: bool = True, species: list = None, data_range: int = 100, correlation_time: int = 1, atom_selection: np.s_ = np.s_[:], molecules: bool = False, tau_values: Union[int, List, Any] = np.s_[:], fit_range: int = -1, ): """ Parameters ---------- plot : bool if true, plot the output. species : list List of species on which to operate. data_range : int Data range to use in the analysis. correlation_time : int Correlation time to use in the window sampling. atom_selection : np.s_ Selection of atoms to use within the HDF5 database. molecules : bool If true, molecules are used instead of atoms. tau_values : Union[int, list, np.s_] Selection of tau values to use in the window sliding. Returns ------- None """ if species is None: if molecules: species = list(self.experiment.molecules) else: species = list(self.experiment.species) if fit_range == -1: fit_range = int(data_range - 1) # set args that will affect the computation result self.args = Args( data_range=data_range, correlation_time=correlation_time, atom_selection=atom_selection, tau_values=tau_values, molecules=molecules, species=species, fit_range=fit_range, ) self.plot = plot self.system_property = False
[docs] def ensemble_operation(self, ensemble): """ Calculate and return the msd. Parameters ---------- ensemble : tf.Tensor An ensemble of data to be operated on. Returns ------- MSD of the tensor_values. """ msd = tf.math.squared_difference( tf.gather(ensemble, self.args.tau_values, axis=1), ensemble[:, None, 0] ) # average over particles, sum over dimensions msd = tf.reduce_sum(tf.reduce_mean(msd, axis=0), axis=-1) # sum up ensembles to average in post processing self.msd_array += np.array(msd)
[docs] def fit_diff_coeff(self): """ Apply unit conversion, fit line to the data, prepare for database storage """ self.msd_array /= int(self.n_batches) * self.ensemble_loop self.msd_array *= self.experiment.units.length**2 self.time *= self.experiment.units.time fit_values, covariance, gradients, gradient_errors = fit_einstein_curve( x_data=self.time, y_data=self.msd_array, fit_max_index=self.args.fit_range ) error = np.sqrt(np.diag(covariance))[0] data = { self.result_keys[0]: 1 / 6.0 * fit_values[0], self.result_keys[1]: 1 / 6.0 * error, self.result_keys[2]: fit_values[0], self.result_keys[3]: fit_values[1], self.result_series_keys[0]: self.time.tolist(), self.result_series_keys[1]: self.msd_array.tolist(), self.result_series_keys[2]: (np.array(gradients) / 6).tolist(), self.result_series_keys[3]: (np.array(gradient_errors) / 6).tolist(), } return data
[docs] def run_calculator(self): """ Run analysis. """ self._run_dependency_check() for species in self.args.species: # Here for now to avoid issues. Should be moved out when calculators become # species-wise self.time = None self.time = self._handle_tau_values() self.msd_array = np.zeros(self.data_resolution) dict_ref = str.encode("/".join([species, self.loaded_property.name])) batch_ds = self.get_batch_dataset([species]) # loop over batches to get MSD for batch in tqdm( batch_ds, ncols=70, desc=species, total=self.n_batches, disable=self.memory_manager.minibatch, ): ensemble_ds = self.get_ensemble_dataset(batch, species) for ensemble in ensemble_ds: self.ensemble_operation(ensemble[dict_ref]) fit_results = self.fit_diff_coeff() self.queue_data(data=fit_results, subjects=[species])
[docs] def plot_data(self, data): """ Plot the Einstein fits. Parameters ---------- data Returns ------- """ for selected_species, val in data.items(): fig = figure(x_axis_label=self.x_label, y_axis_label=self.y_label) gradients = np.array(val[self.result_series_keys[2]]) gradient_errors = np.array(val[self.result_series_keys[3]]) time = np.array(val[self.result_series_keys[0]]) msd = np.array(val[self.result_series_keys[1]]) fig.y_range = Range1d(-0.0, 1.1 * max(msd)) # Compute the span span = Span( location=time[self.args.fit_range], dimension="height", line_dash="dashed", ) # Compute msd and fit lines fig.line( time, msd, color=utils.Colour.ORANGE, legend_label=( f"{selected_species}: {val[self.result_keys[0]]: 0.3E} +-" f" {val[self.result_keys[1]]: 0.3E}" ), ) fit_data = val[self.result_keys[2]] * time + val[self.result_keys[3]] fig.line(time, fit_data, color=utils.Colour.PAUA, legend_label="Curve fit.") fig.extra_y_ranges = { "Diff_range": Range1d( start=0.999 * min(gradients), end=1.001 * max(gradients) ) } fig.add_layout( LinearAxis( y_range_name="Diff_range", axis_label=r"$$\text{Diffusion Coefficient} / m^{2}s^{-1}$$", ), "right", ) grad_time = time[-len(gradients) :] fig.line( grad_time, gradients, y_range_name="Diff_range", color=utils.Colour.MULBERRY, ) fig.varea( grad_time, gradients - gradient_errors, gradients + gradient_errors, alpha=0.3, color=utils.Colour.ORANGE, y_range_name="Diff_range", ) fig.add_tools(HoverTool()) fig.add_layout(span) self.plot_array.append(fig)