parallelize#
- class parallelize(func, iterarg=None, iterkwargs=None, args=None, kwargs=None, ncpus=None, maxcpu=None, maxmem=None, interval=None, parallelizer=None, serial=False, progress=False, callback=None, die=True, **func_kwargs)[source]#
Execute a function in parallel.
Most simply,
sc.parallelize()
acts as a shortcut for usingpool.map
. However, it also provides flexibility in how arguments are passed to the function, load balancing, etc.Either or both of
iterarg
oriterkwargs
can be used.iterarg
can be an iterable or an integer; if the latter, it will run the function that number of times and not pass the argument to the function (which may be useful for running “embarrassingly parallel” simulations).iterkwargs
is a dict of iterables; each iterable must be the same length (and the same length ofiterarg
, if it exists), and each dict key will be used as a kwarg to the called function. Any other kwargs passed tosc.parallelize()
will also be passed to the function.This function can either use a fixed number of CPUs or allocate dynamically based on load. If
ncpus
isNone
, then it will allocate the number of CPUs dynamically. Memory (maxmem
) and CPU load (maxcpu
) limits can also be specified.- Parameters:
func (func) – the function to parallelize
iterarg (list) – the variable(s) to provide to each process (see examples below)
iterkwargs (dict) – another way of providing variables to each process (see examples below)
args (list) – positional arguments for each process, the same for all processes
kwargs (dict) – keyword arguments for each process, the same for all processes
ncpus (int/float) – number of CPUs to use (if <1, treat as a fraction of the total available; if None, use loadbalancer)
maxcpu (float) – maximum CPU load; otherwise, delay the start of the next process (not used if
ncpus
is specified)maxmem (float) – maximum fraction of virtual memory (RAM); otherwise, delay the start of the next process
interval (float) – number of seconds to pause between starting processes for checking load
parallelizer (str/func) – parallelization function; default ‘multiprocess’ (see below for details)
serial (bool) – whether to skip parallelization and run in serial (useful for debugging; equivalent to
parallelizer='serial'
)progress (bool) – whether to show a progress bar
callback (func) – an optional function to call from each worker
die (bool) – whether to stop immediately if an exception is encountered (otherwise, store the exception as the result)
func_kwargs (dict) – merged with kwargs (see above)
- Returns:
List of outputs from each process
Example 1 – simple usage as a shortcut to multiprocess.map():
def f(x): return x*x results = sc.parallelize(f, [1,2,3])
Example 2 – simple usage for “embarrassingly parallel” processing:
import numpy as np def rnd(): np.random.seed() return np.random.random() results = sc.parallelize(rnd, 10, ncpus=4)
Example 3 – three different equivalent ways to use multiple arguments:
def f(x,y): return x*y results1 = sc.parallelize(func=f, iterarg=[(1,2),(2,3),(3,4)]) results2 = sc.parallelize(func=f, iterkwargs={'x':[1,2,3], 'y':[2,3,4]}) results3 = sc.parallelize(func=f, iterkwargs=[{'x':1, 'y':2}, {'x':2, 'y':3}, {'x':3, 'y':4}]) assert results1 == results2 == results3
Example 4 – using non-iterated arguments and dynamic load balancing:
def myfunc(i, x, y): np.random.seed() xy = [x+i*np.random.randn(100), y+i*np.random.randn(100)] return xy xylist1 = sc.parallelize(myfunc, iterarg=range(5), kwargs={'x':3, 'y':8}, maxcpu=0.8, interval=0.2) # Use kwargs dict xylist2 = sc.parallelize(myfunc, x=5, y=10, iterarg=[0,1,2], parallelizer='multiprocessing') # Supply kwargs directly and use a different parallelizer for p,xylist in enumerate([xylist1, xylist2]): pl.subplot(2,1,p+1) for i,xy in enumerate(reversed(xylist)): pl.scatter(xy[0], xy[1], label='Run %i'%i) pl.legend()
Example 5 – using a custom parallelization function:
def f(x,y): return [x]*y import multiprocessing as mp pool = mp.Pool(processes=2) results = sc.parallelize(f, iterkwargs=dict(x=[1,2,3], y=[4,5,6]), parallelizer=pool.map) # Note: parallelizer is pool.map, not pool
Example 6 – using Sciris as an interface to Dask:
def f(x,y): return [x]*y def dask_map(task, argslist): import dask queued = [dask.delayed(task)(args) for args in argslist] return list(dask.compute(*queued)) results = sc.parallelize(f, iterkwargs=dict(x=[1,2,3], y=[4,5,6]), parallelizer=dask_map)
Note 1: the default parallelizer
"multiprocess"
usesdill
for pickling, so is the most versatile (e.g., it can pickle non-top-level functions). However, it is also the slowest for passing large amounts of data. You can switch between these withparallelizer='fast'
(concurrent.futures
) andparallelizer='robust'
(multiprocess
).The
parallelizer
argument allows a wide range of different parallelizers (including different aliases for each), and also supports user-supplied ones. Note that in most cases, the default parallelizer will suffice. However, the full list of options is:None
,'default'
,'robust'
,'multiprocess'
: the slow but robust dill-based parallelizermultiprocess
'fast'
,'concurrent'
,'concurrent.futures'
: the faster but more fragile pickle-based Python-default parallelizerconcurrent.futures
'multiprocessing'
: the previous pickle-based Python default parallelizer,multiprocessing
'serial'
,'serial-copy'
: no parallelization (single-threaded); with “-copy”, force pickling'thread'
’,'threadpool'
’,'thread-copy'
’: thread- rather than process-based parallelization (“-copy” as above)User supplied: any
map()
-like function that takes in a function and an argument list
Note 2: If parallelizing figure generation, use a non-interactive backend, or make sure (a) figure is closed inside the function call, and (b) the figure object is not returned. Otherwise, parallelization won’t increase speed (and might even be slower than serial!).
Note 3: to use on Windows, parallel calls must contained with an
if __name__ == '__main__'
block.For example:
import sciris as sc def f(x,y): return x*y if __name__ == '__main__': results = sc.parallelize(func=f, iterarg=[(1,2),(2,3),(3,4)]) print(results)
New in version 1.1.1: “serial” argumentNew in version 2.0.0: changed default parallelizer frommultiprocess.Pool
toconcurrent.futures.ProcessPoolExecutor
; replacedmaxload
withmaxcpu
/maxmem
; addedreturnpool
argumentNew in version 2.0.4: added “die” argument; changed exception handlingNew in version 3.0.0: new Parallel class; propagated “die” to jobs