diff -r fb965ee44d5e Lib/concurrent/futures/_base.py
--- a/Lib/concurrent/futures/_base.py Sat May 28 14:48:19 2016 +0300
+++ b/Lib/concurrent/futures/_base.py Sat May 28 15:19:14 2016 +0200
@@ -170,6 +170,18 @@
return waiter
+
+def _yield_future(fs, waiter, ref_collect=()):
+ while fs:
+
+ with fs[0]._condition:
+ fs[0]._waiters.remove(waiter)
+
+ for future_list in ref_collect:
+ future_list.remove(fs[0])
+ yield fs.pop(0)
+
+
def as_completed(fs, timeout=None):
"""An iterator over the given futures that yields each as it completes.
@@ -191,6 +203,8 @@
if timeout is not None:
end_time = timeout + time.time()
+ total_futures = len(fs)
+
fs = set(fs)
with _AcquireFutures(fs):
finished = set(
@@ -198,9 +212,10 @@
if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
pending = fs - finished
waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
-
+ finished = list(finished)
try:
- yield from finished
+ yield from _yield_future(finished, waiter,
+ ref_collect=(fs,))
while pending:
if timeout is None:
@@ -210,7 +225,7 @@
if wait_timeout < 0:
raise TimeoutError(
'%d (of %d) futures unfinished' % (
- len(pending), len(fs)))
+ len(pending), total_futures))
waiter.event.wait(wait_timeout)
@@ -219,9 +234,8 @@
waiter.finished_futures = []
waiter.event.clear()
- for future in finished:
- yield future
- pending.remove(future)
+ yield from _yield_future(finished, waiter,
+ ref_collect=(fs, pending))
finally:
for f in fs:
diff -r fb965ee44d5e Lib/test/test_concurrent_futures.py
--- a/Lib/test/test_concurrent_futures.py Sat May 28 14:48:19 2016 +0300
+++ b/Lib/test/test_concurrent_futures.py Sat May 28 15:19:14 2016 +0200
@@ -358,6 +358,34 @@
completed = [f for f in futures.as_completed([future1,future1])]
self.assertEqual(len(completed), 1)
+ def test_free_reference_yielded_future(self):
+ # Issue #14406: Generator should not keep reference
+ # for yielded futures.
+ futures_list = [Future() for _ in range(8)]
+ futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED))
+ futures_list.append(create_future(state=SUCCESSFUL_FUTURE))
+
+ with self.assertRaises(futures.TimeoutError):
+ for future in futures.as_completed(futures_list, timeout=0):
+ futures_list.remove(future)
+ self.assertEqual(sys.getrefcount(future), 2)
+
+ futures_list[0].set_result("test")
+ for future in futures.as_completed(futures_list):
+ futures_list.remove(future)
+ self.assertEqual(sys.getrefcount(future), 2)
+ if futures_list:
+ futures_list[0].set_result("test")
+
+ def test_correct_timeout_exception_msg(self):
+ futures_list = [CANCELLED_AND_NOTIFIED_FUTURE, PENDING_FUTURE,
+ RUNNING_FUTURE, SUCCESSFUL_FUTURE]
+
+ with self.assertRaises(futures.TimeoutError) as cm:
+ list(futures.as_completed(futures_list, timeout=0))
+
+ self.assertEqual(str(cm.exception), '2 (of 4) futures unfinished')
+
class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests, unittest.TestCase):
pass