1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00
community.general/test/runner/lib/thread.py
Matt Clay 5babe2daea Increase python version coverage for tests. (#24762)
* Improve ansible-test inventory handling.
* Fix python 3 re-raise of exception from thread.
* Fix python 3 encoding for windows-integration.
* Run network tests on multiple python versions.
* Run windows tests on multiple python versions.
* Support Shippable delegation using --tox.
* Skip vyos_command on python 3 tests until fixed.
* Add python 3 filtering to local and tox.
* Fix tests to support back to back runs.
* Temporarily test networking with python 2.7 only.

Running the tests back to back causes intermittent test failures
which need to be addressed before we can test multiple versions
in a single test run.
2017-05-19 01:37:53 +08:00

49 lines
1.5 KiB
Python

"""Python threading tools."""
from __future__ import absolute_import, print_function
import threading
import sys
try:
# noinspection PyPep8Naming
import Queue as queue
except ImportError:
# noinspection PyUnresolvedReferences
import queue # pylint: disable=locally-disabled, import-error
class WrappedThread(threading.Thread):
"""Wrapper around Thread which captures results and exceptions."""
def __init__(self, action):
"""
:type action: () -> any
"""
# noinspection PyOldStyleClasses
super(WrappedThread, self).__init__()
self._result = queue.Queue()
self.action = action
def run(self):
"""
Run action and capture results or exception.
Do not override. Do not call directly. Executed by the start() method.
"""
# noinspection PyBroadException
try:
self._result.put((self.action(), None))
except: # pylint: disable=locally-disabled, bare-except
self._result.put((None, sys.exc_info()))
def wait_for_result(self):
"""
Wait for thread to exit and return the result or raise an exception.
:rtype: any
"""
result, exception = self._result.get()
if exception:
if sys.version_info[0] > 2:
raise exception[1].with_traceback(exception[2])
# noinspection PyRedundantParentheses
exec('raise exception[0], exception[1], exception[2]') # pylint: disable=locally-disabled, exec-used
return result