"""Functions for subsampling datasets."""
import warnings
from typing import Any
import pandas as pd
from pymbar.timeseries import detect_equilibration as _detect_equilibration
from pymbar.timeseries import statistical_inefficiency as _statistical_inefficiency
from pymbar.timeseries import subsample_correlated_data as _subsample_correlated_data
from loguru import logger
from .. import pass_attrs
[docs]
def decorrelate_u_nk(
df: pd.DataFrame,
method: str = "dE",
drop_duplicates: bool = True,
sort: bool = True,
remove_burnin: bool = False,
**kwargs: Any,
) -> pd.DataFrame:
"""Subsample an u_nk DataFrame based on the selected method.
The method can be either 'all' (obtained as a sum over all energy
components) or 'dE'. In the latter case the energy differences
:math:`dE_{i,i+1}` (:math:`dE_{i,i-1}` for the last lambda) are used. This
is a wrapper function around the function
:func:`~alchemlyb.preprocessing.subsampling.statistical_inefficiency` or
:func:`~alchemlyb.preprocessing.subsampling.equilibrium_detection`.
Parameters
----------
df : DataFrame
DataFrame to be subsampled according to the selected method.
method : {'all', 'dE'}
Method for decorrelating the data.
drop_duplicates : bool
Drop the duplicated lines based on time.
sort : bool
Sort the Dataframe based on the time column.
remove_burnin : bool
Whether to perform equilibrium detection (``True``) or just do
statistical inefficiency (``False``).
.. versionadded:: 1.0.0
**kwargs :
Additional keyword arguments for
:func:`~alchemlyb.preprocessing.subsampling.statistical_inefficiency`
or :func:`~alchemlyb.preprocessing.subsampling.equilibrium_detection`.
Returns
-------
:class:`pandas.DataFrame`
`df` subsampled according to selected `method`.
Note
----
The default of ``True`` for `drop_duplicates` and `sort` should result in robust decorrelation
but can lose data.
.. versionadded:: 0.6.0
.. versionchanged:: 1.0.0
Add the `remove_burnin` keyword to allow unequilibrated frames
to be removed. Rename `method` value 'dhdl_all' to 'all' and
deprecate the 'dhdl'.
"""
kwargs["drop_duplicates"] = drop_duplicates
kwargs["sort"] = sort
series = u_nk2series(df, method)
if remove_burnin:
return equilibrium_detection(df, series, **kwargs) # type: ignore[return-value,arg-type]
else:
return statistical_inefficiency(df, series, **kwargs) # type: ignore[return-value,arg-type]
[docs]
def decorrelate_dhdl(
df: pd.DataFrame,
drop_duplicates: bool = True,
sort: bool = True,
remove_burnin: bool = False,
**kwargs: Any,
) -> pd.DataFrame:
"""Subsample a dhdl DataFrame.
This is a wrapper function around the function
:func:`~alchemlyb.preprocessing.subsampling.statistical_inefficiency` and
:func:`~alchemlyb.preprocessing.subsampling.equilibrium_detection`.
Parameters
----------
df : DataFrame
DataFrame to subsample according to the selected method.
drop_duplicates : bool
Drop the duplicated lines based on time.
sort : bool
Sort the Dataframe based on the time column.
remove_burnin : bool
Whether to perform equilibrium detection (``True``) or just do
statistical inefficiency (``False``).
.. versionadded:: 1.0.0
**kwargs :
Additional keyword arguments for
:func:`~alchemlyb.preprocessing.subsampling.statistical_inefficiency`
or :func:`~alchemlyb.preprocessing.subsampling.equilibrium_detection`.
Returns
-------
:class:`pandas.DataFrame`
`df` subsampled.
Note
----
The default of ``True`` for `drop_duplicates` and `sort` should result in
robust decorrelation but can loose data.
.. versionadded:: 0.6.0
.. versionchanged:: 1.0.0
Add the `remove_burnin` keyword to allow unequilibrated frames to be
removed.
"""
kwargs["drop_duplicates"] = drop_duplicates
kwargs["sort"] = sort
series = dhdl2series(df)
if remove_burnin:
return equilibrium_detection(df, series, **kwargs) # type: ignore[return-value,arg-type]
else:
return statistical_inefficiency(df, series, **kwargs) # type: ignore[return-value,arg-type]
[docs]
@pass_attrs
def u_nk2series(df: pd.DataFrame, method: str = "dE") -> pd.Series:
"""Convert an u_nk DataFrame into a series based on the selected method
for subsampling.
The method can be either 'all' (obtained as a sum over all energy
components) or 'dE'. In the latter case the energy differences
:math:`dE_{i,i+1}` (:math:`dE_{i,i-1}` for the last lambda) are used.
Parameters
----------
df : DataFrame
DataFrame to be converted according to the selected method.
method : {'all', 'dE'}
Method for converting the data.
Returns
-------
:class:`pandas.Series`
`series` to be used as input for
:func:`~alchemlyb.preprocessing.subsampling.statistical_inefficiency`
or
:func:`~alchemlyb.preprocessing.subsampling.equilibrium_detection`.
.. versionadded:: 1.0.0
.. versionchanged:: 2.0.1
The `dE` method computes the difference between the current lambda
and the next lambda (previous lambda for the last window), instead
of using the next lambda or the previous lambda for the last window.
"""
# deprecation: remove in 3.0.0
# (the deprecations should show up in the calling functions)
if method == "dhdl":
warnings.warn(
"Method 'dhdl' has been deprecated, using 'dE' instead. "
"'dhdl' will be removed in alchemlyb 3.0.0.",
category=DeprecationWarning,
stacklevel=2,
)
method = "dE"
elif method == "dhdl_all":
warnings.warn(
"Method 'dhdl_all' has been deprecated, using 'all' instead. "
"'dhdl_all' will be removed in alchemlyb 3.0.0.",
category=DeprecationWarning,
stacklevel=2,
)
method = "all"
# Check if the input is u_nk
try:
key = df.index.values[0][1:]
if len(key) == 1:
key = key[0]
df[key]
except KeyError:
raise ValueError("The input should be u_nk")
if method == "all":
series = df.sum(axis=1)
elif method == "dE":
# Using the same logic as alchemical-analysis
key = df.index.values[0][1:]
if len(key) == 1:
# For the case where there is a single lambda
index = df.columns.values.tolist().index(key[0])
else:
# For the case of more than 1 lambda
index = df.columns.values.tolist().index(key)
# for the state that is not the last state, take the state+1
current_lambda = df.iloc[:, index]
if index + 1 < len(df.columns):
new_lambda = df.iloc[:, index + 1]
# for the state that is the last state, take the state-1
else:
new_lambda = df.iloc[:, index - 1]
series = new_lambda - current_lambda
else:
raise ValueError("Decorrelation method {} not found.".format(method))
return series
[docs]
@pass_attrs
def dhdl2series(df: pd.DataFrame, method: str = "all") -> pd.Series:
"""Convert a dhdl DataFrame to a series for subsampling.
The series is generated by summing over all energy components (axis 1 of
`df`), as for ``method='all'`` in :func:`u_nk2series`. Commonly, `df` only
contains a single energy component but in some cases (such as using a split
protocol in GROMACS), it can contain multiple columns for different energy
terms.
Parameters
----------
df : DataFrame
DataFrame to subsample according to the selected method.
method : 'all'
Only 'all' is available; the keyword is provided for compatibility with
:func:`u_nk2series`.
Returns
-------
:class:`pandas.Series`
`series` to be used as input for
:func:`~alchemlyb.preprocessing.subsampling.statistical_inefficiency`
or
:func:`~alchemlyb.preprocessing.subsampling.equilibrium_detection`.
.. versionadded:: 1.0.0
"""
if method != "all":
raise ValueError("Only method='all' is supported for dhdl2series().")
series = df.sum(axis=1)
return series
def _check_multiple_times(df: pd.DataFrame | pd.Series) -> bool:
if isinstance(df, pd.Series):
return (
df.sort_index(axis=0).reset_index("time", name="").duplicated("time").any() # type: ignore[return-value]
)
else:
return df.sort_index(axis=0).reset_index("time").duplicated("time").any() # type: ignore[return-value]
def _check_sorted(df: pd.DataFrame | pd.Series) -> bool:
return df.reset_index(0)["time"].is_monotonic_increasing
def _drop_duplicates(
df: pd.DataFrame | pd.Series, series: None | pd.Series = None
) -> tuple[pd.DataFrame | pd.Series, None | pd.Series]:
"""Drop the duplication in the ``df`` which could be Dataframe or
Series, if series is provided, format the series such that it has the
same length as ``df``.
Parameters
----------
df : DataFrame or Series
DataFrame or Series where duplication will be dropped.
series : Series
series to be formatted in the same way as df.
Returns
-------
df : DataFrame or Series
Formatted DataFrame or Series.
series : Series
Formatted Series.
"""
if isinstance(df, pd.Series):
# remove the duplicate based on time
drop_duplicates_series = df.reset_index("time", name="").drop_duplicates("time")
# Rest the time index
lambda_names = [
"time",
]
lambda_names.extend(drop_duplicates_series.index.names) # type: ignore[arg-type]
df = drop_duplicates_series.set_index("time", append=True).reorder_levels(
lambda_names
)
else:
# remove the duplicate based on time
drop_duplicates_df = df.reset_index("time").drop_duplicates("time")
# Rest the time index
lambda_names = [
"time",
]
lambda_names.extend(drop_duplicates_df.index.names) # type: ignore[arg-type]
df = drop_duplicates_df.set_index("time", append=True).reorder_levels(
lambda_names
)
# Do the same withing with the series
if series is not None:
# remove the duplicate based on time
drop_duplicates_series = series.reset_index("time", name="").drop_duplicates(
"time"
)
# Rest the time index
lambda_names = [
"time",
]
lambda_names.extend(drop_duplicates_series.index.names) # type: ignore[arg-type]
series = drop_duplicates_series.set_index("time", append=True).reorder_levels(
lambda_names
) # type: ignore[assignment]
return df, series
def _sort_by_time(
df: pd.DataFrame | pd.Series, series: None | pd.Series = None
) -> tuple[pd.DataFrame | pd.Series, None | pd.Series]:
"""Sort the ``df`` by time which could be Dataframe or
Series, if series is provided, sort the series as well.
Parameters
----------
df : DataFrame or Series
DataFrame or Series to be sorted by time.
series : Series
series to be sorted by time.
Returns
-------
df : DataFrame or Series
Formatted DataFrame or Series.
series : Series
Formatted Series.
"""
df = df.sort_index(level="time")
if series is not None:
series = series.sort_index(level="time")
return df, series
def _prepare_input(
df: pd.DataFrame | pd.Series,
series: None | pd.Series,
drop_duplicates: bool,
sort: bool,
) -> tuple[pd.DataFrame | pd.Series, None | pd.Series]:
"""Prepare and check the input to be used for statistical_inefficiency or equilibrium_detection.
Parameters
----------
df : DataFrame or Series
DataFrame or Series to be Prepared and checked.
series : Series
series to be Prepared and checked.
Returns
-------
df : DataFrame or Series
Formatted DataFrame or Series.
series : Series
Formatted Series.
"""
if series is None:
warnings.warn(
"The series input is `None`, would not subsample according to statistical inefficiency."
)
elif len(df) != len(series):
raise ValueError(
f"The length of df ({len(df)}) should be same as the length of series ({len(series)})."
)
if _check_multiple_times(df):
if drop_duplicates:
df, series = _drop_duplicates(df, series)
else:
raise KeyError(
"Duplicate time values found; statistical inefficiency "
"only works on a single, contiguous, "
"and sorted timeseries."
)
if not _check_sorted(df):
if sort:
df, series = _sort_by_time(df, series)
else:
raise KeyError(
"Statistical inefficiency only works as expected if "
"values are sorted by time, increasing."
)
if series is not None:
if len(series) != len(df) or not all(
series.reset_index()["time"] == df.reset_index()["time"]
):
raise ValueError("series and data must be sampled at the same times")
return df, series
[docs]
def slicing(
df: pd.DataFrame | pd.Series,
lower: None | float = None,
upper: None | float = None,
step: None | int = None,
force: bool = False,
) -> pd.DataFrame | pd.Series:
"""Subsample a DataFrame using simple slicing.
Parameters
----------
df : :class:`pandas.DataFrame`
DataFrame to subsample.
lower : float
Lower time to slice from.
upper : float
Upper time to slice to (inclusive).
step : int
Step between rows to slice by.
force : bool
Ignore checks that DataFrame is in proper form for expected behavior.
Returns
-------
:class:`pandas.DataFrame`
`df` subsampled.
.. versionchanged:: 1.0.1
The rows with NaN values are not dropped by default.
"""
try:
df = df.loc[lower:upper:step] # type: ignore[misc]
except KeyError:
raise KeyError("DataFrame rows must be sorted by time, increasing.")
if not force and _check_multiple_times(df):
raise KeyError(
"Duplicate time values found; it's generally advised "
"to use slicing on DataFrames with unique time values "
"for each row. Use `force=True` to ignore this error."
)
return df
[docs]
def statistical_inefficiency(
df: pd.DataFrame | pd.Series,
series: None | pd.Series = None,
lower: None | float = None,
upper: None | float = None,
step: None | int = None,
conservative: bool = True,
drop_duplicates: bool = False,
sort: bool = False,
) -> pd.DataFrame | pd.Series:
"""Subsample a DataFrame based on the calculated statistical inefficiency
of a timeseries.
If `series` is ``None``, then this function will behave the same as
:func:`slicing`.
Parameters
----------
df : :class:`pandas.DataFrame`
DataFrame to subsample according statistical inefficiency of `series`.
series : :class:`pandas.Series`
Series to use for calculating statistical inefficiency. If ``None``,
no statistical inefficiency-based subsampling will be performed.
lower : float
Lower bound to pre-slice `series` data from.
upper : float
Upper bound to pre-slice `series` to (inclusive).
step : int
Step between `series` items to pre-slice by.
conservative : bool
``True`` use ``ceil(statistical_inefficiency)`` to slice the data in uniform
intervals (the default). ``False`` will sample at non-uniform intervals to
closely match the (fractional) statistical_inefficieny, as implemented
in :func:`pymbar.timeseries.subsample_correlated_data`.
drop_duplicates : bool
Drop the duplicated lines based on time.
sort : bool
Sort the Dataframe based on the time column.
Returns
-------
:class:`pandas.DataFrame`
`df` subsampled according to subsampled `series`.
Warning
-------
The `series` and the data to be sliced, `df`, need to have the same number
of elements because the statistical inefficiency is calculated based on
the index of the series (and not an associated time). At the moment there is
no automatic conversion from a time to an index.
Note
----
For a non-integer statistical ineffciency :math:`g`, the default value
``conservative=True`` will provide _fewer_ data points than allowed by
:math:`g` and thus error estimates will be _higher_. For large numbers of
data points and converged free energies, the choice should not make a
difference. For small numbers of data points, ``conservative=True``
decreases a false sense of accuracy and is deemed the more careful and
conservative approach.
See Also
--------
pymbar.timeseries.statistical_inefficiency : detailed background
pymbar.timeseries.subsample_correlated_data : used for subsampling
.. versionchanged:: 0.2.0
The ``conservative`` keyword was added and the method is now using
``pymbar.timeseries.statistical_inefficiency()``; previously, the statistical
inefficiency was _rounded_ (instead of ``ceil()``) and thus one could
end up with correlated data.
.. versionchanged:: 1.0.0
Fixed a bug that would effectively ignore the ``lower`` and ``step``
keywords when returning the subsampled DataFrame object. See
`issue #198 <https://github.com/alchemistry/alchemlyb/issues/198>`_ for
more details.
"""
df, series = _prepare_input(df, series, drop_duplicates, sort)
if series is not None:
series = slicing(series, lower=lower, upper=upper, step=step) # type: ignore[assignment]
df = slicing(df, lower=lower, upper=upper, step=step)
# calculate statistical inefficiency of series (could use fft=True but needs test)
logger.debug("Running statistical inefficiency analysis.")
statinef = _statistical_inefficiency(series)
logger.debug("Statistical inefficiency: {:.2f}.", statinef)
# use the subsample_correlated_data function to get the subsample index
indices = _subsample_correlated_data(
series, g=statinef, conservative=conservative
)
logger.debug("Number of uncorrelated samples: {}.", len(indices))
df = df.iloc[indices]
else:
df = slicing(df, lower=lower, upper=upper, step=step)
return df
[docs]
def equilibrium_detection(
df: pd.DataFrame | pd.Series,
series: None | pd.Series = None,
lower: None | float = None,
upper: None | float = None,
step: None | int = None,
drop_duplicates: bool = False,
sort: bool = False,
) -> pd.DataFrame | pd.Series:
"""Subsample a DataFrame using automated equilibrium detection on a timeseries.
This function uses the :mod:`pymbar` implementation of the *simple
automated equilibrium detection* algorithm in [Chodera2016]_.
If `series` is ``None``, then this function will behave the same as
:func:`slicing`.
Parameters
----------
df : :class:`pandas.DataFrame`
DataFrame to subsample according to equilibrium detection on `series`.
series : :class:`pandas.Series`
Series to detect equilibration on. If ``None``, no equilibrium
detection-based subsampling will be performed.
lower : float
Lower bound to pre-slice `series` data from.
upper : float
Upper bound to pre-slice `series` to (inclusive).
step : int
Step between `series` items to pre-slice by.
drop_duplicates : bool
Drop the duplicated lines based on time.
sort : bool
Sort the Dataframe based on the time column.
Returns
-------
:class:`pandas.DataFrame`
`df` subsampled according to subsampled `series`.
Notes
-----
Please cite [Chodera2016]_ when you use this function in published work.
See Also
--------
pymbar.timeseries.detect_equilibration : detailed background
pymbar.timeseries.subsample_correlated_data : used for subsampling
.. versionchanged:: 1.0.0
Add the drop_duplicates and sort keyword to unify the behaviour between
:func:`~alchemlyb.preprocessing.subsampling.statistical_inefficiency` or
:func:`~alchemlyb.preprocessing.subsampling.equilibrium_detection`.
"""
df, series = _prepare_input(df, series, drop_duplicates, sort)
if series is not None:
series = slicing(series, lower=lower, upper=upper, step=step) # type: ignore[assignment]
df = slicing(df, lower=lower, upper=upper, step=step)
# calculate statistical inefficiency of series, with equilibrium detection
logger.debug("Running equilibration detection.")
t, statinef, Neff_max = _detect_equilibration(series.values)
logger.debug("Start index: {}.", t)
logger.debug("Statistical inefficiency: {:.2f}.", statinef)
series_equil = series[t:]
df_equil = df[t:]
indices = _subsample_correlated_data(series_equil, g=statinef)
logger.debug("Number of uncorrelated samples: {}.", len(indices))
df = df_equil.iloc[indices]
else:
df = slicing(df, lower=lower, upper=upper, step=step)
return df