MPI¶
Utilities to run on MPI.
Provide functions and decorators that simplify running the same code on multiple nodes. One benefit is that serial and parallel code is exactly the same.
Global variables¶
- disable_mpi : bool
- Set this to True to force running serially.
Routines¶
get_mpicomm()
- Automatically detect and configure MPI execution and return an MPI communicator.
run_single_node()
- Run a task on a single node.
on_single_node()
- Decorator version of
run_single_node()
. distribute()
- Map a task on a sequence of arguments on all the nodes.
delay_termination()
- A context manager to delay the response to termination signals.
delayed_termination()
- A decorator version of
delay_termination()
.
-
yank.mpi.
get_mpicomm
()[source]¶ Retrieve the MPI communicator for this execution.
The function automatically detects if the program runs on MPI by checking specific environment variables set by various MPI implementations. On first execution, it modifies sys.excepthook and register a handler for SIGINT, SIGTERM, SIGABRT to call MPI’s
Abort()
to correctly terminate all processes.Returns: mpicomm : mpi4py communicator or None
The communicator for this node, None if the program doesn’t run with MPI.
-
yank.mpi.
run_single_node
(rank, task, *args, **kwargs)[source]¶ Run task on a single node.
If MPI is not activated, this simply runs locally.
Parameters: task : callable
The task to run on node rank.
rank : int
The rank of the MPI communicator that must execute the task.
broadcast_result : bool, optional
If True, the result is broadcasted to all nodes. If False, only the node executing the task will receive the return value of the task, and all other nodes will receive None (default is False).
sync_nodes : bool, optional
If True, the nodes will be synchronized at the end of the execution (i.e. the task will be blocking) even if the result is not broadcasted (default is False).
Returns: result
The return value of the task. This will be None on all nodes that is not the rank unless
broadcast_result
is set to True.Other Parameters: args
The ordered arguments to pass to task.
kwargs
The keyword arguments to pass to task.
Examples
>>> def add(a, b): ... return a + b >>> # Run 3+4 on node 0. >>> run_single_node(0, task=add, a=3, b=4, broadcast_result=True) 7
-
yank.mpi.
on_single_node
(rank, broadcast_result=False, sync_nodes=False)[source]¶ A decorator version of run_single_node.
Decorates a function to be always executed with
run_single_node()
.Parameters: rank : int
The rank of the MPI communicator that must execute the task.
broadcast_result : bool, optional
If True the result is broadcasted to all nodes. If False, only the node executing the function will receive its return value, and all other nodes will receive None (default is False).
sync_nodes : bool, optional
If True, the nodes will be synchronized at the end of the execution (i.e. the task will be blocking) even if the result is not broadcasted (default is False).
See also
Examples
>>> @on_single_node(rank=0, broadcast_result=True) ... def add(a, b): ... return a + b >>> add(3, 4) 7
-
yank.mpi.
distribute
(task, distributed_args, *other_args, send_results_to='all', propagate_exceptions_to='all', sync_nodes=False, group_size=None, **kwargs)[source]¶ Map the task on a sequence of arguments to be executed on different nodes.
If MPI is not activated, this simply runs serially on this node. The algorithm guarantees that each node will be assigned to the same job_id (i.e. the index of the argument in
distributed_args
) every time.Parameters: task : callable
The task to be distributed among nodes. The task will be called as
task(distributed_args[job_id], *other_args, **kwargs)
, so the parameter to be distributed must the the first one.distributed_args : iterable
The sequence of the parameters to distribute among nodes.
send_results_to : int, ‘all’, or None, optional
If the string ‘all’, the result will be sent to all nodes. If an int, the result will be send only to the node with rank
send_results_to
. The return value of distribute depends on the value of this parameter (default is None).propagate_exceptions_to : ‘all’, ‘group’, or None, optional
When one of the processes raise an exception during the task execution, this controls which other processes raise it (default is ‘all’). This can be ‘group’ or None only if
send_results_to
is None.sync_nodes : bool, optional
If True, the nodes will be synchronized at the end of the execution (i.e. the task will be blocking) even if the result is not shared (default is False).
group_size : None, int or list of int, optional, default is None
If not None, the
distributed_args
are distributed among groups of nodes that are isolated from each other. This is particularly useful iftask
also callsdistribute()
, since normally that would result in unexpected behavior. If an integer, the nodes are split into equal groups ofgroup_size
nodes. If a list of integers, the nodes are split in possibly unequal groups (see example below).Returns: all_results : list
All the return values for all the arguments if the results where sent to the node, or only the return values of the arguments processed by this node otherwise.
arg_indices : list of int, optional
This is returned as part of a tuple
(all_results, job_indices)
only ifsend_results_to
is set to an int or None. In this caseall_results[i]
is the return value oftask(all_args[arg_indices[i]])
.Other Parameters: other_args
Other parameters to pass to task beside the assigned distributed parameters.
kwargs
Keyword arguments to pass to task beside the assigned distributed parameters.
Examples
>>> def square(x): ... return x**2 >>> distribute(square, [1, 2, 3, 4], send_results_to='all') [1, 4, 9, 16]
When send_results_to is not set to all, the return value include also the indices of the arguments associated to the result.
>>> distribute(square, [1, 2, 3, 4], send_results_to=0) ([1, 4, 9, 16], [0, 1, 2, 3])
Divide the nodes in two groups of 2. The task, in turn, can distribute another task among the nodes in its own group.
>>> def supertask(list_of_bases): ... return distribute(square, list_of_bases, send_results_to='all') >>> list_of_supertask_args = [[1, 2, 3], [4], [5, 6]] >>> distribute(supertask, distributed_args=list_of_supertask_args, ... send_results_to='all', group_size=2) [[1, 4, 9], [16], [25, 36]]
-
yank.mpi.
delay_termination
()[source]¶ Context manager to delay handling of termination signals.
This allows to avoid interrupting tasks such as writing to the file system, which could result in the corruption of the file.
-
yank.mpi.
delayed_termination
(func)[source]¶ Decorator that runs the function with
delay_termination()
.