Source code for yank.repex

#!/usr/local/bin/env python

# ==============================================================================
# MODULE DOCSTRING
# ==============================================================================

"""
Repex
=====

Replica-exchange simulation algorithms and specific variants.

This module provides a general facility for running replica-exchange simulations,
as well as derived classes for special cases such as parallel tempering (in which
the states differ only in temperature).

Provided classes include:

- :class:`ReplicaExchange`
    Base class for general replica-exchange simulations.
- :class:`ParallelTempering`
    Convenience subclass of ReplicaExchange for parallel tempering simulations
    (one System object, many temperatures).
- :class:`Reporter`
    Replica Exchange reporter class to store all variables and data

COPYRIGHT

Current version by Andrea Rizzi <andrea.rizzi@choderalab.org>, Levi N. Naden <levi.naden@choderalab.org> and
John D. Chodera<john.chodera@choderalab.org> while at Memorial Sloan-Kettering Cancer Center.

Original version by John D. Chodera <jchodera@gmail.com> while at the University of
California Berkeley.

LICENSE

This code is licensed under the latest available version of the MIT License.

"""

# ==============================================================================
# GLOBAL IMPORTS
# ==============================================================================

from simtk import openmm
from simtk import unit

import os
import math
import copy
import time
import uuid
import yaml
import inspect
import logging
import datetime
import collections

import numpy as np
import mdtraj as md
import netCDF4 as netcdf

import openmmtools as mmtools
from openmmtools.states import ThermodynamicState

from . import utils, mpi, version
from pymbar.utils import ParameterError

logger = logging.getLogger(__name__)


# ==============================================================================
# REPLICA EXCHANGE REPORTER
# ==============================================================================

[docs]class Reporter(object): """Handle storage write/read operations and different format conventions. You can use this object to programmatically inspect the data generated by ReplicaExchange. Parameters ---------- storage : str The path to the storage file for analysis. A second checkpoint file will be determined from either ``checkpoint_storage`` or automatically based on the storage option In the future this will be able to take Storage classes as well. open_mode : str or None The mode of the file between 'r', 'w', and 'a' (or equivalently 'r+'). If None, the storage file won't be open on construction, and a call to :func:`Reporter.open` will be needed before attempting read/write operations. checkpoint_interval : int >= 1, Default: 10 The frequency at which checkpointing information is written relative to analysis information. This is a multiple of the iteration at which energies is written, hence why it must be greater than or equal to 1. Checkpoint information cannot be written on iterations which where ``iteration % checkpoint_interval != 0``. Attempting to read checkpointing information results in a masked array where only entries which were written are unmasked checkpoint_storage : str or None, optional Optional name of the checkpoint point file. This file is used to save trajectory information and other less frequently accessed data. This should NOT be a full path, and instead just a filename If None: the derived checkpoint name is the same as storage, less any extension, then "_checkpoint.nc" is added. The reporter internally tracks what data goes into which file, so its transparent to all other classes In the future, this will be able to take Storage classes as well Attributes ---------- filepath """ def __init__(self, storage, open_mode=None, checkpoint_interval=10, checkpoint_storage=None): # Handle checkpointing if type(checkpoint_interval) != int: raise ValueError("checkpoint_interval must be an integer!") dirname, filename = os.path.split(storage) if checkpoint_storage is None: basename, ext = os.path.splitext(filename) addon = "_checkpoint" checkpoint_storage = os.path.join(dirname, basename + addon + ext) logger.debug("Initial checkpoint file automatically chosen as {}".format(checkpoint_storage)) else: checkpoint_storage = os.path.join(dirname, checkpoint_storage) self._storage_file_analysis = storage self._storage_file_checkpoint = checkpoint_storage self._storage_checkpoint = None self._storage_analysis = None self._checkpoint_interval = checkpoint_interval if open_mode is not None: self.open(open_mode) @property def filepath(self): """ Returns the string file name of the primary storage file Classes outside the Reporter can access the file string for error messages and such. """ return self._storage_file_analysis @property def _storage(self): """ Return an iterable of the storage objects, avoids having the [list, of, storage, objects] everywhere Object 0 is always the primary file, all others are subfiles """ return self._storage_analysis, self._storage_checkpoint @property def _storage_paths(self): """ Return an iterable of paths to the storage files Object 0 is always the primary file, all others are subfiles """ return self._storage_file_analysis, self._storage_file_checkpoint @property def _storage_dict(self): """Return an iterable dictionary of the self._storage_X objects""" return {'checkpoint': self._storage_checkpoint, 'analysis': self._storage_analysis}
[docs] def storage_exists(self, skip_size=False): """ Check if the storage files exist on disk. Reads information on the primary file to see existence of others Parameters ---------- skip_size : bool, Optional, Default: False Skip the check of the file size. Helpful if you have just initialized a storage file but written nothing to it yet and/or its still entirely in memory (e.g. just opened NetCDF files) Returns ------- files_exist : bool If the primary storage file and its related subfiles exist, returns True. If the primary file or any subfiles do not exist, returns False """ # This function serves as a way to mask the subfiles from everything outside the reporter for file_path in self._storage_paths: if not os.path.exists(file_path): return False # Return if any files do not exist elif not os.path.getsize(file_path) > 0 and not skip_size: return False # File is 0 size return True
[docs] def is_open(self): """Return True if the Reporter is ready to read/write.""" if self._storage[0] is None: return False else: return self._storage[0].isopen()
def _are_subfiles_open(self): """Internal function to check if subfiles are open""" open_check_list = [] for storage in self._storage[1:]: if storage is None: return False else: open_check_list.append(storage.isopen()) return np.all(open_check_list)
[docs] def open(self, mode='r'): """ Open the storage file for reading/writing. Creates and pre-formats the required files if they don't exist. This is not necessary if you have indicated in the constructor to open. Parameters ---------- mode : str, Optional, Default: 'r' The mode of the file between 'r', 'w', and 'a' (or equivalently 'r+'). """ # Ensure we don't have already another file # open (possibly in a different mode). for storage in self._storage: if storage is not None: self.close() # No need to execute this code more than once break # Create directory if we want to write. if mode != 'r': for storage_path in self._storage_paths: storage_dir = os.path.dirname(storage_path) # When storage_dir == '', os.path.exists() returns False. if not os.path.exists(storage_dir) and storage_dir != '': os.makedirs(storage_dir) # Open NetCDF 4 file for writing. # If no file exists, this appropriately throws an error on 'r' mode primary_ncfiles = {} sub_ncfiles = {} # TODO: Figure out how to move around the analysis file without the checkpoint file # Cast this to a common name space for operation below primary_ncfiles['analysis'] = netcdf.Dataset(self._storage_file_analysis, mode, version='NETCDF4') if mode == 'w': sub_ncfiles['checkpoint'] = netcdf.Dataset(self._storage_file_checkpoint, mode, version='NETCDF4') else: # Read/append mode # Try to open the files in read mode, its okay if they are not there try: sub_ncfiles['checkpoint'] = netcdf.Dataset(self._storage_file_checkpoint, mode, version='NETCDF4') primary_uuid = primary_ncfiles['analysis'].UUID assert primary_uuid == sub_ncfiles['checkpoint'].UUID except IOError: # Trap the "not on disk" warning logger.debug('Could not locate checkpoint subfile. This is okay for certain append and read operations, ' 'but not for production simulation!') except AttributeError: raise AttributeError("Checkpoint file is missing its linked primary file UUID!" "No way to ensure this checkpoint file contains data matching the analysis file!") except AssertionError: primary_uuid = primary_ncfiles['analysis'].UUID check_uuid = sub_ncfiles['checkpoint'].UUID raise IOError("Checkpoint UUID does not match analysis UUID! This checkpoint file came from another " "simulation!\n" "Analysis UUID: {}" "Checkpoint UUID: {}".format(primary_uuid, check_uuid)) def check_and_build_storage_file(ncfile, nc_name, checkpoint_interval): """ Helper function to build default file settings Returns a bool depending on if it ran through the setup to indicate file needs additional data or not """ if 'scalar' not in ncfile.dimensions: # Create common dimensions. ncfile.createDimension('scalar', 1) # Scalar dimension. ncfile.createDimension('iteration', 0) # Unlimited number of iterations. ncfile.createDimension('spatial', 3) # Number of spatial dimensions. # Set global attributes. ncfile.application = 'YANK' ncfile.program = 'yank.py' ncfile.programVersion = version.short_version ncfile.Conventions = 'ReplicaExchange' ncfile.ConventionVersion = '0.2' ncfile.DataUsedFor = nc_name ncfile.CheckpointInterval = checkpoint_interval # Create and initilize the global variables nc_last_good_iter = ncfile.createVariable('last_iteration', int, 'scalar') nc_last_good_iter[0] = 0 return True else: return False # Setup the primary file if needed # Generate a unique UUID in case we need to assign it # primary and subfiles are linked by UUID # Uses uuid4 to avoid assigning hostname information primary_uuid = str(uuid.uuid4()) ncfile_ever_built = False for nc_name, ncfile in utils.dictiter(primary_ncfiles): ncfile_built = check_and_build_storage_file(ncfile, nc_name, self._checkpoint_interval) if ncfile_built: ncfile_ever_built = True # Assign ncfile to class property setattr(self, '_storage_' + nc_name, ncfile) if ncfile_ever_built: # Assign the same UUID to all primary files for _, ncfile in utils.dictiter(primary_ncfiles): ncfile.UUID = primary_uuid else: primary_uuid = primary_ncfiles['analysis'].UUID # Handle Subfiles for nc_name, ncfile in utils.dictiter(sub_ncfiles): # If they are in the sub_ncfiles dict, they exist and are open ncfile_built = check_and_build_storage_file(ncfile, nc_name, self._checkpoint_interval) if ncfile_built: # Assign the UUID to the subfile ncfile.UUID = primary_uuid setattr(self, '_storage_' + nc_name, ncfile) if self._storage_analysis is not None: # The same number will be on checkpoint file as well, but its not guaranteed to be present on_file_interval = self._storage_analysis.CheckpointInterval if on_file_interval != self._checkpoint_interval: logger.debug("checkpoint_interval != on-file checkpoint interval! " "Using on file analysis interval of {}.".format(on_file_interval)) self._checkpoint_interval = on_file_interval
[docs] def close(self): """Close the storage files""" for storage_name, storage in utils.dictiter(self._storage_dict): if storage is not None: if storage.isopen(): storage.sync() storage.close() setattr(self, '_storage' + storage_name, None)
[docs] def sync(self): """Force any buffer to be flushed to the file""" for storage in self._storage: if storage is not None: storage.sync()
def __del__(self): """Synchronize and close the storage.""" for storage in self._storage: if storage is not None: self.close() break @mmtools.utils.with_timer('Reading thermodynamic states from storage')
[docs] def read_thermodynamic_states(self): """Retrieve the stored thermodynamic states from the checkpoint file. Returns ------- thermodynamic_states : list of ThermodynamicStates The previously stored thermodynamic states. During the simulation these are swapped among replicas. unsampled_states : list of ThermodynamicState The previously stored unsampled thermodynamic states. See Also -------- read_replica_thermodynamic_states """ # We have to parse the thermodynamic states first because the # unsampled states may refer to them for the serialized system. states = collections.OrderedDict([('thermodynamic_states', list()), ('unsampled_states', list())]) # Caches standard_system_name: serialized_standard_system states_serializations = dict() # Read state information. for state_type, state_list in states.items(): # There may not be unsampled states. if state_type not in self._storage_analysis.groups: assert state_type == 'unsampled_states' continue # We keep looking for states until we can't find them anymore. n_states = len(self._storage_analysis.groups[state_type].variables) for state_id in range(n_states): serialized_state = self.read_dict('{}/state{}'.format(state_type, state_id)) # Find the thermodynamic state representation. serialized_thermodynamic_state = serialized_state while 'thermodynamic_state' in serialized_thermodynamic_state: # The while loop is necessary for nested CompoundThermodynamicStates. serialized_thermodynamic_state = serialized_thermodynamic_state['thermodynamic_state'] # Check if the standard state is in a previous state. try: standard_system_name = serialized_thermodynamic_state.pop('_Reporter__compatible_state') except KeyError: # Cache the standard system serialization for future usage. standard_system_name = '{}/{}'.format(state_type, state_id) states_serializations[standard_system_name] = serialized_thermodynamic_state['standard_system'] else: # The system serialization can be retrieved from another state. serialized_standard_system = states_serializations[standard_system_name] serialized_thermodynamic_state['standard_system'] = serialized_standard_system # Create ThermodynamicState object. states[state_type].append(mmtools.utils.deserialize(serialized_state)) state_id += 1 return [states['thermodynamic_states'], states['unsampled_states']]
@mmtools.utils.with_timer('Storing thermodynamic states')
[docs] def write_thermodynamic_states(self, thermodynamic_states, unsampled_states): """Store all the ThermodynamicStates to the checkpoint file. Parameters ---------- thermodynamic_states : list of ThermodynamicState The thermodynamic states to store. unsampled_states : list of ThermodynamicState The unsampled thermodynamic states to store. See Also -------- write_replica_thermodynamic_states """ # Store all thermodynamic states as serialized dictionaries. stored_states = dict() def unnest_thermodynamic_state(serialized): while 'thermodynamic_state' in serialized: serialized = serialized['thermodynamic_state'] return serialized for state_type, states in [('thermodynamic_states', thermodynamic_states), ('unsampled_states', unsampled_states)]: for state_id, state in enumerate(states): # Check if any compatible state has been found found_compatible_state = False for compare_state in stored_states: if compare_state.is_state_compatible(state): serialized_state = mmtools.utils.serialize(state, skip_system=True) serialized_thermodynamic_state = unnest_thermodynamic_state(serialized_state) serialized_thermodynamic_state.pop('standard_system') # Remove the unneeded system object reference_state_name = stored_states[compare_state] serialized_thermodynamic_state['_Reporter__compatible_state'] = reference_state_name found_compatible_state = True break # If no compatible state is found, do full serialization if not found_compatible_state: serialized_state = mmtools.utils.serialize(state) serialized_thermodynamic_state = unnest_thermodynamic_state(serialized_state) serialized_standard_system = serialized_thermodynamic_state['standard_system'] reference_state_name = '{}/{}'.format(state_type, state_id) len_serialization = len(serialized_standard_system) # Store new compatibility data stored_states[state] = reference_state_name logger.debug("Serialized state {} is {}B | {:.3f}KB | {:.3f}MB".format( reference_state_name, len_serialization, len_serialization/1024.0, len_serialization/1024.0/1024.0)) # Finally write the dictionary self.write_dict('{}/state{}'.format(state_type, state_id), serialized_state, fixed_dimension=True)
[docs] def read_sampler_states(self, iteration): """Retrieve the stored sampler states on the checkpoint file If the iteration is not on the checkpoint interval, None is returned Parameters ---------- iteration : int The iteration at which to read the data. Returns ------- sampler_states : list of SamplerStates or None The previously stored sampler states for each replica. If the iteration is not on the checkpoint_interval, None is returned """ iteration = self._calculate_last_iteration(iteration) checkpoint_iteration = self._calculate_checkpoint_iteration(iteration) if checkpoint_iteration is not None: n_states = self._storage_checkpoint.dimensions['replica'].size sampler_states = list() for replica_index in range(n_states): # Restore positions. x = self._storage_checkpoint.variables['positions'][checkpoint_iteration, replica_index, :, :].astype(np.float64) positions = unit.Quantity(x, unit.nanometers) # Restore box vectors. x = self._storage_checkpoint.variables['box_vectors'][checkpoint_iteration, replica_index, :, :].astype(np.float64) box_vectors = unit.Quantity(x, unit.nanometers) # Create SamplerState. sampler_states.append(mmtools.states.SamplerState(positions=positions, box_vectors=box_vectors)) return sampler_states else: return None
@mmtools.utils.with_timer('Storing sampler states')
[docs] def write_sampler_states(self, sampler_states, iteration): """Store all sampler states for a given iteration on the checkpoint file If the iteration is not on the checkpoint interval, no data is written Parameters ---------- sampler_states : list of SamplerStates The sampler states to store for each replica. iteration : int The iteration at which to store the data. """ # Check if the schema must be initialized, do this regardless of the checkpoint_interval for consistency if 'positions' not in self._storage_checkpoint.variables: n_atoms = sampler_states[0].n_particles n_states = len(sampler_states) # Create dimensions. Replica dimension could have been created before. self._storage_checkpoint.createDimension('atom', n_atoms) if 'replica' not in self._storage_checkpoint.dimensions: self._storage_checkpoint.createDimension('replica', n_states) # Create variables. ncvar_positions = self._storage_checkpoint.createVariable('positions', 'f4', ('iteration', 'replica', 'atom', 'spatial'), zlib=True, chunksizes=(1, n_states, n_atoms, 3)) ncvar_box_vectors = self._storage_checkpoint.createVariable('box_vectors', 'f4', ('iteration', 'replica', 'spatial', 'spatial'), zlib=False, chunksizes=(1, n_states, 3, 3)) ncvar_volumes = self._storage_checkpoint.createVariable('volumes', 'f8', ('iteration', 'replica'), zlib=False, chunksizes=(1, n_states)) # Define units for variables. setattr(ncvar_positions, 'units', 'nm') setattr(ncvar_box_vectors, 'units', 'nm') setattr(ncvar_volumes, 'units', 'nm**3') # Define long (human-readable) names for variables. setattr(ncvar_positions, "long_name", ("positions[iteration][replica][atom][spatial] is position of " "coordinate 'spatial' of atom 'atom' from replica 'replica' for " "iteration 'iteration'.")) setattr(ncvar_box_vectors, "long_name", ("box_vectors[iteration][replica][i][j] is dimension j of box " "vector i for replica 'replica' from iteration 'iteration-1'.")) setattr(ncvar_volumes, "long_name", ("volume[iteration][replica] is the box volume for replica 'replica' " "from iteration 'iteration-1'.")) checkpoint_iteration = self._calculate_checkpoint_iteration(iteration) if checkpoint_iteration is not None: # Store sampler states. for replica_index, sampler_state in enumerate(sampler_states): # Store positions x = sampler_state.positions / unit.nanometers self._storage_checkpoint.variables['positions'][checkpoint_iteration, replica_index, :, :] = x[:, :] # Store box vectors and volume. for i in range(3): vector_i = sampler_state.box_vectors[i] / unit.nanometers self._storage_checkpoint.variables['box_vectors'][checkpoint_iteration, replica_index, i, :] = \ vector_i self._storage_checkpoint.variables['volumes'][checkpoint_iteration, replica_index] = \ sampler_state.volume / unit.nanometers**3 else: logger.debug("Iteration {} not on the Checkpoint Interval of {}. " "Sampler State not written.".format(iteration, self._checkpoint_interval))
[docs] def read_replica_thermodynamic_states(self, iteration=slice(None)): """Retrieve the indices of the ThermodynamicStates for each replica on the analysis file Parameters ---------- iteration : int or slice The iteration(s) at which to read the data. The slice(None) allows fetching all iterations at once. Returns ------- state_indices : list of int At the given iteration, replica ``i`` propagated the system in SamplerState ``sampler_states[i]`` and ThermodynamicState ``thermodynamic_states[states_indices[i]]``. """ iteration = self._calculate_last_iteration(iteration) return self._storage_analysis.variables['states'][iteration].astype(np.int64)
[docs] def write_replica_thermodynamic_states(self, state_indices, iteration): """Store the indices of the ThermodynamicStates for each replica on the analysis file Parameters ---------- state_indices : list of int At the given iteration, replica ``i`` propagated the system in SamplerState ``sampler_states[i]`` and ThermodynamicState ``thermodynamic_states[replica_thermodynamic_states[i]]``. iteration : int The iteration at which to store the data. """ # Initialize schema if needed. if 'states' not in self._storage_analysis.variables: n_states = len(state_indices) # Create dimension if they don't exist. if 'replica' not in self._storage_analysis.dimensions: self._storage_analysis.createDimension('replica', n_states) # Create variables and attach units and description. ncvar_states = self._storage_analysis.createVariable('states', 'i4', ('iteration', 'replica'), zlib=False, chunksizes=(1, n_states)) setattr(ncvar_states, 'units', 'none') setattr(ncvar_states, "long_name", ("states[iteration][replica] is the thermodynamic state index " "(0..nstates-1) of replica 'replica' of iteration 'iteration'.")) # Store thermodynamic states indices. self._storage_analysis.variables['states'][iteration, :] = state_indices[:]
[docs] def read_mcmc_moves(self): """Return the MCMCMoves of the :class:`yank.repex.ReplicaExchange` simulation on the checkpoint Returns ------- mcmc_moves : list of MCMCMove The MCMCMoves used to propagate the simulation. """ n_moves = len(self._storage_analysis.groups['mcmc_moves'].variables) # Retrieve all moves in order. mcmc_moves = list() for move_id in range(n_moves): serialized_move = self.read_dict('mcmc_moves/move{}'.format(move_id)) mcmc_moves.append(mmtools.utils.deserialize(serialized_move)) return mcmc_moves
[docs] def write_mcmc_moves(self, mcmc_moves): """Store the MCMCMoves of the :class:`yank.repex.ReplicaExchange` simulation on the checkpoint Parameters ---------- mcmc_moves : list of MCMCMove The MCMCMoves used to propagate the simulation. """ for move_id, move in enumerate(mcmc_moves): serialized_move = mmtools.utils.serialize(move) self.write_dict('mcmc_moves/move{}'.format(move_id), serialized_move)
[docs] def read_energies(self, iteration=slice(None)): """Retrieve the energy matrix at the given iteration on the analysis file Parameters ---------- iteration : int or slice The iteration(s) at which to read the data. The slice(None) allows fetching all iterations at once. Returns ------- energy_thermodynamic_states : n_replicas x n_replicas numpy.ndarray ``energy_thermodynamic_states[iteration, i, j]`` is the reduced potential computed at SamplerState ``sampler_states[iteration, i]`` and ThermodynamicState ``thermodynamic_states[iteration, j]``. energy_unsampled_states : n_replicas x n_unsampled_states numpy.ndarray ``energy_unsampled_states[iteration, i, j]`` is the reduced potential computed at SamplerState ``sampler_states[iteration, i]`` and ThermodynamicState ``unsampled_thermodynamic_states[iteration, j]``. """ iteration = self._calculate_last_iteration(iteration) energy_thermodynamic_states = self._storage_analysis.variables['energies'][iteration, :, :] try: energy_unsampled_states = self._storage_analysis.variables['unsampled_energies'][iteration, :, :] except KeyError: # There are no unsampled thermodynamic states. unsampled_shape = energy_thermodynamic_states.shape[:-1] + (0,) energy_unsampled_states = np.zeros(unsampled_shape) return energy_thermodynamic_states, energy_unsampled_states
[docs] def write_energies(self, energy_thermodynamic_states, energy_unsampled_states, iteration): """Store the energy matrix at the given iteration on the analysis file Parameters ---------- energy_thermodynamic_states : n_replicas x n_replicas numpy.ndarray ``energy_thermodynamic_states[i][j]`` is the reduced potential computed at SamplerState ``sampler_states[i]`` and ThermodynamicState ``thermodynamic_states[j]``. energy_unsampled_states : n_replicas x n_unsampled_states numpy.ndarray ``energy_unsampled_states[i][j]`` is the reduced potential computed at SamplerState ``sampler_states[i]`` and ThermodynamicState ``unsampled_thermodynamic_states[j]``. iteration : int The iteration at which to store the data. """ # Initialize schema if needed. if 'energies' not in self._storage_analysis.variables: n_replicas = len(energy_thermodynamic_states) # Create replica dimension if it wasn't created by other functions. if 'replica' not in self._storage_analysis.dimensions: self._storage_analysis.createDimension('replica', n_replicas) # Create variable for thermodynamic state energies with units and descriptions. ncvar_energies = self._storage_analysis.createVariable('energies', 'f8', ('iteration', 'replica', 'replica'), zlib=False, chunksizes=(1, n_replicas, n_replicas)) ncvar_energies.units = 'kT' ncvar_energies.long_name = ("energies[iteration][replica][state] is the reduced (unitless) " "energy of replica 'replica' from iteration 'iteration' evaluated " "at the thermodynamic state 'state'.") # Check if we have unsampled states. if energy_unsampled_states.shape[1] > 0: if 'unsampled_energies' not in self._storage_analysis.variables: n_unsampled_states = len(energy_unsampled_states[0]) # Create replica dimension if it wasn't created by other functions. if 'unsampled' not in self._storage_analysis.dimensions: self._storage_analysis.createDimension('unsampled', n_unsampled_states) # Create variable for thermodynamic state energies with units and descriptions. ncvar_unsampled = self._storage_analysis.createVariable('unsampled_energies', 'f8', ('iteration', 'replica', 'unsampled'), zlib=False, chunksizes=(1, n_replicas, n_unsampled_states) ) ncvar_unsampled.units = 'kT' ncvar_unsampled.long_name = ("unsampled_energies[iteration][replica][state] is the reduced " "(unitless) energy of replica 'replica' from iteration 'iteration' " "evaluated at unsampled thermodynamic state 'state'.") # Store states energy. self._storage_analysis.variables['energies'][iteration, :, :] = energy_thermodynamic_states[:, :] if energy_unsampled_states.shape[1] > 0: self._storage_analysis.variables['unsampled_energies'][iteration, :, :] = energy_unsampled_states[:, :]
[docs] def read_mixing_statistics(self, iteration=slice(None)): """Retrieve the mixing statistics for the given iteration on the analysis file Parameters ---------- iteration : int or slice The iteration(s) at which to read the data. Returns ------- n_accepted_matrix : kxk numpy.ndarray ``n_accepted_matrix[i][j]`` is the number of accepted moves from state ``thermodynamic_states[i]`` to ``thermodynamic_states[j]`` going from ``iteration-1`` to ``iteration`` (not cumulative). n_proposed_matrix : kxk numpy.ndarray ``n_proposed_matrix[i][j]`` is the number of proposed moves from state ``thermodynamic_states[i]`` to ``thermodynamic_states[j]`` going from ``iteration-1`` to ``iteration`` (not cumulative). """ iteration = self._calculate_last_iteration(iteration) n_accepted_matrix = self._storage_analysis.variables['accepted'][iteration, :, :].astype(np.int64) n_proposed_matrix = self._storage_analysis.variables['proposed'][iteration, :, :].astype(np.int64) return n_accepted_matrix, n_proposed_matrix
[docs] def write_mixing_statistics(self, n_accepted_matrix, n_proposed_matrix, iteration): """Store the mixing statistics for the given iteration on the analysis file Parameters ---------- n_accepted_matrix : kxk numpy.ndarray ``n_accepted_matrix[i][j]`` is the number of accepted moves from state ``thermodynamic_states[i]`` to ``thermodynamic_states[j]`` going from iteration-1 to iteration (not cumulative). n_proposed_matrix : kxk numpy.ndarray ``n_proposed_matrix[i][j]`` is the number of proposed moves from state ``thermodynamic_states[i]`` to ``thermodynamic_states[j]`` going from ``iteration-1`` to ``iteration`` (not cumulative). iteration : int The iteration at which to store the data. """ # Create schema if necessary. if 'accepted' not in self._storage_analysis.variables: n_states = len(n_accepted_matrix) # Create replica dimension if it wasn't already created. if 'replica' not in self._storage_analysis.dimensions: self._storage_analysis.createDimension('replica', n_states) # Create variables with units and descriptions. ncvar_accepted = self._storage_analysis.createVariable('accepted', 'i4', ('iteration', 'replica', 'replica'), zlib=False, chunksizes=(1, n_states, n_states) ) ncvar_proposed = self._storage_analysis.createVariable('proposed', 'i4', ('iteration', 'replica', 'replica'), zlib=False, chunksizes=(1, n_states, n_states) ) setattr(ncvar_accepted, 'units', 'none') setattr(ncvar_proposed, 'units', 'none') setattr(ncvar_accepted, 'long_name', ("accepted[iteration][i][j] is the number of proposed transitions " "between states i and j from iteration 'iteration-1'.")) setattr(ncvar_proposed, 'long_name', ("proposed[iteration][i][j] is the number of proposed transitions " "between states i and j from iteration 'iteration-1'.")) # Store statistics. self._storage_analysis.variables['accepted'][iteration, :, :] = n_accepted_matrix[:, :] self._storage_analysis.variables['proposed'][iteration, :, :] = n_proposed_matrix[:, :]
[docs] def read_timestamp(self, iteration=slice(None)): """Return the timestamp for the given iteration. Read from the analysis file, although there is a paired timestamp on the checkpoint file as well Parameters ---------- iteration : int or slice The iteration(s) at which to read the data. Returns ------- timestamp : str The timestamp at which the iteration was stored. """ iteration = self._calculate_last_iteration(iteration) return self._storage_analysis.variables['timestamp'][iteration]
[docs] def write_timestamp(self, iteration): """Store a timestamp for the given iteration on both analysis and checkpoint file. If the iteration is not on the ``checkpoint_interval``, no timestamp is written on the checkpoint file Parameters ---------- iteration : int The iteration at which to read the data. """ # Create variable if needed. for storage in self._storage: if 'timestamp' not in storage.variables: storage.createVariable('timestamp', str, ('iteration',), zlib=False, chunksizes=(1,)) timestamp = time.ctime() self._storage_analysis.variables['timestamp'][iteration] = timestamp checkpoint_iteration = self._calculate_checkpoint_iteration(iteration) if checkpoint_iteration is not None: self._storage_checkpoint.variables['timestamp'][checkpoint_iteration] = timestamp
[docs] def read_dict(self, name): """Restore a dictionary from the storage file. Parameters ---------- name : str The identifier of the dictionary used to stored the data. Returns ------- data : dict The restored data as a dict. """ # Leaving the skeleton to extend this in for now storage = 'analysis' if storage not in ['analysis', 'checkpoint']: raise ValueError("storage must be either 'analysis' or 'checkpoint'!") # Get NC variable. nc_variable = self._resolve_variable_path(name, storage) if nc_variable.dtype == 'S1': # Handle variables stored in fixed_dimensions data_chars = nc_variable[:] data_str = data_chars.tostring().decode() else: data_str = str(nc_variable[0]) data = yaml.load(data_str, Loader=_DictYamlLoader) # Restore the title in the metadata. if name == 'metadata': data['title'] = self._storage_dict[storage].title return data
[docs] def write_dict(self, name, data, fixed_dimension=False): """Store the contents of a dict. Parameters ---------- name : str The identifier of the dictionary in the storage file. data : dict The dict to store. fixed_dimension: bool, Defautlt: False Use a fixed length dimension instead of variable length one. A unique dimension name (sharing a name with "name") will be created and its length will be set equal to ``"fixedL{}".format(len(data_string))`` This method seems to allow NetCDF to actually compress strings. Do NOT use this flag if you expect to constantly be changing the length of the data fed in, use only for static data """ # Leaving the skeleton to extend this in for now storage = 'analysis' if storage not in ['analysis', 'checkpoint']: raise ValueError("storage must be either 'analysis' or 'checkpoint'!") storage_nc = self._storage_dict[storage] # General NetCDF conventions assume the title of the dataset to be # specified as a global attribute, but the user can specify their # own titles only in metadata. if name == 'metadata': data = copy.deepcopy(data) self._storage_dict[storage].title = data.pop('title') # Activate flow style to save space. data_str = yaml.dump(data, Dumper=_DictYamlDumper)#, default_flow_style=True) # Check if we are updating the dictionary or creating it. try: nc_variable = self._resolve_variable_path(name, storage) except KeyError: if fixed_dimension: len_dim = len(data_str) dim_str = "fixedL{}".format(len_dim) if dim_str not in storage_nc.dimensions: storage_nc.createDimension(dim_str, len_dim) nc_variable = storage_nc.createVariable(name, 'S1', dim_str, zlib=True) else: nc_variable = storage_nc.createVariable(name, str, 'scalar', zlib=True) if fixed_dimension: data_char_array = np.array(list(data_str), dtype='S1') nc_variable[:] = data_char_array else: packed_data = np.empty(1, 'O') packed_data[0] = data_str nc_variable[:] = packed_data
[docs] def read_last_iteration(self, full_iteration=True): """ Read the last iteration from file which was written in sequential order. Parameters ---------- full_iteration : bool, optional If True, returns the last checkpoint iteration (default is True). Returns ------- last_iteration : int Last iteration which was sequentially written. """ # Make sure this is returned as Python int. last_iteration = int(self._storage_analysis.variables['last_iteration'][0]) # Get last checkpoint. if full_iteration: # -1 for stop ensures the 0th index is searched. for i in range(last_iteration, -1, -1): if self._calculate_checkpoint_iteration(i) is not None: return i raise RuntimeError("Could not find a checkpoint! This should not happen " "as the 0th iteration should always be written!") return last_iteration
[docs] def write_last_iteration(self, iteration): """ Tell the reporter what the last iteration which was written in sequential order was to allow resuming and analysis only on valid data. Call this as the last step of any ``write_iteration``-like routine to ensure analysis will not use junk data left over from an interrupted simulation past the last checkpoint. Parameters ---------- iteration : int Iteration at which the last good data point was written. """ self._storage_analysis.variables['last_iteration'][0] = iteration
[docs] def read_mbar_free_energies(self, iteration): """ Read the MBAR dimensionless free energy at a given iteration from file. These free energies are relative to the first state. These are computed through self-consistent iterations from an initial guess. The initial guess is often 0 for all states, so any state not written is returned as zeros for f_k, and infinity for Used primarily in online analysis, and should be used in tandem with an :class:`yank.analyze.YankPhaseAnalyzer` object from the :mod:`yank.analyze` module Parameters ---------- iteration : int iteration to read the free energies from if the iteration was not written at a the given iteration, then the free_energies are all 0 Returns ------- f_k : 1-D ndarray of floats MBAR free_energies from the iteration. free_energy : tuple of two Floats Free energy estimate at the iteration between the end states """ online_group = self._storage_analysis.groups['online_analysis'] f_k = online_group.variables['f_k'][iteration] free_energy = online_group.variables['free_energy'][iteration, :] return f_k, free_energy
[docs] def write_mbar_free_energies(self, iteration, f_k, free_energy): """ Write the mbar free energies at the current iteration. See :func:`read_mbar_free_energies` for more information about pymbar's f_k free energies Used primarily in online analysis, and should be used in tandem with an :class:`yank.analyze.YankPhaseAnalyzer` object from the :mod:`analyze` module. Parameters ---------- iteration : int, Iteration at which to save the free energy. Reads the current energy up to this value and stores it in the analysis reporter f_k : 1D array of floats, Dimensionless free energies you wish to store. This should come from pymbar.MBAR.f_k from an initialized MBAR object. free_energy : tuple of two floats Current estimate of the free energy difference of the phase and its error. This should be of the form (free_energy, error_in_free_energy) Typically output of the ``pymbar.MBAR.getFreeEnergyDifferences()[i,j]`` """ analysis_nc = self._storage_analysis if 'f_k_length' not in analysis_nc.dimensions: analysis_nc.createDimension('f_k_length', len(f_k)) analysis_nc.createDimension('Df', 2) online_group = analysis_nc.createGroup('online_analysis') # larger chunks, faster operations, small matrix anyways online_group.createVariable('f_k', float, dimensions=('iteration', 'f_k_length'), zlib=True, chunksizes=(1, len(f_k)), fill_value=np.inf) online_group.createVariable('free_energy', float, dimensions=('iteration', 'Df'), zlib=True, chunksizes=(1, 2), fill_value=np.inf) online_group = analysis_nc.groups['online_analysis'] online_group.variables['f_k'][iteration] = f_k online_group.variables['free_energy'][iteration, :] = free_energy
# ------------------------------------------------------------------------- # Internal-usage. # ------------------------------------------------------------------------- def _resolve_variable_path(self, path, storage): """Return the NC variable at the end of the path. This can be used to retrieve variables inside one or more groups. """ path_split = path.split('/') nc_group = self._storage_dict[storage] for group_name in path_split[:-1]: nc_group = nc_group.groups[group_name] return nc_group.variables[path_split[-1]] def _calculate_checkpoint_iteration(self, iteration): """Compute the iteration on disk of the checkpoint file matching the iteration linked on the analysis iteration. Although this is a simple function, it provides a common function for calculation Returns either the integer index, or None if there is no matched index """ checkpoint_index = float(iteration) / self._checkpoint_interval if checkpoint_index.is_integer(): return int(checkpoint_index) return None def _calculate_last_iteration(self, iteration): """ Convert the iteration in 'read_X' functions which take a iteration=slice(None) to avoid returning a slice of data which goes past the last_iteration Parameters ---------- iteration : int or Slice Iteration to feed into the check Returns ------- cast_iteration : int or Slice of type iteration Iteration, converted as needed to only access certain ranges of data """ last_good = self.read_last_iteration(full_iteration=False) max_iter = len(self._storage_analysis.dimensions['iteration']) def convert_negative_iteration(iteration): return iteration - (max_iter - last_good - 1) if last_good == max_iter - 1: cast_iteration = iteration else: if type(iteration) is int: throw = False if iteration > last_good: # check positive integer throw = True elif iteration < 0: # Operation on negative integer, recast # Convert negative integer to its mapped values cast_iteration = convert_negative_iteration(iteration) if cast_iteration < -max_iter: # check against absolute size throw = True else: cast_iteration = iteration if throw: raise IndexError("Index out of range!") elif type(iteration) is slice: out_start = None out_stop = None # Condition the start, start must not exceed last good (positive) and must be greater than if iteration.start is not None: if iteration.start > last_good: out_start = last_good elif iteration.start < 0: out_start = convert_negative_iteration(iteration.start) # Condition end iteration if iteration.stop is not None: if iteration.stop > last_good: out_stop = last_good + 1 elif iteration.stop < 0: out_stop = convert_negative_iteration(iteration.stop) - 1 else: # Trap special None cases for the "stop" # Trap the case of -step size, its easier to do all the conditions on which it is not that if iteration.step is None or (iteration.step is not None and iteration.step > 0): out_stop = last_good + 1 cast_iteration = slice(out_start, out_stop, iteration.step) else: raise ValueError("Iteration must be either an int or a slice!") return cast_iteration
# ============================================================================== # REPLICA-EXCHANGE SIMULATION # ==============================================================================
[docs]class ReplicaExchange(object): """Replica-exchange simulation facility. This base class provides a general replica-exchange simulation facility, allowing any set of thermodynamic states to be specified, along with a set of initial positions to be assigned to the replicas in a round-robin fashion. No distinction is made between one-dimensional and multidimensional replica layout. By default, the replica mixing scheme attempts to mix *all* replicas to minimize slow diffusion normally found in multidimensional replica exchange simulations (Modification of the 'replica_mixing_scheme' setting will allow the traditional 'neighbor swaps only' scheme to be used.) Stored configurations, energies, swaps, and restart information are all written to a single output file using the platform portable, robust, and efficient NetCDF4 library. Parameters ---------- mcmc_moves : MCMCMove or list of MCMCMove, optional The MCMCMove used to propagate the states. If a list of MCMCMoves, they will be assigned to the correspondent thermodynamic state on creation. If None is provided, Langevin dynamics with 2fm timestep, 5.0/ps collision rate, and 500 steps per iteration will be used. number_of_iterations : int or None, Optional, Default: 1 The number of iterations to perform If None, an unlimited number of iterations is run replica_mixing_scheme : 'swap-all', 'swap-neighbors' or None, Default: 'swap-all' The scheme used to swap thermodynamic states between replicas. online_analysis_interval : None or Int >= 1, optional, default None Choose the interval at which to perform online analysis of the free energy. After every interval, the simulation will be stopped and the free energy estimated. If the error in the free energy estimate is at or below ``online_analysis_target_error``, then the simulation will be considered completed. online_analysis_target_error : float >= 0, optional, default 0.2 The target error for the online analysis measured in kT per phase. Once the free energy is at or below this value, the phase will be considered complete. If ``online_analysis_interval`` is None, this option does nothing. online_analysis_minimum_iterations : int >= 0, optional, default 50 Set the minimum number of iterations which must pass before online analysis is carried out. Since the initial samples likely not to yield a good estimate of free energy, save time and just skip them If ``online_analysis_interval`` is None, this does nothing Attributes ---------- n_replicas iteration mcmc_moves sampler_states metadata is_completed Examples -------- Parallel tempering simulation of alanine dipeptide in implicit solvent (replica exchange among temperatures). This is just an illustrative example; use :class:`ParallelTempering` class for actual production parallel tempering simulations. Create the system. >>> import math >>> from simtk import unit >>> from openmmtools import testsystems, states, mcmc >>> testsystem = testsystems.AlanineDipeptideImplicit() Create thermodynamic states for parallel tempering with exponentially-spaced schedule. >>> n_replicas = 3 # Number of temperature replicas. >>> T_min = 298.0 * unit.kelvin # Minimum temperature. >>> T_max = 600.0 * unit.kelvin # Maximum temperature. >>> temperatures = [T_min + (T_max - T_min) * (math.exp(float(i) / float(nreplicas-1)) - 1.0) / (math.e - 1.0) ... for i in range(n_replicas)] >>> thermodynamic_states = [states.ThermodynamicState(system=testsystem.system, temperature=T) ... for T in temperatures] Initialize simulation object with options. Run with a GHMC integrator. >>> move = mcmc.GHMCMove(timestep=2.0*unit.femtoseconds, n_steps=50) >>> simulation = ReplicaExchange(mcmc_moves=move, number_of_iterations=2) Create simulation with its storage file (in a temporary directory) and run. >>> storage_path = tempfile.NamedTemporaryFile(delete=False).name + '.nc' >>> reporter = Reporter(storage_path, checkpoint_interval=1) >>> simulation.create(thermodynamic_states=thermodynamic_states, >>> sampler_states=states.SamplerState(testsystem.positions), >>> storage=reporter) >>> simulation.run() # This runs for a maximum of 2 iterations. >>> simulation.iteration 2 >>> simulation.run(n_iterations=1) >>> simulation.iteration 2 To resume a simulation from an existing storage file and extend it beyond the original number of iterations. >>> del simulation >>> simulation = ReplicaExchange.from_storage(reporter) >>> simulation.extend(n_iterations=1) >>> simulation.iteration 3 You can extract several information from the NetCDF file using the Reporter class while the simulation is running. This reads the SamplerStates of every run iteration. >>> reporter = Reporter(storage=storage_path, open_mode='r', checkpoint_interval=1) >>> sampler_states = reporter.read_sampler_states(iteration=range(1, 4)) >>> len(sampler_states) 3 >>> sampler_states[-1].positions.shape # Alanine dipeptide has 22 atoms. (22, 3) Clean up. >>> os.remove(storage_path) :param number_of_iterations: Maximum number of integer iterations that will be run :param replica_mixing_scheme: Scheme which describes how replicas are exchanged each iteration as string :param online_analysis_interval: How frequently to carry out online analysis in number of iterations :param online_analysis_target_error: Target free energy difference error float at which simulation will be stopped during online analysis, in dimensionless energy :param online_analysis_minimum_iterations: Minimum number of iterations needed before online analysis is run as int """ # ------------------------------------------------------------------------- # Constructors. # ------------------------------------------------------------------------- def __init__(self, mcmc_moves=None, number_of_iterations=1, replica_mixing_scheme='swap-all', online_analysis_interval=None, online_analysis_target_error=0.2, online_analysis_minimum_iterations=50): # These will be set on initialization. See function # create() for explanation of single variables. self._thermodynamic_states = None self._unsampled_states = None self._sampler_states = None self._replica_thermodynamic_states = None self._iteration = None self._energy_thermodynamic_states = None self._energy_unsampled_states = None self._n_accepted_matrix = None self._n_proposed_matrix = None self._reporter = None self._metadata = None # Handling default propagator. if mcmc_moves is None: # This will be converted to a list in create(). self._mcmc_moves = mmtools.mcmc.LangevinDynamicsMove(timestep=2.0*unit.femtosecond, collision_rate=5.0/unit.picosecond, n_steps=500, reassign_velocities=True, n_restart_attempts=6) else: self._mcmc_moves = copy.deepcopy(mcmc_moves) # Store constructor parameters. Everything is marked for internal # usage because any change to these attribute implies a change # in the storage file as well. Use properties for checks. self.number_of_iterations = number_of_iterations self.replica_mixing_scheme = replica_mixing_scheme # Online analysis options. self.online_analysis_interval = online_analysis_interval self.online_analysis_target_error = online_analysis_target_error self.online_analysis_minimum_iterations = online_analysis_minimum_iterations self._online_error_trap_counter = 0 # Counter for errors in the online estimate self._online_error_bank = [] self._last_mbar_f_k = None self._last_err_free_energy = None self._have_displayed_citations_before = False # Check convergence. if self.number_of_iterations == np.inf: if self.online_analysis_target_error == 0.0: logger.warning("WARNING! You have specified an unlimited number of iterations and a target error " "for online analysis of 0.0! Your simulation may never reach 'completed' state!") elif self.online_analysis_interval is None: logger.warning("WARNING! This simulation will never be considered 'complete' since there is no " "specified maximum number of iterations!") @classmethod
[docs] def from_storage(cls, storage): """Constructor from an existing storage file. Parameters ---------- storage : str or Reporter If str: The path to the storage file. If :class:`Reporter`: uses the :class:`Reporter` options In the future this will be able to take a Storage class as well. Returns ------- repex : ReplicaExchange A new instance of ReplicaExchange in the same state of the last stored iteration. """ # Check if netcdf file exists, open data for read if it does if type(storage) is str: # Open a reporter to read the data. reporter = Reporter(storage, open_mode='r') file_name = storage else: reporter = storage reporter.open(mode='r') file_name = reporter.filepath if not reporter.storage_exists(): reporter.close() raise RuntimeError('Storage file {} or its subfiles do not exist; cannot resume.'.format(file_name)) # Retrieve options and create new simulation. options = reporter.read_dict('options') options['mcmc_moves'] = reporter.read_mcmc_moves() repex = cls(**options) # Display papers to be cited. repex._display_citations() # Read the last iteration reported to ensure we don't include junk # data written just before a crash. iteration = reporter.read_last_iteration() # Retrieve other attributes. logger.debug("Reading storage file {}...".format(file_name)) thermodynamic_states, unsampled_states = reporter.read_thermodynamic_states() sampler_states = reporter.read_sampler_states(iteration=iteration) state_indices = reporter.read_replica_thermodynamic_states(iteration=iteration) energy_thermodynamic_states, energy_unsampled_states = reporter.read_energies(iteration=iteration) n_accepted_matrix, n_proposed_matrix = reporter.read_mixing_statistics(iteration=iteration) metadata = reporter.read_dict('metadata') # Search for last cached free energies only if online analysis is activated. if repex._online_analysis_interval is not None: online_anaysis_info = repex._get_last_written_free_energy(reporter, iteration) last_mbar_f_k, (_, last_err_free_energy) = online_anaysis_info else: last_mbar_f_k, (_, last_err_free_energy) = None, (None, None) # Close reading reporter. reporter.close() # Assign attributes. repex._iteration = iteration repex._thermodynamic_states = thermodynamic_states repex._unsampled_states = unsampled_states repex._sampler_states = sampler_states repex._replica_thermodynamic_states = state_indices repex._energy_thermodynamic_states = energy_thermodynamic_states repex._energy_unsampled_states = energy_unsampled_states repex._n_accepted_matrix = n_accepted_matrix repex._n_proposed_matrix = n_proposed_matrix repex._metadata = metadata repex._last_mbar_f_k = last_mbar_f_k repex._last_err_free_energy = last_err_free_energy # We open the reporter only in node 0. repex._reporter = reporter mpi.run_single_node(0, repex._reporter.open, mode='a', broadcast_result=False, sync_nodes=False) # Don't write the new last iteration, we have not technically written anything yet, so there is no "junk" return repex
# ------------------------------------------------------------------------- # Public properties. # ------------------------------------------------------------------------- @property def n_replicas(self): """The integer number of replicas (read-only).""" if self._thermodynamic_states is None: return 0 else: return len(self._thermodynamic_states) @property def iteration(self): """The integer current iteration of the simulation (read-only). If the simulation has not been created yet, this is None. """ return self._iteration @property def mcmc_moves(self): """A copy of the MCMCMoves list used to propagate the simulation. This can be set only before creation. """ return copy.deepcopy(self._mcmc_moves) @mcmc_moves.setter def mcmc_moves(self, new_value): if self._thermodynamic_states is not None: # We can't modify representation of the MCMCMoves because it's # impossible to delete groups/variables from an NetCDF file. We # could support this by JSONizing the dict serialization and # store it as a string instead, if we needed this. raise RuntimeError('Cannot modify MCMCMoves after creation.') # If this is a single MCMCMove, it'll be transformed to a list in create(). self._mcmc_moves = copy.deepcopy(new_value) @property def sampler_states(self): """A copy of the sampler states list at the current iteration. This can be set only before running. """ return copy.deepcopy(self._sampler_states) @sampler_states.setter def sampler_states(self, value): if self._iteration != 0: raise RuntimeError('Sampler states can be assigned only between ' 'create() and run().') if len(value) != self.n_replicas: raise ValueError('Passed {} sampler states for {} replicas'.format( len(value), self.n_replicas)) # Update sampler state in the object and on storage. self._sampler_states = copy.deepcopy(value) mpi.run_single_node(0, self._reporter.write_sampler_states, self._sampler_states, self._iteration) class _StoredProperty(object): """ Descriptor of a property stored as an option. validate_function is a simple function for checking things like "X > 0", but exposes both the ReplicaExchange instance and the new value for the variable, in that order. More complex checks which relies on the ReplicaExchange instance, like "if Y == True, then check X" can be accessed through the instance object of the function """ def __init__(self, option_name, validate_function=None): self._option_name = option_name self._validate_function = validate_function def __get__(self, instance, owner_class=None): return getattr(instance, '_' + self._option_name) def __set__(self, instance, new_value): if self._validate_function is not None: new_value = self._validate_function(instance, new_value) setattr(instance, '_' + self._option_name, new_value) # Update storage if we ReplicaExchange is initialized. if instance._thermodynamic_states is not None: mpi.run_single_node(0, instance._store_options) # ---------------------------------- # Value Validation of the properties # Should be @staticmethod with arguments of (instance, value) in that order, even if instance is not used # ---------------------------------- @staticmethod def _number_of_iterations_validator(instance, number_of_iterations): # Support infinite number of iterations. if number_of_iterations is None: number_of_iterations = np.inf return number_of_iterations @staticmethod def _repex_mixing_scheme_validator(instance, replica_mixing_scheme): supported_schemes = ['swap-all', 'swap-neighbors', None] if replica_mixing_scheme not in supported_schemes: raise ValueError("Unknown replica mixing scheme '{}'. Supported values " "are {}.".format(replica_mixing_scheme, supported_schemes)) return replica_mixing_scheme @staticmethod def _oa_interval_validator(instance, online_analysis_interval): """Check the online_analysis_interval value for consistency""" if online_analysis_interval is not None and ( type(online_analysis_interval) != int or online_analysis_interval < 1): raise ValueError("online_analysis_interval must be an integer 1 or greater, or None") return online_analysis_interval @staticmethod def _oa_target_error_validator(instance, online_analysis_target_error): if instance.online_analysis_interval is not None: if online_analysis_target_error < 0: raise ValueError("online_analysis_target_error must be a float >= 0") elif online_analysis_target_error == 0: logger.warning("online_analysis_target_error of 0 may never converge.") return online_analysis_target_error @staticmethod def _oa_min_iter_validator(instance, online_analysis_minimum_iterations): if (instance.online_analysis_interval is not None and (type(online_analysis_minimum_iterations) is not int or online_analysis_minimum_iterations < 0)): raise ValueError("online_analysis_minimum_iterations must be an integer >= 0") return online_analysis_minimum_iterations number_of_iterations = _StoredProperty('number_of_iterations', validate_function=_StoredProperty._number_of_iterations_validator) replica_mixing_scheme = _StoredProperty('replica_mixing_scheme', validate_function=_StoredProperty._repex_mixing_scheme_validator) online_analysis_interval = _StoredProperty('online_analysis_interval', validate_function=_StoredProperty._oa_interval_validator) #:interval to carry out online analysis online_analysis_target_error = _StoredProperty('online_analysis_target_error', validate_function=_StoredProperty._oa_target_error_validator) online_analysis_minimum_iterations = _StoredProperty('online_analysis_minimum_iterations', validate_function=_StoredProperty._oa_min_iter_validator) @property def metadata(self): """A copy of the metadata dictionary passed on creation (read-only).""" return copy.deepcopy(self._metadata) @property def is_completed(self): """Check if we have reached any of the stop target criteria (read-only)""" return self._is_completed() # ------------------------------------------------------------------------- # Main public interface. # -------------------------------------------------------------------------
[docs] def create(self, thermodynamic_states, sampler_states, storage, unsampled_thermodynamic_states=None, metadata=None): """Create new replica-exchange simulation. Parameters ---------- thermodynamic_states : list of openmmtools.states.ThermodynamicState Thermodynamic states to simulate, where one replica is allocated per state. Each state must have a system with the same number of atoms. sampler_states : openmmtools.states.SamplerState or list One or more sets of initial sampler states. If a list of SamplerStates, they will be assigned to replicas in a round-robin fashion. storage : str or instanced Reporter If str: the path to the storage file. Default checkpoint options from Reporter class are used If Reporter: Uses the reporter options and storage path In the future this will be able to take a Storage class as well. unsampled_thermodynamic_states : list of openmmtools.states.ThermodynamicState, optional These are ThermodynamicStates that are not propagated, but their reduced potential is computed at each iteration for each replica. These energy can be used as data for reweighting schemes (default is None). metadata : dict, optional Simulation metadata to be stored in the file. """ # Check if netcdf files exist. This is run only on MPI node 0 and # broadcasted. This is to avoid the case where the other nodes # arrive to this line after node 0 has already created the storage # file, causing an error. files_exist = False # Create temporary reporter if type(storage) is str: reporter = Reporter(storage, open_mode=None) file_string = storage else: reporter = storage file_string = reporter.filepath if mpi.run_single_node(0, reporter.storage_exists, broadcast_result=True): files_exist = True if files_exist: raise RuntimeError("Storage file {} already exists; cowardly " "refusing to overwrite.".format(file_string)) # Make sure sampler_states is an iterable of SamplerStates for later. if isinstance(sampler_states, mmtools.states.SamplerState): sampler_states = [sampler_states] # Make sure there are no more sampler states than thermodynamic states. if len(sampler_states) > len(thermodynamic_states): raise ValueError('Passed {} SamplerStates but only {} ThermodynamicStates'.format( len(sampler_states), len(thermodynamic_states))) # Make sure all states have same number of particles. We don't # currently support writing storage with different n_particles n_particles = thermodynamic_states[0].n_particles for states in [thermodynamic_states, sampler_states]: for state in states: if state.n_particles != n_particles: raise ValueError('All ThermodynamicStates and SamplerStates must ' 'have the same number of particles') # Handle default argument for metadata and add default simulation title. default_title = ('Replica-exchange simulation created using ReplicaExchange class ' 'of yank.repex.py on {}'.format(time.asctime(time.localtime()))) if metadata is None: metadata = dict(title=default_title) elif 'title' not in metadata: metadata['title'] = default_title self._metadata = metadata # Save thermodynamic states. This sets n_replicas. self._thermodynamic_states = copy.deepcopy(thermodynamic_states) # Handle default unsampled thermodynamic states. if unsampled_thermodynamic_states is None: self._unsampled_states = [] else: self._unsampled_states = copy.deepcopy(unsampled_thermodynamic_states) # Distribute sampler states to replicas in a round-robin fashion. self._sampler_states = [copy.deepcopy(sampler_states[i % len(sampler_states)]) for i in range(self.n_replicas)] # Map each thermodynamic state to a replica. Replica i at each iteration is # in ThermodynamicState thermodynamic_states[replica_thermodynamic_states[i]] # and SamplerState sampler_states[i]. During mixing, we exchange the indices # of the ThermodynamicState, but we keep the SamplerStates (i.e. positions, # velocities, box_vectors) bound to the same replica. self._replica_thermodynamic_states = np.array([i for i in range(self.n_replicas)], np.int64) # Assign default system box vectors if None has been specified. for replica_id, thermodynamic_state_id in enumerate(self._replica_thermodynamic_states): sampler_state = self._sampler_states[replica_id] if sampler_state.box_vectors is not None: continue thermodynamic_state = self._thermodynamic_states[thermodynamic_state_id] sampler_state.box_vectors = thermodynamic_state.system.getDefaultPeriodicBoxVectors() # Ensure there is an MCMCMove for each thermodynamic state. if isinstance(self._mcmc_moves, mmtools.mcmc.MCMCMove): self._mcmc_moves = [copy.deepcopy(self._mcmc_moves) for _ in range(self.n_replicas)] elif len(self._mcmc_moves) != self.n_replicas: raise RuntimeError('The number of MCMCMoves ({}) and ThermodynamicStates ({}) must ' 'be the same.'.format(len(self._mcmc_moves), self.n_replicas)) # Reset iteration counter. self._iteration = 0 # Reset statistics. # _n_accepted_matrix[i][j] is the number of swaps proposed between thermodynamic states i and j. # _n_proposed_matrix[i][j] is the number of swaps proposed between thermodynamic states i and j. self._n_accepted_matrix = np.zeros([self.n_replicas, self.n_replicas], np.int64) self._n_proposed_matrix = np.zeros([self.n_replicas, self.n_replicas], np.int64) # Allocate memory for energy matrix. energy_thermodynamic/unsampled_states[k][l] # is the reduced potential computed at the positions of SamplerState sampler_states[k] # and ThermodynamicState thermodynamic/unsampled_states[l]. self._energy_thermodynamic_states = np.zeros([self.n_replicas, self.n_replicas], np.float64) self._energy_unsampled_states = np.zeros([self.n_replicas, len(self._unsampled_states)], np.float64) # Display papers to be cited. self._display_citations() # Close the reporter file so its ready for use reporter.close() self._reporter = reporter self._initialize_reporter()
@mmtools.utils.with_timer('Minimizing all replicas')
[docs] def minimize(self, tolerance=1.0*unit.kilojoules_per_mole/unit.nanometers, max_iterations=0): """Minimize all replicas. Minimized positions are stored at the end. Parameters ---------- tolerance : simtk.unit.Quantity, optional Minimization tolerance (units of energy/mole/length, default is ``1.0 * unit.kilojoules_per_mole / unit.nanometers``). max_iterations : int, optional Maximum number of iterations for minimization. If 0, minimization continues until converged. """ # Check that simulation has been created. if self.n_replicas == 0: raise RuntimeError('Cannot minimize replicas. The simulation must be created first.') logger.debug("Minimizing all replicas...") # Distribute minimization across nodes. Only node 0 will get all positions. # The other nodes, only need the positions that they use for propagation and # computation of the energy matrix entries. minimized_positions, sampler_state_ids = mpi.distribute(self._minimize_replica, range(self.n_replicas), tolerance, max_iterations, send_results_to=0) # Update all sampler states. For non-0 nodes, this will update only the # sampler states associated to the replicas propagated by this node. for sampler_state_id, minimized_pos in zip(sampler_state_ids, minimized_positions): self._sampler_states[sampler_state_id].positions = minimized_pos # Save the stored positions in the storage mpi.run_single_node(0, self._reporter.write_sampler_states, self._sampler_states, self._iteration)
[docs] def equilibrate(self, n_iterations, mcmc_moves=None): """Equilibrate all replicas. This does not increase the iteration counter. The equilibrated positions are stored at the end. Parameters ---------- n_iterations : int Number of equilibration iterations. mcmc_moves : MCMCMove or list of MCMCMove, optional Optionally, the MCMCMoves to use for equilibration can be different from the ones used in production. """ # Check that simulation has been created. if self.n_replicas == 0: raise RuntimeError('Cannot minimize replicas. The simulation must be created first.') # If no MCMCMove is specified, use the ones for production. if mcmc_moves is None: mcmc_moves = self._mcmc_moves # Make sure there is one MCMCMove per thermodynamic state. if isinstance(mcmc_moves, mmtools.mcmc.MCMCMove): mcmc_moves = [copy.deepcopy(mcmc_moves) for _ in range(self.n_replicas)] elif len(mcmc_moves) != self.n_replicas: raise RuntimeError('The number of MCMCMoves ({}) and ThermodynamicStates ({}) for equilibration' ' must be the same.'.format(len(self._mcmc_moves), self.n_replicas)) # Temporarily set the equilibration MCMCMoves. production_mcmc_moves = self._mcmc_moves self._mcmc_moves = mcmc_moves for iteration in range(n_iterations): logger.debug("Equilibration iteration {}/{}".format(iteration, n_iterations)) self._propagate_replicas() # Restore production MCMCMoves. self._mcmc_moves = production_mcmc_moves # Update stored positions. mpi.run_single_node(0, self._reporter.write_sampler_states, self._sampler_states, self._iteration)
[docs] def run(self, n_iterations=None): """Run the replica-exchange simulation. This runs at most ``number_of_iterations`` iterations. Use :func:`extend` to pass the limit. Parameters ---------- n_iterations : int, optional If specified, only at most the specified number of iterations will be run (default is None). """ # If this is the first iteration, compute and store the # starting energies of the minimized/equilibrated structures. if self._iteration == 0: self._compute_energies() mpi.run_single_node(0, self._reporter.write_energies, self._energy_thermodynamic_states, self._energy_unsampled_states, self._iteration) self._check_nan_energy() timer = mmtools.utils.Timer() timer.start('Run ReplicaExchange') run_initial_iteration = self._iteration # Handle default argument and determine number of iterations to run. if n_iterations is None: iteration_limit = self._number_of_iterations - self._iteration else: iteration_limit = min(self._iteration + n_iterations, self._number_of_iterations) # Main loop. while not self._is_completed(iteration_limit): # Increment iteration counter. self._iteration += 1 logger.debug('Iteration {}/{}'.format(self._iteration, iteration_limit)) timer.start('Iteration') # Attempt replica swaps to sample from equilibrium permuation of # states associated with replicas. This step synchronizes replicas. self._replica_thermodynamic_states = self._mix_replicas() # Propagate replicas. self._propagate_replicas() # Compute energies of all replicas at all states. self._compute_energies() # Write iteration to storage file. self._report_iteration() # Compute online free energy if (self._online_analysis_interval is not None and self._iteration > self._online_analysis_minimum_iterations and self._iteration % self._online_analysis_interval == 0): self._run_online_analysis() # Show timing statistics if debug level is activated. if logger.isEnabledFor(logging.DEBUG): iteration_time = timer.stop('Iteration') partial_total_time = timer.partial('Run ReplicaExchange') time_per_iteration = partial_total_time / (self._iteration - run_initial_iteration) estimated_time_remaining = time_per_iteration * (iteration_limit - self._iteration) estimated_total_time = time_per_iteration * iteration_limit estimated_finish_time = time.time() + estimated_time_remaining logger.debug("Iteration took {:.3f}s.".format(iteration_time)) logger.debug("Estimated completion in {}, at {} (consuming total wall clock time {}).".format( str(datetime.timedelta(seconds=estimated_time_remaining)), time.ctime(estimated_finish_time), str(datetime.timedelta(seconds=estimated_total_time)))) # Perform sanity checks to see if we should terminate here. self._check_nan_energy()
[docs] def extend(self, n_iterations): """Extend the simulation by the given number of iterations. Contrarily to :func:`run`, this will extend the number of iterations past ``number_of_iteration`` if requested. Parameters ---------- n_iterations : int The number of iterations to run. """ if self._iteration + n_iterations > self._number_of_iterations: self._number_of_iterations = self._iteration + n_iterations self.run(n_iterations)
def __repr__(self): """Return a 'formal' representation that can be used to reconstruct the class, if possible.""" # TODO: Can we make this a more useful expression? return "<instance of ReplicaExchange>" # Disabled until we have a better answer to this # def __str__(self): # """ # Show an 'informal' human-readable representation of the replica-exchange simulation. # """ # r = "Replica-exchange simulation\n" # r += "\n" # r += "{:d} replicas\n".format(self.nreplicas) # r += "{:d} coordinate sets provided\n".format(len(self.provided_positions)) # r += "file store: {:s}\n".format(self.store_filename) # r += "initialized: {:s}\n".format(self._initialized) # r += "\n" # r += "PARAMETERS\n" # r += "collision rate: {:s}\n".format(self.collision_rate) # r += "relative constraint tolerance: {:s}\n".format(self.constraint_tolerance) # r += "timestep: {:s}\n".format(self.timestep) # r += "number of steps/iteration: {:d}\n".format(self.nsteps_per_iteration) # r += "number of iterations: {:d}\n".format(self._number_of_iterations) # if self.extend_simulation: # r += "Iterations extending existing data.\n" # r += "equilibration timestep: {:s}\n".format(self.equilibration_timestep) # r += "number of equilibration iterations: {:d}\n".format(self.number_of_equilibration_iterations) # r += "\n" # # return r def __del__(self): # The reporter could be None if ReplicaExchange was not created. if self._reporter is not None: mpi.run_single_node(0, self._reporter.close) # ------------------------------------------------------------------------- # Internal-usage. # ------------------------------------------------------------------------- def _check_nan_energy(self): """Checks that energies are finite and abort otherwise. Checks both sampled and unsampled thermodynamic states. """ # Check thermodynamic state energies. if np.any(np.isnan(self._energy_thermodynamic_states)) or np.any(np.isnan(self._energy_unsampled_states)): # Find faulty replicas to create error message. faulty_replicas = set() # Check sampled thermodynamic states first. state_type = 'thermodynamic state' for replica_id in range(self.n_replicas): if np.any(np.isnan(self._energy_thermodynamic_states[replica_id])): faulty_replicas.add(replica_id) # If there are no NaNs in energies, the problem is in the unsampled states. if len(faulty_replicas) == 0: state_type = 'unsampled thermodynamic state' for replica_id in range(self.n_replicas): if np.any(np.isnan(self._unsampled_states[replica_id])): faulty_replicas.add(replica_id) # Raise exception. err_msg = "NaN encountered in {} energies for replicas {}".format(state_type, faulty_replicas) logger.error(err_msg) raise RuntimeError(err_msg) @mpi.on_single_node(rank=0, broadcast_result=False, sync_nodes=False) def _display_citations(self, overwrite_global=False): """ Display papers to be cited. The overwrite_golbal command will force the citation to display even if the "have_citations_been_shown" variable is True """ # TODO Add original citations for various replica-exchange schemes. # TODO Show subset of OpenMM citations based on what features are being used. openmm_citations = """\ Friedrichs MS, Eastman P, Vaidyanathan V, Houston M, LeGrand S, Beberg AL, Ensign DL, Bruns CM, and Pande VS. Accelerating molecular dynamic simulations on graphics processing unit. J. Comput. Chem. 30:864, 2009. DOI: 10.1002/jcc.21209 Eastman P and Pande VS. OpenMM: A hardware-independent framework for molecular simulations. Comput. Sci. Eng. 12:34, 2010. DOI: 10.1109/MCSE.2010.27 Eastman P and Pande VS. Efficient nonbonded interactions for molecular dynamics on a graphics processing unit. J. Comput. Chem. 31:1268, 2010. DOI: 10.1002/jcc.21413 Eastman P and Pande VS. Constant constraint matrix approximation: A robust, parallelizable constraint method for molecular simulations. J. Chem. Theor. Comput. 6:434, 2010. DOI: 10.1021/ct900463w""" gibbs_citations = """\ Chodera JD and Shirts MR. Replica exchange and expanded ensemble simulations as Gibbs sampling: Simple improvements for enhanced mixing. J. Chem. Phys., 135:194110, 2011. DOI:10.1063/1.3660669""" mbar_citations = """\ Shirts MR and Chodera JD. Statistically optimal analysis of samples from multiple equilibrium states. J. Chem. Phys. 129:124105, 2008. DOI: 10.1063/1.2978177""" if overwrite_global or (not self._have_displayed_citations_before and not self._global_citation_silence): print("Please cite the following:") print("") print(openmm_citations) if self._replica_mixing_scheme == 'swap-all': print(gibbs_citations) self._have_displayed_citations_before = True # ------------------------------------------------------------------------- # Internal-usage: Initialization and storage utilities. # ------------------------------------------------------------------------- @staticmethod def _does_file_exist(file_path): """Check if there is a file at the given path.""" return os.path.exists(file_path) and os.path.getsize(file_path) > 0 @mpi.on_single_node(rank=0, broadcast_result=False, sync_nodes=True) def _initialize_reporter(self): """Initialize the reporter and store initial information. This is executed only on MPI node 0 and it is blocking. This is to avoid the case where the other nodes skip ahead and try to read from a file that hasn't been created yet. """ self._reporter.open(mode='w') self._reporter.write_thermodynamic_states(self._thermodynamic_states, self._unsampled_states) # Store run metadata and ReplicaExchange options. self._store_options() self._reporter.write_dict('metadata', self._metadata) # Store initial conditions. This forces the storage to be synchronized. self._report_iteration() @mpi.on_single_node(rank=0, broadcast_result=False, sync_nodes=False) @mpi.delayed_termination @mmtools.utils.with_timer('Writing iteration information to storage') def _report_iteration(self): """Store positions, states, and energies of current iteration. This is executed only on MPI node 0 and it's not blocking. The termination is delayed so that the file is not written only with partial data if the program gets interrupted. """ self._reporter.write_sampler_states(self._sampler_states, self._iteration) self._reporter.write_replica_thermodynamic_states(self._replica_thermodynamic_states, self._iteration) self._reporter.write_mcmc_moves(self._mcmc_moves) # MCMCMoves can store internal statistics. self._reporter.write_energies(self._energy_thermodynamic_states, self._energy_unsampled_states, self._iteration) self._reporter.write_mixing_statistics(self._n_accepted_matrix, self._n_proposed_matrix, self._iteration) self._reporter.write_timestamp(self._iteration) self._reporter.write_last_iteration(self._iteration) self._reporter.sync() def _store_options(self): """Store __init__ parameters (beside MCMCMoves) in storage file.""" logger.debug("Storing general ReplicaExchange options...") # Inspect __init__ parameters to store. parameter_names, _, _, defaults = inspect.getargspec(self.__init__) # Retrieve and store options. options_to_store = {parameter_name: getattr(self, '_' + parameter_name) for parameter_name in parameter_names[-len(defaults):]} # We store the MCMCMoves separately. options_to_store.pop('mcmc_moves') self._reporter.write_dict('options', options_to_store) # ------------------------------------------------------------------------- # Internal-usage: Distributed tasks. # ------------------------------------------------------------------------- @mmtools.utils.with_timer('Propagating all replicas') def _propagate_replicas(self): """Propagate all replicas.""" # TODO Report on efficiency of dyanmics (fraction of time wasted to overhead). logger.debug("Propagating all replicas...") # Distribute propagation across nodes. Only node 0 will get all positions # and box vectors. The other nodes, only need the positions that they use # for propagation and computation of the energy matrix entries. propagated_states, replica_ids = mpi.distribute(self._propagate_replica, range(self.n_replicas), send_results_to=0) # Update all sampler states. For non-0 nodes, this will update only the # sampler states associated to the replicas propagated by this node. for replica_id, propagated_state in zip(replica_ids, propagated_states): propagated_positions, propagated_box_vectors = propagated_state # Unpack. self._sampler_states[replica_id].positions = propagated_positions self._sampler_states[replica_id].box_vectors = propagated_box_vectors # Gather all MCMCMoves statistics. All nodes must have these up-to-date # since they are tied to the ThermodynamicState, not the replica. all_statistics = mpi.distribute(self._get_replica_move_statistics, range(self.n_replicas), send_results_to='all') for replica_id in range(self.n_replicas): if len(all_statistics[replica_id]) > 0: thermodynamic_state_id = self._replica_thermodynamic_states[replica_id] self._mcmc_moves[thermodynamic_state_id].statistics = all_statistics[replica_id] def _propagate_replica(self, replica_id): """Propagate thermodynamic state associated to the given replica.""" # Retrieve thermodynamic, sampler states, and MCMC move of this replica. thermodynamic_state_id = self._replica_thermodynamic_states[replica_id] thermodynamic_state = self._thermodynamic_states[thermodynamic_state_id] mcmc_move = self._mcmc_moves[thermodynamic_state_id] sampler_state = self._sampler_states[replica_id] # Apply MCMC move. try: mcmc_move.apply(thermodynamic_state, sampler_state) except mmtools.mcmc.IntegratorMoveError as e: # Save NaNnig context and MCMove before aborting. prefix = 'nan-error-logs/iteration{}-replica{}-state{}'.format( self._iteration, replica_id, thermodynamic_state_id) e.serialize_error(prefix) raise # Return new positions and box vectors. return sampler_state.positions, sampler_state.box_vectors def _get_replica_move_statistics(self, replica_id): """Return the statistics of the MCMCMove currently associated to this replica.""" thermodynamic_state_id = self._replica_thermodynamic_states[replica_id] mcmc_move = self._mcmc_moves[thermodynamic_state_id] try: move_statistics = mcmc_move.statistics except AttributeError: move_statistics = {} return move_statistics def _minimize_replica(self, replica_id, tolerance, max_iterations): """Minimize the specified replica.""" # Retrieve thermodynamic and sampler states. thermodynamic_state_id = self._replica_thermodynamic_states[replica_id] thermodynamic_state = self._thermodynamic_states[thermodynamic_state_id] sampler_state = self._sampler_states[replica_id] # Retrieve a context. Any Integrator works. context, integrator = mmtools.cache.global_context_cache.get_context(thermodynamic_state) # Set initial positions and box vectors. sampler_state.apply_to_context(context) # Compute the initial energy of the system for logging. initial_energy = thermodynamic_state.reduced_potential(context) logger.debug('Replica {}/{}: initial energy {:8.3f}kT'.format( replica_id + 1, self.n_replicas, initial_energy)) # Minimize energy. openmm.LocalEnergyMinimizer.minimize(context, tolerance, max_iterations) # Get the minimized positions. sampler_state.update_from_context(context) # Compute the final energy of the system for logging. final_energy = thermodynamic_state.reduced_potential(sampler_state) logger.debug('Replica {}/{}: final energy {:8.3f}kT'.format( replica_id + 1, self.n_replicas, final_energy)) # Return minimized positions. return sampler_state.positions @mmtools.utils.with_timer('Computing energy matrix') def _compute_energies(self): """Compute energies of all replicas at all states.""" # Distribute energy computation across nodes. Only node 0 receives # all the energies since it needs to store them and mix states. new_energies, replica_ids = mpi.distribute(self._compute_replica_energies, range(self.n_replicas), send_results_to=0) # Update energy matrices. Non-0 nodes update only the energies computed by this replica. for replica_id, energies in zip(replica_ids, new_energies): energy_thermodynamic_states, energy_unsampled_states = energies # Unpack. self._energy_thermodynamic_states[replica_id] = energy_thermodynamic_states self._energy_unsampled_states[replica_id] = energy_unsampled_states def _compute_replica_energies(self, replica_id): """Compute the energy for the replica in every ThermodynamicState.""" # Initialize replica energies for each thermodynamic state. energy_thermodynamic_states = np.zeros(self.n_replicas) energy_unsampled_states = np.zeros(len(self._unsampled_states)) # Retrieve sampler state associated to this replica. sampler_state = self._sampler_states[replica_id] # Compute energy for all thermodynamic states. for energies, states in [(energy_thermodynamic_states, self._thermodynamic_states), (energy_unsampled_states, self._unsampled_states)]: for i, state in enumerate(states): # Check if we need to request a new Context. if i == 0 or not state.is_state_compatible(context_state): context_state = state # Get the context, any Integrator works. context, integrator = mmtools.cache.global_context_cache.get_context(state) # Update positions and box vectors. We don't need # to set Context velocities for the potential. sampler_state.apply_to_context(context, ignore_velocities=True) else: # If this state is compatible with the context, just fix the # thermodynamic state as positions/box vectors are the same. state.apply_to_context(context) # Compute energy. energies[i] = state.reduced_potential(context) # Return the new energies. return energy_thermodynamic_states, energy_unsampled_states # ------------------------------------------------------------------------- # Internal-usage: Replicas mixing. # ------------------------------------------------------------------------- @mpi.on_single_node(0, broadcast_result=True) def _mix_replicas(self): """Attempt to swap replicas according to user-specified scheme.""" logger.debug("Mixing replicas...") # Reset storage to keep track of swap attempts this iteration. self._n_accepted_matrix[:, :] = 0 self._n_proposed_matrix[:, :] = 0 # Perform swap attempts according to requested scheme. with mmtools.utils.time_it('Mixing of replicas'): if self.replica_mixing_scheme == 'swap-neighbors': self._mix_neighboring_replicas() elif self.replica_mixing_scheme == 'swap-all': # Try to use cython-accelerated mixing code if possible, # otherwise fall back to Python-accelerated code. try: self._mix_all_replicas_cython() except ValueError as e: logger.warning(e.message) self._mix_all_replicas() else: assert self.replica_mixing_scheme is None # Determine fraction of swaps accepted this iteration. n_swaps_proposed = self._n_proposed_matrix.sum() n_swaps_accepted = self._n_accepted_matrix.sum() swap_fraction_accepted = 0.0 if n_swaps_proposed > 0: # TODO drop casting to float when dropping Python 2 support. swap_fraction_accepted = float(n_swaps_accepted) / n_swaps_proposed logger.debug("Accepted {}/{} attempted swaps ({:.1f}%)".format(n_swaps_accepted, n_swaps_proposed, swap_fraction_accepted * 100.0)) # Return new states indices for MPI broadcasting. return self._replica_thermodynamic_states def _mix_all_replicas_cython(self): """Exchange all replicas with Cython-accelerated code.""" from .mixing._mix_replicas import _mix_replicas_cython replica_states = md.utils.ensure_type(self._replica_thermodynamic_states, np.int64, 1, "Replica States") u_kl = md.utils.ensure_type(self._energy_thermodynamic_states, np.float64, 2, "Reduced Potentials") n_proposed_matrix = md.utils.ensure_type(self._n_proposed_matrix, np.int64, 2, "Nij Proposed Swaps") n_accepted_matrix = md.utils.ensure_type(self._n_accepted_matrix, np.int64, 2, "Nij Accepted Swaps") _mix_replicas_cython(self.n_replicas**4, self.n_replicas, replica_states, u_kl, n_proposed_matrix, n_accepted_matrix) self._replica_thermodynamic_states = replica_states self._n_proposed_matrix = n_proposed_matrix self._n_accepted_matrix = n_accepted_matrix def _mix_all_replicas(self): """Exchange all replicas with Python.""" # Determine number of swaps to attempt to ensure thorough mixing. # TODO: Replace this with analytical result computed to guarantee sufficient mixing, or # TODO: adjust it based on how many we can afford to do and not have mixing take a # TODO: substantial fraction of iteration time. nswap_attempts = self.n_replicas**5 # Number of swaps to attempt (ideal, but too slow!) nswap_attempts = self.n_replicas**3 # Best compromise for pure Python? logger.debug("Will attempt to swap all pairs of replicas, using a total of %d attempts." % nswap_attempts) # Attempt swaps to mix replicas. for swap_attempt in range(nswap_attempts): # Choose random replicas uniformly to attempt to swap. replica_i = np.random.randint(self.n_replicas) replica_j = np.random.randint(self.n_replicas) self._attempt_swap(replica_i, replica_j) def _mix_neighboring_replicas(self): """Attempt exchanges between neighboring replicas only.""" logger.debug("Will attempt to swap only neighboring replicas.") # Attempt swaps of pairs of replicas using traditional scheme (e.g. [0,1], [2,3], ...). offset = np.random.randint(2) # Offset is 0 or 1. for thermodynamic_state_i in range(offset, self.n_replicas-1, 2): thermodynamic_state_j = thermodynamic_state_i + 1 # Neighboring state. # Determine which replicas currently hold the thermodynamic states. replica_i = np.where(self._replica_thermodynamic_states == thermodynamic_state_i) replica_j = np.where(self._replica_thermodynamic_states == thermodynamic_state_j) self._attempt_swap(replica_i, replica_j) def _attempt_swap(self, replica_i, replica_j): """Attempt a single exchange between two replicas.""" # Determine the thermodynamic states associated to these replicas. thermodynamic_state_i = self._replica_thermodynamic_states[replica_i] thermodynamic_state_j = self._replica_thermodynamic_states[replica_j] # Compute log probability of swap. energy_ij = self._energy_thermodynamic_states[replica_i, thermodynamic_state_j] energy_ji = self._energy_thermodynamic_states[replica_j, thermodynamic_state_i] energy_ii = self._energy_thermodynamic_states[replica_i, thermodynamic_state_i] energy_jj = self._energy_thermodynamic_states[replica_j, thermodynamic_state_j] log_p_accept = - (energy_ij + energy_ji) + energy_ii + energy_jj # Record that this move has been proposed. self._n_proposed_matrix[thermodynamic_state_i, thermodynamic_state_j] += 1 self._n_proposed_matrix[thermodynamic_state_j, thermodynamic_state_i] += 1 # Accept or reject. if log_p_accept >= 0.0 or np.random.rand() < math.exp(log_p_accept): # Swap states in replica slots i and j. self._replica_thermodynamic_states[replica_i] = thermodynamic_state_j self._replica_thermodynamic_states[replica_j] = thermodynamic_state_i # Accumulate statistics. self._n_accepted_matrix[thermodynamic_state_i, thermodynamic_state_j] += 1 self._n_accepted_matrix[thermodynamic_state_j, thermodynamic_state_i] += 1 # ------------------------------------------------------------------------- # Internal-usage: Online Analysis. # ------------------------------------------------------------------------- @mpi.on_single_node(rank=0, broadcast_result=False, sync_nodes=False) @mpi.delayed_termination @mmtools.utils.with_timer('Computing online free energy estimate') def _run_online_analysis(self): """Compute the free energy during simulation run""" # This relative import is down here because having it at the top causes an ImportError. # __init__ pulls in repex, which pulls in analyze, which pulls in repex. Because the first repex never finished # importing, its not in the name space which causes relative analyze import of repex to crash as neither of them # are the __main__ package. # https://stackoverflow.com/questions/6351805/cyclic-module-dependencies-and-relative-imports-in-python from .analyze import ReplicaExchangeAnalyzer # Start the analysis bump_error_counter = False analysis = ReplicaExchangeAnalyzer(self._reporter, analysis_kwargs={'initial_f_k': self._last_mbar_f_k}) # Indices for online analysis, "i'th index, j'th index" idx, jdx = 0, -1 timer = mmtools.utils.Timer() timer.start("MBAR") logger.debug("Computing free energy with MBAR...") try: # Trap errors for MBAR being under sampled and the W_nk matrix not being normalized correctly mbar = analysis.mbar free_energy, err_free_energy = analysis.get_free_energy() except ParameterError as e: # We don't update self._last_err_free_energy here since if it # wasn't below the target threshold before, it won't stop repex now. bump_error_counter = True self._online_error_bank.append(e) else: self._last_mbar_f_k = mbar.f_k free_energy = free_energy[idx, jdx] self._last_err_free_energy = err_free_energy[idx, jdx] logger.debug("Current Free Energy Estimate is {} +- {} kT".format(free_energy, self._last_err_free_energy)) # Trap a case when errors don't converge (usually due to under sampling) if np.isnan(self._last_err_free_energy): self._last_err_free_energy = np.inf timer.stop("MBAR") # Raise an exception after 6 times MBAR gave an error. if bump_error_counter: self._online_error_trap_counter += 1 if self._online_error_trap_counter >= 6: logger.debug("Thrown MBAR Errors:") for err in self._online_error_bank: logger.debug(str(err)) raise RuntimeError("Online Analysis has failed too many times! Please " "check the latest logs to see the thrown errors!") # Don't write out the free energy in case of error. return # Write out the numbers self._reporter.write_mbar_free_energies(self._iteration, self._last_mbar_f_k, (free_energy, self._last_err_free_energy)) @staticmethod def _get_last_written_free_energy(reporter, iteration): """Get the last free energy computed from online analysis""" last_f_k = None last_free_energy = None try: for index in range(iteration, 0, -1): last_f_k, last_free_energy = reporter.read_mbar_free_energies(index) # Find an f_k that is not all zeros (or masked and empty) if not (np.ma.is_masked(last_f_k) or np.all(last_f_k == 0)): break # Don't need to continue the loop if we already found one except (IndexError, KeyError): # No such f_k written yet (or variable created), no need to do anything pass return last_f_k, last_free_energy def _is_completed(self, iteration_limit=None): """Check if we have completed the run previously""" if iteration_limit is None: iteration_limit = self._number_of_iterations # Return if we have reached the number of iterations # or the statistical error target required. if (self._iteration >= iteration_limit or (self._last_err_free_energy is not None and self._last_err_free_energy <= self._online_analysis_target_error)): return True return False # ------------------------------------------------------------------------- # Internal-usage: Test globals # ------------------------------------------------------------------------- _global_citation_silence = False
# ============================================================================== # PARALLEL TEMPERING # ==============================================================================
[docs]class ParallelTempering(ReplicaExchange): """Parallel tempering simulation facility. This class provides a facility for parallel tempering simulations. It is a subclass of :class:`ReplicaExchange`, but provides efficiency improvements for parallel tempering simulations, so should be preferred for this type of simulation. In particular, this makes use of the fact that the reduced potentials are linear in inverse temperature. Examples -------- Create the system. >>> from simtk import unit >>> from openmmtools import testsystems, states, mcmc >>> testsystem = testsystems.AlanineDipeptideImplicit() Create thermodynamic states for parallel tempering with exponentially-spaced schedule. >>> n_replicas = 3 # Number of temperature replicas. >>> T_min = 298.0 * unit.kelvin # Minimum temperature. >>> T_max = 600.0 * unit.kelvin # Maximum temperature. >>> reference_state = states.ThermodynamicState(system=testsystem.system, temperature=T_min) Initialize simulation object with options. Run with a GHMC integrator. >>> move = mcmc.GHMCMove(timestep=2.0*unit.femtoseconds, n_steps=50) >>> simulation = ParallelTempering(mcmc_moves=move, number_of_iterations=2) Create simulation with its storage file (in a temporary directory) and run. >>> storage_path = tempfile.NamedTemporaryFile(delete=False).name + '.nc' >>> reporter = Reporter(storage_path, checkpoint_interval=10) >>> simulation.create(thermodynamic_states=thermodynamic_states, >>> sampler_states=states.SamplerState(testsystem.positions), ... storage=reporter, min_temperature=T_min, ... max_temperature=T_max, n_temperatures=n_replicas) >>> simulation.run(n_iterations=1) Clean up. >>> os.remove(storage_path) See Also -------- ReplicaExchange """
[docs] def create(self, thermodynamic_state, sampler_states, storage, min_temperature=None, max_temperature=None, n_temperatures=None, temperatures=None, metadata=None): """Initialize a parallel tempering simulation object. Parameters ---------- thermodynamic_state : openmmtools.states.ThermodynamicState Reference thermodynamic state that will be simulated at the given temperatures. sampler_states : openmmtools.states.SamplerState or list One or more sets of initial sampler states. If a list of SamplerStates, they will be assigned to replicas in a round-robin fashion. storage : str or Reporter If str: path to the storage file, checkpoint options are default If Reporter: Instanced :class:`Reporter` class, checkpoint information is read from In the future this will be able to take a Storage class as well. min_temperature : simtk.unit.Quantity, optional Minimum temperature (units of temperature, default is None). max_temperature : simtk.unit.Quantity, optional Maximum temperature (units of temperature, default is None). n_temperatures : int, optional Number of exponentially-spaced temperatures between ``min_temperature`` and ``max_temperature`` (default is None). temperatures : list of simtk.unit.Quantity, optional If specified, this list of temperatures will be used instead of ``min_temperature``, ``max_temperature``, and ``n_temperatures`` (units of temperature, default is None). metadata : dict, optional Simulation metadata to be stored in the file. Notes ----- Either (``min_temperature``, ``max_temperature``, ``n_temperatures``) must all be specified or the list of '`temperatures`' must be specified. """ # Create thermodynamic states from temperatures. if temperatures is not None: logger.debug("Using provided temperatures") elif min_temperature is not None and max_temperature is not None and n_temperatures is not None: # TODO drop casting to float when dropping Python 2 support. temperatures = [min_temperature + (max_temperature - min_temperature) * (math.exp(i / float(n_temperatures-1)) - 1.0) / (math.e - 1.0) for i in range(n_temperatures)] logger.debug('using temperatures {}'.format(temperatures)) else: raise ValueError("Either 'temperatures' or 'min_temperature', 'max_temperature', " "and 'n_temperatures' must be provided.") thermodynamic_states = [copy.deepcopy(thermodynamic_state) for _ in range(n_temperatures)] for state, temperature in zip(thermodynamic_states, temperatures): state.temperature = temperature # Override default title. default_title = ('Parallel tempering simulation created using ParallelTempering ' 'class of yank.repex.py on {}'.format(time.asctime(time.localtime()))) if metadata is None: metadata = dict(title=default_title) elif 'title' not in metadata: metadata['title'] = default_title # Initialize replica-exchange simlulation. super(ParallelTempering, self).create(thermodynamic_states, sampler_states, storage=storage, metadata=metadata)
def _compute_replica_energies(self, replica_id): """Compute the energy for the replica at every temperature. Because only the temperatures differ among replicas, we replace the generic O(N^2) replica-exchange implementation with an O(N) implementation. """ # Initialize replica energies for each thermodynamic state. replica_energies = np.zeros(self.n_replicas) # Retrieve sampler states associated to this replica. sampler_state = self._sampler_states[replica_id] # Thermodynamic state differ only by temperatures. reference_thermodynamic_state = self._thermodynamic_states[0] # Get the context, any Integrator works. context, integrator = mmtools.cache.global_context_cache.get_context(reference_thermodynamic_state) # Update positions and box vectors. sampler_state.apply_to_context(context) # Compute energy. reference_reduced_potential = reference_thermodynamic_state.reduced_potential(context) # Strip reference potential of reference state's beta. reference_beta = 1.0 / (mmtools.constants.kB * reference_thermodynamic_state.temperature) reference_reduced_potential /= reference_beta # Update potential energy by temperature. for thermodynamic_state_id, thermodynamic_state in enumerate(self._thermodynamic_states): beta = 1.0 / (mmtools.constants.kB * thermodynamic_state.temperature) replica_energies[replica_id, thermodynamic_state_id] = beta * reference_reduced_potential # Return the new energies. return replica_energies
# ============================================================================== # MODULE INTERNAL USAGE UTILITIES # ============================================================================== class _DictYamlLoader(yaml.CLoader): """PyYAML Loader that reads !Quantity tags.""" def __init__(self, *args, **kwargs): super(_DictYamlLoader, self).__init__(*args, **kwargs) self.add_constructor(u'!Quantity', self.quantity_constructor) self.add_constructor(u'!ndarray', self.ndarray_constructor) @staticmethod def quantity_constructor(loader, node): loaded_mapping = loader.construct_mapping(node) data_unit = utils.quantity_from_string(loaded_mapping['unit']) data_value = loaded_mapping['value'] return data_value * data_unit @staticmethod def ndarray_constructor(loader, node): loaded_mapping = loader.construct_mapping(node, deep=True) data_type = np.dtype(loaded_mapping['type']) data_shape = loaded_mapping['shape'] data_values = loaded_mapping['values'] data = np.ndarray(shape=data_shape, dtype=data_type) if 0 not in data_shape: data[:] = data_values return data class _DictYamlDumper(yaml.CDumper): """PyYAML Dumper that handle simtk Quantities through !Quantity tags.""" def __init__(self, *args, **kwargs): super(_DictYamlDumper, self).__init__(*args, **kwargs) self.add_representer(unit.Quantity, self.quantity_representer) self.add_representer(np.ndarray, self.ndarray_representer) @staticmethod def quantity_representer(dumper, data): data_unit = data.unit data_value = data / data_unit data_dump = dict(unit=str(data_unit), value=data_value) return dumper.represent_mapping(u'!Quantity', data_dump) @staticmethod def ndarray_representer(dumper, data): """Convert a numpy array to native Python types.""" data_type = str(data.dtype) data_shape = data.shape data_values = data.tolist() data_dump = dict(type=data_type, shape=data_shape, values=data_values) return dumper.represent_mapping(u'!ndarray', data_dump) # ============================================================================== # MAIN AND TESTS # ============================================================================== if __name__ == "__main__": import doctest doctest.testmod()