Apply clang-format for more source files (#795)

Apply clang-format for C source files in folder core/app-mgr,
core/app-framework, and test-tools.
And rename folder component_test to component-test, update
zephyr build document.

Signed-off-by: Wenyong Huang <wenyong.huang@intel.com>
This commit is contained in:
Wenyong Huang
2021-10-21 13:58:34 +08:00
committed by GitHub
parent 225f5d0a64
commit 32242988ed
143 changed files with 5377 additions and 4627 deletions

View File

@ -0,0 +1,11 @@
#
# Copyright (C) 2019 Intel Corporation. All rights reserved.
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
#
__all__ = [
"net_manager", "wifi_daemon_utils"
]
__author__ = ""
__package__ = "model"
__version__ = "1.0"

View File

@ -0,0 +1,29 @@
#
# Copyright (C) 2019 Intel Corporation. All rights reserved.
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
#
import os
import json
from test.test_support import _run_suite
class CTestCaseBase(object):
def __init__(self, suite):
self.m_suite = suite
return
def on_get_case_description(self):
return "Undefined"
def on_setup_case(self):
return True, ''
def on_cleanup_case(self):
return True, ''
# called by the framework
def on_run_case(self):
return True, ''
def get_suite(self):
return self.m_suite

View File

@ -0,0 +1,38 @@
#
# Copyright (C) 2019 Intel Corporation. All rights reserved.
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
#
import datetime
import os
import pprint
import random
import re
import shlex
import subprocess
import signal
import sys
import time
from .test_utils import *
from .test_api import *
def read_cases_from_file(file_path):
if not os.path.exists(file_path):
return False, None
with open(file_path, 'r') as f:
content = f.readlines()
content = [x.strip() for x in content]
print content
if len(content) == 0:
return False, None
return True, content

View File

@ -0,0 +1,287 @@
#
# Copyright (C) 2019 Intel Corporation. All rights reserved.
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
#
import datetime
import os
import pprint
import random
import re
import shlex
import subprocess
import signal
import sys
import time
import shutil
from .test_api import *
import this
'''
The run evironment dir structure:
run/
run{date-time}/
suites/
{suite name}/
-- target/ (the target software being tested)
-- tools/ (the tools for testing the target software)
'''
framework=None
def get_framework():
global framework
return framework
def my_import(name):
mod = __import__(name)
components = name.split('.')
for comp in components[1:]:
mod = getattr(mod, comp)
return mod
# we maintain a root path apart from framework location
# so the suites can be located in anywhere
class CTestFramework(object):
def __init__(self, path):
self.running_case = ''
self.running_suite = ''
self.target_suites = {}
self.target_cases = {}
self.root_path = path
self.running_folder=''
self.report = None
self.sucess_cases = 0
self.failed_cases = 0
self.setup_fails = 0
self.load_fails = 0;
global framework
framework = self
api_set_root_path(path)
print "root_path is " + self.root_path
def gen_execution_stats(self):
return '\nTest Execution Summary: ' \
'\n\tSuccess: {}' \
'\n\tCases fails: {}' \
'\n\tSetup fails: {}' \
'\n\tCase load fails: {}'.format(
self.sucess_cases, self.failed_cases, self.setup_fails, self.load_fails)
def report_result(self, success, message, case_description):
if self.report is None:
return
case_pass = "pass"
if not success:
case_pass = "fail"
self.report.write(case_pass + ": [" + self.running_case + "]\n\treason: " + \
message + "\n\tcase: " + case_description + "\n")
return
def get_running_path(self):
return self.root_path + "/run/" + self.running_folder
def load_suites(self):
self.target_suites = os.listdir(self.root_path + "/suites")
return
def run_case(self, suite_instance, case):
# load the test case module
case_description = ''
suite = suite_instance.m_name
api_log("\n>>start run [" + case + "] >>")
module_name = 'suites.' + suite + ".cases." + case + ".case"
try:
module = my_import(module_name)
except Exception, e:
report_fail("load case fail: " + str(e))
api_log_error("load case fail: " + str(e))
self.load_fails = self.load_fails +1
print(traceback.format_exc())
return False
try:
case = module.CTestCase(suite_instance)
except Exception, e:
report_fail("initialize case fail: " + str(e))
api_log_error("initialize case fail: " + str(e))
self.load_fails = self.load_fails +1
return False
# call the case on setup callback
try:
case_description = case.on_get_case_description()
result, message = case.on_setup_case()
except Exception, e:
result = False
message = str(e);
if not result:
api_log_error(message)
report_fail (message, case_description)
self.failed_cases = self.failed_cases+1
return False
# call the case execution callaback
try:
result, message = case.on_run_case()
except Exception, e:
result = False
message = str(e);
if not result:
report_fail (message, case_description)
api_log_error(message)
self.failed_cases = self.failed_cases+1
else:
report_success(case_description)
self.sucess_cases = self.sucess_cases +1
# call the case cleanup callback
try:
clean_result, message = case.on_cleanup_case()
except Exception, e:
clean_result = False
message = str(e)
if not clean_result:
api_log(message)
return result
def run_suite(self, suite, cases):
# suite setup
message = ''
api_log("\n>>> Suite [" + suite + "] starting >>>")
running_folder = self.get_running_path()+ "/suites/" + suite;
module_name = 'suites.' + suite + ".suite_setup"
try:
module = my_import(module_name)
except Exception, e:
report_fail("load suite [" + suite +"] fail: " + str(e))
self.load_fails = self.load_fails +1
return False
try:
suite_instance = module.CTestSuite(suite, \
self.root_path + '/suites/' + suite, running_folder)
except Exception, e:
report_fail("initialize suite fail: " + str(e))
self.load_fails = self.load_fails +1
return False
result, message = suite_instance.load_settings()
if not result:
report_fail("load settings fail: " + str(e))
self.load_fails = self.load_fails +1
return False
try:
result, message = suite_instance.on_suite_setup()
except Exception, e:
result = False
message = str(e);
if not result:
api_log_error(message)
report_fail (message)
self.setup_fails = self.setup_fails + 1
return False
self.running_suite = suite
cases.sort()
# run cases
for case in cases:
if not os.path.isdir(self.root_path + '/suites/' + suite + '/cases/' + case):
continue
self.running_case = case
self.run_case(suite_instance, case)
self.running_case = ''
# suites cleanup
self.running_suite = ''
try:
result, message = suite_instance.on_suite_cleanup()
except Exception, e:
result = False
message = str(e);
if not result:
api_log_error(message)
report_fail (message)
self.setup_fails = self.setup_fails + 1
return
def start_run(self):
if self.target_suites is None:
print "\n\nstart run: no target suites, exit.."
return
cur_time = time.localtime()
time_prefix = "{:02}-{:02}-{:02}-{:02}".format(
cur_time.tm_mon, cur_time.tm_mday, cur_time.tm_hour, cur_time.tm_min)
debug = api_get_value('debug', False)
if debug:
self.running_folder = 'debug'
else:
self.running_folder = 'run-' + time_prefix
folder = self.root_path + "/run/" +self.running_folder;
if os.path.exists(folder):
shutil.rmtree(folder, ignore_errors=True)
if not os.path.exists(folder):
os.makedirs(folder )
os.makedirs(folder + "/suites")
api_init_log(folder + "/test.log")
self.report = open(folder + "/report.txt", 'a')
self.target_suites.sort()
for suite in self.target_suites:
if not os.path.isdir(self.root_path + '/suites/' + suite):
continue
self.report.write("suite " + suite + " cases:\n")
if self.target_cases is None:
cases = os.listdir(self.root_path + "/suites/" + suite + "/cases")
self.run_suite(suite, cases)
else:
self.run_suite(suite, self.target_cases)
self.report.write("\n")
self.report.write("\n\n")
summary = self.gen_execution_stats()
self.report.write(summary);
self.report.flush()
self.report.close()
print summary
def report_fail(message, case_description=''):
global framework
if framework is not None:
framework.report_result(False, message, case_description)
api_log_error(message)
return
def report_success(case_description=''):
global framework
if framework is not None:
framework.report_result(True , "OK", case_description)
return

View File

@ -0,0 +1,40 @@
#
# Copyright (C) 2019 Intel Corporation. All rights reserved.
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
#
import os
import json
class CTestSuiteBase(object):
def __init__(self, name, suite_path, run_path):
self.suite_path=suite_path
self.run_path=run_path
self.m_name = name
self.settings = {}
def get_settings_item(self, item):
if item in self.settings:
return self.settings[item]
else:
return None
def load_settings(self):
path = self.suite_path + "/settings.cfg"
if os.path.isfile(path):
try:
fp = open(path, 'r')
self.settings = json.load(fp)
fp.close()
except Exception, e:
return False, 'Load settings fail: ' + e.message
return True, 'OK'
else:
return True, 'No file'
def on_suite_setup(self):
return True, 'OK'
def on_suite_cleanup(self):
return True, 'OK'

View File

@ -0,0 +1,98 @@
#
# Copyright (C) 2019 Intel Corporation. All rights reserved.
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
#
import logging
import threading
from .test_utils import *
global logger
logger = None
def api_init_log(log_path):
global logger
print "api_init_log: " + log_path
logger = logging.getLogger(__name__)
logger.setLevel(level = logging.INFO)
handler = logging.FileHandler(log_path)
handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
console = logging.StreamHandler()
console.setLevel(logging.INFO)
logger.addHandler(handler)
logger.addHandler(console)
return
def api_log(message):
global logger
if logger is None:
print message
else:
logger.info (message)
return
def api_log_error(message):
global logger
if logger is None:
print message
else:
logger.error (message)
return
def api_logv(message):
global logger
if logger is None:
print message
else:
logger.info(message)
return
#####################################3
global g_case_runner_event
def api_wait_case_event(timeout):
global g_case_runner_event
g_case_runner_event.clear()
g_case_runner_event.wait(timeout)
def api_notify_case_runner():
global g_case_runner_event
g_case_runner_event.set()
def api_create_case_event():
global g_case_runner_event
g_case_runner_event = threading.Event()
#######################################
def api_init_globals():
global _global_dict
_global_dict = {}
def api_set_value(name, value):
_global_dict[name] = value
def api_get_value(name, defValue=None):
try:
return _global_dict[name]
except KeyError:
return defValue
#########################################
global root_path
def api_set_root_path(root):
global root_path
root_path = root
def api_get_root_path():
global root_path
return root_path;

View File

@ -0,0 +1,70 @@
#
# Copyright (C) 2019 Intel Corporation. All rights reserved.
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
#
import datetime
import os
import random
import re
import shlex
import subprocess
import sys
import time
import shutil
from subprocess import check_output, CalledProcessError
def t_getPIDs(process):
try:
pidlist = map(int, check_output(["pidof", process]).split())
except CalledProcessError:
pidlist = []
#print process + ':list of PIDs = ' + ', '.join(str(e) for e in pidlist)
return pidlist
def t_kill_process_by_name(p_keywords):
pid_list = []
ps_info = subprocess.check_output(shlex.split("ps aux")).split("\n")
for p in ps_info:
if p_keywords in p:
tmp = p.split(" ")
tmp = [x for x in tmp if len(x) > 0]
pid_list.append(tmp[1])
for pid in pid_list:
cmd = "kill -9 {}".format(pid)
subprocess.call(shlex.split(cmd))
return pid_list
#proc -> name of the process
#kill = 1 -> search for pid for kill
#kill = 0 -> search for name (default)
def t_process_exists(proc, kill = 0):
ret = False
processes = t_getPIDs(proc)
for pid in processes:
if kill == 0:
return True
else:
print "kill [" + proc + "], pid=" + str(pid)
os.kill((pid), 9)
ret = True
return ret
def t_copy_files(source_dir, pattern, dest_dir):
files = os.listdir(source_dir)
for file in files:
if file is '/' or file is '.' or file is '..':
continue
if pattern == '*' or pattern is '' or files.endswith(pattern):
shutil.copy(source_dir+"/"+ file,dest_dir)