sciris.sc_parallel

Parallelization functions, allowing multiprocessing to be used simply.

NB: Uses multiprocess instead of multiprocessing under the hood for broadest support across platforms (e.g. Jupyter notebooks).

Highlights:

Functions

parallel_progress

Run a function in parallel with a optional single progress bar

parallelcmd

A function to parallelize any block of code.

parallelize

Execute a function in parallel.

Classes

TaskArgs

A class to hold the arguments for the parallel task -- not to be invoked by the user.

parallelize(func, iterarg=None, iterkwargs=None, args=None, kwargs=None, ncpus=None, maxcpu=None, maxmem=None, interval=None, parallelizer=None, serial=False, returnpool=False, die=False, **func_kwargs)[source]

Execute a function in parallel.

Most simply, sc.parallelize() acts as an shortcut for using ProcessPoolExecutor(). However, it also provides flexibility in how arguments are passed to the function, load balancing, etc.

Either or both of iterarg or iterkwargs can be used. iterarg can be an iterable or an integer; if the latter, it will run the function that number of times and not pass the argument to the function (which may be useful for running “embarrassingly parallel” simulations). iterkwargs is a dict of iterables; each iterable must be the same length (and the same length of iterarg, if it exists), and each dict key will be used as a kwarg to the called function. Any other kwargs passed to sc.parallelize() will also be passed to the function.

This function can either use a fixed number of CPUs or allocate dynamically based on load. If ncpus is None and maxcpu is None, then it will use the number of CPUs returned by multiprocessing; if ncpus is not None, it will use the specified number of CPUs; if ncpus is None and maxcpu is not None, it will allocate the number of CPUs dynamically.

Note: the default parallelizer "multiprocess" uses dill for pickling, so is the most versatile (e.g., it can pickle non-top-level functions). However, it is also the slowest for passing large amounts of data. You can switch between these with parallelizer='fast' (concurrent.futures) and parallelizer='robust' (multiprocess).

Parameters
  • func (func) – the function to parallelize

  • iterarg (list) – the variable(s) to provide to each process (see examples below)

  • iterkwargs (dict) – another way of providing variables to each process (see examples below)

  • args (list) – positional arguments for each process, the same for all processes

  • kwargs (dict) – keyword arguments for each process, the same for all processes

  • ncpus (int/float) – number of CPUs to use (if <1, treat as a fraction of the total available; if None, use loadbalancer)

  • maxcpu (float) – maximum CPU load; otherwise, delay the start of the next process (not used if ncpus is specified)

  • maxmem (float) – maximum fraction of virtual memory (RAM); otherwise, delay the start of the next process

  • interval (float) – number of seconds to pause between starting processes for checking load

  • parallelizer (str/func) – parallelization function; default ‘multiprocess’ (other choices are ‘concurrent.futures’, ‘multiprocessing’, or user-supplied; see example below)

  • serial (bool) – whether to skip parallelization run in serial (useful for debugging)

  • returnpool (bool) – whether to return the process pool as well as the results

  • die (bool) – whether to stop immediately if an exception is encountered (otherwise, try ‘multiprocess’)

  • func_kwargs (dict) – merged with kwargs (see above)

Returns

List of outputs from each process

Example 1 – simple usage as a shortcut to multiprocessing.map():

def f(x):
    return x*x

results = sc.parallelize(f, [1,2,3])

Example 2 – simple usage for “embarrassingly parallel” processing:

import numpy as np

def rnd():
    np.random.seed()
    return np.random.random()

results = sc.parallelize(rnd, 10, ncpus=4)

Example 3 – three different equivalent ways to use multiple arguments:

def f(x,y):
    return x*y

results1 = sc.parallelize(func=f, iterarg=[(1,2),(2,3),(3,4)])
results2 = sc.parallelize(func=f, iterkwargs={'x':[1,2,3], 'y':[2,3,4]})
results3 = sc.parallelize(func=f, iterkwargs=[{'x':1, 'y':2}, {'x':2, 'y':3}, {'x':3, 'y':4}])
assert results1 == results2 == results3

Example 4 – using non-iterated arguments and dynamic load balancing:

def myfunc(i, x, y):
    np.random.seed()
    xy = [x+i*np.random.randn(100), y+i*np.random.randn(100)]
    return xy

xylist1 = sc.parallelize(myfunc, kwargs={'x':3, 'y':8}, iterarg=range(5), maxcpu=0.8, interval=0.2) # Use kwargs dict
xylist2 = sc.parallelize(myfunc, x=5, y=10, iterarg=[0,1,2], parallelizer='multiprocessing') # Supply kwargs directly and use a different parallelizer

for p,xylist in enumerate([xylist1, xylist2]):
    pl.subplot(2,1,p+1)
    for i,xy in enumerate(reversed(xylist)):
        pl.scatter(xy[0], xy[1], label='Run %i'%i)
    pl.legend()

Example 5 – using a custom parallelization function:

def f(x,y):
    return [x]*y

import multiprocessing as mp
pool = mp.Pool(processes=2)
results = sc.parallelize(f, iterkwargs=dict(x=[1,2,3], y=[4,5,6]), parallelizer=pool.map) # Note: parallelizer is pool.map, not pool

Note: to use on Windows, parallel calls must contained with an if __name__ == '__main__' block.

For example:

import sciris as sc

def f(x,y):
    return x*y

if __name__ == '__main__':
    results = sc.parallelize(func=f, iterarg=[(1,2),(2,3),(3,4)])
    print(results)
New in version 1.1.1: “serial” argument.
New in version 2.0.0: changed default parallelizer from multiprocess.Pool to concurrent.futures.ProcessPoolExecutor;

replaced maxload with maxcpu/maxmem; added returnpool argument | New in version 2.0.4: added “die” argument; changed exception handling

parallelcmd(cmd=None, parfor=None, returnval=None, maxcpu=None, maxmem=None, interval=None, **kwargs)[source]

A function to parallelize any block of code. Note: this is intended for quick prototyping; since it uses exec(), it is not recommended for use in production code.

Parameters
  • cmd (str) – a string representation of the code to be run in parallel

  • parfor (dict) – a dictionary of lists of the variables to loop over

  • returnval (str) – the name of the output variable

  • maxcpu (float) – maximum CPU load; used by sc.loadbalancer()

  • maxmem (float) – maximum fraction of virtual memory (RAM); used by sc.loadbalancer()

  • interval (float) – the time delay to poll to see if load is OK, used in sc.loadbalancer()

  • kwargs (dict) – variables to pass into the code

Example:

const = 4
parfor = {'val':[3,5,9]}
returnval = 'result'
cmd = """
newval = val+const
result = newval**2
"""
results = sc.parallelcmd(cmd=cmd, parfor=parfor, returnval=returnval, const=const)

New in version 2.0.0: replaced maxload with maxcpu/maxmem; automatically de-indent the command

parallel_progress(fcn, inputs, num_workers=None, show_progress=True, initializer=None)[source]

Run a function in parallel with a optional single progress bar

The result is essentially equivalent to:

>>> list(map(fcn, inputs))

But with execution in parallel and with a single progress bar being shown.

Parameters
  • fcn (function) – Function object to call, accepting one argument, OR a function with zero arguments in which case inputs should be an integer

  • inputs (list) – A collection of inputs that will each be passed to the function OR a number, if the fcn() has no input arguments

  • num_workers (int) – Number of processes, defaults to the number of CPUs

  • show_progress (bool) – Whether to show a progress bar

  • initializer (func) – A function that each worker process will call when it starts

Returns

A list of outputs

New in version 1.0.0.