diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -49,7 +49,7 @@ import atexit
from concurrent.futures import _base
import queue
import multiprocessing
-from multiprocessing.queues import SimpleQueue
+from multiprocessing.queues import SimpleQueue, SentinelReady
import threading
import weakref
@@ -194,29 +194,54 @@ def _queue_manangement_worker(executor_r
result_queue: A multiprocessing.Queue of _ResultItems generated by the
process workers.
"""
- nb_shutdown_processes = 0
- def shutdown_one_process():
- """Tell a worker to terminate, which will in turn wake us again"""
- nonlocal nb_shutdown_processes
- call_queue.put(None)
- nb_shutdown_processes += 1
+
+ def shutdown_worker():
+ for i in range(0, len(processes)):
+ call_queue.put(None)
+ # If .join() is not called on the created processes then
+ # some multiprocessing.Queue methods may deadlock on Mac OS
+ # X.
+ for p in processes.values():
+ p.join()
+
while True:
_add_call_item_to_queue(pending_work_items,
work_ids_queue,
call_queue)
- result_item = result_queue.get()
+ sentinels = list(processes.keys())
+ assert sentinels
+ try:
+ result_item = result_queue.get(sentinels=sentinels)
+ except SentinelReady as e:
+ # Clear terminated processes
+ for sentinel in e.sentinels:
+ processes.pop(sentinel).join()
+ # All in-flight futures must be marked failed preemptively
+ for work_id, work_item in list(pending_work_items.items()):
+ if work_item.future.running():
+ work_item.future.set_exception(
+ ProcessExited(
+ "A process in the process pool was "
+ "terminated abruptly while the future was "
+ "running."
+ ))
+ del pending_work_items[work_id]
+ executor = executor_reference()
+ if executor is not None:
+ # Launch other workers to compensate
+ if not _shutdown and not executor._shutdown_thread:
+ executor._adjust_process_count()
+ del executor
if result_item is not None:
- work_item = pending_work_items[result_item.work_id]
- del pending_work_items[result_item.work_id]
-
- if result_item.exception:
- work_item.future.set_exception(result_item.exception)
- else:
- work_item.future.set_result(result_item.result)
- continue
- # If we come here, we either got a timeout or were explicitly woken up.
- # In either case, check whether we should start shutting down.
+ work_item = pending_work_items.pop(result_item.work_id, None)
+ # work_item can be None if another process terminated (see above)
+ if work_item is not None:
+ if result_item.exception:
+ work_item.future.set_exception(result_item.exception)
+ else:
+ work_item.future.set_result(result_item.result)
+ # Check whether we should start shutting down.
executor = executor_reference()
# No more work items can be added if:
# - The interpreter is shutting down OR
@@ -226,17 +251,11 @@ def _queue_manangement_worker(executor_r
# Since no new work items can be added, it is safe to shutdown
# this thread if there are no pending work items.
if not pending_work_items:
- while nb_shutdown_processes < len(processes):
- shutdown_one_process()
- # If .join() is not called on the created processes then
- # some multiprocessing.Queue methods may deadlock on Mac OS
- # X.
- for p in processes:
- p.join()
+ shutdown_worker()
return
else:
# Start shutting down by telling a process it can exit.
- shutdown_one_process()
+ call_queue.put(None)
del executor
_system_limits_checked = False
@@ -264,6 +283,14 @@ def _check_system_limits():
_system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
raise NotImplementedError(_system_limited)
+
+class ProcessExited(RuntimeError):
+ """
+ Raised when a process in a ProcessPoolExecutor terminated abruptly
+ while a future was in the running state.
+ """
+
+
class ProcessPoolExecutor(_base.Executor):
def __init__(self, max_workers=None):
"""Initializes a new ProcessPoolExecutor instance.
@@ -288,7 +315,8 @@ class ProcessPoolExecutor(_base.Executor
self._result_queue = SimpleQueue()
self._work_ids = queue.Queue()
self._queue_management_thread = None
- self._processes = set()
+ # Map of sentinels to processes
+ self._processes = {}
# Shutdown is a two-step process.
self._shutdown_thread = False
@@ -302,6 +330,8 @@ class ProcessPoolExecutor(_base.Executor
def weakref_cb(_, q=self._result_queue):
q.put(None)
if self._queue_management_thread is None:
+ # Start the processes so that their sentinels are known.
+ self._adjust_process_count()
self._queue_management_thread = threading.Thread(
target=_queue_manangement_worker,
args=(weakref.ref(self, weakref_cb),
@@ -319,9 +349,10 @@ class ProcessPoolExecutor(_base.Executor
p = multiprocessing.Process(
target=_process_worker,
args=(self._call_queue,
- self._result_queue))
+ self._result_queue),
+ create_sentinel=True)
p.start()
- self._processes.add(p)
+ self._processes[p._sentinel] = p
def submit(self, fn, *args, **kwargs):
with self._shutdown_lock:
@@ -338,7 +369,6 @@ class ProcessPoolExecutor(_base.Executor
self._result_queue.put(None)
self._start_queue_management_thread()
- self._adjust_process_count()
return f
submit.__doc__ = _base.Executor.submit.__doc__
diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py
--- a/Lib/multiprocessing/connection.py
+++ b/Lib/multiprocessing/connection.py
@@ -118,6 +118,15 @@ def address_type(address):
else:
raise ValueError('address type of %r unrecognized' % address)
+
+class SentinelReady(Exception):
+ """
+ Raised when a sentinel is ready when polling.
+ """
+ def __init__(self, *args):
+ Exception.__init__(self, *args)
+ self.sentinels = args[0]
+
#
# Connection classes
#
@@ -253,10 +262,12 @@ class _ConnectionBase:
(offset + size) // itemsize])
return size
- def recv(self):
+ def recv(self, sentinels=None):
"""Receive a (picklable) object"""
self._check_closed()
self._check_readable()
+ if sentinels:
+ self._poll(None, sentinels)
buf = self._recv_bytes()
return pickle.loads(buf.getbuffer())
@@ -306,7 +317,7 @@ if win32:
buf.write(lastchunk)
return buf
- def _poll(self, timeout):
+ def _poll(self, timeout, sentinels=()):
navail, nleft = win32.PeekNamedPipe(self._handle)
if navail > 0:
return True
@@ -384,9 +395,14 @@ class Connection(_ConnectionBase):
return None
return self._recv(size)
- def _poll(self, timeout):
- r = select.select([self._handle], [], [], timeout)[0]
- return bool(r)
+ def _poll(self, timeout, sentinels=[]):
+ handles = [self._handle] + sentinels
+ r = select.select(handles, [], [], timeout)[0]
+ if self._handle in r:
+ return True
+ if r:
+ raise SentinelReady(r)
+ return False
#
diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py
--- a/Lib/multiprocessing/forking.py
+++ b/Lib/multiprocessing/forking.py
@@ -118,8 +118,16 @@ if sys.platform != 'win32':
sys.stderr.flush()
self.returncode = None
+ if process_obj._create_sentinel:
+ r, w = os.pipe()
+ self.sentinel = r
+ else:
+ r = w = self.sentinel = None
+
self.pid = os.fork()
if self.pid == 0:
+ if r is not None:
+ os.close(r)
if 'random' in sys.modules:
import random
random.seed()
@@ -128,6 +136,10 @@ if sys.platform != 'win32':
sys.stderr.flush()
os._exit(code)
+ if w is not None:
+ os.close(w)
+ util.Finalize(self, os.close, (r,))
+
def poll(self, flag=os.WNOHANG):
if self.returncode is None:
try:
diff --git a/Lib/multiprocessing/process.py b/Lib/multiprocessing/process.py
--- a/Lib/multiprocessing/process.py
+++ b/Lib/multiprocessing/process.py
@@ -92,7 +92,7 @@ class Process(object):
_Popen = None
def __init__(self, group=None, target=None, name=None, args=(), kwargs={},
- *, daemon=None):
+ *, daemon=None, create_sentinel=False):
assert group is None, 'group argument must be None for now'
count = next(_current_process._counter)
self._identity = _current_process._identity + (count,)
@@ -109,6 +109,7 @@ class Process(object):
self._kwargs = dict(kwargs)
self._name = name or type(self).__name__ + '-' + \
':'.join(str(i) for i in self._identity)
+ self._create_sentinel = create_sentinel
def run(self):
'''
@@ -132,6 +133,7 @@ class Process(object):
else:
from .forking import Popen
self._popen = Popen(self)
+ self._sentinel = self._popen.sentinel
_current_process._children.add(self)
def terminate(self):
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -44,7 +44,7 @@ import weakref
from queue import Empty, Full
import _multiprocessing
-from multiprocessing import Pipe
+from multiprocessing.connection import Pipe, SentinelReady
from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition
from multiprocessing.util import debug, info, Finalize, register_after_fork
from multiprocessing.forking import assert_spawning
@@ -372,10 +372,10 @@ class SimpleQueue(object):
def _make_methods(self):
recv = self._reader.recv
racquire, rrelease = self._rlock.acquire, self._rlock.release
- def get():
+ def get(*, sentinels=None):
racquire()
try:
- return recv()
+ return recv(sentinels)
finally:
rrelease()
self.get = get
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -154,7 +154,7 @@ class ProcessPoolShutdownTest(ProcessPoo
processes = self.executor._processes
self.executor.shutdown()
- for p in processes:
+ for p in processes.values():
p.join()
def test_context_manager_shutdown(self):
@@ -163,7 +163,7 @@ class ProcessPoolShutdownTest(ProcessPoo
self.assertEqual(list(e.map(abs, range(-5, 5))),
[5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
- for p in processes:
+ for p in processes.values():
p.join()
def test_del_shutdown(self):
@@ -174,7 +174,7 @@ class ProcessPoolShutdownTest(ProcessPoo
del executor
queue_management_thread.join()
- for p in processes:
+ for p in processes.values():
p.join()
class WaitTests(unittest.TestCase):