Source code for yank.yank

#!/usr/local/bin/env python

# ==============================================================================
# MODULE DOCSTRING
# ==============================================================================

"""
Yank
====

Interface for automated free energy calculations.

"""

# ==============================================================================
# GLOBAL IMPORTS
# ==============================================================================

import abc
import copy
import time
import logging
import importlib
import collections

import mdtraj
import pandas
import numpy as np
import openmmtools as mmtools
from simtk import unit, openmm

from . import utils, pipeline, repex, mpi
from .restraints import RestraintState, RestraintParameterError, V0

logger = logging.getLogger(__name__)


# ==============================================================================
# TOPOGRAPHY
# ==============================================================================

[docs]class Topography(object): """A class mapping and labelling the different components of a system. The object holds the topology of a system and offers convenience functions to identify its various parts such as solvent, receptor, ions and ligand atoms. A molecule should be labeled as a ligand, only if there is also a receptor. If there is only a single molecule its atom indices can be obtained from solute_atoms instead. In ligand-receptor system, solute_atoms provides the atom indices for both molecules. Parameters ---------- topology : mdtraj.Topology or simtk.openmm.app.Topology The topology object specifying the system. ligand_atoms : iterable of int or str, optional The atom indices of the ligand. A string is interpreted as an mdtraj DSL specification of the ligand atoms. solvent_atoms : iterable of int or str, optional The atom indices of the solvent. A string is interpreted as an mdtraj DSL specification of the solvent atoms. If 'auto', a list of common solvent residue names will be used to automatically detect solvent atoms (default is 'auto'). Attributes ---------- ligand_atoms receptor_atoms solute_atoms solvent_atoms ions_atoms """ def __init__(self, topology, ligand_atoms=None, solvent_atoms='auto'): # Determine if we need to convert the topology to mdtraj. if isinstance(topology, mdtraj.Topology): self._topology = topology else: self._topology = mdtraj.Topology.from_openmm(topology) # Handle default ligand atoms. if ligand_atoms is None: ligand_atoms = [] # Once ligand and solvent atoms are defined, every other region is implied. self.solvent_atoms = solvent_atoms self.ligand_atoms = ligand_atoms @property def topology(self): """mdtraj.Topology: A copy of the topology (read-only).""" return copy.deepcopy(self._topology) @property def ligand_atoms(self): """The atom indices of the ligand as list This can be empty if this :class:`Topography` doesn't represent a receptor-ligand system. Use solute_atoms to obtain the atom indices of the molecule if this is the case. If assigned to a string, it will be interpreted as an mdtraj DSL specification of the atom indices. """ return self._ligand_atoms @ligand_atoms.setter def ligand_atoms(self, value): self._ligand_atoms = self._resolve_atom_indices(value) # Safety check: with a ligand there should always be a receptor. if len(self._ligand_atoms) > 0 and len(self.receptor_atoms) == 0: raise ValueError('Specified ligand but cannot find ' 'receptor atoms. Ligand: {}'.format(value)) @property def receptor_atoms(self): """The atom indices of the receptor as list (read-only). This can be empty if this Topography doesn't represent a receptor-ligand system. Use solute_atoms to obtain the atom indices of the molecule if this is the case. """ # If there's no ligand, there's no receptor. if len(self._ligand_atoms) == 0: return [] # Create a set for fast searching. ligand_atomset = frozenset(self._ligand_atoms) # Receptor atoms are all solute atoms that are not ligand. return [i for i in self.solute_atoms if i not in ligand_atomset] @property def solute_atoms(self): """The atom indices of the non-solvent molecule(s) (read-only). Practically, this are all the indices of the atoms that are not considered solvent. In a receptor-ligand system, this includes the atom indices of both the receptor and the ligand. """ # Create a set for fast searching. solvent_atomset = frozenset(self._solvent_atoms) # The solute is everything that is not solvent. return [i for i in range(self._topology.n_atoms) if i not in solvent_atomset] @property def solvent_atoms(self): """The atom indices of the solvent molecules. This includes eventual ions. If assigned to a string, it will be interpreted as an mdtraj DSL specification of the atom indices. If assigned to 'auto', a set of solvent auto indices is automatically built from common solvent residue names. """ return self._solvent_atoms @solvent_atoms.setter def solvent_atoms(self, value): # If the user doesn't provide a solvent description, # we use a default set of resnames in mdtraj. if value == 'auto': solvent_resnames = mdtraj.core.residue_names._SOLVENT_TYPES self._solvent_atoms = [atom.index for atom in self._topology.atoms if atom.residue.name in solvent_resnames] else: self._solvent_atoms = self._resolve_atom_indices(value) @property def ions_atoms(self): """The indices of all ions atoms in the solvent (read-only).""" # Ions are all atoms of the solvent whose residue name show a charge. return [i for i in self._solvent_atoms if '-' in self._topology.atom(i).residue.name or '+' in self._topology.atom(i).residue.name] # ------------------------------------------------------------------------- # Serialization # ------------------------------------------------------------------------- def __getstate__(self): # We serialize the MDTraj Topology through a pandas dataframe because # it doesn't implement __getstate__ and __setstate__ methods that # guarantee future-compatibility. This serialization protocol will be # compatible at least until the Topology API is broken. atoms, bonds = self._topology.to_dataframe() serialized_topology = {'atoms': atoms.to_json(orient='records'), 'bonds': bonds.tolist()} return dict(topology=serialized_topology, ligand_atoms=self._ligand_atoms, solvent_atoms=self._solvent_atoms) def __setstate__(self, serialization): topology_dict = serialization['topology'] atoms = pandas.read_json(topology_dict['atoms'], orient='records') bonds = np.array(topology_dict['bonds']) self._topology = mdtraj.Topology.from_dataframe(atoms, bonds) self._ligand_atoms = serialization['ligand_atoms'] self._solvent_atoms = serialization['solvent_atoms'] # ------------------------------------------------------------------------- # Internal-usage # ------------------------------------------------------------------------- def _resolve_atom_indices(self, atoms_description): if isinstance(atoms_description, str): # Assume this is DSL selection. select() returns a numpy array # of int64 that we convert to python integers. atoms_description = self._topology.select(atoms_description).tolist() # Convert to a frozen set of indices. return atoms_description
# ============================================================================== # Class that define a single thermodynamic leg (phase) of the calculation # ==============================================================================
[docs]class IMultiStateSampler(mmtools.utils.SubhookedABCMeta): """A sampler for multiple thermodynamic states. This is the interface documents the properties and methods that need to be exposed by the sampler object to be compatible with the class :class:`AlchemicalPhase`. Attributes ---------- number_of_iterations iteration metadata sampler_states """ @property def number_of_iterations(self): """int: the total number of iterations to run.""" pass @abc.abstractproperty def iteration(self): """int: the current iteration.""" pass @abc.abstractproperty def metadata(self): """dict: a copy of the metadata dictionary passed on creation.""" pass @abc.abstractproperty def sampler_states(self): """list of SamplerState: the sampler states at the current iteration.""" pass @abc.abstractmethod
[docs] def create(self, thermodynamic_state, sampler_states, storage, unsampled_thermodynamic_states, metadata): """Create new simulation and initialize the storage. Parameters ---------- thermodynamic_state : list of openmmtools.states.ThermodynamicState The thermodynamic states for the simulation. sampler_states : openmmtools.states.SamplerState or list One or more sets of initial sampler states. If a list of SamplerStates, they will be assigned to thermodynamic states in a round-robin fashion. storage : str or Reporter If str: The path to the storage file. Reads defaults from the :class:`yank.repex.Reporter` class If :class:`yank.repex.Reporter`: Reads the reporter settings for files and options In the future this will be able to take a Storage class as well. unsampled_thermodynamic_states : list of openmmtools.states.ThermodynamicState These are ThermodynamicStates that are not propagated, but their reduced potential is computed at each iteration for each replica. These energy can be used as data for reweighting schemes. metadata : dict Simulation metadata to be stored in the file. """ pass
@abc.abstractmethod
[docs] def minimize(self, tolerance, max_iterations): """Minimize all states. Parameters ---------- tolerance : simtk.unit.Quantity Minimization tolerance (units of energy/mole/length, default is ``1.0 * unit.kilojoules_per_mole / unit.nanometers``). max_iterations : int Maximum number of iterations for minimization. If 0, minimization continues until converged. """ pass
@abc.abstractmethod
[docs] def equilibrate(self, n_iterations, mcmc_moves=None): """Equilibrate all states. Parameters ---------- n_iterations : int Number of equilibration iterations. mcmc_moves : MCMCMove or list of MCMCMove, optional Optionally, the MCMCMoves to use for equilibration can be different from the ones used in production (default is None). """ pass
@abc.abstractmethod
[docs] def run(self, n_iterations=None): """Run the simulation. This runs at most :attr:`number_of_iterations` iterations. Use :func:`extend` to pass the limit. Parameters ---------- n_iterations : int, optional If specified, only at most the specified number of iterations will be run (default is None). """ pass
@abc.abstractmethod
[docs] def extend(self, n_iterations): """Extend the simulation by the given number of iterations. Contrarily to :func:`run`, this will extend the number of iterations past :attr:`number_of_iteration` if requested. Parameters ---------- n_iterations : int The number of iterations to run. """ pass
[docs]class AlchemicalPhase(object): """A single thermodynamic leg (phase) of an alchemical free energy calculation. This class wraps around a general MultiStateSampler and handle the creation of an alchemical free energy calculation. Parameters ---------- sampler : MultiStateSampler The sampler instance implementing the :class:`IMultiStateSampler` interface. Attributes ---------- iteration number_of_iterations is_completed """ def __init__(self, sampler): self._sampler = sampler @staticmethod
[docs] def from_storage(storage): """Static constructor from an existing storage file. Parameters ---------- storage : str or Reporter If str: The path to the primary storage file. Default checkpointing options are stored in this case If :class:`yank.repex.Reporter`: loads from the reporter class, including checkpointing information In the future this will be able to take a Storage class as well. Returns ------- alchemical_phase : AlchemicalPhase A new instance of :class:`AlchemicalPhase` in the same state of the last stored iteration. """ # Check if netcdf file exists. if type(storage) is str: reporter = repex.Reporter(storage) else: reporter = storage if not reporter.storage_exists(): reporter.close() raise RuntimeError('Storage file at {} does not exists; cannot resume.'.format(reporter.filepath)) # TODO: this should skip the Reporter and use the Storage to read storage.metadata. # Open Reporter for reading and read metadata. reporter.open(mode='r') metadata = reporter.read_dict('metadata') reporter.close() # Retrieve the sampler class. sampler_full_name = metadata['sampler_full_name'] module_name, cls_name = sampler_full_name.rsplit('.', 1) module = importlib.import_module(module_name) cls = getattr(module, cls_name) # Resume sampler and return new AlchemicalPhase. sampler = cls.from_storage(reporter) return AlchemicalPhase(sampler)
@property def iteration(self): """int: the current iteration (read-only).""" return self._sampler.iteration @property def number_of_iterations(self): """int: the total number of iterations to run.""" return self._sampler.number_of_iterations @number_of_iterations.setter def number_of_iterations(self, value): self._sampler.number_of_iterations = value @property def is_completed(self): """ Boolean check if if the sampler has been completed by its own determination or if we have exceeded number of iterations """ try: return self._sampler.is_completed except AttributeError: return self._sampler.iteration >= self._sampler.number_of_iterations
[docs] def create(self, thermodynamic_state, sampler_states, topography, protocol, storage, restraint=None, anisotropic_dispersion_cutoff=None, alchemical_regions=None, alchemical_factory=None, metadata=None): """Create a new AlchemicalPhase calculation for a specified protocol. If ``anisotropic_dispersion_cutoff`` is different than ``None``. The end states of the phase will be reweighted. The fully interacting state accounts for: 1. The truncation of nonbonded interactions. 2. The reciprocal space which is not modeled in alchemical states if an Ewald method is used for long-range interactions. Parameters ---------- thermodynamic_state : openmmtools.states.ThermodynamicState Thermodynamic state holding the reference system, temperature and pressure. sampler_states : openmmtools.states.SamplerState or list One or more sets of initial sampler states. If a list of SamplerStates, they will be assigned to replicas in a round-robin fashion. topography : Topography The object holding the topology and labelling the different components of the system. This is used to discriminate between ligand-receptor and solvation systems. protocol : dict The dictionary ``{parameter_name: list_of_parameter_values}`` defining the protocol. All the parameter values list must have the same number of elements. storage : str or initialized Reporter class If str: Path to the storage file. The default checkpointing options (see the :class:`yank.repex.Reporter` class) will be used in this case If :class:`yank.repex.Reporter`: Uses files and checkpointing options of the reporter class passed in restraint : ReceptorLigandRestraint, optional Restraint to add between protein and ligand. This must be specified for ligand-receptor systems in non-periodic boxes. anisotropic_dispersion_cutoff : simtk.openmm.Quantity, 'auto', or None, optional, default None If specified, this is the cutoff at which to reweight long range interactions of the end states to correct for anisotropic dispersions. If `'auto'`, then the distance is automatically chosen based on the minimum possible size it can be given the box volume, then behaves as if a Quantity was passed in. If `None`, the correction won't be applied (units of length, default is None). alchemical_regions : openmmtools.alchemy.AlchemicalRegion, optional If specified, this is the ``AlchemicalRegion`` that will be passed to the ``AbsoluteAlchemicalFactory``, otherwise the ligand will be alchemically modified according to the given protocol. alchemical_factory : openmmtools.alchemy.AbsoluteAlchemicalFactory, optional If specified, this ``AbsoluteAlchemicalFactory`` will be used instead of the one created with default options. metadata : dict, optional Simulation metadata to be stored in the file. """ # Check that protocol has same number of states for each parameter. len_protocol_parameters = {par_name: len(path) for par_name, path in protocol.items()} if len(set(len_protocol_parameters.values())) != 1: raise ValueError('The protocol parameters have a different number ' 'of states: {}'.format(len_protocol_parameters)) # Do not modify passed thermodynamic state. reference_thermodynamic_state = copy.deepcopy(thermodynamic_state) thermodynamic_state = copy.deepcopy(thermodynamic_state) reference_system = thermodynamic_state.system is_periodic = thermodynamic_state.is_periodic is_complex = len(topography.receptor_atoms) > 0 # We currently don't support reaction field. nonbonded_method = mmtools.forces.find_nonbonded_force(reference_system).getNonbondedMethod() if nonbonded_method == openmm.NonbondedForce.CutoffPeriodic: raise RuntimeError('CutoffPeriodic is not supported yet. Use PME for explicit solvent.') # Make sure sampler_states is a list of SamplerStates. if isinstance(sampler_states, mmtools.states.SamplerState): sampler_states = [sampler_states] # Initialize metadata storage and handle default argument. # We'll use the sampler full name for resuming, the reference # thermodynamic state for minimization and the topography for # ligand randomization. if metadata is None: metadata = dict() sampler_full_name = utils.typename(self._sampler.__class__) metadata['sampler_full_name'] = sampler_full_name metadata['reference_state'] = mmtools.utils.serialize(thermodynamic_state) metadata['topography'] = mmtools.utils.serialize(topography) # Add default title if user hasn't specified. if 'title' not in metadata: default_title = ('Alchemical free energy calculation created ' 'using yank.AlchemicalPhase and {} on {}') metadata['title'] = default_title.format(sampler_full_name, time.asctime(time.localtime())) # Restraint and standard state correction. # ---------------------------------------- # Add receptor-ligand restraint and compute standard state corrections. restraint_state = None metadata['standard_state_correction'] = 0.0 if is_complex and restraint is not None: logger.debug("Creating receptor-ligand restraints...") try: restraint.restrain_state(thermodynamic_state) except RestraintParameterError: logger.debug('There are undefined restraint parameters. ' 'Trying automatic parametrization.') restraint.determine_missing_parameters(thermodynamic_state, sampler_states[0], topography) restraint.restrain_state(thermodynamic_state) correction = restraint.get_standard_state_correction(thermodynamic_state) # in kT metadata['standard_state_correction'] = correction # Create restraint state that will be part of composable states. restraint_state = RestraintState(lambda_restraints=1.0) # Raise error if we can't find a ligand-receptor to apply the restraint. elif restraint is not None: raise RuntimeError("Cannot apply the restraint. No receptor-ligand " "complex could be found. ") # For not-restrained ligand-receptor periodic systems, we must still # add a standard state correction for the box volume. elif is_complex and is_periodic: # TODO: What if the box volume fluctuates during the simulation? box_vectors = reference_system.getDefaultPeriodicBoxVectors() box_volume = mmtools.states._box_vectors_volume(box_vectors) metadata['standard_state_correction'] = - np.log(V0 / box_volume) # For implicit solvent/vacuum complex systems, we require a restraint # to keep the ligand from drifting too far away from receptor. elif is_complex and not is_periodic: raise ValueError('A receptor-ligand system in implicit solvent or ' 'vacuum requires a restraint.') # Create alchemical states. # ------------------------- # Handle default alchemical region. if alchemical_regions is None: alchemical_regions = self._build_default_alchemical_region( reference_system, topography, protocol) # Check that we have atoms to alchemically modify. if len(alchemical_regions.alchemical_atoms) == 0: raise ValueError("Couldn't find atoms to alchemically modify.") # Create alchemically-modified system using alchemical factory. logger.debug("Creating alchemically-modified states...") if alchemical_factory is None: alchemical_factory = mmtools.alchemy.AbsoluteAlchemicalFactory(disable_alchemical_dispersion_correction=True) alchemical_system = alchemical_factory.create_alchemical_system(thermodynamic_state.system, alchemical_regions) # Create compound alchemically modified state to pass to sampler. thermodynamic_state.system = alchemical_system alchemical_state = mmtools.alchemy.AlchemicalState.from_system(alchemical_system) if restraint_state is not None: composable_states = [alchemical_state, restraint_state] else: composable_states = [alchemical_state] compound_state = mmtools.states.CompoundThermodynamicState( thermodynamic_state=thermodynamic_state, composable_states=composable_states) # Create all compound states to pass to sampler.create() # following the requested protocol. compound_states = [] protocol_keys, protocol_values = zip(*protocol.items()) for state_id, state_values in enumerate(zip(*protocol_values)): compound_states.append(copy.deepcopy(compound_state)) for lambda_key, lambda_value in zip(protocol_keys, state_values): if hasattr(compound_state, lambda_key): setattr(compound_states[state_id], lambda_key, lambda_value) else: raise AttributeError('CompoundThermodynamicState object does not ' 'have protocol attribute {}'.format(lambda_key)) # Temperature and pressure at the end states should # be the same or the analysis won't make sense. for state_property in ['temperature', 'pressure']: if getattr(compound_states[0], state_property) != getattr(compound_states[-1], state_property): raise ValueError('The {}s of the end states must be the same.'.format(state_property)) # Expanded cutoff unsampled states. # --------------------------------- # TODO should we allow expanded states for non-periodic systems? logger.debug('Creating expanded cutoff states...') expanded_cutoff_states = [] if is_periodic and anisotropic_dispersion_cutoff is not None: # Create non-alchemically modified state with an expanded cutoff. reference_state_expanded = self._expand_state_cutoff(reference_thermodynamic_state, anisotropic_dispersion_cutoff) # Add the restraint if any. The free energy of removing the restraint # will be taken into account with the standard state correction. if restraint is not None: restraint.restrain_state(reference_state_expanded) # The value of lambda_restraints must be the same as the first state. # TODO: handle case with multiple restraints. restraint_state.lambda_restraints = compound_states[0].lambda_restraints reference_state_expanded = mmtools.states.CompoundThermodynamicState( thermodynamic_state=reference_state_expanded, composable_states=[restraint_state]) # Create the expanded cutoff decoupled state. last_state_expanded = self._expand_state_cutoff(compound_states[-1], anisotropic_dispersion_cutoff) expanded_cutoff_states = [reference_state_expanded, last_state_expanded] elif anisotropic_dispersion_cutoff is not None: logger.warning("The requested anisotropic dispersion correction " "won't be computed since the system is non-periodic.") # Create simulation. # ------------------ logger.debug("Creating sampler object...") self._sampler.create(compound_states, sampler_states, storage=storage, unsampled_thermodynamic_states=expanded_cutoff_states, metadata=metadata)
[docs] def minimize(self, tolerance=1.0*unit.kilojoules_per_mole/unit.nanometers, max_iterations=0): """Minimize all the states. The minimization is performed in two steps. In the first one, the positions are minimized in the reference thermodynamic state (i.e. non alchemically-modified). Only then, the positions are minimized in their alchemically softened state. Parameters ---------- tolerance : simtk.unit.Quantity, optional Minimization tolerance (units of energy/mole/length, default is ``1.0 * unit.kilojoules_per_mole / unit.nanometers``). max_iterations : int, optional Maximum number of iterations for minimization. If 0, minimization continues until converged. """ metadata = self._sampler.metadata serialized_reference_state = metadata['reference_state'] reference_state = mmtools.utils.deserialize(serialized_reference_state) sampler_states = self._sampler.sampler_states # We minimize only the sampler states that are in different positions. # This depends on how many sampler states have been passed in create() # and if the ligand has been randomized before calling minimize(). similar_sampler_states = self._find_similar_sampler_states(sampler_states) logger.debug('Minimizing {} sampler states in the reference ' 'thermodynamic state'.format(len(similar_sampler_states))) # Distribute minimization across nodes. minimized_sampler_states_ids = list(similar_sampler_states.keys()) minimized_positions = mpi.distribute(self._minimize_sampler_state, minimized_sampler_states_ids, sampler_states, reference_state, tolerance, max_iterations, send_results_to='all') # Update all sampler states. for sampler_state_id, minimized_pos in zip(minimized_sampler_states_ids, minimized_positions): sampler_states[sampler_state_id].positions = minimized_pos for similar_sampler_state_id in similar_sampler_states[sampler_state_id]: sampler_states[similar_sampler_state_id].positions = minimized_pos # Update sampler and perform second minimization in alchemically modified states. self._sampler.sampler_states = sampler_states self._sampler.minimize(tolerance=tolerance, max_iterations=max_iterations)
[docs] def randomize_ligand(self, sigma_multiplier=2.0, close_cutoff=1.5*unit.angstrom): """Randomize the ligand positions in every state. The position and orientation of the ligand in each state will be randomized. This works only if the system is a ligand-receptor system. If you call this before minimizing, each positions will be minimized separately in the reference state, so you may want to call it afterwards to speed up minimization. Parameters ---------- sigma_multiplier : float, optional The ligand will be placed close to a random receptor atom at a distance that is normally distributed with standard deviation ``sigma_multiplier * receptor_radius_of_gyration`` (default is 2.0). close_cutoff : simtk.unit.Quantity, optional Each random placement proposal will be rejected if the ligand ends up being closer to the receptor than this cutoff (units of length, default is ``1.5*unit.angstrom``). """ metadata = self._sampler.metadata serialized_topography = metadata['topography'] topography = mmtools.utils.deserialize(serialized_topography) # We can randomize the ligand only in implicit solvent. is_complex = len(topography.ligand_atoms) > 0 is_explicit = len(topography.solvent_atoms) > 0 if not is_complex: raise RuntimeError('Cannot find ligand atoms to randomize.') if is_explicit: raise RuntimeError('Cannot randomize ligand in explict solvent.') # Randomize all sampler states. sampler_states = self._sampler.sampler_states ligand_positions = mpi.distribute(self._randomize_ligand, sampler_states, topography, sigma_multiplier, close_cutoff, send_results_to='all') # Update sampler states with randomized positions. for sampler_state, ligand_pos in zip(sampler_states, ligand_positions): sampler_state.positions[topography.ligand_atoms] = ligand_pos self._sampler.sampler_states = sampler_states
[docs] def equilibrate(self, n_iterations, mcmc_moves=None): """Equilibrate all states. Parameters ---------- n_iterations : int Number of equilibration iterations. mcmc_moves : MCMCMove or list of MCMCMove, optional Optionally, the MCMCMoves to use for equilibration can be different from the ones used in production. """ self._sampler.equilibrate(n_iterations=n_iterations, mcmc_moves=mcmc_moves)
[docs] def run(self, n_iterations=None): """Run the alchemical phase simulation. Parameters ---------- n_iterations : int, optional If specified, only at most the specified number of iterations will be run (default is None). """ self._sampler.run(n_iterations=n_iterations)
[docs] def extend(self, n_iterations): """Extend the simulation by the given number of iterations. Parameters ---------- n_iterations : int The number of iterations to run. """ self._sampler.extend(n_iterations)
# ------------------------------------------------------------------------- # Internal-usage # ------------------------------------------------------------------------- @staticmethod def _expand_state_cutoff(thermodynamic_state, expanded_cutoff_distance, replace_reaction_field=False, switch_width=None): """Expand the thermodynamic state cutoff to the given one. If replace_reaction_field is True, the system will be modified to use an UnshiftedReactionFieldForce. In this case switch_width must be specified. """ # If we use a barostat we leave more room for volume fluctuations or # we risk fatal errors. This is how much we allow the box size to change. fluctuation_size = 0.8 # Do not modify passed thermodynamic state. thermodynamic_state = copy.deepcopy(thermodynamic_state) system = thermodynamic_state.system # Determine minimum box side dimension. The theoretical maximal allowed cutoff # is given by half the norm of the smallest vector, but OpenMM limits it to # the minimum diagonal element of the box vector matrix for efficiency. box_vectors = system.getDefaultPeriodicBoxVectors() min_box_dimension = min([vector[i] for i, vector in enumerate(box_vectors)]) # Determine cutoff automatically if requested. # We leave more space if the volume fluctuates. if expanded_cutoff_distance == 'auto': if thermodynamic_state.pressure is None: expanded_cutoff_distance = min_box_dimension * 0.99 / 2.0 else: expanded_cutoff_distance = min_box_dimension * fluctuation_size / 2.0 expanded_cutoff_distance = min(expanded_cutoff_distance, 16*unit.angstroms) # Otherwise check that requested cutoff is within fluctuation limits. If the # state is in NVT and the cutoff is too big, OpenMM will raise an exception # on Context creation. elif (thermodynamic_state.pressure is not None and min_box_dimension * fluctuation_size < 2.0 * expanded_cutoff_distance): raise RuntimeError('Barostated box sides must be at least {} Angstroms ' 'to correct for missing dispersion interactions. The ' 'minimum dimension of the provided box is {} Angstroms' ''.format(expanded_cutoff_distance/unit.angstrom * 2, min_box_dimension/unit.angstrom)) logger.debug('Setting cutoff for fully interacting system to {}. The minimum box ' 'dimension is {}.'.format(expanded_cutoff_distance, min_box_dimension)) # Expanded forces cutoff. for force in system.getForces(): try: force_cutoff = force.getCutoffDistance() except AttributeError: pass else: # We don't want to reduce the cutoff if it's already large. if force_cutoff < expanded_cutoff_distance: cutoff_diff = expanded_cutoff_distance - force_cutoff switching_distance = force.getSwitchingDistance() # Expand cutoff preserving the original switch width. # We don't need to check if we are using a switch since # there is a setting for that. force.setCutoffDistance(expanded_cutoff_distance) force.setSwitchingDistance(switching_distance + cutoff_diff) # Replace reaction field NonbondedForce to remove constant shift term. # AbsoluteAlchemicalFactory already does it for the other states. if replace_reaction_field: mmtools.forcefactories.replace_reaction_field(system, return_copy=False, switch_width=switch_width) # Return the new thermodynamic state with the expanded cutoff. thermodynamic_state.system = system return thermodynamic_state @staticmethod def _build_default_alchemical_region(system, topography, protocol): """Create a default AlchemicalRegion if the user hasn't provided one.""" # TODO: we should probably have a second region that annihilate sterics of counterions. alchemical_region_kwargs = {} # Modify ligand if this is a receptor-ligand phase, or # solute if this is a transfer free energy calculation. if len(topography.ligand_atoms) > 0: alchemical_region_name = 'ligand_atoms' else: alchemical_region_name = 'solute_atoms' alchemical_atoms = getattr(topography, alchemical_region_name) # In periodic systems, we alchemically modify the ligand/solute # counterions to make sure that the solvation box is always neutral. if system.usesPeriodicBoundaryConditions(): alchemical_counterions = pipeline.find_alchemical_counterions( system, topography, alchemical_region_name) alchemical_atoms += alchemical_counterions # Sort them by index for safety. We don't want to # accidentally exchange two atoms' positions. alchemical_atoms = sorted(alchemical_atoms) alchemical_region_kwargs['alchemical_atoms'] = alchemical_atoms # Check if we need to modify bonds/angles/torsions. for element_type in ['bonds', 'angles', 'torsions']: if 'lambda_' + element_type in protocol: modify_it = True else: modify_it = None alchemical_region_kwargs['alchemical_' + element_type] = modify_it # Create alchemical region. alchemical_region = mmtools.alchemy.AlchemicalRegion(**alchemical_region_kwargs) return alchemical_region @staticmethod def _find_similar_sampler_states(sampler_states): """Groups SamplerStates that have the same positions. Returns ------- similar_sampler_states : dict The dict sampler_state_index: list_of_sampler_state_indices with same positions. """ # similar_sampler_states is an ordered dict # sampler_state_index: list of sampler_state_indices with same positions # we run only 1 minimization for each of these entries. similar_sampler_states = collections.OrderedDict() # processed_sampler_states_ids is a set containing all the # sampler state indices that have been assigned a minimization. processed_sampler_states_ids = set() # Find minimum number of minimizations required. for state_id, sampler_state in enumerate(sampler_states): if state_id in processed_sampler_states_ids: continue similar_sampler_states[state_id] = [] processed_sampler_states_ids.add(state_id) for next_state_id in range(state_id+1, len(sampler_states)): next_sampler_state = sampler_states[next_state_id] if np.allclose(sampler_state.positions, next_sampler_state.positions): similar_sampler_states[state_id].append(next_state_id) processed_sampler_states_ids.add(next_state_id) return similar_sampler_states @staticmethod def _minimize_sampler_state(sampler_state_id, sampler_states, thermodynamic_state, tolerance, max_iterations): """Minimize the specified sampler state at the given thermodynamic state.""" sampler_state = sampler_states[sampler_state_id] # Retrieve a context. Any Integrator works. context, integrator = mmtools.cache.global_context_cache.get_context(thermodynamic_state) # Set initial positions and box vectors. sampler_state.apply_to_context(context) # Compute the initial energy of the system for logging. initial_energy = thermodynamic_state.reduced_potential(context) logger.debug('Sampler state {}/{}: initial energy {:8.3f}kT'.format( sampler_state_id + 1, len(sampler_states), initial_energy)) # Minimize energy. openmm.LocalEnergyMinimizer.minimize(context, tolerance, max_iterations) # Get the minimized positions. sampler_state.update_from_context(context) # Compute the final energy of the system for logging. final_energy = thermodynamic_state.reduced_potential(sampler_state) logger.debug('Sampler state {}/{}: final energy {:8.3f}kT'.format( sampler_state_id + 1, len(sampler_states), final_energy)) # Return minimized positions. return sampler_state.positions @staticmethod def _randomize_ligand(sampler_state, topography, sigma_multiplier, close_cutoff): """Randomize ligand positions of the given sampler state.""" # Shortcut variables. ligand_atoms = topography.ligand_atoms receptor_atoms = topography.receptor_atoms # We set the standard deviation of the displacement # proportional to the receptor radius of gyration. radius_of_gyration = pipeline.compute_radius_of_gyration(sampler_state.positions[receptor_atoms]) sigma = sigma_multiplier * radius_of_gyration # Convert to dimensionless positions. positions_unit = sampler_state.positions.unit x = sampler_state.positions / positions_unit close_cutoff = close_cutoff / positions_unit # We work with Quantity only for ligand atoms for readability. ligand_positions = x[ligand_atoms] * positions_unit # Try until we have a non-overlapping ligand conformation. max_n_attempts = 5000 n_attempts = 0 while n_attempts <= max_n_attempts: # Center ligand on a random receptor atom. ligand_positions_mean = ligand_positions.mean(0) receptor_atom_index = receptor_atoms[np.random.randint(0, len(receptor_atoms))] ligand_positions[:] += sampler_state.positions[receptor_atom_index] - ligand_positions_mean # Randomize ligand orientation and displace. ligand_positions = mmtools.mcmc.MCRotationMove.rotate_positions(ligand_positions) ligand_positions = mmtools.mcmc.MCDisplacementMove.displace_positions(ligand_positions, sigma) # Update array to compute distances. x[ligand_atoms, :] = ligand_positions / positions_unit # Check if there's overlap. min_dist = pipeline.compute_min_dist(x[ligand_atoms], x[receptor_atoms]) if min_dist >= close_cutoff: break n_attempts += 1 # Check if we could find a working configuration. if n_attempts > max_n_attempts: raise RuntimeError('Could not randomize ligand after {} attempts'.format(max_n_attempts)) # We return only the randomized ligand positions to minimize MPI traffic. return ligand_positions