[OE-core] [PATCH 1/4] oeqa/core/runner: write testresult to json files
Richard Purdie
richard.purdie at linuxfoundation.org
Fri Oct 12 15:00:21 UTC 2018
On Fri, 2018-10-12 at 14:33 +0800, Yeoh Ee Peng wrote:
> As part of the solution to replace Testopia to store testresult,
> OEQA need to output testresult into json files, where these json
> testresult files will be stored in git repository by the future
> test-case-management tools.
>
> Both the testresult (eg. PASSED, FAILED, ERROR) and the test log
> (eg. message from unit test assertion) will be created for storing.
>
> Also the library class inside this patch will be reused by the future
> test-case-management tools to write json testresult for manual test
> case executed.
This code is along the right lines but we need to work on some of the
coding style and I think some of this can be simplified.
> Signed-off-by: Yeoh Ee Peng <ee.peng.yeoh at intel.com>
> ---
> meta/lib/oeqa/core/runner.py | 132 +++++++++++++++++++++++++++++++++++++++----
> 1 file changed, 121 insertions(+), 11 deletions(-)
>
> diff --git a/meta/lib/oeqa/core/runner.py b/meta/lib/oeqa/core/runner.py
> index eeb625b..cc33d9c 100644
> --- a/meta/lib/oeqa/core/runner.py
> +++ b/meta/lib/oeqa/core/runner.py
> @@ -6,6 +6,8 @@ import time
> import unittest
> import logging
> import re
> +import json
> +import pathlib
>
> from unittest import TextTestResult as _TestResult
> from unittest import TextTestRunner as _TestRunner
> @@ -44,6 +46,9 @@ class OETestResult(_TestResult):
>
> self.tc = tc
>
> + self.result_types = ['failures', 'errors', 'skipped', 'expectedFailures', 'successes']
> + self.result_desc = ['FAILED', 'ERROR', 'SKIPPED', 'EXPECTEDFAIL', 'PASSED']
> +
> def startTest(self, test):
> # May have been set by concurrencytest
> if test.id() not in self.starttime:
> @@ -80,7 +85,7 @@ class OETestResult(_TestResult):
> msg += " (skipped=%d)" % skipped
> self.tc.logger.info(msg)
>
> - def _getDetailsNotPassed(self, case, type, desc):
> + def _isTestResultContainTestCaseWithResultTypeProvided(self, case, type):
> found = False
>
> for (scase, msg) in getattr(self, type):
> @@ -121,16 +126,12 @@ class OETestResult(_TestResult):
> for case_name in self.tc._registry['cases']:
> case = self.tc._registry['cases'][case_name]
>
> - result_types = ['failures', 'errors', 'skipped', 'expectedFailures', 'successes']
> - result_desc = ['FAILED', 'ERROR', 'SKIPPED', 'EXPECTEDFAIL', 'PASSED']
> -
> - fail = False
> + found = False
> desc = None
> - for idx, name in enumerate(result_types):
> - (fail, msg) = self._getDetailsNotPassed(case, result_types[idx],
> - result_desc[idx])
> - if fail:
> - desc = result_desc[idx]
> + for idx, name in enumerate(self.result_types):
> + (found, msg) = self._isTestResultContainTestCaseWithResultTypeProvided(case, self.result_types[idx])
> + if found:
> + desc = self.result_desc[idx]
> break
>
> oeid = -1
> @@ -143,13 +144,43 @@ class OETestResult(_TestResult):
> if case.id() in self.starttime and case.id() in self.endtime:
> t = " (" + "{0:.2f}".format(self.endtime[case.id()] - self.starttime[case.id()]) + "s)"
>
> - if fail:
> + if found:
> self.tc.logger.info("RESULTS - %s - Testcase %s: %s%s" % (case.id(),
> oeid, desc, t))
> else:
> self.tc.logger.info("RESULTS - %s - Testcase %s: %s%s" % (case.id(),
> oeid, 'UNKNOWN', t))
I think the above needs to be split into a separate patch where you can
explain you're extracting the common functionality for use in other
functions and cleaning up the variable names.
The function name "_isTestResultContainTestCaseWithResultTypeProvided"
is not good though (and I agree the current one is also suboptimal).
findTestResultDetails() would perhaps be a better name?
> + def _get_testcase_result_and_log_dict(self):
> + testcase_result_dict = {}
> + testcase_log_dict = {}
> + for case_name in self.tc._registry['cases']:
> + case = self.tc._registry['cases'][case_name]
> +
> + found = False
> + desc = None
> + test_log = ''
> + for idx, name in enumerate(self.result_types):
> + (found, msg) = self._isTestResultContainTestCaseWithResultTypeProvided(case, self.result_types[idx])
> + if found:
> + desc = self.result_desc[idx]
> + test_log = msg
> + break
> +
> + if found:
> + testcase_result_dict[case.id()] = desc
> + testcase_log_dict[case.id()] = test_log
> + else:
> + testcase_result_dict[case.id()] = "UNKNOWN"
> + return testcase_result_dict, testcase_log_dict
> +
> + def logDetailsInJson(self, file_dir):
> + (testcase_result_dict, testcase_log_dict) = self._get_testcase_result_and_log_dict()
> + if len(testcase_result_dict) > 0 and len(testcase_log_dict) > 0:
> + tresultjsonhelper = OETestResultJSONHelper()
> + tresultjsonhelper.dump_testresult_files(testcase_result_dict, file_dir)
> + tresultjsonhelper.dump_log_files(testcase_log_dict, os.path.join(file_dir, 'logs'))
The above two functions may as well be combined. I suspect the code can
also be simplified, for example moving the second "if found" into the
first one. I'd also be tempted to combine testcase_result_dict and
testcase_log_dict into one variable called something like "results", we
don't need to state its a dict. For example you could do:
results[case.id()] = (desc, test_log)
Ultimately you look like you then process this data into sorting it by
testsuite below. I'm tempted to suggest you simply do that here to
start with too.
> class OEListTestsResult(object):
> def wasSuccessful(self):
> return True
> @@ -261,3 +292,82 @@ class OETestRunner(_TestRunner):
> self._list_tests_module(suite)
>
> return OEListTestsResult()
> +
> +class OETestResultJSONHelper(object):
> +
> + def get_testsuite_from_testcase(self, testcase):
> + testsuite = testcase[0:testcase.rfind(".")]
> + return testsuite
def get_testsuite(self, testcase):
return testcase[:testcase.rfind(".")]
> +
> + def get_testmodule_from_testsuite(self, testsuite):
> + testmodule = testsuite[0:testsuite.find(".")]
> + return testmodule
> +
> + def get_testsuite_testcase_dictionary(self, testcase_result_dict):
> + testcase_list = testcase_result_dict.keys()
> + testsuite_testcase_dict = {}
> + for testcase in testcase_list:
> + testsuite = self.get_testsuite_from_testcase(testcase)
> + if testsuite in testsuite_testcase_dict:
> + testsuite_testcase_dict[testsuite].append(testcase)
> + else:
> + testsuite_testcase_dict[testsuite] = [testcase]
> + return testsuite_testcase_dict
> +
> + def get_testmodule_testsuite_dictionary(self, testsuite_testcase_dict):
> + testsuite_list = testsuite_testcase_dict.keys()
> + testmodule_testsuite_dict = {}
> + for testsuite in testsuite_list:
> + testmodule = self.get_testmodule_from_testsuite(testsuite)
> + if testmodule in testmodule_testsuite_dict:
> + testmodule_testsuite_dict[testmodule].append(testsuite)
> + else:
> + testmodule_testsuite_dict[testmodule] = [testsuite]
> + return testmodule_testsuite_dict
> +
> + def _get_testcase_result(self, testcase, testcase_status_dict):
> + if testcase in testcase_status_dict:
> + return testcase_status_dict[testcase]
> + return ""
> +
> + def _create_testcase_testresult_object(self, testcase_list, testcase_result_dict):
> + testcase_dict = {}
> + for testcase in sorted(testcase_list):
> + result = self._get_testcase_result(testcase, testcase_result_dict)
> + testcase_dict[testcase] = {"testresult": result}
> + return testcase_dict
> +
> + def _create_json_testsuite_string(self, testsuite_list, testsuite_testcase_dict, testcase_result_dict):
> + testsuite_object = {'testsuite': {}}
> + testsuite_dict = testsuite_object['testsuite']
> + for testsuite in sorted(testsuite_list):
> + testsuite_dict[testsuite] = {'testcase': {}}
> + testsuite_dict[testsuite]['testcase'] = self._create_testcase_testresult_object(
> + testsuite_testcase_dict[testsuite],
> + testcase_result_dict)
> + return json.dumps(testsuite_object, sort_keys=True, indent=4)
This all looks too complicated for what we really need. I think/hope we
can have one output json file and the original function above can
construct the data in a better form so we don't need much of the above.
> + def dump_testresult_files(self, testcase_result_dict, write_dir):
> + if not os.path.exists(write_dir):
> + pathlib.Path(write_dir).mkdir(parents=True, exist_ok=True)
> + testsuite_testcase_dict = self.get_testsuite_testcase_dictionary(testcase_result_dict)
> + testmodule_testsuite_dict = self.get_testmodule_testsuite_dictionary(testsuite_testcase_dict)
> + for testmodule in testmodule_testsuite_dict.keys():
> + testsuite_list = testmodule_testsuite_dict[testmodule]
> + json_testsuite = self._create_json_testsuite_string(testsuite_list, testsuite_testcase_dict,
> + testcase_result_dict)
> + file_name = '%s.json' % testmodule
> + file_path = os.path.join(write_dir, file_name)
> + with open(file_path, 'w') as the_file:
> + the_file.write(json_testsuite)
Do we need to write a file per module? Why wouldn't we simply write all
the results into a single json file?
> + def dump_log_files(self, testcase_log_dict, write_dir):
> + if not os.path.exists(write_dir):
> + pathlib.Path(write_dir).mkdir(parents=True, exist_ok=True)
> + for testcase in testcase_log_dict.keys():
> + test_log = testcase_log_dict[testcase]
> + if test_log is not None:
> + file_name = '%s.log' % testcase
> + file_path = os.path.join(write_dir, file_name)
> + with open(file_path, 'w') as the_file:
> + the_file.write(test_log)
Why wouldn't we put the test result logs alongside the test result data
in the json file?
Cheers,
Richard
More information about the Openembedded-core
mailing list