diff --git a/openff/evaluator/datasets/taproom/taproom.py b/openff/evaluator/datasets/taproom/taproom.py index 06aebfe2a..af7bde814 100644 --- a/openff/evaluator/datasets/taproom/taproom.py +++ b/openff/evaluator/datasets/taproom/taproom.py @@ -398,6 +398,8 @@ def _build_metadata( "n_pull_windows": n_pull_windows, "release_windows_indices": [*range(len(attach_lambdas))], "release_lambdas": release_lambdas, + "bound_window_index": [[*range(n_pull_windows)][0]], + "unbound_window_index": [[*range(n_pull_windows)][-1]], } ) diff --git a/openff/evaluator/properties/binding.py b/openff/evaluator/properties/binding.py index 948cc444c..dec7cf5a4 100644 --- a/openff/evaluator/properties/binding.py +++ b/openff/evaluator/properties/binding.py @@ -18,6 +18,8 @@ ) from openff.evaluator.protocols.paprika.analysis import ( AnalyzeAPRPhase, + ComputeFreeEnergyGradient, + ComputePotentialEnergyGradient, ComputeReferenceWork, ComputeSymmetryCorrection, ) @@ -29,8 +31,10 @@ from openff.evaluator.protocols.paprika.restraints import ( ApplyRestraints, GenerateAttachRestraints, + GenerateBoundRestraints, GeneratePullRestraints, GenerateReleaseRestraints, + GenerateUnboundRestraints, ) from openff.evaluator.substances import Component from openff.evaluator.thermodynamics import Ensemble @@ -234,17 +238,13 @@ def _paprika_default_solvation_protocol( solvation_protocol.count_exact_amount = False solvation_protocol.box_aspect_ratio = [1.0, 1.0, 2.0] solvation_protocol.center_solute_in_box = False + return solvation_protocol @classmethod def _paprika_default_simulation_protocols( cls, - n_thermalization_steps: int, - n_equilibration_steps: int, - n_production_steps: int, - dt_thermalization: unit.Quantity, - dt_equilibration: unit.Quantity, - dt_production: unit.Quantity, + simulation_time_steps: dict, ) -> Tuple[ openmm.OpenMMEnergyMinimisation, openmm.OpenMMSimulation, @@ -256,22 +256,11 @@ def _paprika_default_simulation_protocols( Parameters ---------- - n_thermalization_steps - The number of thermalization simulations steps to perform. - Sample generated during this step will be discarded. - n_equilibration_steps - The number of equilibration simulations steps to perform. - Sample generated during this step will be discarded. - n_production_steps - The number of production simulations steps to perform. - Sample generated during this step will be used in the final - free energy calculation. - dt_thermalization - The integration timestep during thermalization - dt_equilibration - The integration timestep during equilibration - dt_production - The integration timestep during production + simulation_time_steps + A dictionary containing `n_xxx_steps`, `out_xxx_steps` and `dt_xxx`, corresponding + to number of steps, output frequency and integration time step, respectively. + The dictionary requires three sets for `thermalization`, `equilibration` and + `production` runs. Returns ------- @@ -281,19 +270,23 @@ def _paprika_default_simulation_protocols( energy_minimisation = openmm.OpenMMEnergyMinimisation("") thermalization = openmm.OpenMMSimulation("") - thermalization.steps_per_iteration = n_thermalization_steps - thermalization.output_frequency = 10000 - thermalization.timestep = dt_thermalization + thermalization.steps_per_iteration = simulation_time_steps[ + "n_thermalization_steps" + ] + thermalization.output_frequency = simulation_time_steps["out_thermalization"] + thermalization.timestep = simulation_time_steps["dt_thermalization"] equilibration = openmm.OpenMMSimulation("") - equilibration.steps_per_iteration = n_equilibration_steps - equilibration.output_frequency = 10000 - equilibration.timestep = dt_equilibration + equilibration.steps_per_iteration = simulation_time_steps[ + "n_equilibration_steps" + ] + equilibration.output_frequency = simulation_time_steps["out_equilibration"] + equilibration.timestep = simulation_time_steps["dt_equilibration"] production = openmm.OpenMMSimulation("") - production.steps_per_iteration = n_production_steps - production.output_frequency = 5000 - production.timestep = dt_production + production.steps_per_iteration = simulation_time_steps["n_production_steps"] + production.output_frequency = simulation_time_steps["out_production"] + production.timestep = simulation_time_steps["dt_production"] return energy_minimisation, thermalization, equilibration, production @@ -737,17 +730,362 @@ def _paprika_build_release_protocols( ProtocolPath("result", analyze_release_phase.id), ) + @classmethod + def _paprika_build_end_states_protocol( + cls, + orientation_replicator: ProtocolReplicator, + restraint_schemas: Dict[str, ProtocolPath], + solvation_template: coordinates.SolvateExistingStructure, + minimization_template: openmm.OpenMMEnergyMinimisation, + thermalization_template: openmm.OpenMMSimulation, + equilibration_template: openmm.OpenMMSimulation, + production_template: openmm.OpenMMSimulation, + ): + + # Define a replicator to set and solvate the coordinates for the bound and unbound system + orientation_placeholder = orientation_replicator.placeholder_id + + bound_replicator = ProtocolReplicator( + f"bound_replicator_{orientation_placeholder}" + ) + bound_replicator.template_values = ProtocolPath("bound_window_index", "global") + bound_replicator_id = ( + f"{bound_replicator.placeholder_id}_" f"{orientation_placeholder}" + ) + unbound_replicator = ProtocolReplicator( + f"unbound_replicator_{orientation_placeholder}" + ) + unbound_replicator.template_values = ProtocolPath( + "unbound_window_index", "global" + ) + unbound_replicator_id = ( + f"{unbound_replicator.placeholder_id}_" f"{orientation_placeholder}" + ) + + # Filter out only the solvent substance to help with the solvation step. + filter_solvent = miscellaneous.FilterSubstanceByRole( + "host-guest-filter_solvent" + ) + filter_solvent.input_substance = ProtocolPath("substance", "global") + filter_solvent.component_roles = [Component.Role.Solvent] + + # Align the bound complex + align_bound_coordinates = PreparePullCoordinates( + f"state_bound_align_coordinates_{bound_replicator_id}" + ) + align_bound_coordinates.substance = ProtocolPath("substance", "global") + align_bound_coordinates.complex_file_path = ProtocolPath( + f"guest_orientations[{orientation_placeholder}].coordinate_path", "global" + ) + align_bound_coordinates.guest_orientation_mask = ProtocolPath( + "guest_orientation_mask", "global" + ) + align_bound_coordinates.pull_window_index = ReplicatorValue(bound_replicator.id) + align_bound_coordinates.pull_distance = ProtocolPath("pull_distance", "global") + align_bound_coordinates.n_pull_windows = ProtocolPath( + "n_pull_windows", "global" + ) + + # Align the unbound complex + align_unbound_coordinates = PreparePullCoordinates( + f"state_unbound_align_coordinates_{unbound_replicator_id}" + ) + align_unbound_coordinates.substance = ProtocolPath("substance", "global") + align_unbound_coordinates.complex_file_path = ProtocolPath( + f"guest_orientations[{orientation_placeholder}].coordinate_path", "global" + ) + align_unbound_coordinates.guest_orientation_mask = ProtocolPath( + "guest_orientation_mask", "global" + ) + align_unbound_coordinates.pull_window_index = ReplicatorValue( + unbound_replicator.id + ) + align_unbound_coordinates.pull_distance = ProtocolPath( + "pull_distance", "global" + ) + align_unbound_coordinates.n_pull_windows = ProtocolPath( + "n_pull_windows", "global" + ) + + # Solvate the bound structure + solvate_bound_coordinates = copy.deepcopy(solvation_template) + solvate_bound_coordinates.id = ( + f"state_bound_solvate_coordinates_{bound_replicator_id}" + ) + solvate_bound_coordinates.substance = ProtocolPath( + "filtered_substance", filter_solvent.id + ) + solvate_bound_coordinates.solute_coordinate_file = ProtocolPath( + "output_coordinate_path", align_bound_coordinates.id + ) + + # Solvate the unbound structure + solvate_unbound_coordinates = copy.deepcopy(solvation_template) + solvate_unbound_coordinates.id = ( + f"state_unbound_solvate_coordinates_{unbound_replicator_id}" + ) + solvate_unbound_coordinates.substance = ProtocolPath( + "filtered_substance", filter_solvent.id + ) + solvate_unbound_coordinates.solute_coordinate_file = ProtocolPath( + "output_coordinate_path", align_unbound_coordinates.id + ) + + # Apply the force field parameters. This only needs to be done once. + apply_parameters = forcefield.BuildSmirnoffSystem( + f"state_bound_apply_parameters_{orientation_placeholder}" + ) + apply_parameters.force_field_path = ProtocolPath("force_field_path", "global") + apply_parameters.substance = ProtocolPath("substance", "global") + apply_parameters.coordinate_file_path = ProtocolPath( + "coordinate_file_path", + f"state_bound_solvate_coordinates_0_{orientation_placeholder}", + ) + + # Add dummy atoms to the bound system + add_bound_dummy_atoms = AddDummyAtoms( + f"state_bound_add_dummy_atoms_{bound_replicator_id}" + ) + add_bound_dummy_atoms.substance = ProtocolPath("substance", "global") + add_bound_dummy_atoms.input_coordinate_path = ProtocolPath( + "coordinate_file_path", solvate_bound_coordinates.id + ) + add_bound_dummy_atoms.input_system = ProtocolPath( + "parameterized_system", apply_parameters.id + ) + add_bound_dummy_atoms.offset = ProtocolPath("dummy_atom_offset", "global") + + bound_coordinate_path = ProtocolPath( + "output_coordinate_path", + f"state_bound_add_dummy_atoms_0_{orientation_placeholder}", + ) + bound_system = ProtocolPath( + "output_system", + f"state_bound_add_dummy_atoms_0_{orientation_placeholder}", + ) + + # Add dummy atoms to the unbound system + add_unbound_dummy_atoms = AddDummyAtoms( + f"state_unbound_add_dummy_atoms_{unbound_replicator_id}" + ) + add_unbound_dummy_atoms.substance = ProtocolPath("substance", "global") + add_unbound_dummy_atoms.input_coordinate_path = ProtocolPath( + "coordinate_file_path", solvate_unbound_coordinates.id + ) + add_unbound_dummy_atoms.input_system = ProtocolPath( + "parameterized_system", apply_parameters.id + ) + add_unbound_dummy_atoms.offset = ProtocolPath("dummy_atom_offset", "global") + + unbound_coordinate_path = ProtocolPath( + "output_coordinate_path", + f"state_unbound_add_dummy_atoms_0_{orientation_placeholder}", + ) + unbound_system = ProtocolPath( + "output_system", + f"state_unbound_add_dummy_atoms_0_{orientation_placeholder}", + ) + + # Generate restraints for the bound complex + generate_bound_restraints = GenerateBoundRestraints( + f"state_bound_generate_restraints_{orientation_placeholder}" + ) + generate_bound_restraints.complex_coordinate_path = bound_coordinate_path + generate_bound_restraints.attach_lambdas = ProtocolPath( + "attach_lambdas", "global" + ) + generate_bound_restraints.restraint_schemas = restraint_schemas + + # Generate restraints for the unbound complex + new_restraint_schemas = { + "static": ProtocolPath( + f"guest_orientations[{orientation_placeholder}].static_restraints", + "global", + ), + "guest": ProtocolPath("guest_restraints", "global"), + } + generate_unbound_restraints = GenerateUnboundRestraints( + f"state_unbound_generate_restraints_{orientation_placeholder}" + ) + generate_unbound_restraints.complex_coordinate_path = unbound_coordinate_path + generate_unbound_restraints.attach_lambdas = ProtocolPath( + "attach_lambdas", "global" + ) + generate_unbound_restraints.n_pull_windows = ProtocolPath( + "n_pull_windows", "global" + ) + generate_unbound_restraints.restraint_schemas = new_restraint_schemas + + # Apply restraints to the bound complex + apply_bound_restraints = ApplyRestraints( + f"state_bound_apply_restraints_{bound_replicator_id}" + ) + apply_bound_restraints.restraints_path = ProtocolPath( + "restraints_path", generate_bound_restraints.id + ) + apply_bound_restraints.phase = "attach" + apply_bound_restraints.window_index = 0 + apply_bound_restraints.input_coordinate_path = bound_coordinate_path + apply_bound_restraints.input_system = bound_system + + # Apply restraints to the unbound complex + apply_unbound_restraints = ApplyRestraints( + f"state_unbound_apply_restraints_{unbound_replicator_id}" + ) + apply_unbound_restraints.restraints_path = ProtocolPath( + "restraints_path", generate_unbound_restraints.id + ) + apply_unbound_restraints.phase = "pull" + apply_unbound_restraints.window_index = -1 + apply_unbound_restraints.input_coordinate_path = unbound_coordinate_path + apply_unbound_restraints.input_system = unbound_system + + # Setup the simulations for the bound complex + ( + bound_minimization, + bound_thermalization, + bound_equilibration, + bound_production, + ) = cls._paprika_build_simulation_protocols( + ProtocolPath("output_coordinate_path", add_bound_dummy_atoms.id), + ProtocolPath("output_system", apply_bound_restraints.id), + "state_bound", + bound_replicator_id, + minimization_template, + thermalization_template, + equilibration_template, + production_template, + ) + + # Setup the simulations for the unbound complex + ( + unbound_minimization, + unbound_thermalization, + unbound_equilibration, + unbound_production, + ) = cls._paprika_build_simulation_protocols( + ProtocolPath("output_coordinate_path", add_unbound_dummy_atoms.id), + ProtocolPath("output_system", apply_unbound_restraints.id), + "state_unbound", + unbound_replicator_id, + minimization_template, + thermalization_template, + equilibration_template, + production_template, + ) + + # Return the full list of the protocols which make up the bound and unbound parts + # for the gradient calculation. + protocols = [ + filter_solvent, + align_bound_coordinates, + align_unbound_coordinates, + solvate_bound_coordinates, + solvate_unbound_coordinates, + apply_parameters, + add_bound_dummy_atoms, + add_unbound_dummy_atoms, + generate_bound_restraints, + generate_unbound_restraints, + apply_bound_restraints, + apply_unbound_restraints, + bound_minimization, + bound_thermalization, + bound_equilibration, + bound_production, + unbound_minimization, + unbound_thermalization, + unbound_equilibration, + unbound_production, + ] + protocol_replicators = [bound_replicator, unbound_replicator] + + return ( + protocols, + protocol_replicators, + bound_replicator_id, + unbound_replicator_id, + ProtocolPath("parameterized_system", apply_parameters.id), + ProtocolPath("output_coordinate_path", add_bound_dummy_atoms.id), + ProtocolPath("trajectory_file_path", bound_production.id), + ProtocolPath("output_coordinate_path", add_unbound_dummy_atoms.id), + ProtocolPath("trajectory_file_path", unbound_production.id), + ) + + @classmethod + def _paprika_free_energy_gradient_protocol( + cls, + orientation_replicator: ProtocolReplicator, + bound_replicator_id: str, + unbound_replicator_id: str, + parameterized_system: ProtocolPath, + bound_topology: ProtocolPath, + bound_trajectory: ProtocolPath, + unbound_topology: ProtocolPath, + unbound_trajectory: ProtocolPath, + orientation_free_energy: ProtocolPath, + ): + + # Compute the potential energy gradients for the bound complex + bound_gradients = ComputePotentialEnergyGradient( + f"state_bound_gradient_{bound_replicator_id}" + ) + bound_gradients.topology_path = bound_topology + bound_gradients.trajectory_path = bound_trajectory + bound_gradients.input_system = parameterized_system + bound_gradients.gradient_parameters = ProtocolPath( + "parameter_gradient_keys", "global" + ) + bound_gradients.thermodynamic_state = ProtocolPath( + "thermodynamic_state", "global" + ) + + # Compute the potential energy gradients for the unbound complex + unbound_gradients = ComputePotentialEnergyGradient( + f"state_unbound_gradient_{unbound_replicator_id}" + ) + unbound_gradients.topology_path = unbound_topology + unbound_gradients.trajectory_path = unbound_trajectory + unbound_gradients.input_system = parameterized_system + unbound_gradients.gradient_parameters = ProtocolPath( + "parameter_gradient_keys", "global" + ) + unbound_gradients.thermodynamic_state = ProtocolPath( + "thermodynamic_state", "global" + ) + + # Compute the free energy gradients + free_energy_gradients = ComputeFreeEnergyGradient( + f"free_energy_gradient_{orientation_replicator.placeholder_id}" + ) + free_energy_gradients.orientation_free_energy = orientation_free_energy + free_energy_gradients.bound_state_gradients = ProtocolPath( + "potential_energy_gradients", bound_gradients.id + ) + free_energy_gradients.unbound_state_gradients = ProtocolPath( + "potential_energy_gradients", unbound_gradients.id + ) + + # Return the full list of the protocols which make up the bound and unbound parts + # for the gradient calculation. + protocols = [ + bound_gradients, + unbound_gradients, + free_energy_gradients, + ] + + return ( + protocols, + ProtocolPath("result", free_energy_gradients.id), + ) + @classmethod def default_paprika_schema( cls, existing_schema: SimulationSchema = None, n_solvent_molecules: int = 2500, - n_thermalization_steps: int = 50000, - n_equilibration_steps: int = 200000, - n_production_steps: int = 2500000, - dt_thermalization: unit.Quantity = 1.0 * unit.femtosecond, - dt_equilibration: unit.Quantity = 2.0 * unit.femtosecond, - dt_production: unit.Quantity = 2.0 * unit.femtosecond, + simulation_time_steps: dict = None, + end_states_time_steps: dict = None, debug: bool = False, ): """Returns the default calculation schema to use when estimating @@ -766,24 +1104,19 @@ def default_paprika_schema( An existing schema whose settings to use. If set, the schema's `workflow_schema` will be overwritten by this method. - n_solvent_molecules + n_solvent_molecules: int, optional The number of solvent molecules to add to the box. - n_thermalization_steps - The number of thermalization simulations steps to perform. - Sample generated during this step will be discarded. - n_equilibration_steps - The number of equilibration simulations steps to perform. - Sample generated during this step will be discarded. - n_production_steps - The number of production simulations steps to perform. - Sample generated during this step will be used in the final - free energy calculation. - dt_thermalization - The integration timestep during thermalization - dt_equilibration - The integration timestep during equilibration - dt_production - The integration timestep during production + simulation_time_steps: dict, optional + The integration timestep `dt_xxx`, number of steps to perform `n_xxx_steps` + and output frequency `out_xxx` stored in a dictionary for thermalization, + equilibration and production runs. The integration timesteps is a + pint.Quantity, and number of steps and output frequency are integers. + Sample generated during thermalization and equilibration runs will + be discarded while the production run will be used in the final + free energy. + end_states_time_steps: dict, optional + Same as ``simulation_time_steps`` but for simulating the end states + that will be used to estimate the free energy gradient. debug Whether to return a debug schema. This is nearly identical to the default schema, albeit with significantly less @@ -803,6 +1136,45 @@ def default_paprika_schema( assert isinstance(existing_schema, SimulationSchema) calculation_schema = copy.deepcopy(existing_schema) + # Check user input for simulation time steps + default_time_steps = { + "n_thermalization_steps": 50000, + "n_equilibration_steps": 500000, + "n_production_steps": 1000000, + "dt_thermalization": 1.0 * unit.femtosecond, + "dt_equilibration": 2.0 * unit.femtosecond, + "dt_production": 2.0 * unit.femtosecond, + "out_thermalization": 10000, + "out_equilibration": 10000, + "out_production": 5000, + } + if simulation_time_steps: + assert all( + [ + key in default_time_steps.keys() + for key in simulation_time_steps.keys() + ] + ) + + for key in default_time_steps: + if key not in simulation_time_steps: + simulation_time_steps[key] = default_time_steps[key] + else: + simulation_time_steps = default_time_steps + + # Check user input for end-states time steps + if end_states_time_steps: + assert all( + [ + key in default_time_steps.keys() + for key in end_states_time_steps.keys() + ] + ) + + for key in default_time_steps: + if key not in end_states_time_steps: + end_states_time_steps[key] = default_time_steps[key] + # Initialize the protocols which will serve as templates for those # used in the actual workflows. solvation_template = cls._paprika_default_solvation_protocol( @@ -812,14 +1184,13 @@ def default_paprika_schema( ( minimization_template, *simulation_templates, - ) = cls._paprika_default_simulation_protocols( - n_thermalization_steps=n_thermalization_steps, - n_equilibration_steps=n_equilibration_steps, - n_production_steps=n_production_steps, - dt_thermalization=dt_thermalization, - dt_equilibration=dt_equilibration, - dt_production=dt_production, - ) + ) = cls._paprika_default_simulation_protocols(simulation_time_steps) + + if end_states_time_steps: + ( + end_states_minimization_template, + *end_states_simulation_templates, + ) = cls._paprika_default_simulation_protocols(end_states_time_steps) if debug: @@ -883,6 +1254,26 @@ def default_paprika_schema( *simulation_templates, ) + # Build the protocols for the end-states (for gradient calculations) + if end_states_time_steps: + ( + end_states_protocols, + end_states_replicator, + bound_replicator_id, + unbound_replicator_id, + end_states_parameterized_system, + bound_topology, + bound_trajectory, + unbound_topology, + unbound_trajectory, + ) = cls._paprika_build_end_states_protocol( + orientation_replicator, + restraint_schemas, + solvation_template, + end_states_minimization_template, + *end_states_simulation_templates, + ) + # Compute the symmetry correction. symmetry_correction = ComputeSymmetryCorrection("symmetry_correction") symmetry_correction.n_microstates = ProtocolPath( @@ -904,27 +1295,67 @@ def default_paprika_schema( ProtocolPath("result", symmetry_correction.id), ] + # Free energy gradient + if end_states_time_steps: + ( + gradient_protocols, + orientation_free_energy_with_gradient, + ) = cls._paprika_free_energy_gradient_protocol( + orientation_replicator, + bound_replicator_id, + unbound_replicator_id, + end_states_parameterized_system, + bound_topology, + bound_trajectory, + unbound_topology, + unbound_trajectory, + ProtocolPath("result", orientation_free_energy.id), + ) + # Finally, combine all of the values together total_free_energy = analysis.AverageFreeEnergies("total_free_energy") - total_free_energy.values = ProtocolPath("result", orientation_free_energy.id) + if end_states_time_steps: + total_free_energy.values = orientation_free_energy_with_gradient + else: + total_free_energy.values = ProtocolPath( + "result", orientation_free_energy.id + ) + total_free_energy.thermodynamic_state = ProtocolPath( "thermodynamic_state", "global" ) calculation_schema.workflow_schema = WorkflowSchema() - calculation_schema.workflow_schema.protocol_schemas = [ - *(protocol.schema for protocol in attach_pull_protocols), - *(protocol.schema for protocol in release_protocols), - symmetry_correction.schema, - orientation_free_energy.schema, - total_free_energy.schema, - ] - calculation_schema.workflow_schema.protocol_replicators = [ - orientation_replicator, - *attach_pull_replicators, - release_replicator, - ] + if end_states_time_steps: + calculation_schema.workflow_schema.protocol_schemas = [ + *(protocol.schema for protocol in attach_pull_protocols), + *(protocol.schema for protocol in release_protocols), + *(protocol.schema for protocol in end_states_protocols), + symmetry_correction.schema, + orientation_free_energy.schema, + *(protocol.schema for protocol in gradient_protocols), + total_free_energy.schema, + ] + calculation_schema.workflow_schema.protocol_replicators = [ + orientation_replicator, + *attach_pull_replicators, + release_replicator, + *end_states_replicator, + ] + else: + calculation_schema.workflow_schema.protocol_schemas = [ + *(protocol.schema for protocol in attach_pull_protocols), + *(protocol.schema for protocol in release_protocols), + symmetry_correction.schema, + orientation_free_energy.schema, + total_free_energy.schema, + ] + calculation_schema.workflow_schema.protocol_replicators = [ + orientation_replicator, + *attach_pull_replicators, + release_replicator, + ] # Define where the final value comes from. calculation_schema.workflow_schema.final_value_source = ProtocolPath( diff --git a/openff/evaluator/protocols/paprika/analysis.py b/openff/evaluator/protocols/paprika/analysis.py index f51c34737..9fe593808 100644 --- a/openff/evaluator/protocols/paprika/analysis.py +++ b/openff/evaluator/protocols/paprika/analysis.py @@ -1,10 +1,20 @@ import os +import numpy as np + from openff.evaluator import unit from openff.evaluator.attributes import UNDEFINED +from openff.evaluator.forcefield import ParameterGradient +from openff.evaluator.forcefield.system import ParameterizedSystem +from openff.evaluator.protocols.openmm import _compute_gradients from openff.evaluator.protocols.paprika.restraints import ApplyRestraints from openff.evaluator.thermodynamics import ThermodynamicState -from openff.evaluator.utils.observables import Observable +from openff.evaluator.utils.observables import ( + Observable, + ObservableArray, + ObservableFrame, + ObservableType, +) from openff.evaluator.workflow import Protocol, workflow_protocol from openff.evaluator.workflow.attributes import InputAttribute, OutputAttribute @@ -49,7 +59,7 @@ class AnalyzeAPRPhase(Protocol): def _execute(self, directory, available_resources): - from paprika import analyze + from paprika.evaluator import Analyze # Set-up the expected directory structure. windows_directory = os.path.join(directory, "windows") @@ -90,7 +100,7 @@ def _execute(self, directory, available_resources): for restraint in restraints[restraint_type] ] - results = analyze.compute_phase_free_energy( + results = Analyze.compute_phase_free_energy( phase=self.phase, restraints=flat_restraints, windows_directory=windows_directory, @@ -111,6 +121,166 @@ def _execute(self, directory, available_resources): ) +@workflow_protocol() +class ComputePotentialEnergyGradient(Protocol): + """A protocol to calculate the gradient of the potential energy with + respect to a force field parameter(s) -> . + """ + + input_system = InputAttribute( + docstring="The file path to the force field parameters to assign to the system.", + type_hint=ParameterizedSystem, + default_value=UNDEFINED, + ) + topology_path = InputAttribute( + docstring="The path to the topology file to compute the gradient.", + type_hint=str, + default_value=UNDEFINED, + ) + trajectory_path = InputAttribute( + docstring="The path to the trajectory file to compute the gradient.", + type_hint=str, + default_value=UNDEFINED, + ) + thermodynamic_state = InputAttribute( + docstring="The thermodynamic state that the calculation was performed at.", + type_hint=ThermodynamicState, + default_value=UNDEFINED, + ) + enable_pbc = InputAttribute( + docstring="Whether PBC should be enabled when re-evaluating system energies.", + type_hint=bool, + default_value=True, + ) + gradient_parameters = InputAttribute( + docstring="An optional list of parameters to differentiate the estimated " + "free energy with respect to.", + type_hint=list, + default_value=lambda: list(), + ) + potential_energy_gradients = OutputAttribute( + docstring="The gradient of the potential energy.", + type_hint=list, + ) + + def _execute(self, directory, available_resources): + + assert len(self.gradient_parameters) != 0 + + import mdtraj + from simtk.openmm.app import Modeller, PDBFile + + # Set-up the expected directory structure. + windows_directory = os.path.join(directory, "window") + os.makedirs(windows_directory, exist_ok=True) + + destination_topology_path = os.path.join(windows_directory, "topology.pdb") + destination_trajectory_path = os.path.join(windows_directory, "trajectory.dcd") + + # Work around because dummy atoms are not support in OFF. + # Write a new PDB file without dummy atoms. + coords = PDBFile(self.topology_path) + new_topology = Modeller(coords.topology, coords.positions) + dummy_atoms = [r for r in new_topology.getTopology().atoms() if r.name == "DUM"] + new_topology.delete(dummy_atoms) + + with open(destination_topology_path, "w") as file: + PDBFile.writeFile( + new_topology.getTopology(), + new_topology.getPositions(), + file, + keepIds=True, + ) + + # Write a new DCD file without dummy atoms. + trajectory = mdtraj.load_dcd( + os.path.join(os.getcwd(), self.trajectory_path), + top=os.path.join(os.getcwd(), self.topology_path), + ) + trajectory.atom_slice( + [i for i in range(trajectory.n_atoms - len(dummy_atoms))] + ).save_dcd(destination_trajectory_path) + + # Load in the new trajectory + trajectory = mdtraj.load_dcd( + destination_trajectory_path, + top=destination_topology_path, + ) + + # Placeholder for gradient + observables = ObservableFrame( + { + ObservableType.PotentialEnergy: ObservableArray( + value=np.zeros((len(trajectory), 1)) * unit.kilojoule / unit.mole + ) + } + ) + + # Compute the gradient in the first solvent. + _compute_gradients( + self.gradient_parameters, + observables, + self.input_system.force_field.to_force_field(), + self.thermodynamic_state, + self.input_system.topology, + trajectory, + available_resources, + self.enable_pbc, + ) + + self.potential_energy_gradients = [ + ParameterGradient(key=gradient.key, value=gradient.value.mean().item()) + for gradient in observables[ObservableType.PotentialEnergy].gradients + ] + + +@workflow_protocol() +class ComputeFreeEnergyGradient(Protocol): + """A protocol to calculate the free-energy gradient of the binding free energy + respect to a force field parameter(s). d(ΔG°)/dθ = _bound - _unbound + """ + + bound_state_gradients = InputAttribute( + docstring="The gradient of the potential energy for the bound state.", + type_hint=list, + default_value=UNDEFINED, + ) + unbound_state_gradients = InputAttribute( + docstring="The gradient of the potential energy for the unbound state.", + type_hint=list, + default_value=UNDEFINED, + ) + orientation_free_energy = InputAttribute( + docstring="The total free energy for a particular binding orientation.", + type_hint=Observable, + default_value=UNDEFINED, + ) + result = OutputAttribute( + docstring="The free energy with the gradients stored as Observable.", + type_hint=Observable, + ) + + def _execute(self, directory, available_resources): + + bound_state = { + gradient.key: gradient for gradient in self.bound_state_gradients[0] + } + unbound_state = { + gradient.key: gradient for gradient in self.unbound_state_gradients[0] + } + + free_energy_gradients = [ + bound_state[key] - unbound_state[key] for key in bound_state + ] + + self.result = Observable( + self.orientation_free_energy.value.plus_minus( + self.orientation_free_energy.error + ), + gradients=free_energy_gradients, + ) + + @workflow_protocol() class ComputeSymmetryCorrection(Protocol): """Computes the symmetry correction for an APR calculation which involves @@ -132,7 +302,7 @@ class ComputeSymmetryCorrection(Protocol): def _execute(self, directory, available_resources): - from paprika.analyze import Analyze + from paprika.evaluator import Analyze self.result = Observable( unit.Measurement( @@ -171,7 +341,7 @@ class ComputeReferenceWork(Protocol): def _execute(self, directory, available_resources): - from paprika.analyze import Analyze + from paprika.evaluator import Analyze restraints = ApplyRestraints.load_restraints(self.restraints_path) guest_restraints = restraints["guest"] diff --git a/openff/evaluator/protocols/paprika/coordinates.py b/openff/evaluator/protocols/paprika/coordinates.py index fec6ac8f2..3e338db90 100644 --- a/openff/evaluator/protocols/paprika/coordinates.py +++ b/openff/evaluator/protocols/paprika/coordinates.py @@ -141,7 +141,7 @@ class PreparePullCoordinates(_PrepareAPRCoordinates): def _execute(self, directory, available_resources): - from paprika.setup import Setup + from paprika.evaluator import Setup from simtk.openmm import app atom_indices_by_role = _atom_indices_by_role( @@ -177,7 +177,7 @@ class PrepareReleaseCoordinates(_PrepareAPRCoordinates): def _execute(self, directory, available_resources): import mdtraj - from paprika.setup import Setup + from paprika.evaluator import Setup from simtk.openmm import app mdtraj_trajectory = mdtraj.load_pdb(self.complex_file_path) @@ -248,7 +248,7 @@ class AddDummyAtoms(Protocol): def _execute(self, directory, available_resources): import parmed.geometry - from paprika.setup import Setup + from paprika.evaluator import Setup from simtk.openmm import NonbondedForce, XmlSerializer, app # Extract the host atoms to determine the offset of the dummy atoms. @@ -273,8 +273,8 @@ def _execute(self, directory, available_resources): # Shift the structure to avoid issues with the PBC input_structure.coordinates += numpy.array( [ - input_structure.box[0] * 0.5, - input_structure.box[1] * 0.5, + input_structure.box[0] * 0.45, + input_structure.box[1] * 0.45, -input_structure.coordinates[-1, 2] + 1.0, ] ) diff --git a/openff/evaluator/protocols/paprika/restraints.py b/openff/evaluator/protocols/paprika/restraints.py index 050abc3cf..1e67ff471 100644 --- a/openff/evaluator/protocols/paprika/restraints.py +++ b/openff/evaluator/protocols/paprika/restraints.py @@ -40,13 +40,16 @@ def _save_restraints( self, directory: str, static_restraints, - conformational_restraints, + conformational_restraints=None, symmetry_restraints=None, wall_restraints=None, guest_restraints=None, ): """Saves the restraints to a convenient JSON file.""" + conformational_restraints = ( + [] if conformational_restraints is None else conformational_restraints + ) symmetry_restraints = [] if symmetry_restraints is None else symmetry_restraints wall_restraints = [] if wall_restraints is None else wall_restraints guest_restraints = [] if guest_restraints is None else guest_restraints @@ -87,7 +90,7 @@ class GenerateAttachRestraints(_GenerateRestraints): def _execute(self, directory, available_resources): - from paprika.setup import Setup + from paprika.evaluator import Setup # Construct the restraints to keep the host in place and # with an open cavity. @@ -149,7 +152,7 @@ class GeneratePullRestraints(GenerateAttachRestraints): def _execute(self, directory, available_resources): - from paprika.setup import Setup + from paprika.evaluator import Setup # Construct the restraints to keep the host in place and # with an open cavity. @@ -218,7 +221,7 @@ class GenerateReleaseRestraints(_GenerateRestraints): def _execute(self, directory, available_resources): - from paprika.setup import Setup + from paprika.evaluator import Setup # Construct the restraints to keep the host in place and # with an open cavity. @@ -244,6 +247,92 @@ def _execute(self, directory, available_resources): ) +@workflow_protocol() +class GenerateBoundRestraints(GenerateAttachRestraints): + """Generates the restraint values to apply to the bound host-guest system for the gradient + calculation from a set of restraint schema definitions and makes them easily accessible + for the protocols which will apply them to the parameterized system.""" + + def _execute(self, directory, available_resources): + + from paprika.evaluator import Setup + + # Construct the restraints to keep the host in place + static_restraints = Setup.build_static_restraints( + self.complex_coordinate_path, + len(self.attach_lambdas), + None, + None, + self.restraint_schemas.get("static", []), + ) + + # Construct the restraints to keep the guest at the correct + # distance and orientation relative to the host. + symmetry_restraints = Setup.build_symmetry_restraints( + self.complex_coordinate_path, + len(self.attach_lambdas), + self.restraint_schemas.get("symmetry", []), + ) + wall_restraints = Setup.build_wall_restraints( + self.complex_coordinate_path, + len(self.attach_lambdas), + self.restraint_schemas.get("wall", []), + ) + + self._save_restraints( + directory, + static_restraints, + None, + symmetry_restraints, + wall_restraints, + None, + ) + + +@workflow_protocol() +class GenerateUnboundRestraints(GeneratePullRestraints): + """Generates the restraint values to apply to the unbound host-guest system for the gradient + calculation from a set of restraint schema definitions and makes them easily accessible + for the protocols which will apply them to the parameterized system.""" + + def _execute(self, directory, available_resources): + from paprika.evaluator import Setup + + # Construct the restraints to keep the host in place + static_restraints = Setup.build_static_restraints( + self.complex_coordinate_path, + len(self.attach_lambdas), + self.n_pull_windows, + None, + self.restraint_schemas.get("static", []), + ) + + # Construct the restraints to keep the guest at the correct + # distance and orientation relative to the host. + guest_restraints = Setup.build_guest_restraints( + self.complex_coordinate_path, + self.attach_lambdas, + self.n_pull_windows, + self.restraint_schemas.get("guest", []), + ) + + # Remove the attach phases from the restraints as these restraints are + # only being used for the pull phase. + for restraint in static_restraints + guest_restraints: + + for key in restraint.phase["attach"]: + restraint.phase["attach"][key] = None + + self._save_restraints( + directory, + static_restraints, + None, + None, + None, + guest_restraints, + ) + + @workflow_protocol() class ApplyRestraints(Protocol): """A protocol which will apply the restraints defined in a restraints JSON diff --git a/openff/evaluator/tests/test_protocols/test_paprika.py b/openff/evaluator/tests/test_protocols/test_paprika.py index 5e7cf564a..48e313b4c 100644 --- a/openff/evaluator/tests/test_protocols/test_paprika.py +++ b/openff/evaluator/tests/test_protocols/test_paprika.py @@ -2,12 +2,21 @@ import numpy import pytest +from openff.toolkit.topology import Molecule, Topology +from openff.toolkit.typing.engines.smirnoff import ForceField from openff.evaluator import unit from openff.evaluator.attributes import UNDEFINED +from openff.evaluator.forcefield import ( + ParameterGradient, + ParameterGradientKey, + SmirnoffForceFieldSource, +) from openff.evaluator.forcefield.system import ParameterizedSystem from openff.evaluator.protocols.paprika.analysis import ( AnalyzeAPRPhase, + ComputeFreeEnergyGradient, + ComputePotentialEnergyGradient, ComputeReferenceWork, ComputeSymmetryCorrection, ) @@ -21,12 +30,15 @@ from openff.evaluator.protocols.paprika.restraints import ( ApplyRestraints, GenerateAttachRestraints, + GenerateBoundRestraints, GeneratePullRestraints, GenerateReleaseRestraints, + GenerateUnboundRestraints, ) from openff.evaluator.substances import Component, ExactAmount, Substance from openff.evaluator.thermodynamics import ThermodynamicState from openff.evaluator.utils import get_data_filename +from openff.evaluator.utils.observables import Observable @pytest.fixture(scope="module") @@ -48,7 +60,7 @@ def dummy_complex() -> Substance: def complex_file_path(tmp_path): import parmed.geometry - from paprika.setup import Setup + from paprika.evaluator import Setup complex_path = get_data_filename( os.path.join("test", "molecules", "methanol_methane.pdb") @@ -133,6 +145,31 @@ def release_restraints_path(tmp_path, complex_file_path, restraints_schema): return protocol.restraints_path +@pytest.fixture() +def bound_restraints_path(tmp_path, complex_file_path, restraints_schema): + + protocol = GenerateBoundRestraints("") + protocol.complex_coordinate_path = complex_file_path + protocol.attach_lambdas = [1.0] + protocol.restraint_schemas = restraints_schema + protocol.execute(str(tmp_path)) + + return protocol.restraints_path + + +@pytest.fixture() +def unbound_restraints_path(tmp_path, complex_file_path, restraints_schema): + + protocol = GenerateUnboundRestraints("") + protocol.complex_coordinate_path = complex_file_path + protocol.attach_lambdas = [0.0] + protocol.n_pull_windows = 1 + protocol.restraint_schemas = restraints_schema + protocol.execute(str(tmp_path)) + + return protocol.restraints_path + + def test_components_by_role(dummy_complex): components_by_role = _components_by_role(dummy_complex) @@ -249,11 +286,11 @@ def test_add_dummy_atoms(tmp_path, dummy_complex): trajectory = mdtraj.load_pdb(protocol.output_coordinate_path) assert trajectory.topology.n_atoms == 14 - assert numpy.allclose(trajectory.xyz[0][11:12, :2], 2.5) + assert numpy.allclose(trajectory.xyz[0][11:12, :2], 2.25) assert numpy.isclose(trajectory.xyz[0][11, 2], 0.62) assert numpy.isclose(trajectory.xyz[0][12, 2], 0.32) - assert numpy.isclose(trajectory.xyz[0][13, 0], 2.5) - assert numpy.isclose(trajectory.xyz[0][13, 1], 2.72) + assert numpy.isclose(trajectory.xyz[0][13, 0], 2.25) + assert numpy.isclose(trajectory.xyz[0][13, 1], 2.47) assert numpy.isclose(trajectory.xyz[0][13, 2], 0.1) # Validate the atom / residue names. @@ -366,6 +403,37 @@ def test_generate_release_restraints(tmp_path, complex_file_path, restraints_sch ) +def test_generate_bound_restraints(tmp_path, complex_file_path, restraints_schema): + + protocol = GenerateBoundRestraints("") + protocol.complex_coordinate_path = complex_file_path + protocol.attach_lambdas = [0.0, 1.0] + protocol.restraint_schemas = restraints_schema + + protocol.execute(str(tmp_path)) + + assert os.path.isfile(protocol.restraints_path) + + validate_generated_restraints( + protocol.restraints_path, {"static", "wall", "symmetry"}, "attach" + ) + + +def test_generate_unbound_restraints(tmp_path, complex_file_path, restraints_schema): + + protocol = GenerateUnboundRestraints("") + protocol.complex_coordinate_path = complex_file_path + protocol.attach_lambdas = [0.0, 1.0] + protocol.n_pull_windows = 2 + protocol.restraint_schemas = restraints_schema + + protocol.execute(str(tmp_path)) + + assert os.path.isfile(protocol.restraints_path) + + validate_generated_restraints(protocol.restraints_path, {"static", "guest"}, "pull") + + def test_apply_attach_restraints( tmp_path, dummy_complex, complex_file_path, attach_restraints_path ): @@ -441,6 +509,56 @@ def test_apply_release_restraints( validate_system_file(protocol.output_system.system_path, {10, 11}) +def test_apply_bound_restraints( + tmp_path, dummy_complex, complex_file_path, bound_restraints_path +): + + from simtk import openmm + + with open(os.path.join(tmp_path, "system.xml"), "w") as file: + file.write(openmm.XmlSerializer.serialize(openmm.System())) + + protocol = ApplyRestraints("") + protocol.restraints_path = bound_restraints_path + protocol.input_coordinate_path = complex_file_path + protocol.input_system = ParameterizedSystem( + substance=dummy_complex, + force_field=None, + topology_path=complex_file_path, + system_path=os.path.join(tmp_path, "system.xml"), + ) + protocol.phase = "attach" + protocol.window_index = 0 + protocol.execute(str(tmp_path)) + + validate_system_file(protocol.output_system.system_path, {10, 13, 14}) + + +def test_apply_unbound_restraints( + tmp_path, dummy_complex, complex_file_path, unbound_restraints_path +): + + from simtk import openmm + + with open(os.path.join(tmp_path, "system.xml"), "w") as file: + file.write(openmm.XmlSerializer.serialize(openmm.System())) + + protocol = ApplyRestraints("") + protocol.restraints_path = unbound_restraints_path + protocol.input_coordinate_path = complex_file_path + protocol.input_system = ParameterizedSystem( + substance=dummy_complex, + force_field=None, + topology_path=complex_file_path, + system_path=os.path.join(tmp_path, "system.xml"), + ) + protocol.phase = "pull" + protocol.window_index = 0 + protocol.execute(str(tmp_path)) + + validate_system_file(protocol.output_system.system_path, {10, 12}) + + def test_compute_reference_work(tmp_path, complex_file_path): # Generate a dummy set of pull restraints @@ -500,10 +618,103 @@ def test_compute_symmetry_correction(temperature, n_microstates): assert numpy.isclose(protocol.result.value, expected_value) +def test_compute_potential_energy_gradient(tmp_path): + + import mdtraj + from simtk import openmm + + smiles = "C" + substance = Substance() + substance.add_component( + Component(smiles=smiles, role=Component.Role.Ligand), ExactAmount(1) + ) + + # Create XML files + topology_path = os.path.join(tmp_path, "topology.pdb") + methane = Molecule.from_smiles(smiles) + methane.to_file(topology_path, "PDB") + + topology = Topology.from_openmm( + methane.to_topology().to_openmm(), unique_molecules=[methane] + ) + forcefield = ForceField("openff-1.2.0.offxml") + system = forcefield.create_openmm_system(topology) + system_path = os.path.join(tmp_path, "system.xml") + + with open(system_path, "w") as file: + file.write(openmm.XmlSerializer.serialize(system)) + + # Create a trajectory to load + trajectory_path = os.path.join(tmp_path, "trajectory.dcd") + trajectory: mdtraj.Trajectory = mdtraj.load_pdb(topology_path) + trajectory.save_dcd(trajectory_path) + + # Compute gradient of potential enery + protocol = ComputePotentialEnergyGradient("") + protocol.input_system = ParameterizedSystem( + substance=substance, + force_field=SmirnoffForceFieldSource.from_object(forcefield), + topology_path=topology_path, + system_path=system_path, + ) + protocol.topology_path = topology_path + protocol.trajectory_path = trajectory_path + protocol.thermodynamic_state = ThermodynamicState(temperature=298.15 * unit.kelvin) + protocol.enable_pbc = False + protocol.gradient_parameters = [ + ParameterGradientKey(tag="vdW", smirks="[#1:1]", attribute="rmin_half"), + ParameterGradientKey(tag="vdW", smirks="[#1:1]", attribute="epsilon"), + ] + protocol.execute(str(tmp_path)) + + assert ( + protocol.potential_energy_gradients[0].value + == 0.0 * unit.kilojoule / unit.mole / unit.angstrom + ) + assert ( + protocol.potential_energy_gradients[1].value + == 0.0 * unit.kilojoule / unit.kilocalorie + ) + + +def test_compute_free_energy_gradient(tmp_path): + + protocol = ComputeFreeEnergyGradient("") + protocol.bound_state_gradients = [ + [ + ParameterGradient( + key=ParameterGradientKey("vdW", "[#6:1]", "sigma"), + value=-10.0 * unit.kilocalorie / unit.mole / unit.angstrom, + ), + ] + ] + protocol.unbound_state_gradients = [ + [ + ParameterGradient( + key=ParameterGradientKey("vdW", "[#6:1]", "sigma"), + value=-5.0 * unit.kilocalorie / unit.mole / unit.angstrom, + ), + ] + ] + protocol.orientation_free_energy = Observable( + value=(-10.0 * unit.kilocalorie / unit.mole).plus_minus( + 1.0 * unit.kilocalorie / unit.mole + ), + gradients=[], + ) + protocol.execute() + + assert protocol.result.value == -10.0 * unit.kilocalorie / unit.mole + assert ( + protocol.result.gradients[0].value + == -5.0 * unit.kilocalorie / unit.mole / unit.angstrom + ) + + def test_analyse_apr(tmp_path, monkeypatch, complex_file_path): import mdtraj - from paprika import analyze + from paprika.evaluator import Analyze # Generate a dummy set of attach restraints restraints_protocol = GenerateAttachRestraints("") @@ -534,7 +745,7 @@ def mock_analyze_return(**_): # Application of the monkeypatch to replace Path.home # with the behavior of mockreturn defined above. - monkeypatch.setattr(analyze, "compute_phase_free_energy", mock_analyze_return) + monkeypatch.setattr(Analyze, "compute_phase_free_energy", mock_analyze_return) protocol = AnalyzeAPRPhase("analyze_release_phase") protocol.topology_path = complex_file_path