Source code for emcee.interruptible_pool

# -*- coding: utf-8 -*-

"""
Python's multiprocessing.Pool class doesn't interact well with
``KeyboardInterrupt`` signals, as documented in places such as:

* `<http://stackoverflow.com/questions/1408356/>`_
* `<http://stackoverflow.com/questions/11312525/>`_
* `<http://noswap.com/blog/python-multiprocessing-keyboardinterrupt>`_

Various workarounds have been shared. Here, we adapt the one proposed in the
last link above, by John Reese, and shared as

* `<https://github.com/jreese/multiprocessing-keyboardinterrupt/>`_

Our version is a drop-in replacement for multiprocessing.Pool ... as long as
the map() method is the only one that needs to be interrupt-friendly.

Contributed by Peter K. G. Williams <peter@newton.cx>.

*Added in version 2.1.0*

"""

from __future__ import (division, print_function, absolute_import,
                        unicode_literals)

__all__ = ["InterruptiblePool"]

import signal
import functools
from multiprocessing.pool import Pool
from multiprocessing import TimeoutError


def _initializer_wrapper(actual_initializer, *rest):
    """
    We ignore SIGINT. It's up to our parent to kill us in the typical
    condition of this arising from ``^C`` on a terminal. If someone is
    manually killing us with that signal, well... nothing will happen.

    """
    signal.signal(signal.SIGINT, signal.SIG_IGN)
    if actual_initializer is not None:
        actual_initializer(*rest)


[docs]class InterruptiblePool(Pool): """ A modified version of :class:`multiprocessing.pool.Pool` that has better behavior with regard to ``KeyboardInterrupts`` in the :func:`map` method. :param processes: (optional) The number of worker processes to use; defaults to the number of CPUs. :param initializer: (optional) Either ``None``, or a callable that will be invoked by each worker process when it starts. :param initargs: (optional) Arguments for *initializer*; it will be called as ``initializer(*initargs)``. :param kwargs: (optional) Extra arguments. Python 2.7 supports a ``maxtasksperchild`` parameter. """ wait_timeout = 3600 def __init__(self, processes=None, initializer=None, initargs=(), **kwargs): new_initializer = functools.partial(_initializer_wrapper, initializer) super(InterruptiblePool, self).__init__(processes, new_initializer, initargs, **kwargs)
[docs] def map(self, func, iterable, chunksize=None): """ Equivalent of ``map()`` built-in, without swallowing ``KeyboardInterrupt``. :param func: The function to apply to the items. :param iterable: An iterable of items that will have `func` applied to them. """ # The key magic is that we must call r.get() with a timeout, because # a Condition.wait() without a timeout swallows KeyboardInterrupts. r = self.map_async(func, iterable, chunksize) while True: try: return r.get(self.wait_timeout) except TimeoutError: pass except KeyboardInterrupt: self.terminate() self.join() raise
# Other exceptions propagate up.