"""
Functions to allow parallelization to be performed easily.
NB: Uses ``multiprocess`` instead of :mod:`multiprocessing` under the hood for
broadest support across platforms (e.g. Jupyter notebooks).
Highlights:
- :func:`sc.parallelize() <parallelize>`: as-easy-as-possible parallelization
"""
import warnings
import numpy as np
import multiprocess as mp
import multiprocessing as mpi
import concurrent.futures as cf
from functools import partial
from . import sc_utils as scu
from . import sc_odict as sco
from . import sc_printing as scp
from . import sc_datetime as scd
from . import sc_profiling as scpro
##############################################################################
#%% Parallelization class
##############################################################################
__all__ = ['Parallel', 'parallelize']
# Define a common error message
freeze_support_error = '''
Uh oh! It appears you are trying to run with multiprocessing on Windows outside
of the __main__ block; please see https://docs.python.org/3/library/multiprocessing.html
for more information. The correct syntax to use is e.g.
import sciris as sc
def my_func(x):
return
if __name__ == '__main__':
sc.parallelize(my_func)
'''
def _jobkey(index):
""" Convert a job index to a key """
return f'_job{index}'
def _progressbar(globaldict, njobs, started, **kwargs):
""" Define a progress bar based on the global dictionary """
try:
done = sum([globaldict[k] for k in globaldict.keys() if str(k).startswith('_job')])
except Exception as E:
done = '<unknown>' + str(E)
elapsed = (scd.now() - started).total_seconds()
scp.progressbar(done, njobs, label=f'Job {done}/{njobs} ({elapsed:.1f} s)', **kwargs)
return
[docs]
class Parallel:
"""
Parallelization manager
For arguments and usage documentation, see :func:`sc.parallelize() <parallelize>`. Briefly,
this class validates input arguments, sets the number of CPUs, creates a
process (or thread) pool, starts the jobs running, retrieves the results from
each job, and processes them into outputs.
Useful methods:
reset(): reset the Parallel object to its initial pre-run state
run_async(): the method that actually executes the parallelization (NB, used with every method, not only async ones)
monitor(): monitor the progress of an asynchronous run
finalize(): get the results from each job and process it
run(): shortcut to calling run_async() followed by finalize()
Useful attributes and properties:
running (bool): whether or not the jobs are running
ready (bool): whether or not the jobs are ready
status (str): a string description of the current state (not run, running, or done)
jobs (list): a list of jobs to run or being run (empty prior to run)
results (list): list of all results (the output from the jobs; empty prior to run)
success (list): whether each job completed successfully (true/false)
exceptions (list): if not, store the exceptions that were raised
times (dict): timing information on when the jobs were started, when they finished, and how long each job took
**Example**::
import sciris as sc
def slowfunc(i):
sc.randsleep(seed=i)
return i**2
# Standard usage
P = sc.Parallel(slowfunc, iterarg=range(10), parallelizer='multiprocess-async')
P.run_async()
P.monitor()
P.finalize()
print(P.times)
| *New in version 3.0.0.*
| *New in version 3.1.0:* "globaldict" argument
"""
def __init__(self, func, iterarg=None, iterkwargs=None, args=None, kwargs=None, ncpus=None,
maxcpu=None, maxmem=None, interval=None, parallelizer=None, serial=False,
progress=False, callback=None, globaldict=None, label=None, die=True, **func_kwargs):
# Store input arguments
self.func = func
self.iterarg = iterarg
self.iterkwargs = iterkwargs
self.args = args
self.kwargs = scu.mergedicts(kwargs, func_kwargs)
self._ncpus = ncpus # With a prefix since dynamically calculated later
self.maxcpu = maxcpu
self.maxmem = maxmem
self.interval = interval
self.parallelizer = parallelizer
self.serial = serial
self.progress = progress
self.callback = callback
self.inputdict = globaldict
self.label = label
self.die = die
# Additional initialization
self.init()
return
[docs]
def init(self):
"""
Perform all remaining initialization steps; this can safely be called after object creation
"""
self.reset()
self.set_defaults()
self.validate_args()
self.set_ncpus()
self.set_method()
return
[docs]
def reset(self):
""" Reset to the pre-run state """
self.ncpus = None
self.njobs = None
self.embarrassing = None
self.argslist = None
self.method = None
self.pool = None
self.manager = None
self.globaldict = None
self.map_func = None
self.is_async = None
self.jobs = None
self.results = None
self.success = None
self.exceptions = None
self.times = sco.objdict(started=None, finished=None, elapsed=None, jobs=None)
self._running = False # Used interally; see self.running for the dynamically updated property
self.already_run = False
return
def __repr__(self):
""" Brief representation of the object """
labelstr = f'"{self.label}; "' if self.label else ''
string = f'Parallel({labelstr}jobs={self.njobs}; cpus={self.ncpus}; method={self.method}; status={self.status})'
return string
[docs]
def disp(self):
""" Display the full representation of the object """
return scp.pr(self)
[docs]
def set_defaults(self):
""" Define defaults for parallelization """
self.defaults = sco.objdict()
# Define default parallelizers
self.defaults.fast = 'concurrent.futures'
self.defaults.robust = 'multiprocess'
# Map parallelizer to consistent choices
self.defaults.mapping = {
'default' : self.defaults.robust,
'robust' : self.defaults.robust,
'fast' : self.defaults.fast,
'serial' : 'serial',
'concurrent.futures' : 'concurrent.futures',
'concurrent' : 'concurrent.futures',
'futures' : 'concurrent.futures',
'multiprocess' : 'multiprocess',
'multiprocessing' : 'multiprocessing',
'thread' : 'thread',
'threadpool' : 'thread',
}
return
[docs]
def validate_args(self):
""" Validate iterarg and iterkwargs """
iterarg = self.iterarg
iterkwargs = self.iterkwargs
njobs = 0
# Check that only one was provided
if iterarg is not None and iterkwargs is not None: # pragma: no cover
errormsg = 'You can only use one of iterarg or iterkwargs as your iterable, not both'
raise ValueError(errormsg)
# Validate iterarg
if iterarg is not None:
if not(scu.isiterable(iterarg)):
try:
iterarg = np.arange(iterarg) # This is duplicated in make_argslist, but kept here so as to not modify user inputs
self.embarrassing = True
except Exception as E: # pragma: no cover
errormsg = f'Could not understand iterarg "{iterarg}": not iterable and not an integer: {str(E)}'
raise TypeError(errormsg)
njobs = len(iterarg)
# Validate iterkwargs
if iterkwargs is not None:
if isinstance(iterkwargs, dict): # It's a dict of lists, e.g. {'x':[1,2,3], 'y':[2,3,4]}
for key,val in iterkwargs.items():
if not scu.isiterable(val): # pragma: no cover
errormsg = f'iterkwargs entries must be iterable, not {type(val)}'
raise TypeError(errormsg)
if not njobs:
njobs = len(val)
else:
if len(val) != njobs: # pragma: no cover
errormsg = f'All iterkwargs iterables must be the same length, not {njobs} vs. {len(val)}'
raise ValueError(errormsg)
elif isinstance(iterkwargs, list): # It's a list of dicts, e.g. [{'x':1, 'y':2}, {'x':2, 'y':3}, {'x':3, 'y':4}]
njobs = len(iterkwargs)
for item in iterkwargs:
if not isinstance(item, dict): # pragma: no cover
errormsg = f'If iterkwargs is a list, each entry must be a dict, not {type(item)}'
raise TypeError(errormsg)
else: # pragma: no cover
errormsg = f'iterkwargs must be a dict of lists, a list of dicts, or None, not {type(iterkwargs)}'
raise TypeError(errormsg)
# Final error checking
if njobs == 0:
errormsg = 'Nothing found to parallelize: please supply an iterarg, iterkwargs, or both'
raise ValueError(errormsg)
else:
self.njobs = njobs
return
[docs]
def set_ncpus(self):
""" Configure number of CPUs """
# Handle maxload deprecation
maxload = self.kwargs.pop('maxload', None)
if maxload is not None: # pragma: no cover
self.maxcpu = maxload
warnmsg = 'sc.loadbalancer() argument "maxload" has been renamed "maxcpu" as of v2.0.0'
warnings.warn(warnmsg, category=FutureWarning, stacklevel=2)
# Handle number of CPUs
ncpus = self._ncpus # Copy, then process
sys_cpus = scpro.cpu_count()
if not ncpus: # Nothing is supplied (None or 0), calculate dynamically
ncpus = sys_cpus
elif 0 < ncpus < 1: # Less than one, treat as a fraction of total
ncpus = int(np.ceil(sys_cpus*ncpus))
ncpus = min(ncpus, self.njobs) # Don't use more CPUs than there are things to process
# Check and set CPUs
if not ncpus > 0: # pragma: no cover
errormsg = f'No CPUs to run on with inputs ncpus={ncpus}, system CPUs={sys_cpus}, and/or number of jobs={self.njobs}'
raise ValueError(errormsg)
self.ncpus = ncpus
return
[docs]
def set_method(self):
""" Choose which method to use for parallelization """
# Handle defaults for the parallelizer
parallelizer = self.parallelizer
if parallelizer is None or parallelizer == 'async':
parallelizer = 'default'
if self.serial:
parallelizer = 'serial'
# Handle the choice of parallelizer
if scu.isstring(parallelizer):
parallelizer = parallelizer.replace('copy', '').replace('async', '').replace('-', '')
try:
self.method = self.defaults.mapping[parallelizer]
except:
errormsg = f'Parallelizer "{parallelizer}" not found: must be one of {scu.strjoin(self.defaults.mapping.keys())}'
raise scu.KeyNotFoundError(errormsg)
else: # pragma: no cover
self.method = 'custom' # If a custom parallelizer is provided
# Handle async
is_async = False
supports_async = ['multiprocess', 'multiprocessing']
if scu.isstring(self.parallelizer) and 'async' in self.parallelizer:
if self.method in supports_async:
is_async = True
else:
errormsg = f'You have specified to use async with "{self.method}", but async is only supported for: {scu.strjoin(supports_async)}.'
raise ValueError(errormsg)
self.is_async = is_async
return
[docs]
def make_pool(self):
""" Make the pool and map function """
# Shorten variables
ncpus = self.ncpus
method = self.method
is_async = self.is_async
def make_async_func(pool):
if is_async:
map_func = partial(pool.map_async, callback=self._time_finished)
else:
map_func = pool.map
return map_func
# Choose parallelizer and map function
if method == 'serial':
pool = None
map_func = lambda task,argslist: list(map(task, argslist))
elif method == 'multiprocess': # Main use case
pool = mp.Pool(processes=ncpus)
map_func = make_async_func(pool)
elif method == 'multiprocessing':
pool = mpi.Pool(processes=ncpus)
map_func = make_async_func(pool)
elif method == 'concurrent.futures':
pool = cf.ProcessPoolExecutor(max_workers=ncpus)
map_func = pool.map
elif method == 'thread':
pool = cf.ThreadPoolExecutor(max_workers=ncpus)
map_func = pool.map
elif method == 'custom':
pool = None
map_func = self.parallelizer
else: # Should be unreachable; exception should have already been caught # pragma: no cover
errormsg = f'Invalid parallelizer "{self.parallelizer}"'
raise ValueError(errormsg)
# Create a manager for sharing resources across jobs
if method in ['serial', 'thread', 'custom']:
manager = None
globaldict = dict() # For serial and thread, don't need anything fancy to share global variables
else:
if method == 'multiprocess': # Special case: can't share multiprocessing managers with multiprocess
manager = mp.Manager()
else:
manager = mpi.Manager() # Note "mpi" instead of "mp"
globaldict = manager.dict() # Create a dict for sharing progress of each job
# Handle any supplied input
if self.inputdict:
if method == 'custom': # For something custom, use the inputdict directly, in case it's something special
globaldict = self.inputdict
else:
globaldict.update(self.inputdict)
# Reset
self.pool = pool
self.manager = manager
self.globaldict = globaldict
self.map_func = map_func
self.is_async = is_async
self.jobs = None
self.rawresults = None
self.results = None
return
[docs]
def make_argslist(self):
""" Construct argument list """
# Initialize
iterarg = self.iterarg
iterkwargs = self.iterkwargs
argslist = []
# Check for embarrassingly parallel run -- should already be validated
if self.embarrassing:
iterarg = np.arange(iterarg)
# Check for additional global arguments
useglobal = True if self.inputdict is not None else False
# Construct the argument list for each job
for index in range(self.njobs):
if iterarg is None:
iterval = None
else:
iterval = iterarg[index]
if iterkwargs is None:
iterdict = None
else:
if isinstance(iterkwargs, dict): # Dict of lists
iterdict = {}
for key,val in iterkwargs.items():
iterdict[key] = val[index]
elif isinstance(iterkwargs, list): # List of dicts
iterdict = iterkwargs[index]
else: # pragma: no cover # Should be caught by previous checking, so shouldn't happen
errormsg = f'iterkwargs type not understood ({type(iterkwargs)})'
raise TypeError(errormsg)
taskargs = TaskArgs(func=self.func, index=index, njobs=self.njobs, iterval=iterval, iterdict=iterdict,
args=self.args, kwargs=self.kwargs, maxcpu=self.maxcpu, maxmem=self.maxmem,
interval=self.interval, embarrassing=self.embarrassing, callback=self.callback,
progress=self.progress, globaldict=self.globaldict, useglobal=useglobal,
started=self.times.started, die=self.die)
argslist.append(taskargs)
self.argslist = argslist
return
[docs]
def run_async(self):
""" Choose how to run in parallel, and do it """
# Shorten variables
method = self.method
needs_copy = ['serial', 'thread', 'custom']
# Make the pool
self.make_pool()
# Construct the argument list (has to be after the pool is made)
self.times.started = scd.now()
self.make_argslist()
# Handle optional deepcopy
if scu.isstring(self.parallelizer) and '-copy' in self.parallelizer and method in needs_copy: # Don't deepcopy if we're going to pickle anyway
argslist = [scu.dcp(arg, die=self.die) for arg in self.argslist]
else:
argslist = self.argslist
# Run it!
output = self.map_func(_task, argslist)
# Store the pool; do not store the output list here
if self.is_async:
self.jobs = output
else:
self.rawresults = list(output)
self._time_finished()
self.already_run = True
return
@property
def running(self):
if not self.is_async:
return self._running
else:
if self.jobs is not None and not self.jobs.ready():
running = True
else:
running = False
return running
@property
def ready(self):
if not self.is_async:
ready = self.results is not None
else:
if self.jobs is None:
ready = False
else:
if self.jobs.ready():
ready = True
else:
ready = False
return ready
@property
def status(self):
output = 'not run'
if self.running:
output = 'running'
elif self.already_run:
output = 'done'
return output
def _time_finished(self, *args, **kwargs):
self.times.finished = scd.now()
self.times.elapsed = (self.times.finished - self.times.started).total_seconds()
return
[docs]
def monitor(self, interval=0.1, **kwargs):
""" Monitor progress -- only usable with async """
final_iter = True
while self.running or final_iter:
if not self.running and final_iter:
final_iter = False
_progressbar(self.globaldict, njobs=self.njobs, started=self.times.started, **kwargs)
scd.timedsleep(interval)
return
[docs]
def finalize(self, get_results=True, close_pool=True, process_results=True):
""" Get results from the jobs and close the pool """
if get_results and self.jobs:
self.rawresults = list(self.jobs.get())
if close_pool and self.pool:
try:
self.pool.__exit__(None, None, None) # Handle as if in a with block
except Exception as E: # pragma: no cover
warnmsg = f'Could not close pool {self.pool}, please close manually: {str(E)}'
warnings.warn(warnmsg, category=RuntimeWarning, stacklevel=2)
if process_results:
self.process_results()
return
[docs]
def process_results(self):
""" Parse the returned results dict into separate lists """
# Do not proceed if no results
if self.rawresults is None: # pragma: no cover
errormsg = 'Cannot process results: results not ready yet'
raise ValueError(errormsg)
# Otherwise, process results
else:
self.results = list()
self.success = list()
self.exceptions = list()
self.times.jobs = list()
for raw in self.rawresults:
self.results.append(raw['result'])
self.success.append(raw['success'])
self.exceptions.append(raw['exception'])
self.times.jobs.append(raw['elapsed'])
if not all(self.success): # pragma: no cover
warnmsg = f'Only {sum(self.success)} of {len(self.success)} jobs succeeded; see exceptions attribute for details'
warnings.warn(warnmsg, category=RuntimeWarning, stacklevel=2)
return
[docs]
def run(self):
""" Actually run the parallelization """
try:
self.run_async()
self.finalize()
# Handle if run outside of __main__ on Windows
except RuntimeError as E: # pragma: no cover
if 'freeze_support' in E.args[0]: # For this error, add additional information
raise RuntimeError(freeze_support_error) from E
else: # For all other runtime errors, raise the original exception
raise E
# Tidy up
return self
[docs]
def parallelize(func, iterarg=None, iterkwargs=None, args=None, kwargs=None, ncpus=None,
maxcpu=None, maxmem=None, interval=None, parallelizer=None, serial=False,
progress=False, callback=None, globaldict=None, die=True, **func_kwargs):
"""
Execute a function in parallel.
Most simply, :func:`sc.parallelize() <parallelize>` acts as a shortcut for using :meth:`pool.map <multiprocessing.pool.Pool.map>`.
However, it also provides flexibility in how arguments are passed to the function,
load balancing, etc.
Either or both of ``iterarg`` or ``iterkwargs`` can be used. ``iterarg`` can
be an iterable or an integer; if the latter, it will run the function that number
of times and not pass the argument to the function (which may be useful for
running "embarrassingly parallel" simulations). ``iterkwargs`` is a dict of
iterables; each iterable must be the same length (and the same length of ``iterarg``,
if it exists), and each dict key will be used as a kwarg to the called function.
Any other kwargs passed to :func:`sc.parallelize() <parallelize>` will also be passed to the function.
This function can either use a fixed number of CPUs or allocate dynamically
based on load. If ``ncpus`` is ``None``, then it will allocate the number of
CPUs dynamically. Memory (``maxmem``) and CPU load (``maxcpu``) limits can also
be specified.
Args:
func (func) : the function to parallelize
iterarg (list) : the variable(s) to provide to each process (see examples below)
iterkwargs (dict) : another way of providing variables to each process (see examples below)
args (list) : positional arguments for each process, the same for all processes
kwargs (dict) : keyword arguments for each process, the same for all processes
ncpus (int/float) : number of CPUs to use (if <1, treat as a fraction of the total available; if None, use loadbalancer)
maxcpu (float) : maximum CPU load; otherwise, delay the start of the next process (not used if ``ncpus`` is specified)
maxmem (float) : maximum fraction of virtual memory (RAM); otherwise, delay the start of the next process
interval (float) : number of seconds to pause between starting processes for checking load
parallelizer (str/func) : parallelization function; default 'multiprocess' (see below for details)
serial (bool) : whether to skip parallelization and run in serial (useful for debugging; equivalent to ``parallelizer='serial'``)
progress (bool) : whether to show a progress bar
callback (func) : an optional function to call from each worker
globaldict (dict) : an optional global dictionary to pass to each worker via the kwarg "globaldict" (note: may not update properly with low task latency)
die (bool) : whether to stop immediately if an exception is encountered (otherwise, store the exception as the result)
func_kwargs (dict) : merged with kwargs (see above)
Returns:
List of outputs from each process
**Example 1 -- simple usage as a shortcut to multiprocess.map()**::
def f(x):
return x*x
results = sc.parallelize(f, [1,2,3])
**Example 2 -- simple usage for "embarrassingly parallel" processing**::
import numpy as np
def rnd():
np.random.seed()
return np.random.random()
results = sc.parallelize(rnd, 10, ncpus=4)
**Example 3 -- three different equivalent ways to use multiple arguments**::
def f(x,y):
return x*y
results1 = sc.parallelize(func=f, iterarg=[(1,2),(2,3),(3,4)])
results2 = sc.parallelize(func=f, iterkwargs={'x':[1,2,3], 'y':[2,3,4]})
results3 = sc.parallelize(func=f, iterkwargs=[{'x':1, 'y':2}, {'x':2, 'y':3}, {'x':3, 'y':4}])
assert results1 == results2 == results3
**Example 4 -- using non-iterated arguments and dynamic load balancing**::
def myfunc(i, x, y):
np.random.seed()
xy = [x+i*np.random.randn(100), y+i*np.random.randn(100)]
return xy
xylist1 = sc.parallelize(myfunc, iterarg=range(5), kwargs={'x':3, 'y':8}, maxcpu=0.8, interval=0.2) # Use kwargs dict
xylist2 = sc.parallelize(myfunc, x=5, y=10, iterarg=[0,1,2], parallelizer='multiprocessing') # Supply kwargs directly and use a different parallelizer
for p,xylist in enumerate([xylist1, xylist2]):
pl.subplot(2,1,p+1)
for i,xy in enumerate(reversed(xylist)):
pl.scatter(xy[0], xy[1], label='Run %i'%i)
pl.legend()
**Example 5 -- using a custom parallelization function**::
def f(x,y):
return [x]*y
import multiprocessing as mp
pool = mp.Pool(processes=2)
results = sc.parallelize(f, iterkwargs=dict(x=[1,2,3], y=[4,5,6]), parallelizer=pool.map) # Note: parallelizer is pool.map, not pool
**Example 6 -- using Sciris as an interface to Dask**::
def f(x,y):
return [x]*y
def dask_map(task, argslist):
import dask
queued = [dask.delayed(task)(args) for args in argslist]
return list(dask.compute(*queued))
results = sc.parallelize(f, iterkwargs=dict(x=[1,2,3], y=[4,5,6]), parallelizer=dask_map)
**Note 1**: the default parallelizer ``"multiprocess"`` uses ``dill`` for pickling, so
is the most versatile (e.g., it can pickle non-top-level functions). However,
it is also the slowest for passing large amounts of data. You can switch between
these with ``parallelizer='fast'`` (``concurrent.futures``) and ``parallelizer='robust'``
(``multiprocess``).
The ``parallelizer`` argument allows a wide range of different parallelizers
(including different aliases for each), and also supports user-supplied ones.
Note that in most cases, the default parallelizer will suffice. However, the
full list of options is:
- ``None``, ``'default'``, ``'robust'``, ``'multiprocess'``: the slow but robust dill-based parallelizer ``multiprocess``
- ``'fast'``, ``'concurrent'``, ``'concurrent.futures'``: the faster but more fragile pickle-based Python-default parallelizer :mod:`concurrent.futures`
- ``'multiprocessing'``: the previous pickle-based Python default parallelizer, :mod:`multiprocessing`
- ``'serial'``, ``'serial-copy'``: no parallelization (single-threaded); with "-copy", force pickling
- ``'thread'``', ``'threadpool'``', ``'thread-copy'``': thread- rather than process-based parallelization ("-copy" as above)
- User supplied: any :func:`map`-like function that takes in a function and an argument list
**Note 2**: If parallelizing figure generation, use a non-interactive backend,
or make sure (a) figure is closed inside the function call, and (b) the figure
object is not returned. Otherwise, parallelization won't increase speed (and
might even be slower than serial!).
**Note 3**: to use on Windows, parallel calls must contained with an ``if __name__ == '__main__'`` block.
For example::
import sciris as sc
def f(x,y):
return x*y
if __name__ == '__main__':
results = sc.parallelize(func=f, iterarg=[(1,2),(2,3),(3,4)])
print(results)
| *New in version 1.1.1:* "serial" argument
| *New in version 2.0.0:* changed default parallelizer from ``multiprocess.Pool`` to ``concurrent.futures.ProcessPoolExecutor``; replaced ``maxload`` with ``maxcpu``/``maxmem``; added ``returnpool`` argument
| *New in version 2.0.4:* added "die" argument; changed exception handling
| *New in version 3.0.0:* new Parallel class; propagated "die" to jobs
| *New in version 3.1.0:* new "globaldict" argument
"""
# Create the parallel instance
P = Parallel(func, iterarg=iterarg, iterkwargs=iterkwargs, args=args, kwargs=kwargs,
ncpus=ncpus, maxcpu=maxcpu, maxmem=maxmem, interval=interval,
parallelizer=parallelizer, serial=serial, progress=progress,
callback=callback, globaldict=globaldict, die=die, **func_kwargs)
# Run it
P.run()
return P.results
##############################################################################
#%% Helper functions/classes
##############################################################################
class TaskArgs(scp.prettyobj):
"""
A class to hold the arguments for the parallel task -- not to be invoked by the user.
Arguments must match both :func:`sc.parallelize() <parallelize>` and ``sc._task()``
"""
def __init__(self, func, index, njobs, iterval, iterdict, args, kwargs, maxcpu, maxmem,
interval, embarrassing, callback, progress, globaldict, useglobal, started,
die=True):
self.func = func # The function being called
self.index = index # The place in the queue
self.njobs = njobs # The total number of iterations
self.iterval = iterval # The value being iterated (may be None if iterdict is not None)
self.iterdict = iterdict # A dictionary of values being iterated (may be None if iterval is not None)
self.args = args # Arguments passed directly to the function
self.kwargs = kwargs # Keyword arguments passed directly to the function
self.maxcpu = maxcpu # Maximum CPU load (ignored if ncpus is not None in sc.parallelize())
self.maxmem = maxmem # Maximum memory
self.interval = interval # Interval to check load (only used with maxcpu/maxmem)
self.embarrassing = embarrassing # Whether or not to pass the iterarg to the function (no if it's embarrassing)
self.callback = callback # A function to call after each task finishes
self.progress = progress # Whether to print progress after each job
self.globaldict = globaldict # A global dictionary for sharing progress on each task
self.useglobal = useglobal # Whether to pass the global dictionary to each task
self.started = started # The time when the parallelization was started
self.die = die # Whether to raise an exception if the child task encounters one
return
def _task(taskargs):
"""
Task called by parallelize() -- not to be called directly.
*New in version 3.0.0:* renamed from "_parallel_task" to "_task"; return output dict with metadata
"""
# Handle inputs
func = taskargs.func
index = taskargs.index
args = taskargs.args
kwargs = taskargs.kwargs
if args is None: args = ()
if kwargs is None: kwargs = {}
if taskargs.iterval is not None:
if not isinstance(taskargs.iterval, tuple): # Ensure it's a tuple
taskargs.iterval = (taskargs.iterval,)
if not taskargs.embarrassing:
args = taskargs.iterval + args # If variable name is not supplied, prepend it to args
kwargs = scu.mergedicts(kwargs, taskargs.iterdict) # Merge this iterdict, overwriting kwargs if there are conflicts
# Handle load balancing
maxcpu = taskargs.maxcpu
maxmem = taskargs.maxmem
if maxcpu or maxmem:
scpro.loadbalancer(maxcpu=maxcpu, maxmem=maxmem, index=index, interval=taskargs.interval)
# Set up input and output arguments
globaldict = taskargs.globaldict
start = scd.time()
result = None
success = False
exception = None
try: # Try to update the globaldict, but don't worry if we can't
globaldict[_jobkey(index)] = 0
if taskargs.useglobal:
kwargs['globaldict'] = taskargs.globaldict
except:
pass
# Call the function!
try:
result = func(*args, **kwargs) # Call the function!
success = True
try: # Likewise, try to update the task progress
globaldict[_jobkey(index)] = 1
except:
pass
except Exception as E: # pragma: no cover
if taskargs.die: # Usual case, raise an exception and stop
errormsg = f'Task {index} failed: set die=False to keep going instead; see above for error details'
try: # Try to preserve the original exception type ...
exctype = type(E)
exc = exctype(errormsg)
except: # ... but don't worry if it fails
exc = Exception(errormsg)
raise exc from E
else: # Alternatively, keep going and just let this trial fail
warnmsg = f'sc.parallelize(): Task {index} failed, but die=False so continuing.\n{scu.traceback()}'
warnings.warn(warnmsg, category=RuntimeWarning, stacklevel=2)
exception = E
end = scd.time()
elapsed = end - start
if taskargs.progress:
_progressbar(globaldict, njobs=taskargs.njobs, started=taskargs.started, flush=True)
# Generate output
outdict = dict(
result = result,
success = success,
exception = exception,
elapsed = elapsed,
)
# Handle callback, if present
if taskargs.callback: # pragma: no cover
data = dict(index=index, njobs=taskargs.njobs, args=args, kwargs=kwargs, globaldict=globaldict, outdict=outdict)
taskargs.callback(data)
# Handle output
return outdict