Source code for alchemlyb.parsing.namd

"""Parsers for extracting alchemical data from `NAMD <http://www.ks.uiuc.edu/Research/namd/>`_ output files."""

from os.path import basename
from re import split

import numpy as np
import pandas as pd
from loguru import logger

from . import _init_attrs
from .util import anyopen
from ..postprocessors.units import R_kJmol, kJ2kcal

k_b = R_kJmol * kJ2kcal


def _filename_sort_key(s: str) -> list[int | str]:
    """Key for natural-sorting filenames, ignoring the path.

    This means that unlike with the standard Python sorted() function, "foo9" < "foo10".
    """

    return [int(t) if t.isdigit() else t.lower() for t in split(r"(\d+)", basename(s))]


def _get_lambdas(fep_files: str | list[str]) -> None | list[float]:
    """Retrieves all lambda values included in the FEP files provided.

    We have to do this in order to tolerate truncated and restarted fepout files.
    The IDWS lambda is not present at the termination of the window, presumably
    for backwards compatibility with ParseFEP and probably other things.

    For a given lambda1, there can be only one lambda2 and at most one lambda_idws.

    Parameters
    ----------
    fep_files: str or list of str
        Path(s) to fepout files to extract data from.

    Returns
    -------
    List of floats, or None if there is more than one lambda_idws for each lambda1.
    """

    lambda_fwd_map: dict[float, float] = {}
    lambda_bwd_map: dict[float, float] = {}
    is_ascending = set()
    endpoint_windows = []

    for fep_file in sorted(fep_files, key=_filename_sort_key):
        with anyopen(fep_file, "r") as f:  # type: ignore[arg-type]
            for line in f:
                line_split = line.strip().split()

                # We might not have a #NEW line so make the best guess
                if line_split[0] == "#NEW":
                    lambda1, lambda2 = float(line_split[6]), float(line_split[8])
                    lambda_idws = (
                        float(line_split[10]) if "LAMBDA_IDWS" in line_split else None
                    )
                elif line_split[0] == "#Free":
                    lambda1, lambda2, lambda_idws = (
                        float(line_split[7]),
                        float(line_split[8]),
                        None,
                    )
                else:
                    # We only care about lines with lambda values. No need to
                    # do all that other processing below for every line
                    continue  # pragma: no cover

                # Keep track of whether the lambda values are increasing or decreasing, so we can return
                # a sorted list of the lambdas in the correct order.
                # If it changes during parsing of this set of fepout files, then we know something is wrong

                # Keep track of endpoints separately since in IDWS runs there must be one of opposite direction
                if 0.0 in (lambda1, lambda2) or 1.0 in (lambda1, lambda2):
                    endpoint_windows.append((lambda1, lambda2))
                else:
                    # If the lambdas are equal then this doesn't represent an ascending window
                    if lambda2 != lambda1:
                        is_ascending.add(lambda2 > lambda1)
                    if lambda_idws is not None and lambda1 != lambda_idws:
                        is_ascending.add(lambda1 > lambda_idws)

                if len(is_ascending) > 1:
                    raise ValueError(
                        f"Lambda values change direction in {fep_file}, relative to the other files: {lambda1} -> {lambda2} (IDWS: {lambda_idws})"
                    )

                # Make sure the lambda2 values are consistent
                if lambda1 in lambda_fwd_map and lambda_fwd_map[lambda1] != lambda2:
                    logger.error(
                        f"fwd: lambda1 {lambda1} has lambda2 {lambda_fwd_map[lambda1]} in {fep_file} but it has already been {lambda2}"
                    )
                    raise ValueError(
                        "More than one lambda2 value for a particular lambda1"
                    )

                lambda_fwd_map[lambda1] = lambda2

                # Make sure the lambda_idws values are consistent
                if lambda_idws is not None:
                    if (
                        lambda1 in lambda_bwd_map
                        and lambda_bwd_map[lambda1] != lambda_idws
                    ):
                        logger.error(
                            f"bwd: lambda1 {lambda1} has lambda_idws {lambda_bwd_map[lambda1]} but it has already been {lambda_idws}"
                        )
                        raise ValueError(
                            "More than one lambda_idws value for a particular lambda1"
                        )
                    lambda_bwd_map[lambda1] = lambda_idws

    is_ascending = next(iter(is_ascending))  # type: ignore[arg-type]

    all_lambdas: set[float] = set()
    all_lambdas.update(lambda_fwd_map.keys())
    all_lambdas.update(lambda_fwd_map.values())
    all_lambdas.update(lambda_bwd_map.keys())
    all_lambdas.update(lambda_bwd_map.values())
    return list(sorted(all_lambdas, reverse=not is_ascending))


[docs] @_init_attrs def extract_u_nk( fep_files: str | list[str], T: float, ignore_equilibration: bool = False ) -> pd.DataFrame: """Return reduced potentials `u_nk` from NAMD fepout file(s). Parameters ---------- fep_files : str or list of str Path to fepout file(s) to extract data from. These are sorted by filename, not including the path, prior to processing, using natural-sort. This way, filenames including numbers without leading zeros are handled intuitively. Windows may be split across files, or more than one window may be present in a given file. Windows without footer lines (which may be in a different file than the respective header lines) will raise an error. This means that while windows may have been interrupted and restarted, they must be complete. Lambda values are expected to increase or decrease monotonically, and match between header and footer of each window. T : float Temperature in Kelvin at which the simulation was sampled. ignore_equilibration : bool, optional If True, the parser will begin collecting energy data immediately from the start of a window, ignoring the "#STARTING COLLECTION..." line. This effectively includes the equilibration steps (defined by NAMD's alchEquilSteps) in the resulting DataFrame. Default is False. Returns ------- u_nk : :class:`pandas.DataFrame` Potential energy for each alchemical state (k) for each frame (n). Notes ----- Only samples collected after alchEquilSteps steps in each window are read. If post-hoc equilibrium detection is used, alchEquilSteps can be set to 0 in NAMD. If the number of forward and backward samples in a given window are different, the extra sample(s) will be discarded. This is typically zero or one sample. .. versionchanged:: 0.5.0 The :mod:`scipy.constants` is used for parsers instead of the constants used by the corresponding MD engine. .. versionchanged:: 0.6.0 Support for Interleaved Double-Wide Sampling files added, with various robustness checks. :param:`fep_files` can now be a list of filenames. .. versionchanged:: 2.6.0 Added parameter ignore_equilibration. """ beta = 1 / (k_b * T) # lists to get times and work values of each window win_ts: list[float] = [] win_de: list[float] = [] win_ts_back: list[float] = [] win_de_back: list[float] = [] # create dataframe for results u_nk = pd.DataFrame(columns=["time", "fep-lambda"]) # Flag to detect inconsistencies like truncated windows in_window = False if type(fep_files) is str: fep_files = [fep_files] # Extract the lambda values only from the fepouts all_lambdas = _get_lambdas(fep_files) # open and get data from fep file. # We sort the list of fep files in case some of them represent restarted windows. # The assumption is that they make sense in lexicographic order. # We keep track of which lambda window we're in, but since it can span multiple files, # only reset these variables here and after the end of each window lambda1_at_start, lambda2_at_start, lambda_idws_at_start = None, None, None for fep_file in sorted(fep_files, key=_filename_sort_key): # Note we have not set parsing=False because we could be continuing one window across # more than one fepout file with anyopen(fep_file, "r") as f: # type: ignore[arg-type] has_idws = False for line in f: line_split = line.strip().split() # We don't know if IDWS was enabled just from the #Free line, and we might not have # a #NEW line in this file, so we have to check for the existence of FepE_back lines # We rely on short-circuit evaluation to avoid the string comparison most of the time if has_idws is False and line_split[0] == "FepE_back:": has_idws = True # New window, get IDWS lambda if any # We keep track of lambdas from the #NEW line and if they disagree with the #Free line # within the same file, then complain. This can happen if truncated fepout files # are presented in the wrong order. if line_split[0] == "#NEW": if in_window: logger.error( f"Window with lambda1: {lambda1_at_start} lambda2: {lambda2_at_start} lambda_idws: {lambda_idws_at_start} appears truncated" ) logger.error( f"because a new window was encountered in {fep_file} before the previous one finished." ) raise ValueError("New window begun after truncated window") in_window = True lambda1_at_start, lambda2_at_start = ( float(line_split[6]), float(line_split[8]), ) lambda_idws_at_start = ( float(line_split[10]) if "LAMBDA_IDWS" in line_split else None ) has_idws = True if lambda_idws_at_start is not None else False # this line marks end of window; dump data into dataframe if line_split[0] == "#Free": # Note: in_window might already be false if this window was restarted without another "#NEW" line in_window = False # extract lambda values for finished window # lambda1 = sampling lambda (row), lambda2 = comparison lambda (col) lambda1 = float(line_split[7]) lambda2 = float(line_split[8]) # If the lambdas are not what we thought they would be, raise an exception to ensure the calculation # fails. This can happen if fepouts where one window spans multiple fepouts are processed out of order # NB: There is no way to tell if lambda_idws changed because it isn't in the '#Free' line that ends a window if lambda1_at_start is not None and (lambda1, lambda2) != ( lambda1_at_start, lambda2_at_start, ): logger.error( f"Lambdas changed unexpectedly while processing {fep_file}" ) logger.error( f"l1, l2: {lambda1_at_start}, {lambda2_at_start} changed to {lambda1}, {lambda2}" ) logger.error(line) raise ValueError( "Inconsistent lambda values within the same window" ) # As we are at the end of a window, convert last window's work and times values to np arrays # (with energy unit kT since they were kcal/mol in the fepouts) win_de_arr = beta * np.asarray(win_de) # dE values win_ts_arr = np.asarray(win_ts) # timesteps # This handles the special case where there are IDWS energies but no lambda_idws value in the # current .fepout file. This can happen when the NAMD firsttimestep is not 0, because NAMD only emits # the '#NEW' line on timestep 0 for some reason. Perhaps the user ran minimize before dynamics, # or this is a restarted run. # We infer lambda_idws_at_start if it wasn't explictly included in this fepout. # If lambdas are in ascending order, choose the one before it all_lambdas, and if descending, choose # the one after. This happens "automatically" because the lambdas were returned already sorted # in the correct direction by _get_lambdas(). # The "else" case is handled by the rest of this block, by default. if has_idws and lambda_idws_at_start is None: l1_idx = all_lambdas.index(lambda1) # type: ignore[union-attr] # Test for the highly pathological case where the first window is both incomplete and has IDWS # data but no lambda_idws value. if l1_idx == 0: raise ValueError( "IDWS data present in first window but lambda_idws not included; no way to infer the correct lambda_idws" ) lambda_idws_at_start = all_lambdas[l1_idx - 1] # type: ignore[index] logger.warning( f"Warning: {fep_file} has IDWS data but lambda_idws not included." ) logger.warning( f" lambda1 = {lambda1}, lambda2 = {lambda2}; inferring lambda_idws to be {lambda_idws_at_start}" ) if lambda_idws_at_start is not None: # Mimic classic DWS data # Arbitrarily match up fwd and bwd comparison energies on the same times # truncate extra samples from whichever array is longer win_de_back_arr = beta * np.asarray(win_de_back) n = min(len(win_de_back_arr), len(win_de_arr)) tempDF = pd.DataFrame( { "time": win_ts_arr[:n], "fep-lambda": np.full(n, lambda1), lambda1: 0, lambda2: win_de_arr[:n], lambda_idws_at_start: win_de_back_arr[:n], } ) # print(f"{fep_file}: IDWS window {lambda1} {lambda2} {lambda_idws_at_start}") else: # print(f"{fep_file}: Forward-only window {lambda1} {lambda2}") # create dataframe of times and work values # this window's data goes in row LAMBDA1 and column LAMBDA2 tempDF = pd.DataFrame( { "time": win_ts_arr, "fep-lambda": np.full(len(win_de_arr), lambda1), lambda1: 0, lambda2: win_de_arr, } ) # join the new window's df to existing df u_nk = pd.concat([u_nk, tempDF], sort=False) # reset values for next window of fepout file win_de = [] win_ts = [] win_de_back = [] win_ts_back = [] has_idws = False lambda1_at_start, lambda2_at_start, lambda_idws_at_start = ( None, None, None, ) # end of "#Free" line processing # append work value from 'dE' column of fepout file if line_split[0] == "FepEnergy:": win_de.append(float(line_split[6])) win_ts.append(float(line_split[1])) elif line_split[0] == "FepE_back:": win_de_back.append(float(line_split[6])) win_ts_back.append(float(line_split[1])) # Forget previous data for this point after line 'STARTING COLLECTION OF ENSEMBLE AVERAGE' if "#STARTING" in line_split and not ignore_equilibration: win_de = [] win_ts = [] win_de_back = [] win_ts_back = [] if in_window or len(win_de) != 0 or len(win_de_back) != 0: # pragma: no cover logger.warning( 'Trailing data without footer line ("#Free energy..."). Interrupted run?' ) raise ValueError("Last window is truncated") if lambda2 in (0.0, 1.0): # this excludes the IDWS case where a dataframe already exists for both endpoints # create last dataframe for fep-lambda at last LAMBDA2 tempDF = pd.DataFrame({"time": win_ts_arr, "fep-lambda": lambda2}) u_nk = pd.concat([u_nk, tempDF], sort=True) u_nk.set_index(["time", "fep-lambda"], inplace=True) return u_nk
[docs] def extract( fep_files: str | list[str], T: float, ignore_equilibration: bool = False ) -> dict[str, pd.DataFrame | None]: r"""Return reduced potentials `u_nk` and gradients `dH/dl` from NAMD fepout file(s). Parameters ---------- fep_file : str or list of str Path to fepout file(s) to extract data from. These are sorted by filename, not including the path, prior to processing, using natural-sort. This way, filenames including numbers without leading zeros are handled intuitively. Windows may be split across files, or more than one window may be present in a given file. Windows without footer lines (which may be in a different file than the respective header lines) will raise an error. This means that while windows may have been interrupted and restarted, they must be complete. Lambda values are expected to increase or decrease monotonically, and match between header and footer of each window. T : float Temperature in Kelvin at which the simulation was sampled. Returns ------- Dict A dictionary with keys of 'u_nk', which is a pandas DataFrame of potential energy for each alchemical state (k) for each frame (n). Note ---- If the number of forward and backward samples in a given window are different, the extra sample(s) will be discarded. This is typically zero or one sample. .. versionadded:: 1.0.0 """ return { "u_nk": extract_u_nk(fep_files, T, ignore_equilibration) } # NOTE: maybe we should also have 'dHdl': None