[OE-core] [PATCH 1/4] oeqa/runner: write testresult to json files

Yeoh Ee Peng ee.peng.yeoh at intel.com
Tue Oct 2 09:22:04 UTC 2018


As part of the solution to replace Testopia to store testresult,
OEQA need to output testresult into json files, where these json
testresult files will be stored in git repository by the future
test-case-management tools.

Both the testresult (eg. PASSED, FAILED, ERROR) and  the test log
(eg. message from unit test assertion) will be created for storing.

Also the library class inside this patch will be reused by the future
test-case-management tools to write json testresult for manual test
case executed.

Signed-off-by: Yeoh Ee Peng <ee.peng.yeoh at intel.com>
---
 meta/lib/oeqa/core/runner.py | 137 +++++++++++++++++++++++++++++++++++++++----
 1 file changed, 126 insertions(+), 11 deletions(-)

diff --git a/meta/lib/oeqa/core/runner.py b/meta/lib/oeqa/core/runner.py
index eeb625b..54efdd0 100644
--- a/meta/lib/oeqa/core/runner.py
+++ b/meta/lib/oeqa/core/runner.py
@@ -6,6 +6,8 @@ import time
 import unittest
 import logging
 import re
+import json
+import pathlib
 
 from unittest import TextTestResult as _TestResult
 from unittest import TextTestRunner as _TestRunner
@@ -44,6 +46,9 @@ class OETestResult(_TestResult):
 
         self.tc = tc
 
+        self.result_types = ['failures', 'errors', 'skipped', 'expectedFailures', 'successes']
+        self.result_desc = ['FAILED', 'ERROR', 'SKIPPED', 'EXPECTEDFAIL', 'PASSED']
+
     def startTest(self, test):
         # May have been set by concurrencytest
         if test.id() not in self.starttime:
@@ -80,7 +85,7 @@ class OETestResult(_TestResult):
             msg += " (skipped=%d)" % skipped
         self.tc.logger.info(msg)
 
-    def _getDetailsNotPassed(self, case, type, desc):
+    def _isTestResultContainTestCaseWithResultTypeProvided(self, case, type):
         found = False
 
         for (scase, msg) in getattr(self, type):
@@ -121,16 +126,12 @@ class OETestResult(_TestResult):
         for case_name in self.tc._registry['cases']:
             case = self.tc._registry['cases'][case_name]
 
-            result_types = ['failures', 'errors', 'skipped', 'expectedFailures', 'successes']
-            result_desc = ['FAILED', 'ERROR', 'SKIPPED', 'EXPECTEDFAIL', 'PASSED']
-
-            fail = False
+            found = False
             desc = None
-            for idx, name in enumerate(result_types):
-                (fail, msg) = self._getDetailsNotPassed(case, result_types[idx],
-                        result_desc[idx])
-                if fail:
-                    desc = result_desc[idx]
+            for idx, name in enumerate(self.result_types):
+                (found, msg) = self._isTestResultContainTestCaseWithResultTypeProvided(case, self.result_types[idx])
+                if found:
+                    desc = self.result_desc[idx]
                     break
 
             oeid = -1
@@ -143,13 +144,43 @@ class OETestResult(_TestResult):
             if case.id() in self.starttime and case.id() in self.endtime:
                 t = " (" + "{0:.2f}".format(self.endtime[case.id()] - self.starttime[case.id()]) + "s)"
 
-            if fail:
+            if found:
                 self.tc.logger.info("RESULTS - %s - Testcase %s: %s%s" % (case.id(),
                     oeid, desc, t))
             else:
                 self.tc.logger.info("RESULTS - %s - Testcase %s: %s%s" % (case.id(),
                     oeid, 'UNKNOWN', t))
 
+    def _get_testcase_result_and_testmessage_dict(self):
+        testcase_result_dict = {}
+        testcase_testmessage_dict = {}
+        for case_name in self.tc._registry['cases']:
+            case = self.tc._registry['cases'][case_name]
+
+            found = False
+            desc = None
+            test_msg = ''
+            for idx, name in enumerate(self.result_types):
+                (found, msg) = self._isTestResultContainTestCaseWithResultTypeProvided(case, self.result_types[idx])
+                if found:
+                    desc = self.result_desc[idx]
+                    test_msg = msg
+                    break
+
+            if found:
+                testcase_result_dict[case.id()] = desc
+                testcase_testmessage_dict[case.id()] = test_msg
+            else:
+                testcase_result_dict[case.id()] = "UNKNOWN"
+        return testcase_result_dict, testcase_testmessage_dict
+
+    def logDetailsInJson(self, file_dir):
+        (testcase_result_dict, testcase_testmessage_dict) = self._get_testcase_result_and_testmessage_dict()
+        if len(testcase_result_dict) > 0 and len(testcase_testmessage_dict) > 0:
+            jsontresulthelper = OEJSONTestResultHelper(testcase_result_dict, testcase_testmessage_dict)
+            jsontresulthelper.write_json_testresult_files(file_dir)
+            jsontresulthelper.write_testcase_log_files(os.path.join(file_dir, 'logs'))
+
 class OEListTestsResult(object):
     def wasSuccessful(self):
         return True
@@ -261,3 +292,87 @@ class OETestRunner(_TestRunner):
             self._list_tests_module(suite)
 
         return OEListTestsResult()
+
+class OEJSONTestResultHelper(object):
+    def __init__(self, testcase_result_dict, testcase_log_dict):
+        self.testcase_result_dict = testcase_result_dict
+        self.testcase_log_dict = testcase_log_dict
+
+    def get_testcase_list(self):
+        return self.testcase_result_dict.keys()
+
+    def get_testsuite_from_testcase(self, testcase):
+        testsuite = testcase[0:testcase.rfind(".")]
+        return testsuite
+
+    def get_testmodule_from_testsuite(self, testsuite):
+        testmodule = testsuite[0:testsuite.find(".")]
+        return testmodule
+
+    def get_testsuite_testcase_dictionary(self):
+        testsuite_testcase_dict = {}
+        for testcase in self.get_testcase_list():
+            testsuite = self.get_testsuite_from_testcase(testcase)
+            if testsuite in testsuite_testcase_dict:
+                testsuite_testcase_dict[testsuite].append(testcase)
+            else:
+                testsuite_testcase_dict[testsuite] = [testcase]
+        return testsuite_testcase_dict
+
+    def get_testmodule_testsuite_dictionary(self, testsuite_testcase_dict):
+        testsuite_list = testsuite_testcase_dict.keys()
+        testmodule_testsuite_dict = {}
+        for testsuite in testsuite_list:
+            testmodule = self.get_testmodule_from_testsuite(testsuite)
+            if testmodule in testmodule_testsuite_dict:
+                testmodule_testsuite_dict[testmodule].append(testsuite)
+            else:
+                testmodule_testsuite_dict[testmodule] = [testsuite]
+        return testmodule_testsuite_dict
+
+    def _get_testcase_result(self, testcase, testcase_status_dict):
+        if testcase in testcase_status_dict:
+            return testcase_status_dict[testcase]
+        return ""
+
+    def _create_testcase_testresult_object(self, testcase_list, testcase_result_dict):
+        testcase_dict = {}
+        for testcase in sorted(testcase_list):
+            result = self._get_testcase_result(testcase, testcase_result_dict)
+            testcase_dict[testcase] = {"testresult": result}
+        return testcase_dict
+
+    def _create_json_testsuite_string(self, testsuite_list, testsuite_testcase_dict, testcase_result_dict):
+        testsuite_object = {'testsuite': {}}
+        testsuite_dict = testsuite_object['testsuite']
+        for testsuite in sorted(testsuite_list):
+            testsuite_dict[testsuite] = {'testcase': {}}
+            testsuite_dict[testsuite]['testcase'] = self._create_testcase_testresult_object(
+                testsuite_testcase_dict[testsuite],
+                testcase_result_dict)
+        return json.dumps(testsuite_object, sort_keys=True, indent=4)
+
+    def write_json_testresult_files(self, write_dir):
+        if not os.path.exists(write_dir):
+            pathlib.Path(write_dir).mkdir(parents=True, exist_ok=True)
+        testsuite_testcase_dict = self.get_testsuite_testcase_dictionary()
+        testmodule_testsuite_dict = self.get_testmodule_testsuite_dictionary(testsuite_testcase_dict)
+        for testmodule in testmodule_testsuite_dict.keys():
+            testsuite_list = testmodule_testsuite_dict[testmodule]
+            json_testsuite = self._create_json_testsuite_string(testsuite_list, testsuite_testcase_dict,
+                                                                self.testcase_result_dict)
+            file_name = '%s.json' % testmodule
+            file_path = os.path.join(write_dir, file_name)
+            with open(file_path, 'w') as the_file:
+                the_file.write(json_testsuite)
+
+    def write_testcase_log_files(self, write_dir):
+        if not os.path.exists(write_dir):
+            pathlib.Path(write_dir).mkdir(parents=True, exist_ok=True)
+        for testcase in self.testcase_log_dict.keys():
+            test_log = self.testcase_log_dict[testcase]
+            if test_log is not None:
+                file_name = '%s.log' % testcase
+                file_path = os.path.join(write_dir, file_name)
+                with open(file_path, 'w') as the_file:
+                    the_file.write(test_log)
-- 
2.7.4



More information about the Openembedded-core mailing list