"""
Execution Context
=================
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
__all__ = ['Context']
import importlib
import os
import warnings
import tempfile
from gmx import exceptions
from gmx import logging
from gmx import status
import gmx.core as gmxapi
# Module-level logger
logger = logging.getLogger(__name__)
logger.info('Importing gmx.context')
def _load_tpr(self, element):
"""Implement the gromacs.load_tpr operation.
Updates the minimum width of the workflow parallelism. Does not add any API object to the graph.
Arguments:
self: The Context in which this operation is being loaded.
element: WorkElement specifying the operation.
Returns:
A Director that the Context can use in launching the Session.
"""
class Builder(object):
def __init__(self, tpr_list):
logger.debug("Loading tpr builder for tpr_list {}".format(tpr_list))
self.tpr_list = tpr_list
self.subscribers = []
self.width = len(tpr_list)
def add_subscriber(self, builder):
builder.infile = self.tpr_list
self.subscribers.append(builder)
def build(self, dag):
width = len(self.tpr_list)
for builder in self.subscribers:
builder.width = width
if 'width' in dag.graph:
width = max(width, dag.graph['width'])
dag.graph['width'] = width
return Builder(element.params['input'])
def _md(context, element):
"""Implement the gmxapi.md operation by returning a builder that can populate a data flow graph for the element.
Inspects dependencies to set up the simulation runner.
The graph node created will have `launch` and `run` attributes with function references, and a `width`
attribute declaring the workflow parallelism requirement.
Arguments:
context: The Context in which this operation is being loaded.
element: WorkElement specifying the operation.
Returns:
A Director that the Context can use in launching the Session.
"""
class Builder(object):
"""Translate md work element to a node in the session's DAG."""
def __init__(self, element):
try:
self.name = element.name
# Note that currently the calling code is in charge of subscribing this builder to its dependencies.
# A list of tpr files will be set when the calling code subscribes this builder to a tpr provider.
self.infile = None
# Other dependencies in the element may register potentials when subscribed to.
self.potential = []
self.input_nodes = []
self.runtime_params = element.params
except AttributeError:
raise exceptions.ValueError("object provided does not seem to be a WorkElement.")
def add_subscriber(self, builder):
"""The md operation does not yet have any subscribeable facilities."""
pass
def build(self, dag):
"""Add a node to the graph that, when launched, will construct a simulation runner.
Complete the definition of appropriate graph edges for dependencies.
The launch() method of the added node creates the runner from the tpr file for the current rank and adds
modules from the incoming edges.
"""
if not (hasattr(dag, 'add_node')
and hasattr(dag, 'add_edge')
and hasattr(dag, 'graph')
and hasattr(dag, 'nodes')):
raise gmx.exceptions.TypeError("dag argument does not have a DiGraph interface.")
name = self.name
dag.add_node(name)
for neighbor in self.input_nodes:
dag.add_edge(neighbor, name)
infile = self.infile
assert not infile is None
potential_list = self.potential
assert dag.graph['width'] >= len(infile)
# Provide closure with which to execute tasks for this node.
def launch(rank=None):
assert not rank is None
# Copy and update, if required by `end_time` parameter.
temp_filename = None
if 'end_time' in self.runtime_params:
# Note that mkstemp returns a file descriptor as the first part of the tuple.
# We can make this cleaner in 0.0.7 with a separate node that manages the
# altered input.
_, temp_filename = tempfile.mkstemp(suffix='.tpr')
logger.debug('Updating input. Using temp file {}'.format(temp_filename))
gmxapi.copy_tprfile(source=infile[rank],
destination=temp_filename,
end_time=self.runtime_params['end_time'])
tpr_file = temp_filename
else:
tpr_file = infile[rank]
logger.info('Loading TPR file: {}'.format(tpr_file))
system = gmxapi.from_tpr(tpr_file)
dag.nodes[name]['system'] = system
mdargs = gmxapi.MDArgs()
mdargs.set(self.runtime_params)
# Workaround to give access to plugin potentials used in a context.
pycontext = element.workspec._context
pycontext.potentials = potential_list
context = pycontext._api_object
context.setMDArgs(mdargs)
for potential in potential_list:
context.add_mdmodule(potential)
dag.nodes[name]['session'] = system.launch(context)
dag.nodes[name]['close'] = dag.nodes[name]['session'].close
if 'end_time' in self.runtime_params:
def special_close():
dag.nodes[name]['session'].close()
logger.debug("Unlinking temporary TPR file {}.".format(temp_filename))
os.unlink(temp_filename)
dag.nodes[name]['close'] = special_close
else:
dag.nodes[name]['close'] = dag.nodes[name]['session'].close
def runner():
"""Currently we only support a single call to run."""
def done():
raise StopIteration()
# Replace the runner with a stop condition for subsequent passes.
dag.nodes[name]['run'] = done
return dag.nodes[name]['session'].run()
dag.nodes[name]['run'] = runner
return dag.nodes[name]['run']
dag.nodes[name]['launch'] = launch
return Builder(element)
def _get_mpi_ensemble_communicator(session_communicator, ensemble_size):
"""Get an ensemble communicator from an MPI communicator.
An ensemble communicator is an object that implements mpi4py.MPI.Comm methods
as described elsewhere in this documentation.
:param session_communicator: MPI communicator with the interface described by mpi4py.MPI.Comm
:param ensemble_size: number of ensemble members
:return: communicator of described size on participating ranks and null communicator on any others.
Must be called exactly once in every process in `communicator`. It is the
responsibility of the calling code to refrain from running ensemble operations
if not part of the ensemble. The calling code determines this by comparing its
session_communicator.Get_rank() to ensemble_size. This is not a good solution
because it assumes there is a single ensemble communicator and that ensemble
work is allocated to ranks serially from session_rank 0. Future work might
use process groups associated with specific operations in the work graph so
that processes can check for membership in a group to decide whether to use
their ensemble communicator. Another possibility would be to return None
rather than a null communicator in processes that aren't participating in
a given ensemble.
"""
from mpi4py import MPI
session_size = session_communicator.Get_size()
session_rank = session_communicator.Get_rank()
# Check the ensemble "width" against the available parallelism
if ensemble_size > session_size:
msg = 'ParallelArrayContext requires a work array that fits in the MPI communicator: '
msg += 'array width {} > size {}.'
msg = msg.format(ensemble_size, session_size)
raise exceptions.UsageError(msg)
if ensemble_size < session_size:
msg = 'MPI context is wider than necessary to run this work: array width {} vs. size {}.'
warnings.warn(msg.format(ensemble_size, session_size))
# Create an appropriate sub-communicator for the present work. Extra ranks will be in a
# sub-communicator with no work.
if session_rank < ensemble_size:
# The session launcher should maintain an inventory of the ensembles and
# provide an appropriate tag, but right now we just have a sort of
# Boolean: ensemble or not.
color = 0
else:
color = MPI.UNDEFINED
ensemble_communicator = session_communicator.Split(color, session_rank)
try:
ensemble_communicator_size = ensemble_communicator.Get_size()
ensemble_communicator_rank = ensemble_communicator.Get_rank()
except:
warnings.warn("Possible API programming error: ensemble_communicator does not provide required methods...")
ensemble_communicator_size = 0
ensemble_communicator_rank = None
logger.info("Session rank {} assigned to rank {} of subcommunicator {} of size {}".format(
session_rank,
ensemble_communicator_rank,
ensemble_communicator,
ensemble_communicator_size
))
# There isn't a good reason to worry about special handling for a null communicator,
# which we have to explicitly avoid "free"ing, so let's just get rid of it.
# To do: don't even get the null communicator in the first place. Use a group and create instead of split.
if ensemble_communicator == MPI.COMM_NULL:
ensemble_communicator = None
return ensemble_communicator
def _acquire_communicator(communicator=None):
"""Get a workflow level communicator for the session.
This function is intended to be called by the __enter__ method that creates
a session get a communicator instance. The `Free` method of the returned
instance must be called exactly once. This should be performed by the
corresponding __exit__ method.
Arguments:
communicator : a communicator to duplicate (optional)
Returns:
A communicator that must be explicitly freed by the caller.
Currently only supports MPI multi-simulation parallelism dependent on
mpi4py. The mpi4py package should be installed and built with compilers
that are compatible with the gmxapi installation.
If provided, `communicator` must provide the mpi4py.MPI.Comm interface.
Returns either a duplicate of `communicator` or of MPI_COMM_WORLD if mpi4py
is available. Otherwise, returns a mock communicator that can only manage
sessions and ensembles of size 0 or 1.
gmx behavior is undefined if launched with mpiexec and without mpi4py
"""
class MockSessionCommunicator(object):
def Dup(self):
return self
def Free(self):
return
def Get_size(self):
return 1
def Get_rank(self):
return 0
def __str__(self):
return 'Basic'
def __repr__(self):
return 'MockSessionCommunicator()'
if communicator is None:
try:
import mpi4py.MPI as MPI
communicator = MPI.COMM_WORLD
except ImportError:
logger.info("mpi4py is not available for default session communication.")
communicator = MockSessionCommunicator()
else:
communicator = communicator
try:
new_communicator = communicator.Dup()
except Exception as e:
message = "Exception when duplicating communicator: {}".format(e)
raise exceptions.ApiError(message)
return new_communicator
def _get_ensemble_communicator(communicator, ensemble_size):
"""Provide ensemble_communicator feature in active_context, if possible.
Must be called on all ranks in `communicator`. The communicator returned
must be freed by a call to its `Free()` instance method. This function is
best used in a context manager's `__enter__()` method so that the
corresponding `context.Free()` can be called in the `__exit__` method.
Arguments:
communicator : session communicator for the session with the ensemble.
ensemble_size : ensemble size of the requested ensemble communicator
The ensemble_communicator feature should be present if the Context can
provide communication between all ensemble members. The Context should
determine this at the launch of the session and set the
``_session_ensemble_communicator`` attribute to provide an object that
implements the same interface as an mpi4py.MPI.Comm object. Actually, this is
a temporary shim, so the only methods that need to be available are `Get_size`,
`Get_rank` and something that can be called as
Allreduce(send, recv) where send and recv are objects providing the Python
buffer interface.
Currently, only one ensemble can be managed in a session.
"""
ensemble_communicator = None
class TrivialEnsembleCommunicator(object):
def __init__(self):
import numpy
self._numpy = numpy
def Free(self):
return
def Allreduce(self, send, recv):
logger.debug("Faking an Allreduce for ensemble of size 1.")
send_buffer = self._numpy.array(send, copy=False)
recv_buffer = self._numpy.array(recv, copy=False)
recv_buffer[:] = send_buffer[:]
def Get_size(self):
return 1
def Get_rank(self):
return 0
# For trivial cases, don't bother trying to use MPI
# Note: all ranks in communicator must agree on the size of the work!
# Note: If running with a Mock session communicator in an MPI session (user error)
# every rank will think it is the only rank and will try to perform the
# same work.
if communicator.Get_size() <= 1 or ensemble_size <= 1:
message = "Getting TrivialEnsembleCommunicator for ensemble of size {}".format((ensemble_size))
message += " for session rank {} in session communicator of size {}".format(
communicator.Get_rank(),
communicator.Get_size())
logger.debug(message)
ensemble_communicator = TrivialEnsembleCommunicator()
else:
message = "Getting an MPI subcommunicator for ensemble of size {}".format(ensemble_size)
message += " for session rank {} in session communicator of size {}".format(
communicator.Get_rank(),
communicator.Get_size())
logger.debug(message)
ensemble_communicator = _get_mpi_ensemble_communicator(communicator, ensemble_size)
return ensemble_communicator
def _get_ensemble_update(context):
"""Set up a simple ensemble resource.
The context should call this function once per session to get an `ensemble_update`
function object.
This is a draft of a Context feature that may not be available in all
Context implementations. This factory function can be wrapped as a
``ensemble_update`` "property" in a Context instance method to produce a Python function
with the signature ``update(context, send, recv, tag=None)``.
This feature requires that the Context is capabable of providing the
ensemble_communicator feature and the numpy feature.
If both are available, the function object provided by
``ensemble_update`` provides
the ensemble reduce operation used by the restraint potential plugin in the
gmxapi sample_restraint repository. Otherwise, the provided function object
will log an error and then raise an exception.
gmxapi 0.0.5 and 0.0.6 MD plugin clients look for a member function named
``ensemble_update`` in the Context that launched them. In the future,
clients will use session resources to access ensemble reduce operations.
In the mean time, a transitional implementation can involve defining a
``ensemble_update`` property in the Context object that acts as a factory
function to produce the reducing operation, if possible with the given
resources.
"""
try:
import numpy
except ImportError:
message = "ensemble_update requires numpy, but numpy is not available."
logger.error(message)
raise exceptions.FeatureNotAvailableError(message)
def _ensemble_update(active_context, send, recv, tag=None):
assert not tag is None
assert str(tag) != ''
if not tag in active_context.part:
active_context.part[tag] = 0
logger.debug("Performing ensemble update.")
active_context._session_ensemble_communicator.Allreduce(send, recv)
buffer = numpy.array(recv, copy=False)
buffer /= active_context.work_width
suffix = '_{}.npz'.format(tag)
# These will end up in the working directory and each ensemble member will have one
filename = str("rank{}part{:04d}{}".format(active_context.rank, int(active_context.part[tag]), suffix))
numpy.savez(filename, recv=recv)
active_context.part[tag] += 1
def _no_ensemble_update(active_context, send, recv, tag=None):
message = "Attempt to call ensemble_update() in a Context that does not provide the operation."
# If we confirm effective exception handling, remove the extraneous log.
logger.error(message)
raise exceptions.FeatureNotAvailableError(message)
if context._session_ensemble_communicator is not None:
functor = _ensemble_update
else:
functor = _no_ensemble_update
context.part = {}
return functor
class _libgromacsContext(object):
"""Low level API to libgromacs library context provides Python context manager.
Binds to a workflow and manages computation resources.
Attributes:
workflow (:obj:`gmx.workflow.WorkSpec`): bound workflow to be executed.
Example:
>>> with _libgromacsContext(my_workflow) as session: # doctest: +SKIP
... session.run()
Things are still fluid, but what we might do is have all of the WorkSpec operations that are supported
by a given Context to correspond to member functions in the Context, a SessionBuilder, or Session. In
any case, the operations allow a Context implementation to transform a work specification into a
directed acyclic graph of schedulable work.
"""
# The Context is the appropriate entity to own or mediate access to an appropriate logging facility,
# but right now we are using the module-level Python logger.
# Reference https://github.com/kassonlab/gmxapi/issues/135
def __init__(self, workflow=None):
"""Create new context bound to the provided workflow, if any.
Args:
workflow (gmx.workflow.WorkSpec) work specification object to bind.
"""
self._session = None
self.__workflow = workflow
@property
def workflow(self):
return self.__workflow
@workflow.setter
def workflow(self, workflow):
"""Before accepting a workflow, the context must check whether it can interpret the work specification."""
self.__workflow = workflow
@classmethod
def check_workspec(cls, workspec, raises=False):
"""Check the validity of the work specification in this Context.
Args:
workspec: work specification to check
raises: Boolean (default False)
If raises == True, raises exceptions for problems found in the work specification.
Returns:
True if workspec is processable in this Context, else False.
"""
from gmx.workflow import workspec_version, get_source_elements, WorkElement
# initialize return value.
is_valid = True
# Check compatibility
if workspec.version != workspec_version:
is_valid = False
if raises:
raise exceptions.ApiError('Incompatible workspec version.')
# Check that Elements are uniquely identifiable.
elements = dict()
for element in workspec.elements:
if element.name is not None and element.name not in elements:
elements[element.name] = element
else:
is_valid = False
if raises:
raise exceptions.ApiError('WorkSpec must contain uniquely named elements.')
# Check that the specification is complete. There must be at least one source element and all
# dependencies must be fulfilled.
sources = set([element.name for element in get_source_elements(workspec)])
if len(sources) < 1:
is_valid = False
if raises:
raise exceptions.ApiError('WorkSpec must contain at least one source element')
return is_valid
def __enter__(self):
"""Implement Python context manager protocol.
Returns:
runnable session object.
"""
if self._session is not None:
raise exceptions.Error('Already running.')
# The API runner currently has an implicit context.
try:
# launch() with no arguments is deprecated.
# Ref: https://github.com/kassonlab/gmxapi/issues/124
self._session = self.workflow.launch()
except:
self._session = None
raise
return self._session
def __exit__(self, exception_type, exception_value, traceback):
"""Implement Python context manager protocol.
Closing a session should not produce Python exceptions. Instead, exit
state is accessible through API objects like Status.
For evolving design points, see
- https://github.com/kassonlab/gmxapi/issues/41
- https://github.com/kassonlab/gmxapi/issues/121
"""
self._session.close()
self._session = None
return False
class DefaultContext(_libgromacsContext):
""" Produce an appropriate context for the work and compute environment.
Deprecated:
Use gmx.context.get_context() to find an appropriate high-level API
context. For lower-level access to the library that does not employ the
full API Context abstraction, but instead explicitly uses a local
libgromacs instance, a replacement still needs to be devised. It may
have this same interface, but the name and scoping of DefaultContext is
misleading.
"""
def __init__(self, work):
# There is very little context abstraction at this point...
warnings.warn("Behavior of DefaultContext is unspecified starting in gmxapi 0.0.8.", DeprecationWarning)
super(DefaultContext, self).__init__(work)
[docs]class Context(object):
"""Manage an array of simulation work executing in parallel.
This is the first implementation of a new style of Context class that has some extra abstraction
and uses the new WorkSpec idea.
Additional facilities are available to elements of the array members.
* array element corresponding to work in the current sub-context
* "global" resources managed by the ParallelArrayContext
Attributes:
work :obj:`gmx.workflow.WorkSpec`: specification of work to be performed when a session is launched.
rank : numerical index of the current worker in a running session (None if not running)
work_width : Minimum width needed for the parallelism required by the array of work being executed.
elements : dictionary of references to elements of the workflow.
`rank`, `work_width`, and `elements` are empty or None until the work is processed, as during session launch.
Example:
Use ``mpiexec -n 2 python -m mpi4py myscript.py`` to run two jobs at the same time.
In this example the jobs are identical. In myscript.py:
>>> import gmx
>>> import gmx.core
>>> from gmx.data import tpr_filename # Get a test tpr filename
>>> work = gmx.workflow.from_tpr([tpr_filename, tpr_filename])
>>> gmx.run(work)
Example:
>>> import gmx
>>> import gmx.core
>>> from gmx.data import tpr_filename # Get a test tpr filename
>>> work = gmx.workflow.from_tpr([tpr_filename, tpr_filename])
>>> context = gmx.context.get_context(work)
>>> with context as session:
... session.run()
... # The session is one abstraction too low to know what rank it is. It lets the spawning context manage
... # such things.
... # rank = session.rank
... # The local context object knows where it fits in the global array.
... rank = context.rank
... output_path = os.path.join(context.workdir_list[rank], 'traj.trr')
... assert(os.path.exists(output_path))
... print('Worker {} produced {}'.format(rank, output_path))
Implementation notes:
To produce a running session, the Context __enter__() method is called, according to the Python context manager
protocol. At this time, the attached WorkSpec must be feasible on the available resources. To turn the specified
work into an executable directed acyclic graph (DAG), handle objects for the elements in the work spec are sequenced
in dependency-compatible order and the context creates a "builder" for each according to the element's operation.
Each builder is subscribed to the builders of its dependency elements. The DAG is then assembled by calling each
builder in sequence. A builder can add zero, one, or more nodes and edges to the DAG.
The Session is then launched from the DAG. What happens next is implementation-dependent, and it may take a while for
us to decide whether and how to standardize interfaces for the DAG nodes and edges and/or execution protocols. I
expect each node will at least have a `launch()` method, but will also probably have typed input and output ports as well as some signalling.
A sophisticated and abstract Session implementation could schedule work only to satisfy data dependencies of requested
output upon request. Our immediate implementation will use the following protocol.
Each node has a `launch()` method. When the session is entered, the `launch()` method is called for each node in
dependency order. The launch method returns either a callable (`run()` function) or None, raising an exception in
case of an error. The sequence of non-None callables is stored by the Session. When Session.run() is called, the
sequence of callables is called in order. If StopIteration is raised by the callable, it is removed from the sequence.
The sequence is processed repeatedly until there are no more callables.
Note that this does not rigorously handle races or deadlocks, or flexibility in automatically chasing dependencies. A more
thorough implementation could recursively call launch on dependencies (launch could be idempotent or involve some
signalling to dependents when complete), run calls could be entirely event driven, and/or nodes could "publish"
output (including just a completion message), blocking for acknowledgement before looking for the next set of subscribed inputs.
"""
def __init__(self, work=None, workdir_list=None, communicator=None):
"""Create manager for computing resources.
Does not initialize resources because Python objects by themselves do
not have a good way to deinitialize resources. Instead, resources are
initialized using the Python context manager protocol when sessions are
entered and exited.
Appropriate computing resources need to be knowable when the Context is created.
Keyword Arguments:
work : work specification with which to initialize this context
workdir_list : deprecated
communicator : non-owning reference to a multiprocessing communicator
If provided, communicator must implement the mpi4py.MPI.Comm interface. The
Context will use this communicator as the parent for subcommunicators
used when launching sessions. If provided, communicator is owned by the
caller, and must be freed by the caller after any sessions are closed.
By default, the Context will get a reference to MPI_COMM_WORLD, which
will be freed when the Python process ends and cleans up its resources.
The communicator stored by the Context instance will not be used directly,
but will be duplicated when launching sessions using ``with``.
"""
# self.__context_array = list([Context(work_element) for work_element in work])
from gmx.workflow import WorkSpec
# Until better Session abstraction exists at the Python level, a
# _session_communicator attribute will be added to and removed from the
# context at session entry and exit. If necessary, a _session_ensemble_communicator
# will be split from _session_communicator for simulation ensembles
# present in the specified work.
self.__communicator = communicator
self.__work = WorkSpec()
self.__workdir_list = workdir_list
self._session = None
# This may not belong here. Is it confusing for the Context to have both global and local properties?
# Alternatively, maybe a trivial `property` that gets the rank from a bound session, if any.
self.rank = None
# `work_width` notes the required width of an array of synchronous tasks to perform the specified work.
# As work elements are processed, self.work_width will be increased as appropriate.
self.work_width = None
# initialize the operations map. May be extended during the lifetime of a Context.
# Note that there may be a difference between built-in operations provided by this module and
# additional operations registered at run time.
self.__operations = dict()
# The map contains a builder for each operation. The builder is created by passing the element to the function
# in the map. The object returned must have the following methods:
#
# * add_subscriber(another_builder) : allow other builders to subscribe to this one.
# * build(dag) : Fulfill the builder responsibilities by adding an arbitrary number of nodes and edges to a Graph.
#
# The gmxapi namespace of operations should be consistent with a specified universal set of functionalities
self.__operations['gmxapi'] = {'md': lambda element : _md(self, element),
# 'global_data' : shared_data_maker,
}
# Even if TPR file loading were to become a common and stable enough operation to be specified in
# and API, it is unlikely to be implemented by any code outside of GROMACS, so let's not clutter
# a potentially more universal namespace.
self.__operations['gromacs'] = {'load_tpr': lambda element : _load_tpr(self, element),
}
# Right now we are treating workspec elements and work DAG nodes as equivalent, but they are
# emphatically not intended to be tightly coupled. The work specification is intended to be
# simple, user-friendly, general, and easy-to-implement. The DAG is an implementation detail
# and may differ across context types. It is likely to have stronger typing of nodes and/or
# edges. It is not yet specified whether we should translate the work into a graph before, after,
# or during processing of the elements, so it is not yet known whether we will need facilities
# to allow cross-referencing between the two graph-type structures. If we instantiate API objects
# as we process work elements, and the DAG in a context deviates from the work specification
# topology, we would need to use named dependencies to look up objects to bind to. Such facilities
# could be hidden in the WorkElement class(es), too, to delegate code away from the Context as a
# container class growing without bounds...
# In actuality, we will have to process the entire workspec to some degree to make sure we can
# run it on the available resources.
self.elements = None
# This setter must be called after the operations map has been populated.
self.work = work
self._api_object = gmxapi.Context()
@property
def work(self):
return self.__work
@work.setter
def work(self, work):
"""Set `work` attribute.
Raises:
gmx.exceptions.ApiError: work is not compatible with schema or
known operations.
gmx.exceptions.UsageError: Context can not access operations in
the name space given for an Element
gmx.exceptions.ValueError: assignment operation cannot be performed
for the provided object (rhs)
For discussion on error handling, see https://github.com/kassonlab/gmxapi/issues/125
"""
from gmx.workflow import WorkSpec, WorkElement
if work is None:
return
if isinstance(work, WorkSpec):
workspec = work
elif hasattr(work, 'workspec') and isinstance(work.workspec, WorkSpec):
workspec = work.workspec
else:
raise exceptions.ValueError('work argument must provide a gmx.workflow.WorkSpec.')
workspec._context = self
# Make sure this context knows how to run the specified work.
for e in workspec.elements:
element = WorkElement.deserialize(workspec.elements[e])
# Note: Non-built-in namespaces (non-native) are treated as modules to import.
# Native namespaces may not be completely implemented in a particular version of a particular Context.
if element.namespace in {'gmxapi', 'gromacs'}:
assert element.namespace in self.__operations
if not element.operation in self.__operations[element.namespace]:
# The requested element is a built-in operation but not available in this Context.
# element.namespace should be mapped, but not all operations are necessarily implemented.
logger.error("Operation {} not found in map {}".format(element.operation,
str(self.__operations)))
# This check should be performed when deciding if the context is appropriate for the work.
# If we are just going to use a try/catch block for this test, then we should differentiate
# this exception from those raised due to incorrect usage.
# The exception thrown here may evolve with https://github.com/kassonlab/gmxapi/issues/125
raise exceptions.FeatureNotAvailableError(
'Specified work cannot be performed due to unimplemented operation {}.{}.'.format(
element.namespace,
element.operation))
else:
assert element.namespace not in {'gmxapi', 'gromacs'}
# Don't leave an empty nested dictionary if we couldn't map the operation.
if element.namespace in self.__operations:
namespace_map = self.__operations[element.namespace]
else:
namespace_map = dict()
# Set or update namespace map iff we have something new.
if element.operation not in namespace_map:
try:
element_module = importlib.import_module(element.namespace)
element_operation = getattr(element_module, element.operation)
except ImportError as e:
raise exceptions.UsageError(
'Cannot find implementation for namespace {}. ImportError: {}'.format(
element.namespace,
str(e)))
except AttributeError:
raise exceptions.UsageError(
'Cannot find factory for operation {}.{}'.format(
element.namespace,
element.operation
)
)
namespace_map[element.operation] = element_operation
self.__operations[element.namespace] = namespace_map
self.__work = workspec
[docs] def add_operation(self, namespace, operation, get_builder):
"""Add a builder factory to the operation map.
Extends the known operations of the Context by mapping an operation in a namespace to a function
that returns a builder to process a work element referencing the operation. Must be called before
the work specification is added, since the spec is inspected to confirm that the Context can run it.
It may be more appropriate to add this functionality to the Context constructor or as auxiliary
information in the workspec, or to remove it entirely; it is straight-forward to just add snippets
of code to additional files in the working directory and to make them available as modules for the
Context to import.
Example:
>>> # Import some custom extension code.
>>> import myplugin
>>> myelement = myplugin.new_element()
>>> workspec = gmx.workflow.WorkSpec()
>>> workspec.add_element(myelement)
>>> context = gmx.context.ParallelArrayContext()
>>> context.add_operation(myelement.namespace, myelement.operation, myplugin.element_translator)
>>> context.work = workspec
>>> with get_context() as session:
... session.run()
"""
if namespace not in self.__operations:
if namespace in {'gmxapi', 'gromacs'}:
raise exceptions.UsageError("Cannot add operations to built-in namespaces.")
else:
self.__operations[namespace] = dict()
else:
assert namespace in self.__operations
if operation in self.__operations[namespace]:
raise exceptions.UsageError("Operation {}.{} already defined in this context.".format(namespace, operation))
else:
self.__operations[namespace][operation] = get_builder
# Set up a simple ensemble resource
# This should be implemented for Session, not Context, and use an appropriate subcommunicator
# that is created and freed as the Session launches and exits.
[docs] def ensemble_update(self, send, recv, tag=None):
"""Implement the ensemble_update member function that gmxapi through 0.0.6 expects.
"""
# gmxapi through 0.0.6 expects to bind to this member function during "build".
# This behavior needs to be deprecated (bind during launch, instead), but this
# dispatching function should be an effective placeholder.
if tag is None or str(tag) == '':
raise exceptions.ApiError("ensemble_update must be called with a name tag.")
# __ensemble_update is an attribute, not an instance function, so we need to explicitly pass 'self'
return self.__ensemble_update(self, send, recv, tag)
def __enter__(self):
"""Implement Python context manager protocol, producing a Session for the specified work in this Context.
Returns:
Session object that can be run and/or inspected.
Additional API operations are possible while the Session is active. When used as a Python context manager,
the Context will close the Session at the end of the `with` block by calling `__exit__`.
Note: this is probably where we will have to process the work specification to determine whether we
have appropriate resources (such as sufficiently wide parallelism). Until we have a better Session
abstraction, this means the clean approach should take two passes to first build a DAG and then
instantiate objects to perform the work. In the first implementation, we kind of muddle things into
a single pass.
"""
try:
import networkx as nx
from networkx import DiGraph as _Graph
except ImportError:
raise exceptions.FeatureNotAvailableError("gmx requires the networkx package to execute work graphs.")
# Cache the working directory from which we were launched so that __exit__() can give us proper context
# management behavior.
self.__initial_cwd = os.getcwd()
logger.debug("Launching session from {}".format(self.__initial_cwd))
if self._session is not None:
raise exceptions.Error('Already running.')
if self.work is None:
raise exceptions.UsageError('No work to perform!')
# Set up the global and local context.
# Check the global MPI configuration
# Since the Context doesn't have a destructor, if we use an MPI communicator at this scope then
# it has to be owned and managed outside of Context.
self._session_communicator = _acquire_communicator(self.__communicator)
context_comm_size = self._session_communicator.Get_size()
context_rank = self._session_communicator.Get_rank()
self.rank = context_rank
# self._communicator = communicator
logger.debug("Context rank {} in context {} of size {}".format(context_rank,
self._session_communicator,
context_comm_size))
###
# Process the work specification.
###
logger.debug("Processing workspec:\n{}".format(str(self.work)))
# Get a builder for DAG components for each element
builders = {}
builder_sequence = []
for element in self.work:
# dispatch builders for operation implementations
try:
new_builder = self.__operations[element.namespace][element.operation](element)
assert hasattr(new_builder, 'add_subscriber')
assert hasattr(new_builder, 'build')
logger.info("Collected builder for {}".format(element.name))
except LookupError as e:
request = '.'.join([element.namespace, element.operation])
message = 'Could not find an implementation for the specified operation: {}. '.format(request)
message += str(e)
raise exceptions.ApiError(message)
# Subscribing builders is the Context's responsibility because otherwise the builders
# don't know about each other. Builders should not depend on the Context unless they
# are a facility provided by the Context, in which case they may be member functions
# of the Context. We will probably need to pass at least some
# of the Session to the `launch()` method, though...
dependencies = element.depends
for dependency in dependencies:
# If a dependency is a list, assume it is an "ensemble" of dependencies
# and pick the element for corresponding to the local rank.
if isinstance(dependency, (list, tuple)):
assert len(dependency) > context_rank
name = str(dependency[context_rank])
else:
name = dependency
logger.info("Subscribing {} to {}.".format(element.name, name))
builders[name].add_subscriber(new_builder)
builders[element.name] = new_builder
builder_sequence.append(element.name)
# Call the builders in dependency order
# Note: session_communicator is available, but ensemble_communicator has not been created yet.
graph = _Graph(width=1)
logger.info("Building sequence {}".format(builder_sequence))
for name in builder_sequence:
builder = builders[name]
logger.info("Building {}".format(builder))
logger.debug("Has build attribute {}.".format(builder.build))
builder.build(graph)
self.work_width = graph.graph['width']
# Prepare working directories. This should probably be moved to some aspect of the Session and either
# removed from here or made more explicit to the user.
workdir_list = self.__workdir_list
if workdir_list is None:
workdir_list = [os.path.join('.', str(i)) for i in range(self.work_width)]
self.__workdir_list = list([os.path.abspath(dir) for dir in workdir_list])
# For gmxapi 0.0.6, all ranks have a session_ensemble_communicator
self._session_ensemble_communicator = _get_ensemble_communicator(self._session_communicator, self.work_width)
self.__ensemble_update = _get_ensemble_update(self)
# launch() is currently a method of gmx.core.MDSystem and returns a gmxapi::Session.
# MDSystem objects are obtained from gmx.core.from_tpr(). They also provide add_potential().
# gmxapi::Session objects are exposed as gmx.core.MDSession and provide run() and close() methods.
#
# Here, I want to find the input appropriate for this rank and get an MDSession for it.
# E.g. Make a pass that allows meta-objects to bind (setting md_proxy._input_tpr and md_proxy._plugins,
# and then call a routine implemented by each object to run whatever protocol it needs, such
# as `system = gmx.core.from_tpr(md._input_tpr); system.add_potential(md._plugins)
# For future design plans, reference https://github.com/kassonlab/gmxapi/issues/65
#
# This `if` condition is currently the thing that ultimately determines whether the
# rank attempts to do work.
if context_rank < self.work_width:
# print(graph)
logger.debug(("Launching graph {}.".format(graph.graph)))
logger.debug("Graph nodes: {}".format(str(list(graph.nodes))))
logger.debug("Graph edges: {}".format(str(list(graph.edges))))
logger.info("Launching work on context rank {}, subcommunicator rank {}.".format(
self.rank,
self._session_ensemble_communicator.Get_rank()))
# Launch the work for this rank
self.workdir = self.__workdir_list[self.rank]
if os.path.exists(self.workdir):
if not os.path.isdir(self.workdir):
raise exceptions.FileError('{} is not a valid working directory.'.format(self.workdir))
else:
os.mkdir(self.workdir)
os.chdir(self.workdir)
logger.info('rank {} changed directory to {}'.format(self.rank, self.workdir))
sorted_nodes = nx.topological_sort(graph)
runners = []
closers = []
for name in sorted_nodes:
launcher = graph.nodes[name]['launch']
runner = launcher(self.rank)
if not runner is None:
runners.append(runner)
closers.append(graph.nodes[name]['close'])
# Get a session object to return. It must simply provide a `run()` function.
context = self # Part of workaround for bug gmxapi-214
class Session(object):
def __init__(self, runners, closers):
self.runners = list(runners)
self.closers = list(closers)
def run(self):
# Note we are not following the documented protocol of running repeatedly yet.
to_be_deleted = []
for i, runner in enumerate(self.runners):
try:
runner()
except StopIteration:
to_be_deleted.insert(0, i)
for i in to_be_deleted:
del self.runners[i]
return True
def close(self):
for close in self.closers:
logger.debug("Closing node: {}".format(close))
close()
# Workaround for bug gmxapi-214
if not gmxapi.has_feature('0.0.7-bugfix-https://github.com/kassonlab/gmxapi/issues/214'):
context._api_object = gmxapi.Context()
self._session = Session(runners, closers)
else:
logger.info("Context rank {} has no work to do".format(self.rank))
context = self # Part of workaround for bug gmxapi-214
class NullSession(object):
def run(self):
logger.info("Running null session on rank {}.".format(self.rank))
return status.Status()
def close(self):
logger.info("Closing null session.")
# Workaround for bug gmxapi-214
if not gmxapi.has_feature('0.0.7-bugfix-https://github.com/kassonlab/gmxapi/issues/214'):
context._api_object = gmxapi.Context()
return
self._session = NullSession()
self._session.rank = self.rank
# Make sure session has started on all ranks before continuing?
self._session.graph = graph
return self._session
def __exit__(self, exception_type, exception_value, traceback):
"""Implement Python context manager protocol."""
logger.info("Exiting session on context rank {}.".format(self.rank))
if self._session is not None:
logger.info("Calling session.close().")
self._session.close()
self._session = None
else:
# Note: we should not have a None session but rather an API-compliant Session that just has no work.
# Reference: https://github.com/kassonlab/gmxapi/issues/41
logger.info("No _session known to context or session already closed.")
if hasattr(self, '_session_ensemble_communicator'):
if self._session_communicator is not None:
logger.info("Freeing sub-communicator {} on rank {}".format(
self._session_ensemble_communicator,
self.rank))
self._session_ensemble_communicator.Free()
else:
logger.debug('"None" ensemble communicator does not need to be "Free"d.')
del self._session_ensemble_communicator
else:
logger.debug("No ensemble subcommunicator on context rank {}.".format(self.rank))
logger.debug("Freeing session communicator.")
self._session_communicator.Free()
logger.debug("Deleting session communicator reference.")
del self._session_communicator
os.chdir(self.__initial_cwd)
logger.info("Session closed on context rank {}.".format(self.rank))
# Note: Since sessions running in different processes can have different work, sessions have not necessarily
# ended on all ranks. As a result, starting another session on the same resources could block until the
# resources are available.
# Python context managers return False when there were no exceptions to handle.
return False
# The interface and functionality of ParallelArrayContext is the new generic
# Context behavior, but we need to keep the old name for compatibility for
# the moment.
ParallelArrayContext = Context
[docs]def get_context(work=None):
"""Get a concrete Context object.
Args:
work (gmx.workflow.WorkSpec): runnable work as a valid gmx.workflow.WorkSpec object
Returns:
An object implementing the :py:class:`gmx.context.Context` interface, if possible.
Raises:
gmx.exceptions.ValueError if an appropriate context for ``work`` could not be loaded.
If work is provided, return a Context object capable of running the provided work or produce an error.
The semantics for finding Context implementations needs more consideration, and a more informative exception
is likely possible.
A Context can run the provided work if
* the Context supports can resolve all operations specified in the elements
* the Context supports DAG topologies implied by the network of dependencies
* the Context supports features required by the elements with the specified parameters,
such as synchronous array jobs.
"""
# We need to define an interface for WorkSpec objects so that we don't need
# to rely on typing and inter-module dependencies.
from gmx import workflow
workspec = None
if work is not None:
if isinstance(work, workflow.WorkSpec):
workspec = work
elif hasattr(work, 'workspec') and isinstance(work.workspec,
workflow.WorkSpec):
workspec = work.workspec
else:
raise exceptions.ValueError('work argument must provide a gmx.workflow.WorkSpec.')
if workspec is not None and \
hasattr(workspec, '_context') and \
workspec._context is not None:
context = workspec._context
else:
context = Context(work=workspec)
return context