Source code for viscid.parallel

"""common tools for parallel processing"""

from __future__ import print_function, division
from math import ceil
import threading
import multiprocessing as mp
import multiprocessing.pool
from contextlib import closing
from itertools import repeat
import sys

import numpy as np

import viscid
from viscid.compat import izip, futures, string_types


__all__ = ["chunk_list", "chunk_slices", "chunk_interslices", "chunk_sizes",
           "map", "map_async"]


# Non daemonic processes are probably a really bad idea
class NoDaemonProcess(mp.Process):
    """Using this is probably a bad idea"""
    # make 'daemon' attribute always return False
    @staticmethod
    def _get_daemon():
        return False
    def _set_daemon(self, value):
        pass
    daemon = property(_get_daemon, _set_daemon)

class NoDaemonPool(multiprocessing.pool.Pool):  # pylint: disable=W0223
    """ I am vulnerable to armies of undead worker processes, chances
    are you don't actually want to use me
    """
    Process = NoDaemonProcess


class _MapThread(threading.Thread):
    def __init__(self, result_container, index, **kwargs):
        self.results = result_container
        self.index = index
        self.target = kwargs.pop("target")
        self.args = kwargs.pop("args", [])
        self.kwargs = kwargs.pop("kwargs", {})
        super(_MapThread, self).__init__(**kwargs)

    def run(self):
        self.results[self.index] = self.target(*self.args, **self.kwargs)


[docs]def chunk_list(seq, nchunks, size=None): """Chunk a list slice seq into chunks of nchunks size, seq can be a anything sliceable such as lists, numpy arrays, etc. These chunks will be 'contiguous', see :meth:`chunk_interslice` for picking every nth element. Parameters: size: if given, set nchunks such that chunks have about 'size' elements Returns: nchunks slices of length N = (len(lst) // nchunks) or N - 1 See Also: Use :meth:`chunk_iterator` to chunk up iterators Example: >>> it1, it2, it3 = chunk_list(range(8), 3) >>> it1 == range(0, 3) # 3 vals True >>> it2 == range(3, 6) # 3 vals True >>> it3 == range(6, 8) # 2 vals True """ nel = len(seq) if size is not None: nchunks = int(ceil(nel / nchunks)) ret = chunk_slices(nel, nchunks) for i in range(nchunks): ret[i] = seq[slice(*ret[i])] return ret
[docs]def chunk_slices(nel, nchunks, size=None): r"""Make continuous chunks Get the slice info (can be unpacked and passed to the slice builtin as in slice(\*ret[i])) for nchunks contiguous chunks in a list with nel elements Parameters: nel: how many elements are in one pass of the original list nchunks: how many chunks to make size: if given, set nchunks such that chunks have about 'size' elements Returns: a list of (start, stop) tuples with length nchunks Example: >>> sl1, sl2 = chunk_slices(5, 2) >>> sl1 == (0, 3) # 3 vals True >>> sl2 == (3, 5) # 2 vals True """ if size is not None: nchunks = int(ceil(nel / nchunks)) nlong = nel % nchunks # nshort guarenteed < nchunks lenshort = nel // nchunks lenlong = lenshort + 1 ret = [None] * nchunks start = 0 for i in range(nlong): ret[i] = (start, start + lenlong) start += lenlong for i in range(nlong, nchunks): ret[i] = (start, start + lenshort) start += lenshort return ret
[docs]def chunk_interslices(nchunks): """Make staggered chunks Similar to chunk_slices, but pick every nth element instead of getting a contiguous patch for each chunk Parameters: nchunks: how many chunks to make Returns: a list of (start, stop, step) tuples with length nchunks Example: >>> chunk_slices(2) == [(0, None, 2), (1, None, 2)] True """ ret = [None] * nchunks for i in range(nchunks): ret[i] = (i, None, nchunks) return ret
[docs]def chunk_sizes(nel, nchunks, size=None): """For chunking up lists, how big is each chunk Parameters: nel: how many elements are in one pass of the original list nchunks: is inferred from the length of iter_list size: if given, set nchunks such that chunks have about 'size' elements Returns: an ndarray of the number of elements in each chunk, this should be the same for chunk_list, chunk_slices and chunk_interslices Example: >>> nel1, nel2 = chunk_sizes(5, 2) >>> nel1 == 2 True >>> nel2 == 3 True """ if size is not None: nchunks = int(ceil(nel / nchunks)) nlong = nel % nchunks # nshort guarenteed < nchunks lenshort = nel // nchunks lenlong = lenshort + 1 ret = np.empty((nchunks,), dtype="int") ret[:nlong] = lenlong ret[nlong:] = lenshort return ret
def _star_passthrough(args): """ this is so we can give a zipped iterable to func """ # args[0] is function, args[1] is positional args, and args[2] is kwargs return args[0](*(args[1]), **(args[2])) def sanitize_nr_procs(nr_procs): if isinstance(nr_procs, string_types): nr_procs = nr_procs.strip().lower() if nr_procs == "all" or nr_procs == "auto": nr_procs = mp.cpu_count() return int(nr_procs)
[docs]def map(nr_procs, func, args_iter, args_kw=None, timeout=1e8, daemonic=True, threads=False, pool=None, force_subprocess=False): """Just like ``subprocessing.map``? same as :meth:`map_async`, except it waits for the result to be ready and returns it Note: When using threads, this is WAY faster than map_async since map_async uses the builtin python ThreadPool. I have no idea why that's slower than making threads by hand. """ nr_procs = sanitize_nr_procs(nr_procs) if args_kw is None: args_kw = {} # don't waste time spinning up a new process if threads: args = [(func, ai, args_kw) for ai in args_iter] with futures.ThreadPoolExecutor(max_workers=nr_procs) as executor: ret = [val for val in executor.map(_star_passthrough, args)] elif pool is None and nr_procs == 1 and not force_subprocess: args_iter = izip(repeat(func), args_iter, repeat(args_kw)) ret = [_star_passthrough(args) for args in args_iter] else: p, r = map_async(nr_procs, func, args_iter, args_kw=args_kw, daemonic=daemonic, threads=threads, pool=pool) ret = r.get(int(timeout)) # in principle this join should return almost immediately since # we already called r.get p.join() return ret
[docs]def map_async(nr_procs, func, args_iter, args_kw=None, daemonic=True, threads=False, pool=None): """Wrap python's ``map_async`` This has some utility stuff like star passthrough Run func on nr_procs with arguments given by args_iter. args_iter should be an iterable of the list of arguments that can be unpacked for each invocation. kwargs are passed to func as keyword arguments Returns: (tuple) (pool, multiprocessing.pool.AsyncResult) Note: When using threads, this is WAY slower than map since map_async uses the builtin python ThreadPool. I have no idea why that's slower than making threads by hand. Note: daemonic can be set to False if one needs to spawn child processes in func, BUT this could be vulnerable to creating an undead army of worker processes, only use this if you really really need it, and know what you're doing Example: >>> func = lambda i, letter: print i, letter >>> p, r = map_async(2, func, itertools.izip(itertools.count(), 'abc')) >>> r.get(1e8) >>> p.join() >>> # the following is printed from 2 processes 0 a 1 b 2 c """ nr_procs = sanitize_nr_procs(nr_procs) if args_kw is None: args_kw = {} if not threads and sys.platform == 'darwin' and ("mayavi.mlab" in sys.modules or "mayavi" in sys.modules): import mayavi if mayavi.ETSConfig.toolkit == 'qt4': viscid.logger.critical("Using multiprocessing with Mayavi + Qt4 " "will cause segfaults on join.\n" "A workaround is to use the wx backend " "(`os.environ['ETS_TOOLKIT'] = 'wx'`).") args_iter = izip(repeat(func), args_iter, repeat(args_kw)) # if given a pool, don't close it when we're done delegating tasks if pool is not None: return pool, pool.map_async(_star_passthrough, args_iter) else: if threads: pool = mp.pool.ThreadPool(nr_procs) elif daemonic: pool = mp.Pool(nr_procs) else: pool = NoDaemonPool(nr_procs) with closing(pool) as p: return p, p.map_async(_star_passthrough, args_iter)
## ## EOF ##