# HG changeset patch
# Parent e6c9c4c613c5362b818c6cc25cefc98d7b96f24f
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/mp_common.py
rename from Lib/test/test_multiprocessing.py
rename to Lib/test/mp_common.py
--- a/Lib/test/test_multiprocessing.py
+++ b/Lib/test/mp_common.py
@@ -1,7 +1,14 @@
-#!/usr/bin/env python3
-
#
-# Unit tests for the multiprocessing package
+# Unit tests for the multiprocessing package which are
+# run using different configurations
+#
+# The base classes defined here are used by
+#
+# test_multiprocessing_fork
+# test_multiprocessing_nofork
+# test_multiprocessing_manager_fork
+# test_multiprocessing_manager_nofork
+# test_multiprocessing_threads
#
import unittest
@@ -2150,23 +2157,6 @@
# logger.warn('foo')
# assert self.__handled
-#
-# Test to verify handle verification, see issue 3321
-#
-
-class TestInvalidHandle(unittest.TestCase):
-
- @unittest.skipIf(WIN32, "skipped on Windows")
- def test_invalid_handles(self):
- conn = multiprocessing.connection.Connection(44977608)
- try:
- self.assertRaises((ValueError, IOError), conn.poll)
- finally:
- # Hack private attribute _handle to avoid printing an error
- # in conn.__del__
- conn._handle = None
- self.assertRaises((ValueError, IOError),
- multiprocessing.connection.Connection, -1)
#
# Functions used to create test cases from the base ones in this module
@@ -2181,16 +2171,15 @@
d[name] = obj
return d
-def create_test_cases(Mixin, type):
+def create_test_cases(Mixin):
result = {}
glob = globals()
- Type = type.capitalize()
for name in list(glob.keys()):
if name.startswith('_Test'):
base = glob[name]
- if type in base.ALLOWED_TYPES:
- newname = 'With' + Type + name[1:]
+ if Mixin.TYPE in base.ALLOWED_TYPES:
+ newname = name[1:]
class Temp(base, unittest.TestCase, Mixin):
pass
result[newname] = Temp
@@ -2199,184 +2188,10 @@
return result
#
-# Create test cases
#
-
-class ProcessesMixin(object):
- TYPE = 'processes'
- Process = multiprocessing.Process
- locals().update(get_attributes(multiprocessing, (
- 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
- 'Condition', 'Event', 'Value', 'Array', 'RawValue',
- 'RawArray', 'current_process', 'active_children', 'Pipe',
- 'connection', 'JoinableQueue'
- )))
-
-testcases_processes = create_test_cases(ProcessesMixin, type='processes')
-globals().update(testcases_processes)
-
-
-class ManagerMixin(object):
- TYPE = 'manager'
- Process = multiprocessing.Process
- manager = object.__new__(multiprocessing.managers.SyncManager)
- locals().update(get_attributes(manager, (
- 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
- 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
- 'Namespace', 'JoinableQueue'
- )))
-
-testcases_manager = create_test_cases(ManagerMixin, type='manager')
-globals().update(testcases_manager)
-
-
-class ThreadsMixin(object):
- TYPE = 'threads'
- Process = multiprocessing.dummy.Process
- locals().update(get_attributes(multiprocessing.dummy, (
- 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
- 'Condition', 'Event', 'Value', 'Array', 'current_process',
- 'active_children', 'Pipe', 'connection', 'dict', 'list',
- 'Namespace', 'JoinableQueue'
- )))
-
-testcases_threads = create_test_cases(ThreadsMixin, type='threads')
-globals().update(testcases_threads)
-
-class OtherTest(unittest.TestCase):
- # TODO: add more tests for deliver/answer challenge.
- def test_deliver_challenge_auth_failure(self):
- class _FakeConnection(object):
- def recv_bytes(self, size):
- return b'something bogus'
- def send_bytes(self, data):
- pass
- self.assertRaises(multiprocessing.AuthenticationError,
- multiprocessing.connection.deliver_challenge,
- _FakeConnection(), b'abc')
-
- def test_answer_challenge_auth_failure(self):
- class _FakeConnection(object):
- def __init__(self):
- self.count = 0
- def recv_bytes(self, size):
- self.count += 1
- if self.count == 1:
- return multiprocessing.connection.CHALLENGE
- elif self.count == 2:
- return b'something bogus'
- return b''
- def send_bytes(self, data):
- pass
- self.assertRaises(multiprocessing.AuthenticationError,
- multiprocessing.connection.answer_challenge,
- _FakeConnection(), b'abc')
-
#
-# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
-#
-
-def initializer(ns):
- ns.test += 1
-
-class TestInitializers(unittest.TestCase):
- def setUp(self):
- self.mgr = multiprocessing.Manager()
- self.ns = self.mgr.Namespace()
- self.ns.test = 0
-
- def tearDown(self):
- self.mgr.shutdown()
-
- def test_manager_initializer(self):
- m = multiprocessing.managers.SyncManager()
- self.assertRaises(TypeError, m.start, 1)
- m.start(initializer, (self.ns,))
- self.assertEqual(self.ns.test, 1)
- m.shutdown()
-
- def test_pool_initializer(self):
- self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
- p = multiprocessing.Pool(1, initializer, (self.ns,))
- p.close()
- p.join()
- self.assertEqual(self.ns.test, 1)
-
-#
-# Issue 5155, 5313, 5331: Test process in processes
-# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
-#
-
-def _ThisSubProcess(q):
- try:
- item = q.get(block=False)
- except pyqueue.Empty:
- pass
-
-def _TestProcess(q):
- queue = multiprocessing.Queue()
- subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
- subProc.daemon = True
- subProc.start()
- subProc.join()
-
-def _afunc(x):
- return x*x
-
-def pool_in_process():
- pool = multiprocessing.Pool(processes=4)
- x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
-
-class _file_like(object):
- def __init__(self, delegate):
- self._delegate = delegate
- self._pid = None
-
- @property
- def cache(self):
- pid = os.getpid()
- # There are no race conditions since fork keeps only the running thread
- if pid != self._pid:
- self._pid = pid
- self._cache = []
- return self._cache
-
- def write(self, data):
- self.cache.append(data)
-
- def flush(self):
- self._delegate.write(''.join(self.cache))
- self._cache = []
-
-class TestStdinBadfiledescriptor(unittest.TestCase):
-
- def test_queue_in_process(self):
- queue = multiprocessing.Queue()
- proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
- proc.start()
- proc.join()
-
- def test_pool_in_process(self):
- p = multiprocessing.Process(target=pool_in_process)
- p.start()
- p.join()
-
- def test_flushing(self):
- sio = io.StringIO()
- flike = _file_like(sio)
- flike.write('foo')
- proc = multiprocessing.Process(target=lambda: flike.flush())
- flike.flush()
- assert sio.getvalue() == 'foo'
-
-testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
- TestStdinBadfiledescriptor]
-
-#
-#
-#
-
-def test_main(run=None):
+
+def _main(prepare, cleanup, name):
if sys.platform.startswith("linux"):
try:
lock = multiprocessing.RLock()
@@ -2384,40 +2199,10 @@
raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
check_enough_semaphores()
-
- if run is None:
- from test.support import run_unittest as run
-
util.get_temp_dir() # creates temp directory for use by all processes
-
multiprocessing.get_logger().setLevel(LOG_LEVEL)
-
- ProcessesMixin.pool = multiprocessing.Pool(4)
- ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
- ManagerMixin.manager.__init__()
- ManagerMixin.manager.start()
- ManagerMixin.pool = ManagerMixin.manager.Pool(4)
-
- testcases = (
- sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
- sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
- sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
- testcases_other
- )
-
- loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
- suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
- run(suite)
-
- ThreadsMixin.pool.terminate()
- ProcessesMixin.pool.terminate()
- ManagerMixin.pool.terminate()
- ManagerMixin.manager.shutdown()
-
- del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
-
-def main():
- test_main(unittest.TextTestRunner(verbosity=2).run)
-
-if __name__ == '__main__':
- main()
+ old_state = prepare()
+ try:
+ test.support.run_unittest(name)
+ finally:
+ cleanup(old_state)
diff --git a/Lib/test/test_multiprocessing_fork.py b/Lib/test/test_multiprocessing_fork.py
new file mode 100644
--- /dev/null
+++ b/Lib/test/test_multiprocessing_fork.py
@@ -0,0 +1,39 @@
+#!/usr/bin/env python3
+
+# Test of multiprocessing using forked processes
+
+import sys
+import multiprocessing
+from test import mp_common
+
+class Mixin(object):
+ TYPE = 'processes'
+ Process = multiprocessing.Process
+ locals().update(mp_common.get_attributes(multiprocessing, (
+ 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
+ 'Condition', 'Event', 'Value', 'Array', 'RawValue',
+ 'RawArray', 'current_process', 'active_children', 'Pipe',
+ 'connection', 'JoinableQueue'
+ )))
+
+testcases_dict = mp_common.create_test_cases(Mixin)
+globals().update(testcases_dict)
+
+def prepare():
+ old_is_forking = multiprocessing.forking_is_enabled()
+ multiprocessing.forking_enable(True)
+ Mixin.pool = multiprocessing.Pool(4)
+ return old_is_forking
+
+def cleanup(old_state):
+ Mixin.pool.terminate()
+ del Mixin.pool
+ multiprocessing.forking_enable(old_state)
+
+def test_main():
+ if sys.platform == "win32":
+ raise unittest.SkipTest("Windows does not have fork")
+ mp_common._main(prepare, cleanup, __name__)
+
+if __name__ == '__main__':
+ test_main()
diff --git a/Lib/test/test_multiprocessing_manager_fork.py b/Lib/test/test_multiprocessing_manager_fork.py
new file mode 100644
--- /dev/null
+++ b/Lib/test/test_multiprocessing_manager_fork.py
@@ -0,0 +1,42 @@
+#!/usr/bin/env python3
+
+# Test of multiprocessing using forked processes and a manager process
+
+import sys
+import multiprocessing
+from test import mp_common
+
+class Mixin(object):
+ TYPE = 'manager'
+ Process = multiprocessing.Process
+ manager = object.__new__(multiprocessing.managers.SyncManager)
+ locals().update(mp_common.get_attributes(manager, (
+ 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
+ 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
+ 'Namespace', 'JoinableQueue'
+ )))
+
+testcases_dict = mp_common.create_test_cases(Mixin)
+globals().update(testcases_dict)
+
+def prepare():
+ old_is_forking = multiprocessing.forking_is_enabled()
+ multiprocessing.forking_enable(True)
+ Mixin.manager.__init__()
+ Mixin.manager.start()
+ Mixin.pool = Mixin.manager.Pool(4)
+ return old_is_forking
+
+def cleanup(old_state):
+ Mixin.pool.terminate()
+ Mixin.manager.shutdown()
+ del Mixin.pool
+ multiprocessing.forking_enable(old_state)
+
+def test_main():
+ if sys.platform == "win32":
+ raise unittest.SkipTest("Windows does not have fork")
+ mp_common._main(prepare, cleanup, __name__)
+
+if __name__ == '__main__':
+ test_main()
diff --git a/Lib/test/test_multiprocessing_manager_nofork.py b/Lib/test/test_multiprocessing_manager_nofork.py
new file mode 100644
--- /dev/null
+++ b/Lib/test/test_multiprocessing_manager_nofork.py
@@ -0,0 +1,39 @@
+#!/usr/bin/env python3
+
+# Test of multiprocessing using non-forked processes and a manager process
+
+import multiprocessing
+from test import mp_common
+
+class Mixin(object):
+ TYPE = 'manager'
+ Process = multiprocessing.Process
+ manager = object.__new__(multiprocessing.managers.SyncManager)
+ locals().update(mp_common.get_attributes(manager, (
+ 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
+ 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
+ 'Namespace', 'JoinableQueue'
+ )))
+
+testcases_dict = mp_common.create_test_cases(Mixin)
+globals().update(testcases_dict)
+
+def prepare():
+ old_is_forking = multiprocessing.forking_is_enabled()
+ multiprocessing.forking_enable(False)
+ Mixin.manager.__init__()
+ Mixin.manager.start()
+ Mixin.pool = Mixin.manager.Pool(4)
+ return old_is_forking
+
+def cleanup(old_state):
+ Mixin.pool.terminate()
+ Mixin.manager.shutdown()
+ del Mixin.pool
+ multiprocessing.forking_enable(old_state)
+
+def test_main():
+ mp_common._main(prepare, cleanup, __name__)
+
+if __name__ == '__main__':
+ test_main()
diff --git a/Lib/test/test_multiprocessing_misc.py b/Lib/test/test_multiprocessing_misc.py
new file mode 100644
--- /dev/null
+++ b/Lib/test/test_multiprocessing_misc.py
@@ -0,0 +1,170 @@
+#!/usr/bin/env python3
+
+# Tests for multiprocessing which do not need to be run
+# with multiple configurations
+
+import test.support
+import unittest
+
+# Skip tests if _multiprocessing wasn't built.
+_multiprocessing = test.support.import_module('_multiprocessing')
+# Skip tests if sem_open implementation is broken.
+test.support.import_module('multiprocessing.synchronize')
+
+import sys
+import io
+import os
+import queue as pyqueue
+import multiprocessing
+
+WIN32 = sys.platform == 'win32'
+
+#
+# Test to verify handle verification, see issue 3321
+#
+
+class TestInvalidHandle(unittest.TestCase):
+
+ @unittest.skipIf(WIN32, "skipped on Windows")
+ def test_invalid_handles(self):
+ conn = multiprocessing.connection.Connection(44977608)
+ try:
+ self.assertRaises((ValueError, IOError), conn.poll)
+ finally:
+ # Hack private attribute _handle to avoid printing an error
+ # in conn.__del__
+ conn._handle = None
+ self.assertRaises((ValueError, IOError),
+ multiprocessing.connection.Connection, -1)
+
+class OtherTest(unittest.TestCase):
+ # TODO: add more tests for deliver/answer challenge.
+ def test_deliver_challenge_auth_failure(self):
+ class _FakeConnection(object):
+ def recv_bytes(self, size):
+ return b'something bogus'
+ def send_bytes(self, data):
+ pass
+ self.assertRaises(multiprocessing.AuthenticationError,
+ multiprocessing.connection.deliver_challenge,
+ _FakeConnection(), b'abc')
+
+ def test_answer_challenge_auth_failure(self):
+ class _FakeConnection(object):
+ def __init__(self):
+ self.count = 0
+ def recv_bytes(self, size):
+ self.count += 1
+ if self.count == 1:
+ return multiprocessing.connection.CHALLENGE
+ elif self.count == 2:
+ return b'something bogus'
+ return b''
+ def send_bytes(self, data):
+ pass
+ self.assertRaises(multiprocessing.AuthenticationError,
+ multiprocessing.connection.answer_challenge,
+ _FakeConnection(), b'abc')
+
+#
+# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
+#
+
+def initializer(ns):
+ ns.test += 1
+
+class TestInitializers(unittest.TestCase):
+ def setUp(self):
+ self.mgr = multiprocessing.Manager()
+ self.ns = self.mgr.Namespace()
+ self.ns.test = 0
+
+ def tearDown(self):
+ self.mgr.shutdown()
+
+ def test_manager_initializer(self):
+ m = multiprocessing.managers.SyncManager()
+ self.assertRaises(TypeError, m.start, 1)
+ m.start(initializer, (self.ns,))
+ self.assertEqual(self.ns.test, 1)
+ m.shutdown()
+
+ def test_pool_initializer(self):
+ self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
+ p = multiprocessing.Pool(1, initializer, (self.ns,))
+ p.close()
+ p.join()
+ self.assertEqual(self.ns.test, 1)
+
+#
+# Issue 5155, 5313, 5331: Test process in processes
+# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
+#
+
+def _ThisSubProcess(q):
+ try:
+ item = q.get(block=False)
+ except pyqueue.Empty:
+ pass
+
+def _TestProcess(q):
+ queue = multiprocessing.Queue()
+ subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
+ subProc.daemon = True
+ subProc.start()
+ subProc.join()
+
+def _afunc(x):
+ return x*x
+
+def pool_in_process():
+ pool = multiprocessing.Pool(processes=4)
+ x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
+
+class _file_like(object):
+ def __init__(self, delegate):
+ self._delegate = delegate
+ self._pid = None
+
+ @property
+ def cache(self):
+ pid = os.getpid()
+ # There are no race conditions since fork keeps only the running thread
+ if pid != self._pid:
+ self._pid = pid
+ self._cache = []
+ return self._cache
+
+ def write(self, data):
+ self.cache.append(data)
+
+ def flush(self):
+ self._delegate.write(''.join(self.cache))
+ self._cache = []
+
+class TestStdinBadfiledescriptor(unittest.TestCase):
+
+ def test_queue_in_process(self):
+ queue = multiprocessing.Queue()
+ proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
+ proc.start()
+ proc.join()
+
+ def test_pool_in_process(self):
+ p = multiprocessing.Process(target=pool_in_process)
+ p.start()
+ p.join()
+
+ def test_flushing(self):
+ sio = io.StringIO()
+ flike = _file_like(sio)
+ flike.write('foo')
+ proc = multiprocessing.Process(target=lambda: flike.flush())
+ flike.flush()
+ assert sio.getvalue() == 'foo'
+
+def test_main():
+ test.support.run_unittest(__name__)
+
+if __name__ == '__main__':
+ test_main()
diff --git a/Lib/test/test_multiprocessing_nofork.py b/Lib/test/test_multiprocessing_nofork.py
new file mode 100644
--- /dev/null
+++ b/Lib/test/test_multiprocessing_nofork.py
@@ -0,0 +1,36 @@
+#!/usr/bin/env python3
+
+# Test of multiprocessing using non-forked processes
+
+import multiprocessing
+from test import mp_common
+
+class Mixin(object):
+ TYPE = 'processes'
+ Process = multiprocessing.Process
+ locals().update(mp_common.get_attributes(multiprocessing, (
+ 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
+ 'Condition', 'Event', 'Value', 'Array', 'RawValue',
+ 'RawArray', 'current_process', 'active_children', 'Pipe',
+ 'connection', 'JoinableQueue'
+ )))
+
+testcases_dict = mp_common.create_test_cases(Mixin)
+globals().update(testcases_dict)
+
+def prepare():
+ old_is_forking = multiprocessing.forking_is_enabled()
+ multiprocessing.forking_enable(False)
+ Mixin.pool = multiprocessing.Pool(4)
+ return old_is_forking
+
+def cleanup(old_state):
+ Mixin.pool.terminate()
+ del Mixin.pool
+ multiprocessing.forking_enable(old_state)
+
+def test_main():
+ mp_common._main(prepare, cleanup, __name__)
+
+if __name__ == '__main__':
+ test_main()
diff --git a/Lib/test/test_multiprocessing_threads.py b/Lib/test/test_multiprocessing_threads.py
new file mode 100644
--- /dev/null
+++ b/Lib/test/test_multiprocessing_threads.py
@@ -0,0 +1,33 @@
+#!/usr/bin/env python3
+
+# Test of multiprocessing using threads instead of processes
+
+import multiprocessing.dummy
+from test import mp_common
+
+class Mixin(object):
+ TYPE = 'threads'
+ Process = multiprocessing.dummy.Process
+ locals().update(mp_common.get_attributes(multiprocessing.dummy, (
+ 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
+ 'Condition', 'Event', 'Value', 'Array', 'current_process',
+ 'active_children', 'Pipe', 'connection', 'dict', 'list',
+ 'Namespace', 'JoinableQueue'
+ )))
+
+testcases_dict = mp_common.create_test_cases(Mixin)
+globals().update(testcases_dict)
+
+def prepare():
+ Mixin.pool = multiprocessing.dummy.Pool(4)
+ return None
+
+def cleanup(old_state):
+ Mixin.pool.terminate()
+ del Mixin.pool
+
+def test_main():
+ mp_common._main(prepare, cleanup, __name__)
+
+if __name__ == '__main__':
+ test_main()