Skip to content

Implement a new --failing-and-slow-first command line argument to test runner. #24624

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,6 @@ coverage.xml
# ...except the templates.
!/tools/run_python.ps1
!/tools/run_python_compiler.ps1

# Test runner previous run results for sorting the next run
__previous_test_run_results.json
71 changes: 65 additions & 6 deletions test/parallel_testsuite.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# University of Illinois/NCSA Open Source License. Both these licenses can be
# found in the LICENSE file.

import json
import multiprocessing
import os
import sys
Expand All @@ -19,7 +20,12 @@
seen_class = set()


def run_test(test):
def run_test(test, failfast_event):
# If failfast mode is in effect and any of the tests have failed,
# and then we should abort executing further tests immediately.
if failfast_event is not None and failfast_event.is_set():
return None

olddir = os.getcwd()
result = BufferedParallelTestResult()
temp_dir = tempfile.mkdtemp(prefix='emtest_')
Expand All @@ -29,10 +35,16 @@ def run_test(test):
seen_class.add(test.__class__)
test.__class__.setUpClass()
test(result)

# Alert all other multiprocess pool runners that they need to stop executing further tests.
if failfast_event is not None and result.test_result not in ['success', 'skipped']:
failfast_event.set()
except unittest.SkipTest as e:
result.addSkip(test, e)
except Exception as e:
result.addError(test, e)
if failfast_event is not None:
failfast_event.set()
# Before attempting to delete the tmp dir make sure the current
# working directory is not within it.
os.chdir(olddir)
Expand All @@ -46,9 +58,11 @@ class ParallelTestSuite(unittest.BaseTestSuite):
Creates worker threads, manages the task queue, and combines the results.
"""

def __init__(self, max_cores):
def __init__(self, max_cores, options):
super().__init__()
self.max_cores = max_cores
self.failfast = options.failfast
self.failing_and_slow_first = options.failing_and_slow_first

def addTest(self, test):
super().addTest(test)
Expand All @@ -61,12 +75,48 @@ def run(self, result):
# inherited by the child process, but can lead to hard-to-debug windows-only
# issues.
# multiprocessing.set_start_method('spawn')
tests = list(self.reversed_tests())

# If we are running with --failing-and-slow-first, then the test list has been
# pre-sorted based on previous test run results. Otherwise run the tests in
# reverse alphabetical order.
tests = list(self if self.failing_and_slow_first else self.reversed_tests())
use_cores = cap_max_workers_in_pool(min(self.max_cores, len(tests), num_cores()))
print('Using %s parallel test processes' % use_cores)
pool = multiprocessing.Pool(use_cores)
results = [pool.apply_async(run_test, (t,)) for t in tests]
results = [r.get() for r in results]
with multiprocessing.Manager() as manager:
pool = multiprocessing.Pool(use_cores)
failfast_event = manager.Event() if self.failfast else None
results = [pool.apply_async(run_test, (t, failfast_event)) for t in tests]
results = [r.get() for r in results]
results = [r for r in results if r is not None]

try:
previous_test_run_results = json.load(open('__previous_test_run_results.json'))
except FileNotFoundError:
previous_test_run_results = {}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to duplicate this code for handling __previous_test_run_results between here and runner.py?

Perhaps a shared sorting function that they can both call?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is not creating a sorter, but I see the shared block amounts to a

def load_previous_test_run_results():
    try:
      return json.load(open('out/__previous_test_run_results.json'))
    except FileNotFoundError:
      return {}

I could refactor that to e.g. common.py, though that is not a large win necessarily.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess i don't understand quite what is going on here then.. I would have though the new flag --failing-and-slow-first flag would apply equally to the non-parallel and parallel test runner and so it would only need to be handled in a single place (where we decide on the test ordering)... I need to take a deeper look at what is really going on here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merged the code.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new flag only applies to the parallel test runner. If we wanted to extend this to the non-parallel runner, that's fine, though that could be worked on as a later PR as well.


if self.failing_and_slow_first:
for r in results:
# Save a test result record with the specific suite name (e.g. "core0.test_foo")
test_failed = r.test_result not in ['success', 'skipped']
fail_frequency = previous_test_run_results[r.test_name]['fail_frequency'] if r.test_name in previous_test_run_results else int(test_failed)
fail_frequency = (fail_frequency + int(test_failed)) / 2
previous_test_run_results[r.test_name] = {
'result': r.test_result,
'duration': r.test_duration,
'fail_frequency': fail_frequency,
}
# Also save a test result record without suite name (e.g. just "test_foo"). This enables different suite runs to order tests
# for quick --failfast termination, in case a test fails in multiple suites
test_in_any_suite = r.test_name.split(' ')[0]
fail_frequency = previous_test_run_results[test_in_any_suite]['fail_frequency'] if test_in_any_suite in previous_test_run_results else int(test_failed)
fail_frequency = (fail_frequency + int(test_failed)) / 2
previous_test_run_results[test_in_any_suite] = {
'result': r.test_result,
'duration': r.test_duration,
'fail_frequency': fail_frequency,
}

json.dump(previous_test_run_results, open('__previous_test_run_results.json', 'w'), indent=2)
pool.close()
pool.join()
return self.combine_results(result, results)
Expand Down Expand Up @@ -104,6 +154,8 @@ class BufferedParallelTestResult:
def __init__(self):
self.buffered_result = None
self.test_duration = 0
self.test_result = 'errored'
self.test_name = ''

@property
def test(self):
Expand All @@ -122,6 +174,7 @@ def updateResult(self, result):
result.core_time += self.test_duration

def startTest(self, test):
self.test_name = str(test)
self.start_time = time.perf_counter()

def stopTest(self, test):
Expand All @@ -134,28 +187,34 @@ def addSuccess(self, test):
if hasattr(time, 'perf_counter'):
print(test, '... ok (%.2fs)' % (self.calculateElapsed()), file=sys.stderr)
self.buffered_result = BufferedTestSuccess(test)
self.test_result = 'success'

def addExpectedFailure(self, test, err):
if hasattr(time, 'perf_counter'):
print(test, '... expected failure (%.2fs)' % (self.calculateElapsed()), file=sys.stderr)
self.buffered_result = BufferedTestExpectedFailure(test, err)
self.test_result = 'expected failure'

def addUnexpectedSuccess(self, test):
if hasattr(time, 'perf_counter'):
print(test, '... unexpected success (%.2fs)' % (self.calculateElapsed()), file=sys.stderr)
self.buffered_result = BufferedTestUnexpectedSuccess(test)
self.test_result = 'unexpected success'

def addSkip(self, test, reason):
print(test, "... skipped '%s'" % reason, file=sys.stderr)
self.buffered_result = BufferedTestSkip(test, reason)
self.test_result = 'skipped'

def addFailure(self, test, err):
print(test, '... FAIL', file=sys.stderr)
self.buffered_result = BufferedTestFailure(test, err)
self.test_result = 'failed'

def addError(self, test, err):
print(test, '... ERROR', file=sys.stderr)
self.buffered_result = BufferedTestError(test, err)
self.test_result = 'errored'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It this needed? Isn't the existing buffered_result object enough?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Python doesn't have an API to ask the result of a test object, and it would have required some kind of an awkward isinstance() jungle to convert the test to a string, so I opted to writing simple looking code as a more preferable way.



class BufferedTestBase:
Expand Down
90 changes: 81 additions & 9 deletions test/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import atexit
import fnmatch
import glob
import json
import logging
import math
import operator
Expand All @@ -30,6 +31,7 @@
import sys
import unittest
import time
from functools import cmp_to_key

# Setup

Expand Down Expand Up @@ -270,8 +272,75 @@ def error_on_legacy_suite_names(args):
utils.exit_with_error('`%s` test suite has been replaced with `%s`', a, new)


def load_test_suites(args, modules, start_at, repeat):
found_start = not start_at
def create_test_run_sorter(failfast):
try:
previous_test_run_results = json.load(open('__previous_test_run_results.json'))
except FileNotFoundError:
previous_test_run_results = {}

def read_approx_fail_freq(test_name):
if test_name in previous_test_run_results and 'fail_frequency' in previous_test_run_results[test_name]:
# Quantize the float value to relatively fine-grained buckets for sorting
return round(previous_test_run_results[test_name]['fail_frequency'] * 20) / 20
return 0

def sort_tests_failing_and_slowest_first_comparator(x, y):
x = str(x)
y = str(y)

# Look at the number of times this test has failed, and order by failures count first
# Only do this in --failfast, if we are looking to fail early. (otherwise sorting by last test run duration is more productive)
if failfast:
x_fail_freq = read_approx_fail_freq(x)
y_fail_freq = read_approx_fail_freq(y)
if x_fail_freq != y_fail_freq:
return y_fail_freq - x_fail_freq

# Look at the number of times this test has failed overall in any other suite, and order by failures count first
x_fail_freq = read_approx_fail_freq(x.split(' ')[0])
y_fail_freq = read_approx_fail_freq(y.split(' ')[0])
if x_fail_freq != y_fail_freq:
return y_fail_freq - x_fail_freq

if x in previous_test_run_results:
X = previous_test_run_results[x]

# if test Y has not been run even once, run Y before X
if y not in previous_test_run_results:
return 1
Y = previous_test_run_results[y]

# If both X and Y have been run before, order the tests based on what the previous result was (failures first, skips very last)
# N.b. it is important to sandwich all skipped tests between fails and successes. This is to maximize the chances that when
# a failing test is detected, then the other cores will fail-fast as well. (successful tests are run slowest-first to help
# scheduling)
order_by_result = {'errored': 0, 'failed': 1, 'expected failure': 2, 'unexpected success': 3, 'skipped': 4, 'success': 5}
x_result = order_by_result[X['result']]
y_result = order_by_result[Y['result']]
if x_result != y_result:
return x_result - y_result

# Finally, order by test duration from last run
if X['duration'] != Y['duration']:
if X['result'] == 'success':
# If both tests were successful tests, run the slower test first to improve parallelism
return Y['duration'] - X['duration']
else:
# If both tests were failing tests, run the quicker test first to improve --failfast detection time
return X['duration'] - Y['duration']

# if test X has not been run even once, but Y has, run X before Y
if y in previous_test_run_results:
return -1

# Neither test have been run before, so run them in alphabetical order
return (x > y) - (x < y)

return sort_tests_failing_and_slowest_first_comparator


def load_test_suites(args, modules, options):
found_start = not options.start_at

loader = unittest.TestLoader()
error_on_legacy_suite_names(args)
Expand All @@ -291,20 +360,22 @@ def load_test_suites(args, modules, start_at, repeat):
if names_in_module:
loaded_tests = loader.loadTestsFromNames(sorted(names_in_module), m)
tests = flattened_tests(loaded_tests)
suite = suite_for_module(m, tests)
suite = suite_for_module(m, tests, options)
if options.failing_and_slow_first:
tests = sorted(tests, key=cmp_to_key(create_test_run_sorter(options.failfast)))
for test in tests:
if not found_start:
# Skip over tests until we find the start
if test.id().endswith(start_at):
if test.id().endswith(options.start_at):
found_start = True
else:
continue
for _x in range(repeat):
for _x in range(options.repeat):
total_tests += 1
suite.addTest(test)
suites.append((m.__name__, suite))
if not found_start:
utils.exit_with_error(f'unable to find --start-at test: {start_at}')
utils.exit_with_error(f'unable to find --start-at test: {options.start_at}')
if total_tests == 1 or parallel_testsuite.num_cores() == 1:
# TODO: perhaps leave it at 2 if it was 2 before?
common.EMTEST_SAVE_DIR = 1
Expand All @@ -318,13 +389,13 @@ def flattened_tests(loaded_tests):
return tests


def suite_for_module(module, tests):
def suite_for_module(module, tests, options):
suite_supported = module.__name__ in ('test_core', 'test_other', 'test_posixtest')
if not common.EMTEST_SAVE_DIR and not shared.DEBUG:
has_multiple_tests = len(tests) > 1
has_multiple_cores = parallel_testsuite.num_cores() > 1
if suite_supported and has_multiple_tests and has_multiple_cores:
return parallel_testsuite.ParallelTestSuite(len(tests))
return parallel_testsuite.ParallelTestSuite(len(tests), options)
return unittest.TestSuite()


Expand Down Expand Up @@ -394,6 +465,7 @@ def parse_args():
help='Command to launch web browser in which to run browser tests.')
parser.add_argument('tests', nargs='*')
parser.add_argument('--failfast', action='store_true')
parser.add_argument('--failing-and-slow-first', action='store_true', help='Run failing tests first, then sorted by slowest first. Combine with --failfast for fast fail-early CI runs.')
parser.add_argument('--start-at', metavar='NAME', help='Skip all tests up until <NAME>')
parser.add_argument('--continue', dest='_continue', action='store_true',
help='Resume from the last run test.'
Expand Down Expand Up @@ -488,7 +560,7 @@ def prepend_default(arg):
if os.path.exists(common.LAST_TEST):
options.start_at = utils.read_file(common.LAST_TEST).strip()

suites, unmatched_tests = load_test_suites(tests, modules, options.start_at, options.repeat)
suites, unmatched_tests = load_test_suites(tests, modules, options)
if unmatched_tests:
print('ERROR: could not find the following tests: ' + ' '.join(unmatched_tests))
return 1
Expand Down