Source code for alchemlyb.convergence.convergence

"""Functions for assessing convergence of free energy estimates and raw data."""

from typing import Any, List, Tuple

import numpy as np
import pandas as pd
from loguru import logger
from sklearn.base import BaseEstimator

from .. import concat
from ..estimators import BAR, TI, MBAR, FEP_ESTIMATORS, TI_ESTIMATORS
from ..postprocessors.units import to_kT

estimators_dispatch = {"BAR": BAR, "TI": TI, "MBAR": MBAR}


[docs] def forward_backward_convergence( df_list: List[pd.DataFrame], estimator: str = "MBAR", num: int = 10, error_tol: float = 3, **kwargs: Any, ) -> pd.DataFrame: """Forward and backward convergence of the free energy estimate. Generate the free energy estimate as a function of time in both directions, with the specified number of equally spaced points in the time [Klimovich2015]_. For example, setting `num` to 10 would give the forward convergence which is the free energy estimate from the first 10%, 20%, 30%, ... of the data. The Backward would give the estimate from the last 10%, 20%, 30%, ... of the data. Parameters ---------- df_list : list List of DataFrame of either dHdl or u_nk, where each represents a different value of lambda. estimator : {'MBAR', 'BAR', 'TI'} Name of the estimators. .. deprecated:: 1.0.0 Lower case input is also accepted until release 2.0.0. num : int The number of blocks used to divide *each* DataFrame and progressively add to assess convergence. Note that if the DataFrames are different lengths, the number of samples contributed with each block will be different. error_tol : float The maximum error tolerated for analytic error. If the analytic error is bigger than the error tolerance, the bootstrap error will be used. .. versionadded:: 2.3.0 .. versionchanged:: 2.4.0 Clarified docstring, removed incorrect estimation of std for cumulative result in bar and added check that only a single lambda state is represented in the indices of each df in df_list. kwargs : dict Keyword arguments to be passed to the estimator. Returns ------- :class:`pandas.DataFrame` The DataFrame with convergence data. :: Forward Forward_Error Backward Backward_Error data_fraction 0 3.016442 0.052748 3.065176 0.051036 0.1 1 3.078106 0.037170 3.078567 0.036640 0.2 2 3.072561 0.030186 3.047357 0.029775 0.3 3 3.048325 0.026070 3.057527 0.025743 0.4 4 3.049769 0.023359 3.037454 0.023001 0.5 5 3.034078 0.021260 3.040484 0.021075 0.6 6 3.043274 0.019642 3.032495 0.019517 0.7 7 3.035460 0.018340 3.036670 0.018261 0.8 8 3.042032 0.017319 3.046597 0.017233 0.9 9 3.044149 0.016405 3.044385 0.016402 1.0 .. versionadded:: 0.6.0 .. versionchanged:: 1.0.0 The ``estimator`` accepts uppercase input. The default for using ``estimator='MBAR'`` was changed from :class:`~alchemlyb.estimators.MBAR` to :class:`~alchemlyb.estimators.AutoMBAR`. .. versionchanged:: 2.0.0 Use pymbar.MBAR instead of the AutoMBAR option. """ logger.info("Start convergence analysis.") logger.info("Check data availability.") if estimator not in (FEP_ESTIMATORS + TI_ESTIMATORS): msg = f"Estimator {estimator} is not available in {FEP_ESTIMATORS + TI_ESTIMATORS}." logger.error(msg) raise ValueError(msg) else: # select estimator class by name my_estimator = estimators_dispatch[estimator](**kwargs) logger.info(f"Use {estimator} estimator for convergence analysis.") # Check that each df in the list has only one value of lambda for i, df in enumerate(df_list): lambda_values = list(set([x[1:] for x in df.index.to_numpy()])) if len(lambda_values) > 1: ind = [ j for j in range(len(lambda_values[0])) if len(list(set([x[j] for x in lambda_values]))) > 1 ][0] raise ValueError( "Provided DataFrame, df_list[{}] has more than one lambda value in df.index[{}]".format( i, ind ) ) logger.info("Begin forward analysis") forward_list = [] forward_error_list = [] for i in range(1, num + 1): logger.info("Forward analysis: {:.2f}%".format(100 * i / num)) sample = [] for data in df_list: sample.append(data[: len(data) // num * i]) mean, error = _forward_backward_convergence_estimate( sample, estimator, my_estimator, error_tol, **kwargs ) forward_list.append(mean) forward_error_list.append(error) logger.info( "{:.2f} +/- {:.2f} kT".format(forward_list[-1], forward_error_list[-1]) ) logger.info("Begin backward analysis") backward_list = [] backward_error_list = [] for i in range(1, num + 1): logger.info("Backward analysis: {:.2f}%".format(100 * i / num)) sample = [] for data in df_list: sample.append(data[-len(data) // num * i :]) mean, error = _forward_backward_convergence_estimate( sample, estimator, my_estimator, error_tol, **kwargs ) backward_list.append(mean) backward_error_list.append(error) logger.info( "{:.2f} +/- {:.2f} kT".format(backward_list[-1], backward_error_list[-1]) ) convergence = pd.DataFrame( { "Forward": forward_list, "Forward_Error": forward_error_list, "Backward": backward_list, "Backward_Error": backward_error_list, "data_fraction": [i / num for i in range(1, num + 1)], } ) convergence.attrs = df_list[0].attrs return convergence
def _forward_backward_convergence_estimate( sample_list: List[pd.DataFrame], estimator: str, my_estimator: BaseEstimator, error_tol: float, **kwargs: Any, ) -> Tuple[float, float]: """Use estimator to run the estimation and return the mean and error. Parameters ---------- sample_list: A list of samples as pandas Dataframe. estimator: The string of the estimator (upper case) my_estimator: The estimator object. error_tol: The error tolerance. kwargs Returns ------- mean: The delta_f between 0 and 1 error: The d_delta_f between 0 and 1 """ sample = concat(sample_list) result = my_estimator.fit(sample) if estimator == "MBAR": my_estimator.initial_f_k = result.delta_f_.iloc[0, :] mean = result.delta_f_.iloc[0, -1] if estimator == "BAR": # See https://github.com/alchemistry/alchemlyb/pull/60#issuecomment-430720742 # Error estimate generated by BAR ARE correlated error = np.nan else: error = result.d_delta_f_.iloc[0, -1] if estimator == "MBAR" and error > error_tol: logger.warning( f"Statistical Error ({error}) bigger than error tolerance ({error_tol}), use bootstrap error instead." ) bootstraps_estimator = estimators_dispatch[estimator]( n_bootstraps=50, initial_f_k=result.delta_f_.iloc[0, :], **kwargs ) bootstraps_estimator.fit(sample) error = bootstraps_estimator.d_delta_f_.iloc[0, -1] return mean, error def _cummean(vals: np.ndarray, out_length: int) -> np.ndarray: """The cumulative mean of an array. This function computes the cumulative mean and shapes the result to the desired length. Parameters ---------- vals : numpy.array The one-dimensional input array. out_length : int The length of the output array. Returns ------- numpy.array The one-dimensional input array with length of total. Note ---- If the length of the input series is smaller than the ``out_length``, the length of the output array is the same as the input series. .. versionadded:: 1.0.0 """ in_length = len(vals) if in_length < out_length: out_length = in_length block = in_length // out_length reshape = vals[: block * out_length].reshape(block, out_length) mean = np.mean(reshape, axis=0) result = np.cumsum(mean) / np.arange(1, out_length + 1) return np.array(result)
[docs] def fwdrev_cumavg_Rc( series: pd.Series, precision: float = 0.01, tol: float = 2 ) -> Tuple[float, pd.DataFrame]: r"""Generate the convergence criteria :math:`R_c` for a single simulation. The input will be :class:`pandas.Series` generated by :func:`~alchemlyb.preprocessing.subsampling.decorrelate_u_nk` or :func:`~alchemlyb.preprocessing.subsampling.decorrelate_dhdl`. The output will be the float :math:`R_c` [Fan2020]_ [Fan2021]_ and a :class:`pandas.DataFrame` with the forward and backward cumulative average at `precision` fractional increments, as described below. :math:`R_c = 0` indicates that the system is well equilibrated right from the beginning while :math:`R_c = 1` signifies that the whole trajectory is not equilibrated. Parameters ---------- series : pandas.Series The input energy array. precision : float The precision of the output :math:`R_c`. To speed the calculation up, the data has been block-averaged before doing the calculation, the size of the block is controlled by the desired precision. tol : float Tolerance (or convergence threshold :math:`\epsilon` in [Fan2021]_) in :math:`kT`. Returns ------- float Convergence time fraction :math:`R_c` [Fan2021]_ :class:`pandas.DataFrame` The DataFrame with block average. :: Forward Backward data_fraction 0 3.016442 3.065176 0.1 1 3.078106 3.078567 0.2 2 3.072561 3.047357 0.3 3 3.048325 3.057527 0.4 4 3.049769 3.037454 0.5 5 3.034078 3.040484 0.6 6 3.043274 3.032495 0.7 7 3.035460 3.036670 0.8 8 3.042032 3.046597 0.9 9 3.044149 3.044385 1.0 Notes ----- This function computes :math:`R_c` from `equation 16`_ from [Fan2021]_. The code is modified based on Shujie Fan's (@VOD555) work. Zhiyi Wu (@xiki-tempula) improved the performance of the original algorithm. Please cite [Fan2021]_ when using this function. See also -------- A_c .. versionadded:: 1.0.0 .. _`equation 16`: https://www.ncbi.nlm.nih.gov/pmc/articles/PMC8397498/#FD16 """ series = to_kT(series) # type: ignore[assignment] array = series.to_numpy() out_length = int(1 / precision) g_forward = _cummean(array, out_length) g_backward = _cummean(array[::-1], out_length) length = len(g_forward) convergence = pd.DataFrame( { "Forward": g_forward, "Backward": g_backward, "data_fraction": [i / length for i in range(1, length + 1)], } ) convergence.attrs = series.attrs # Final value g = g_forward[-1] conv = np.logical_and(np.abs(g_forward - g) < tol, np.abs(g_backward - g) < tol) for i in range(out_length): if all(conv[i:]): return i / length, convergence else: # This branch exists as we are truncating the dataset to speed the # calculation up. For example if the dataset has 21 points and the # precision is 0.05. The last point of g_forward is computed using # data[0:20] while the last point of g_backward is computed using # data[1:21]. Thus, the last point of g_forward and g_backward are not # the same as this branch will be triggered. return 1.0, convergence
[docs] def A_c(series_list: List[pd.Series], precision: float = 0.01, tol: float = 2) -> float: r"""Generate the ensemble convergence criteria :math:`A_c` for a set of simulations. The input is a :class:`list` of :class:`pandas.Series` generated by :func:`~alchemlyb.preprocessing.subsampling.decorrelate_u_nk` or :func:`~alchemlyb.preprocessing.subsampling.decorrelate_dhdl`. The output will the float :math:`A_c` [Fan2020]_ [Fan2021]_. :math:`A_c` is a number between 0 and 1 that can be interpreted as the ratio of the total equilibrated simulation time to the whole simulation time for a full set of simulations. :math:`A_c = 1` means that all simulation time frames in all windows can be considered equilibrated, while :math:`A_c = 0` indicates that nothing is equilibrated. Parameters ---------- series_list : list A list of :class:`pandas.Series` energy array. precision : float The precision of the output :math:`A_c`. To speed the calculation up, the data has been block-averaged before doing the calculation, the size of the block is controlled by the desired precision. tol : float Tolerance (or convergence threshold :math:`\epsilon` in [Fan2021]_) in :math:`kT`. Returns ------- float The area :math:`A_c` under curve for convergence time fraction. Notes ----- This function computes :math:`A_c` from `equation 18`_ from [Fan2021]_. Please cite [Fan2021]_ when using this function. See also -------- fwdrev_cumavg_Rc .. versionadded:: 1.0.0 .. _`equation 18`: https://www.ncbi.nlm.nih.gov/pmc/articles/PMC8397498/#FD18 """ n_R_c = len(series_list) R_c_list = [fwdrev_cumavg_Rc(series, precision, tol)[0] for series in series_list] logger.info(f"R_c list: {R_c_list}") # Integrate the R_c_list <= R_c over the range of 0 to 1 array_01 = np.hstack((R_c_list, [0, 1])) sorted_array = np.sort(np.unique(array_01)) result = 0 for i, element in enumerate(sorted_array[::-1]): if i == 0: continue else: d_R_c = sorted_array[-i] - sorted_array[-i - 1] result += d_R_c * sum(R_c_list <= element) / n_R_c return result
[docs] def block_average( df_list: List[pd.DataFrame], estimator: str = "MBAR", num: int = 10, **kwargs: Any ) -> pd.DataFrame: """Free energy estimate for portions of the trajectory. Generate the free energy estimate for a series of blocks in time, with the specified number of equally spaced points. For example, setting `num` to 10 would give the block averages which is the free energy estimate from the first 10% alone, then the next 10% ... of the data. Parameters ---------- df_list : list List of DataFrame of either dHdl or u_nk, where each represents a different value of lambda. estimator : {'MBAR', 'BAR', 'TI'} Name of the estimators. num : int The number of blocks used to divide *each* DataFrame. Note that if the DataFrames are different lengths, the number of samples contributed to each block will be different. kwargs : dict Keyword arguments to be passed to the estimator. Returns ------- :class:`pandas.DataFrame` The DataFrame with estimate data. :: FE FE_Error 0 3.016442 0.052748 1 3.078106 0.037170 2 3.072561 0.030186 3 3.048325 0.026070 4 3.049769 0.023359 5 3.034078 0.021260 6 3.043274 0.019642 7 3.035460 0.018340 8 3.042032 0.017319 9 3.044149 0.016405 .. versionadded:: 2.4.0 """ logger.info("Start block averaging analysis.") logger.info("Check data availability.") if estimator not in (FEP_ESTIMATORS + TI_ESTIMATORS): msg = f"Estimator {estimator} is not available in {FEP_ESTIMATORS + TI_ESTIMATORS}." logger.error(msg) raise ValueError(msg) else: # select estimator class by name estimator_fit = estimators_dispatch[estimator](**kwargs).fit logger.info(f"Use {estimator} estimator for convergence analysis.") # Check that each df in the list has only one value of lambda for i, df in enumerate(df_list): lambda_values = list(set([x[1:] for x in df.index.to_numpy()])) if len(lambda_values) > 1: ind = [ j for j in range(len(lambda_values[0])) if len(list(set([x[j] for x in lambda_values]))) > 1 ][0] raise ValueError( "Provided DataFrame, df_list[{}] has more than one lambda value in df.index[{}]".format( i, ind ) ) if estimator in ["BAR"] and len(df_list) > 2: raise ValueError( "Restrict to two DataFrames, one with a fep-lambda value and one its forward adjacent state for a " "meaningful result." ) logger.info("Begin Moving Average Analysis") average_list = [] average_error_list = [] for i in range(1, num): logger.info("Moving Average Analysis: {:.2f}%".format(100 * i / num)) sample = [] for data in df_list: ind1, ind2 = len(data) // num * (i - 1), len(data) // num * i sample.append(data[ind1:ind2]) sample = concat(sample) # type: ignore[assignment] result = estimator_fit(sample) average_list.append(result.delta_f_.iloc[0, -1]) if estimator.lower() == "bar": # See https://github.com/alchemistry/alchemlyb/pull/60#issuecomment-430720742 # Error estimate generated by BAR ARE correlated average_error_list.append(np.nan) else: average_error_list.append(result.d_delta_f_.iloc[0, -1]) logger.info( "{:.2f} +/- {:.2f} kT".format(average_list[-1], average_error_list[-1]) ) convergence = pd.DataFrame( { "FE": average_list, "FE_Error": average_error_list, } ) convergence.attrs = df_list[0].attrs return convergence