parallelize#

parallelize(func, iterarg=None, iterkwargs=None, args=None, kwargs=None, ncpus=None, maxcpu=None, maxmem=None, interval=None, parallelizer=None, serial=False, progress=False, callback=None, globaldict=None, die=True, **func_kwargs)[source]#

Execute a function in parallel.

Most simply, sc.parallelize() acts as a shortcut for using pool.map. However, it also provides flexibility in how arguments are passed to the function, load balancing, etc.

Either or both of iterarg or iterkwargs can be used. iterarg can be an iterable or an integer; if the latter, it will run the function that number of times and not pass the argument to the function (which may be useful for running “embarrassingly parallel” simulations). iterkwargs is a dict of iterables; each iterable must be the same length (and the same length of iterarg, if it exists), and each dict key will be used as a kwarg to the called function. Any other kwargs passed to sc.parallelize() will also be passed to the function.

This function can either use a fixed number of CPUs or allocate dynamically based on load. If ncpus is None, then it will allocate the number of CPUs dynamically. Memory (maxmem) and CPU load (maxcpu) limits can also be specified.

Parameters:
  • func (func) – the function to parallelize

  • iterarg (list) – the variable(s) to provide to each process (see examples below)

  • iterkwargs (dict) – another way of providing variables to each process (see examples below)

  • args (list) – positional arguments for each process, the same for all processes

  • kwargs (dict) – keyword arguments for each process, the same for all processes

  • ncpus (int/float) – number of CPUs to use (if <1, treat as a fraction of the total available; if None, use loadbalancer)

  • maxcpu (float) – maximum CPU load; otherwise, delay the start of the next process (not used if ncpus is specified)

  • maxmem (float) – maximum fraction of virtual memory (RAM); otherwise, delay the start of the next process

  • interval (float) – number of seconds to pause between starting processes for checking load

  • parallelizer (str/func) – parallelization function; default ‘multiprocess’ (see below for details)

  • serial (bool) – whether to skip parallelization and run in serial (useful for debugging; equivalent to parallelizer='serial')

  • progress (bool) – whether to show a progress bar

  • callback (func) – an optional function to call from each worker

  • globaldict (dict) – an optional global dictionary to pass to each worker via the kwarg “globaldict” (note: may not update properly with low task latency)

  • die (bool) – whether to stop immediately if an exception is encountered (otherwise, store the exception as the result)

  • func_kwargs (dict) – merged with kwargs (see above)

Returns:

List of outputs from each process

Example 1 – simple usage as a shortcut to multiprocess.map():

def f(x):
    return x*x

results = sc.parallelize(f, [1,2,3])

Example 2 – simple usage for “embarrassingly parallel” processing:

import numpy as np

def rnd():
    np.random.seed()
    return np.random.random()

results = sc.parallelize(rnd, 10, ncpus=4)

Example 3 – three different equivalent ways to use multiple arguments:

def f(x,y):
    return x*y

results1 = sc.parallelize(func=f, iterarg=[(1,2),(2,3),(3,4)])
results2 = sc.parallelize(func=f, iterkwargs={'x':[1,2,3], 'y':[2,3,4]})
results3 = sc.parallelize(func=f, iterkwargs=[{'x':1, 'y':2}, {'x':2, 'y':3}, {'x':3, 'y':4}])
assert results1 == results2 == results3

Example 4 – using non-iterated arguments and dynamic load balancing:

def myfunc(i, x, y):
    np.random.seed()
    xy = [x+i*np.random.randn(100), y+i*np.random.randn(100)]
    return xy

xylist1 = sc.parallelize(myfunc, iterarg=range(5), kwargs={'x':3, 'y':8}, maxcpu=0.8, interval=0.2) # Use kwargs dict
xylist2 = sc.parallelize(myfunc, x=5, y=10, iterarg=[0,1,2], parallelizer='multiprocessing') # Supply kwargs directly and use a different parallelizer

for p,xylist in enumerate([xylist1, xylist2]):
    pl.subplot(2,1,p+1)
    for i,xy in enumerate(reversed(xylist)):
        pl.scatter(xy[0], xy[1], label='Run %i'%i)
    pl.legend()

Example 5 – using a custom parallelization function:

def f(x,y):
    return [x]*y

import multiprocessing as mp
pool = mp.Pool(processes=2)
results = sc.parallelize(f, iterkwargs=dict(x=[1,2,3], y=[4,5,6]), parallelizer=pool.map) # Note: parallelizer is pool.map, not pool

Example 6 – using Sciris as an interface to Dask:

def f(x,y):
    return [x]*y

def dask_map(task, argslist):
    import dask
    queued = [dask.delayed(task)(args) for args in argslist]
    return list(dask.compute(*queued))

results = sc.parallelize(f, iterkwargs=dict(x=[1,2,3], y=[4,5,6]), parallelizer=dask_map)

Note 1: the default parallelizer "multiprocess" uses dill for pickling, so is the most versatile (e.g., it can pickle non-top-level functions). However, it is also the slowest for passing large amounts of data. You can switch between these with parallelizer='fast' (concurrent.futures) and parallelizer='robust' (multiprocess).

The parallelizer argument allows a wide range of different parallelizers (including different aliases for each), and also supports user-supplied ones. Note that in most cases, the default parallelizer will suffice. However, the full list of options is:

  • None, 'default', 'robust', 'multiprocess': the slow but robust dill-based parallelizer multiprocess

  • 'fast', 'concurrent', 'concurrent.futures': the faster but more fragile pickle-based Python-default parallelizer concurrent.futures

  • 'multiprocessing': the previous pickle-based Python default parallelizer, multiprocessing

  • 'serial', 'serial-copy': no parallelization (single-threaded); with “-copy”, force pickling

  • 'thread'’, 'threadpool'’, 'thread-copy'’: thread- rather than process-based parallelization (“-copy” as above)

  • User supplied: any map()-like function that takes in a function and an argument list

Note 2: If parallelizing figure generation, use a non-interactive backend, or make sure (a) figure is closed inside the function call, and (b) the figure object is not returned. Otherwise, parallelization won’t increase speed (and might even be slower than serial!).

Note 3: to use on Windows, parallel calls must contained with an if __name__ == '__main__' block.

For example:

import sciris as sc

def f(x,y):
    return x*y

if __name__ == '__main__':
    results = sc.parallelize(func=f, iterarg=[(1,2),(2,3),(3,4)])
    print(results)
New in version 1.1.1: “serial” argument
New in version 2.0.0: changed default parallelizer from multiprocess.Pool to concurrent.futures.ProcessPoolExecutor; replaced maxload with maxcpu/maxmem; added returnpool argument
New in version 2.0.4: added “die” argument; changed exception handling
New in version 3.0.0: new Parallel class; propagated “die” to jobs
New in version 3.1.0: new “globaldict” argument