You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1602 lines
63 KiB
1602 lines
63 KiB
#
|
|
# Copyright (C) 2016 The Android Open Source Project
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
|
|
import logging
|
|
import os
|
|
import re
|
|
import signal
|
|
import sys
|
|
import threading
|
|
import time
|
|
|
|
from vts.proto import VtsReportMessage_pb2 as ReportMsg
|
|
from vts.runners.host import asserts
|
|
from vts.runners.host import const
|
|
from vts.runners.host import errors
|
|
from vts.runners.host import keys
|
|
from vts.runners.host import logger
|
|
from vts.runners.host import records
|
|
from vts.runners.host import signals
|
|
from vts.runners.host import utils
|
|
from vts.utils.python.controllers import adb
|
|
from vts.utils.python.controllers import android_device
|
|
from vts.utils.python.controllers.adb import AdbError
|
|
from vts.utils.python.common import cmd_utils
|
|
from vts.utils.python.common import filter_utils
|
|
from vts.utils.python.common import list_utils
|
|
from vts.utils.python.common import timeout_utils
|
|
from vts.utils.python.coverage import coverage_utils
|
|
from vts.utils.python.coverage import sancov_utils
|
|
from vts.utils.python.instrumentation import test_framework_instrumentation as tfi
|
|
from vts.utils.python.io import file_util
|
|
from vts.utils.python.precondition import precondition_utils
|
|
from vts.utils.python.profiling import profiling_utils
|
|
from vts.utils.python.reporting import log_uploading_utils
|
|
from vts.utils.python.systrace import systrace_utils
|
|
from vts.utils.python.web import feature_utils
|
|
from vts.utils.python.web import web_utils
|
|
|
|
|
|
# Macro strings for test result reporting
|
|
TEST_CASE_TEMPLATE = "[Test Case] %s %s"
|
|
RESULT_LINE_TEMPLATE = TEST_CASE_TEMPLATE + " %s"
|
|
STR_TEST = "test"
|
|
STR_GENERATE = "generate"
|
|
TIMEOUT_SECS_LOG_UPLOADING = 60
|
|
TIMEOUT_SECS_TEARDOWN_CLASS = 120
|
|
_REPORT_MESSAGE_FILE_NAME = "report_proto.msg"
|
|
_BUG_REPORT_FILE_PREFIX = "bugreport_"
|
|
_BUG_REPORT_FILE_EXTENSION = ".zip"
|
|
_DEFAULT_TEST_TIMEOUT_SECS = 60 * 3
|
|
_LOGCAT_FILE_PREFIX = "logcat_"
|
|
_LOGCAT_FILE_EXTENSION = ".txt"
|
|
_ANDROID_DEVICES = '_android_devices'
|
|
_REASON_TO_SKIP_ALL_TESTS = '_reason_to_skip_all_tests'
|
|
_SETUP_RETRY_NUMBER = 5
|
|
# the name of a system property which tells whether to stop properly configured
|
|
# native servers where properly configured means a server's init.rc is
|
|
# configured to stop when that property's value is 1.
|
|
SYSPROP_VTS_NATIVE_SERVER = "vts.native_server.on"
|
|
|
|
LOGCAT_BUFFERS = [
|
|
'radio',
|
|
'events',
|
|
'main',
|
|
'system',
|
|
'crash'
|
|
]
|
|
|
|
|
|
class BaseTestClass(object):
|
|
"""Base class for all test classes to inherit from.
|
|
|
|
This class gets all the controller objects from test_runner and executes
|
|
the test cases requested within itself.
|
|
|
|
Most attributes of this class are set at runtime based on the configuration
|
|
provided.
|
|
|
|
Attributes:
|
|
android_devices: A list of AndroidDevice object, representing android
|
|
devices.
|
|
test_module_name: A string representing the test module name.
|
|
tests: A list of strings, each representing a test case name.
|
|
log: A logger object used for logging.
|
|
results: A records.TestResult object for aggregating test results from
|
|
the execution of test cases.
|
|
_current_record: A records.TestResultRecord object for the test case
|
|
currently being executed. If no test is running, this
|
|
should be None.
|
|
_interrupted: Whether the test execution has been interrupted.
|
|
_interrupt_lock: The threading.Lock object that protects _interrupted.
|
|
_timer: The threading.Timer object that interrupts main thread when
|
|
timeout.
|
|
timeout: A float, the timeout, in seconds, configured for this object.
|
|
include_filer: A list of string, each representing a test case name to
|
|
include.
|
|
exclude_filer: A list of string, each representing a test case name to
|
|
exclude. Has no effect if include_filer is not empty.
|
|
abi_name: String, name of abi in use
|
|
abi_bitness: String, bitness of abi in use
|
|
web: WebFeature, object storing web feature util for test run
|
|
coverage: CoverageFeature, object storing coverage feature util for test run
|
|
sancov: SancovFeature, object storing sancov feature util for test run
|
|
start_time_sec: int, time value in seconds when the module execution starts.
|
|
start_vts_agents: whether to start vts agents when registering new
|
|
android devices.
|
|
profiling: ProfilingFeature, object storing profiling feature util for test run
|
|
_bug_report_on_failure: bool, whether to catch bug report at the end
|
|
of failed test cases. Default is False
|
|
_logcat_on_failure: bool, whether to dump logcat at the end
|
|
of failed test cases. Default is True
|
|
_is_final_run: bool, whether the current test run is the final run during retry.
|
|
test_filter: Filter object to filter test names.
|
|
_test_filter_retry: Filter object for retry filtering.
|
|
max_retry_count: int, max number of retries.
|
|
"""
|
|
_current_record = None
|
|
start_vts_agents = True
|
|
|
|
def __init__(self, configs):
|
|
self.start_time_sec = time.time()
|
|
self.tests = []
|
|
# Set all the controller objects and params.
|
|
for name, value in configs.items():
|
|
setattr(self, name, value)
|
|
self.results = records.TestResult()
|
|
self.log = logger.LoggerProxy()
|
|
|
|
# Timeout
|
|
self._interrupted = False
|
|
self._interrupt_lock = threading.Lock()
|
|
self._timer = None
|
|
|
|
timeout_milli = self.getUserParam(keys.ConfigKeys.KEY_TEST_TIMEOUT, 0)
|
|
self.timeout = timeout_milli / 1000 if timeout_milli > 0 else _DEFAULT_TEST_TIMEOUT_SECS
|
|
|
|
# Setup test filters
|
|
# TODO(yuexima): remove include_filter and exclude_filter from class attributes
|
|
# after confirming all modules no longer have reference to them
|
|
self.include_filter = self.getUserParam(
|
|
[
|
|
keys.ConfigKeys.KEY_TEST_SUITE,
|
|
keys.ConfigKeys.KEY_INCLUDE_FILTER
|
|
],
|
|
default_value=[])
|
|
self.exclude_filter = self.getUserParam(
|
|
[
|
|
keys.ConfigKeys.KEY_TEST_SUITE,
|
|
keys.ConfigKeys.KEY_EXCLUDE_FILTER
|
|
],
|
|
default_value=[])
|
|
|
|
self.test_module_name = self.getUserParam(
|
|
keys.ConfigKeys.KEY_TESTBED_NAME,
|
|
warn_if_not_found=True,
|
|
default_value=self.__class__.__name__)
|
|
|
|
self.updateTestFilter()
|
|
|
|
logging.debug('Test filter: %s' % self.test_filter)
|
|
|
|
# TODO: get abi information differently for multi-device support.
|
|
# Set other optional parameters
|
|
self.abi_name = self.getUserParam(
|
|
keys.ConfigKeys.IKEY_ABI_NAME, default_value=None)
|
|
self.abi_bitness = self.getUserParam(
|
|
keys.ConfigKeys.IKEY_ABI_BITNESS, default_value=None)
|
|
self.skip_on_32bit_abi = self.getUserParam(
|
|
keys.ConfigKeys.IKEY_SKIP_ON_32BIT_ABI, default_value=False)
|
|
self.skip_on_64bit_abi = self.getUserParam(
|
|
keys.ConfigKeys.IKEY_SKIP_ON_64BIT_ABI, default_value=False)
|
|
self.run_32bit_on_64bit_abi = self.getUserParam(
|
|
keys.ConfigKeys.IKEY_RUN_32BIT_ON_64BIT_ABI, default_value=False)
|
|
self.max_retry_count = self.getUserParam(
|
|
keys.ConfigKeys.IKEY_MAX_RETRY_COUNT, default_value=0)
|
|
|
|
self.web = web_utils.WebFeature(self.user_params)
|
|
self.coverage = coverage_utils.CoverageFeature(
|
|
self.user_params, web=self.web)
|
|
self.sancov = sancov_utils.SancovFeature(
|
|
self.user_params, web=self.web)
|
|
self.profiling = profiling_utils.ProfilingFeature(
|
|
self.user_params, web=self.web)
|
|
self.systrace = systrace_utils.SystraceFeature(
|
|
self.user_params, web=self.web)
|
|
self.log_uploading = log_uploading_utils.LogUploadingFeature(
|
|
self.user_params, web=self.web)
|
|
self.collect_tests_only = self.getUserParam(
|
|
keys.ConfigKeys.IKEY_COLLECT_TESTS_ONLY, default_value=False)
|
|
self.run_as_vts_self_test = self.getUserParam(
|
|
keys.ConfigKeys.RUN_AS_VTS_SELFTEST, default_value=False)
|
|
self.run_as_compliance_test = self.getUserParam(
|
|
keys.ConfigKeys.RUN_AS_COMPLIANCE_TEST, default_value=False)
|
|
self._bug_report_on_failure = self.getUserParam(
|
|
keys.ConfigKeys.IKEY_BUG_REPORT_ON_FAILURE, default_value=False)
|
|
self._logcat_on_failure = self.getUserParam(
|
|
keys.ConfigKeys.IKEY_LOGCAT_ON_FAILURE, default_value=True)
|
|
self._test_filter_retry = None
|
|
|
|
@property
|
|
def android_devices(self):
|
|
"""Returns a list of AndroidDevice objects"""
|
|
if not hasattr(self, _ANDROID_DEVICES):
|
|
event = tfi.Begin('base_test registering android_device. '
|
|
'Start agents: %s' % self.start_vts_agents,
|
|
tfi.categories.FRAMEWORK_SETUP)
|
|
setattr(self, _ANDROID_DEVICES,
|
|
self.registerController(android_device,
|
|
start_services=self.start_vts_agents))
|
|
event.End()
|
|
|
|
for device in getattr(self, _ANDROID_DEVICES):
|
|
device.shell_default_nohup = self.getUserParam(
|
|
keys.ConfigKeys.SHELL_DEFAULT_NOHUP, default_value=True)
|
|
return getattr(self, _ANDROID_DEVICES)
|
|
|
|
@android_devices.setter
|
|
def android_devices(self, devices):
|
|
"""Set the list of AndroidDevice objects"""
|
|
setattr(self, _ANDROID_DEVICES, devices)
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, *args):
|
|
self._exec_func(self.cleanUp)
|
|
|
|
def updateTestFilter(self):
|
|
"""Updates test filter using include and exclude filters."""
|
|
self.include_filter = list_utils.ExpandItemDelimiters(
|
|
list_utils.ItemsToStr(self.include_filter), ',')
|
|
self.exclude_filter = list_utils.ExpandItemDelimiters(
|
|
list_utils.ItemsToStr(self.exclude_filter), ',')
|
|
|
|
exclude_over_include = self.getUserParam(
|
|
keys.ConfigKeys.KEY_EXCLUDE_OVER_INCLUDE, default_value=None)
|
|
|
|
self.test_filter = filter_utils.Filter(
|
|
self.include_filter,
|
|
self.exclude_filter,
|
|
enable_regex=True,
|
|
exclude_over_include=exclude_over_include,
|
|
enable_negative_pattern=True,
|
|
enable_module_name_prefix_matching=True,
|
|
module_name=self.test_module_name,
|
|
expand_bitness=True)
|
|
|
|
def unpack_userparams(self, req_param_names=[], opt_param_names=[], **kwargs):
|
|
"""Wrapper for test cases using ACTS runner API."""
|
|
return self.getUserParams(req_param_names, opt_param_names, **kwargs)
|
|
|
|
def getUserParams(self, req_param_names=[], opt_param_names=[], **kwargs):
|
|
"""Unpacks user defined parameters in test config into individual
|
|
variables.
|
|
|
|
Instead of accessing the user param with self.user_params["xxx"], the
|
|
variable can be directly accessed with self.xxx.
|
|
|
|
A missing required param will raise an exception. If an optional param
|
|
is missing, an INFO line will be logged.
|
|
|
|
Args:
|
|
req_param_names: A list of names of the required user params.
|
|
opt_param_names: A list of names of the optional user params.
|
|
**kwargs: Arguments that provide default values.
|
|
e.g. getUserParams(required_list, opt_list, arg_a="hello")
|
|
self.arg_a will be "hello" unless it is specified again in
|
|
required_list or opt_list.
|
|
|
|
Raises:
|
|
BaseTestError is raised if a required user params is missing from
|
|
test config.
|
|
"""
|
|
for k, v in kwargs.items():
|
|
setattr(self, k, v)
|
|
for name in req_param_names:
|
|
if name not in self.user_params:
|
|
raise errors.BaseTestError(("Missing required user param '%s' "
|
|
"in test configuration.") % name)
|
|
setattr(self, name, self.user_params[name])
|
|
for name in opt_param_names:
|
|
if name not in self.user_params:
|
|
logging.debug(("Missing optional user param '%s' in "
|
|
"configuration, continue."), name)
|
|
else:
|
|
setattr(self, name, self.user_params[name])
|
|
|
|
def getUserParam(self,
|
|
param_name,
|
|
error_if_not_found=False,
|
|
warn_if_not_found=False,
|
|
default_value=None,
|
|
to_str=False):
|
|
"""Get the value of a single user parameter.
|
|
|
|
This method returns the value of specified user parameter.
|
|
|
|
Note: unlike getUserParams(), this method will not automatically set
|
|
attribute using the parameter name and value.
|
|
|
|
Args:
|
|
param_name: string or list of string, denoting user parameter names. If provided
|
|
a single string, self.user_params["<param_name>"] will be accessed.
|
|
If provided multiple strings,
|
|
self.user_params["<param_name1>"]["<param_name2>"]["<param_name3>"]...
|
|
will be accessed.
|
|
error_if_not_found: bool, whether to raise error if parameter not
|
|
exists. Default: False
|
|
warn_if_not_found: bool, log a warning message if parameter value
|
|
not found. Default: False
|
|
default_value: object, default value to return if not found.
|
|
If error_if_not_found is true, this parameter has no
|
|
effect. Default: None
|
|
to_str: boolean, whether to convert the result object to string if
|
|
not None.
|
|
Note, strings passing in from java json config are often
|
|
unicode.
|
|
|
|
Returns:
|
|
object, value of the specified parameter name chain if exists;
|
|
<default_value> otherwise.
|
|
"""
|
|
|
|
def ToStr(return_value):
|
|
"""Check to_str option and convert to string if not None"""
|
|
if to_str and return_value is not None:
|
|
return str(return_value)
|
|
return return_value
|
|
|
|
if not param_name:
|
|
if error_if_not_found:
|
|
raise errors.BaseTestError("empty param_name provided")
|
|
logging.error("empty param_name")
|
|
return ToStr(default_value)
|
|
|
|
if not isinstance(param_name, list):
|
|
param_name = [param_name]
|
|
|
|
curr_obj = self.user_params
|
|
for param in param_name:
|
|
if param not in curr_obj:
|
|
msg = ("Missing user param '%s' in test configuration.\n"
|
|
"User params: %s") % (param_name, self.user_params)
|
|
if error_if_not_found:
|
|
raise errors.BaseTestError(msg)
|
|
elif warn_if_not_found:
|
|
logging.warn(msg)
|
|
return ToStr(default_value)
|
|
curr_obj = curr_obj[param]
|
|
|
|
return ToStr(curr_obj)
|
|
|
|
def _getUserConfig(self,
|
|
config_type,
|
|
key,
|
|
default_value=None,
|
|
error_if_not_found=False,
|
|
warn_if_not_found=False,
|
|
to_str=False):
|
|
"""Get the value of a user config given the key.
|
|
|
|
This method returns the value of specified user config type.
|
|
|
|
Args:
|
|
config_type: string, type of user config
|
|
key: string, key of the value string in string config map.
|
|
default_value: object, default value to return if not found.
|
|
If error_if_not_found is true, this parameter has no
|
|
effect. Default: None
|
|
error_if_not_found: bool, whether to raise error if parameter not
|
|
exists. Default: False
|
|
warn_if_not_found: bool, log a warning message if parameter value
|
|
not found. Default: False
|
|
to_str: boolean, whether to apply str() method to result value
|
|
if result is not None.
|
|
Note, strings passing in from java json config are ofen
|
|
unicode.
|
|
|
|
Returns:
|
|
Value in config matching the given key and type if exists;
|
|
<default_value> otherwise.
|
|
"""
|
|
dic = self.getUserParam(config_type,
|
|
error_if_not_found=False,
|
|
warn_if_not_found=False,
|
|
default_value=None,
|
|
to_str=False)
|
|
|
|
if dic is None or key not in dic:
|
|
msg = ("Config key %s not found in user config type %s.\n"
|
|
"User params: %s") % (key, config_type, self.user_params)
|
|
if error_if_not_found:
|
|
raise errors.BaseTestError(msg)
|
|
elif warn_if_not_found:
|
|
logging.warn(msg)
|
|
|
|
return default_value
|
|
|
|
return dic[key] if not to_str else str(dic[key])
|
|
|
|
def getUserConfigStr(self, key, **kwargs):
|
|
"""Get the value of a user config string given the key.
|
|
|
|
See _getUserConfig method for more details.
|
|
"""
|
|
kwargs["to_str"] = True
|
|
return self._getUserConfig(keys.ConfigKeys.IKEY_USER_CONFIG_STR,
|
|
key,
|
|
**kwargs)
|
|
|
|
def getUserConfigInt(self, key, **kwargs):
|
|
"""Get the value of a user config int given the key.
|
|
|
|
See _getUserConfig method for more details.
|
|
"""
|
|
return self._getUserConfig(keys.ConfigKeys.IKEY_USER_CONFIG_INT,
|
|
key,
|
|
**kwargs)
|
|
|
|
def getUserConfigBool(self, key, **kwargs):
|
|
"""Get the value of a user config bool given the key.
|
|
|
|
See _getUserConfig method for more details.
|
|
"""
|
|
return self._getUserConfig(keys.ConfigKeys.IKEY_USER_CONFIG_BOOL,
|
|
key,
|
|
**kwargs)
|
|
|
|
def _setUpClass(self):
|
|
"""Proxy function to guarantee the base implementation of setUpClass
|
|
is called.
|
|
"""
|
|
event = tfi.Begin('_setUpClass method for test class',
|
|
tfi.categories.TEST_CLASS_SETUP)
|
|
timeout = self.timeout - time.time() + self.start_time_sec
|
|
if timeout < 0:
|
|
timeout = 1
|
|
self.resetTimeout(timeout)
|
|
if not precondition_utils.MeetFirstApiLevelPrecondition(self):
|
|
self.skipAllTests("The device's first API level doesn't meet the "
|
|
"precondition.")
|
|
for device in self.android_devices:
|
|
if not precondition_utils.CheckFeaturePrecondition(self, device):
|
|
self.skipAllTests("Precondition feature check fail.")
|
|
|
|
if (self.getUserParam(
|
|
keys.ConfigKeys.IKEY_DISABLE_FRAMEWORK, default_value=False) or
|
|
# @Deprecated Legacy configuration option name.
|
|
self.getUserParam(
|
|
keys.ConfigKeys.IKEY_BINARY_TEST_DISABLE_FRAMEWORK,
|
|
default_value=False)):
|
|
stop_native_server = (
|
|
self.getUserParam(
|
|
keys.ConfigKeys.IKEY_STOP_NATIVE_SERVERS,
|
|
default_value=False) or
|
|
# @Deprecated Legacy configuration option name.
|
|
self.getUserParam(
|
|
keys.ConfigKeys.IKEY_BINARY_TEST_STOP_NATIVE_SERVERS,
|
|
default_value=False))
|
|
# Disable the framework if requested.
|
|
for device in self.android_devices:
|
|
device.stop(stop_native_server)
|
|
else:
|
|
# Enable the framework if requested.
|
|
for device in self.android_devices:
|
|
device.start()
|
|
|
|
# Wait for the native service process to stop.
|
|
native_server_process_names = self.getUserParam(
|
|
keys.ConfigKeys.IKEY_NATIVE_SERVER_PROCESS_NAME,
|
|
default_value=[])
|
|
for device in self.android_devices:
|
|
device.waitForProcessStop(native_server_process_names)
|
|
|
|
|
|
event_sub = tfi.Begin('setUpClass method from test script',
|
|
tfi.categories.TEST_CLASS_SETUP,
|
|
enable_logging=False)
|
|
result = self.setUpClass()
|
|
event_sub.End()
|
|
event.End()
|
|
return result
|
|
|
|
def setUpClass(self):
|
|
"""Setup function that will be called before executing any test case in
|
|
the test class.
|
|
|
|
To signal setup failure, return False or raise an exception. If
|
|
exceptions were raised, the stack trace would appear in log, but the
|
|
exceptions would not propagate to upper levels.
|
|
|
|
Implementation is optional.
|
|
"""
|
|
pass
|
|
|
|
def _tearDownClass(self):
|
|
"""Proxy function to guarantee the base implementation of tearDownClass
|
|
is called.
|
|
"""
|
|
event = tfi.Begin('_tearDownClass method for test class',
|
|
tfi.categories.TEST_CLASS_TEARDOWN)
|
|
self.cancelTimeout()
|
|
|
|
event_sub = tfi.Begin('tearDownClass method from test script',
|
|
tfi.categories.TEST_CLASS_TEARDOWN,
|
|
enable_logging=False)
|
|
|
|
@timeout_utils.timeout(TIMEOUT_SECS_TEARDOWN_CLASS,
|
|
message='tearDownClass method timed out.',
|
|
no_exception=True)
|
|
def _executeTearDownClass(baseTest):
|
|
baseTest._exec_func(baseTest.tearDownClass)
|
|
_executeTearDownClass(self)
|
|
|
|
event_sub.End()
|
|
|
|
if self.web.enabled:
|
|
if self.results.class_errors:
|
|
# Create a result to make the module shown as failure.
|
|
self.web.AddTestReport("setup_class")
|
|
self.web.SetTestResult(ReportMsg.TEST_CASE_RESULT_FAIL)
|
|
|
|
# Attach log destination urls to proto message so the urls will be
|
|
# recorded and uploaded to dashboard. The actual log uploading is postponed
|
|
# after generating report message to prevent failure due to timeout of log uploading.
|
|
self.log_uploading.UploadLogs(dryrun=True)
|
|
|
|
message_b = self.web.GenerateReportMessage(self.results.requested,
|
|
self.results.executed)
|
|
else:
|
|
message_b = ''
|
|
|
|
report_proto_path = os.path.join(logging.log_path,
|
|
_REPORT_MESSAGE_FILE_NAME)
|
|
|
|
if message_b:
|
|
logging.debug('Result proto message path: %s', report_proto_path)
|
|
|
|
with open(report_proto_path, "wb") as f:
|
|
f.write(message_b)
|
|
|
|
if self.log_uploading.enabled:
|
|
@timeout_utils.timeout(TIMEOUT_SECS_LOG_UPLOADING,
|
|
message='_tearDownClass method in base_test timed out.',
|
|
no_exception=True)
|
|
def _executeLogUpload(_log_uploading):
|
|
_log_uploading.UploadLogs()
|
|
|
|
event_upload = tfi.Begin('Log upload',
|
|
tfi.categories.RESULT_PROCESSING)
|
|
_executeLogUpload(self.log_uploading)
|
|
event_upload.End()
|
|
|
|
event.End()
|
|
|
|
def tearDownClass(self):
|
|
"""Teardown function that will be called after all the selected test
|
|
cases in the test class have been executed.
|
|
|
|
Implementation is optional.
|
|
"""
|
|
pass
|
|
|
|
def interrupt(self):
|
|
"""Interrupts test execution and terminates process."""
|
|
with self._interrupt_lock:
|
|
if self._interrupted:
|
|
logging.warning("Cannot interrupt more than once.")
|
|
return
|
|
self._interrupted = True
|
|
logging.info("Test timed out, interrupt")
|
|
utils.stop_current_process(TIMEOUT_SECS_TEARDOWN_CLASS)
|
|
|
|
def cancelTimeout(self):
|
|
"""Cancels main thread timer."""
|
|
if hasattr(signal, "SIGALRM"):
|
|
signal.alarm(0)
|
|
else:
|
|
with self._interrupt_lock:
|
|
if self._interrupted:
|
|
logging.warning("Test execution has been interrupted. "
|
|
"Cannot cancel or reset timeout.")
|
|
return
|
|
|
|
if self._timer:
|
|
self._timer.cancel()
|
|
|
|
def resetTimeout(self, timeout):
|
|
"""Restarts the timer that will interrupt the main thread.
|
|
|
|
This class starts the timer before setUpClass. As the timeout depends
|
|
on number of generated tests, the subclass can restart the timer.
|
|
|
|
Args:
|
|
timeout: A float, wait time in seconds before interrupt.
|
|
"""
|
|
logging.debug("Start timer with timeout=%ssec.", timeout)
|
|
if hasattr(signal, "SIGALRM"):
|
|
signal.signal(signal.SIGALRM, utils._timeout_handler)
|
|
signal.alarm(int(timeout))
|
|
else:
|
|
self.cancelTimeout()
|
|
|
|
self._timer = threading.Timer(timeout, self.interrupt)
|
|
self._timer.daemon = True
|
|
self._timer.start()
|
|
|
|
def _testEntry(self, test_record):
|
|
"""Internal function to be called upon entry of a test case.
|
|
|
|
Args:
|
|
test_record: The TestResultRecord object for the test case going to
|
|
be executed.
|
|
"""
|
|
self._current_record = test_record
|
|
if self.web.enabled:
|
|
self.web.AddTestReport(test_record.test_name)
|
|
|
|
def _setUp(self, test_name):
|
|
"""Proxy function to guarantee the base implementation of setUp is
|
|
called.
|
|
"""
|
|
event = tfi.Begin('_setUp method for test case',
|
|
tfi.categories.TEST_CASE_SETUP,)
|
|
if not self.Heal(passive=True):
|
|
msg = 'Framework self diagnose didn\'t pass for %s. Marking test as fail.' % test_name
|
|
logging.error(msg)
|
|
event.Remove(msg)
|
|
asserts.fail(msg)
|
|
|
|
if self.systrace.enabled:
|
|
self.systrace.StartSystrace()
|
|
|
|
event_sub = tfi.Begin('_setUp method from test script',
|
|
tfi.categories.TEST_CASE_SETUP,
|
|
enable_logging=False)
|
|
result = self.setUp()
|
|
event_sub.End()
|
|
event.End()
|
|
|
|
def setUp(self):
|
|
"""Setup function that will be called every time before executing each
|
|
test case in the test class.
|
|
|
|
To signal setup failure, return False or raise an exception. If
|
|
exceptions were raised, the stack trace would appear in log, but the
|
|
exceptions would not propagate to upper levels.
|
|
|
|
Implementation is optional.
|
|
"""
|
|
|
|
def _testExit(self):
|
|
"""Internal function to be called upon exit of a test."""
|
|
self._current_record = None
|
|
|
|
def _tearDown(self, test_name):
|
|
"""Proxy function to guarantee the base implementation of tearDown
|
|
is called.
|
|
"""
|
|
event = tfi.Begin('_tearDown method from base_test',
|
|
tfi.categories.TEST_CASE_TEARDOWN)
|
|
if self.systrace.enabled:
|
|
self._exec_func(self.systrace.ProcessAndUploadSystrace, test_name)
|
|
event_sub = tfi.Begin('_tearDown method from test script',
|
|
tfi.categories.TEST_CASE_TEARDOWN,
|
|
enable_logging=False)
|
|
self._exec_func(self.tearDown)
|
|
event_sub.End()
|
|
event.End()
|
|
|
|
def tearDown(self):
|
|
"""Teardown function that will be called every time a test case has
|
|
been executed.
|
|
|
|
Implementation is optional.
|
|
"""
|
|
|
|
def _onFail(self):
|
|
"""Proxy function to guarantee the base implementation of onFail is
|
|
called.
|
|
"""
|
|
record = self._current_record
|
|
logging.error(record.details)
|
|
begin_time = logger.epochToLogLineTimestamp(record.begin_time)
|
|
logging.error(RESULT_LINE_TEMPLATE, self.results.progressStr,
|
|
record.test_name, record.result)
|
|
if self.web.enabled:
|
|
self.web.SetTestResult(ReportMsg.TEST_CASE_RESULT_FAIL)
|
|
|
|
event = tfi.Begin('_onFail method from BaseTest',
|
|
tfi.categories.FAILED_TEST_CASE_PROCESSING,
|
|
enable_logging=False)
|
|
self.onFail(record.test_name, begin_time)
|
|
if self._bug_report_on_failure:
|
|
self.DumpBugReport(record.test_name)
|
|
if self._logcat_on_failure:
|
|
self.DumpLogcat(record.test_name)
|
|
event.End()
|
|
|
|
def onFail(self, test_name, begin_time):
|
|
"""A function that is executed upon a test case failure.
|
|
|
|
User implementation is optional.
|
|
|
|
Args:
|
|
test_name: Name of the test that triggered this function.
|
|
begin_time: Logline format timestamp taken when the test started.
|
|
"""
|
|
|
|
def _onPass(self):
|
|
"""Proxy function to guarantee the base implementation of onPass is
|
|
called.
|
|
"""
|
|
record = self._current_record
|
|
begin_time = logger.epochToLogLineTimestamp(record.begin_time)
|
|
msg = record.details
|
|
if msg:
|
|
logging.debug(msg)
|
|
logging.info(RESULT_LINE_TEMPLATE, self.results.progressStr,
|
|
record.test_name, record.result)
|
|
if self.web.enabled:
|
|
self.web.SetTestResult(ReportMsg.TEST_CASE_RESULT_PASS)
|
|
self.onPass(record.test_name, begin_time)
|
|
|
|
def onPass(self, test_name, begin_time):
|
|
"""A function that is executed upon a test case passing.
|
|
|
|
Implementation is optional.
|
|
|
|
Args:
|
|
test_name: Name of the test that triggered this function.
|
|
begin_time: Logline format timestamp taken when the test started.
|
|
"""
|
|
|
|
def _onSkip(self):
|
|
"""Proxy function to guarantee the base implementation of onSkip is
|
|
called.
|
|
"""
|
|
record = self._current_record
|
|
begin_time = logger.epochToLogLineTimestamp(record.begin_time)
|
|
logging.info(RESULT_LINE_TEMPLATE, self.results.progressStr,
|
|
record.test_name, record.result)
|
|
logging.debug("Reason to skip: %s", record.details)
|
|
if self.web.enabled:
|
|
self.web.SetTestResult(ReportMsg.TEST_CASE_RESULT_SKIP)
|
|
self.onSkip(record.test_name, begin_time)
|
|
|
|
def onSkip(self, test_name, begin_time):
|
|
"""A function that is executed upon a test case being skipped.
|
|
|
|
Implementation is optional.
|
|
|
|
Args:
|
|
test_name: Name of the test that triggered this function.
|
|
begin_time: Logline format timestamp taken when the test started.
|
|
"""
|
|
|
|
def _onSilent(self):
|
|
"""Proxy function to guarantee the base implementation of onSilent is
|
|
called.
|
|
"""
|
|
record = self._current_record
|
|
begin_time = logger.epochToLogLineTimestamp(record.begin_time)
|
|
if self.web.enabled:
|
|
self.web.SetTestResult(None)
|
|
self.onSilent(record.test_name, begin_time)
|
|
|
|
def onSilent(self, test_name, begin_time):
|
|
"""A function that is executed upon a test case being marked as silent.
|
|
|
|
Implementation is optional.
|
|
|
|
Args:
|
|
test_name: Name of the test that triggered this function.
|
|
begin_time: Logline format timestamp taken when the test started.
|
|
"""
|
|
|
|
def _onException(self):
|
|
"""Proxy function to guarantee the base implementation of onException
|
|
is called.
|
|
"""
|
|
record = self._current_record
|
|
logging.exception(record.details)
|
|
begin_time = logger.epochToLogLineTimestamp(record.begin_time)
|
|
if self.web.enabled:
|
|
self.web.SetTestResult(ReportMsg.TEST_CASE_RESULT_EXCEPTION)
|
|
|
|
event = tfi.Begin('_onFail method from BaseTest',
|
|
tfi.categories.FAILED_TEST_CASE_PROCESSING,
|
|
enable_logging=False)
|
|
self.onException(record.test_name, begin_time)
|
|
if self._bug_report_on_failure:
|
|
self.DumpBugReport(ecord.test_name)
|
|
if self._logcat_on_failure:
|
|
self.DumpLogcat(record.test_name)
|
|
event.End()
|
|
|
|
def onException(self, test_name, begin_time):
|
|
"""A function that is executed upon an unhandled exception from a test
|
|
case.
|
|
|
|
Implementation is optional.
|
|
|
|
Args:
|
|
test_name: Name of the test that triggered this function.
|
|
begin_time: Logline format timestamp taken when the test started.
|
|
"""
|
|
|
|
def _exec_procedure_func(self, func):
|
|
"""Executes a procedure function like onPass, onFail etc.
|
|
|
|
This function will alternate the 'Result' of the test's record if
|
|
exceptions happened when executing the procedure function.
|
|
|
|
This will let signals.TestAbortAll through so abortAll works in all
|
|
procedure functions.
|
|
|
|
Args:
|
|
func: The procedure function to be executed.
|
|
"""
|
|
record = self._current_record
|
|
|
|
if (func not in (self._onPass, self._onSilent, self._onSkip)
|
|
and not self._is_final_run):
|
|
logging.debug('Skipping test failure procedure function during retry')
|
|
logging.info(RESULT_LINE_TEMPLATE, self.results.progressStr,
|
|
record.test_name, 'RETRY')
|
|
return
|
|
|
|
if record is None:
|
|
logging.error("Cannot execute %s. No record for current test.",
|
|
func.__name__)
|
|
return
|
|
try:
|
|
func()
|
|
except signals.TestAbortAll as e:
|
|
raise signals.TestAbortAll, e, sys.exc_info()[2]
|
|
except Exception as e:
|
|
logging.exception("Exception happened when executing %s for %s.",
|
|
func.__name__, record.test_name)
|
|
record.addError(func.__name__, e)
|
|
|
|
def addTableToResult(self, name, rows):
|
|
"""Adds a table to current test record.
|
|
|
|
A subclass can call this method to add a table to _current_record when
|
|
running test cases.
|
|
|
|
Args:
|
|
name: String, the table name.
|
|
rows: A 2-dimensional list which contains the data.
|
|
"""
|
|
self._current_record.addTable(name, rows)
|
|
|
|
def filterOneTest(self, test_name, test_filter=None):
|
|
"""Check test filters for a test name.
|
|
|
|
The first layer of filter is user defined test filters:
|
|
if a include filter is not empty, only tests in include filter will
|
|
be executed regardless whether they are also in exclude filter. Else
|
|
if include filter is empty, only tests not in exclude filter will be
|
|
executed.
|
|
|
|
The second layer of filter is checking whether skipAllTests method is
|
|
called. If the flag is set, this method raises signals.TestSkip.
|
|
|
|
The third layer of filter is checking abi bitness:
|
|
if a test has a suffix indicating the intended architecture bitness,
|
|
and the current abi bitness information is available, non matching tests
|
|
will be skipped. By our convention, this function will look for bitness in suffix
|
|
formated as "32bit", "32Bit", "32BIT", or 64 bit equivalents.
|
|
|
|
This method assumes const.SUFFIX_32BIT and const.SUFFIX_64BIT are in lower cases.
|
|
|
|
Args:
|
|
test_name: string, name of a test
|
|
test_filter: Filter object, test filter
|
|
|
|
Raises:
|
|
signals.TestSilent if a test should not be executed
|
|
signals.TestSkip if a test should be logged but not be executed
|
|
"""
|
|
if self._test_filter_retry and not self._test_filter_retry.Filter(test_name):
|
|
# TODO: TestSilent will remove test case from record,
|
|
# TestSkip will record test skip with a reason.
|
|
# skip during retry is neither of these, as the test should be skipped,
|
|
# not being recorded and not being deleted.
|
|
# Create a new signal type if callers outside this class wants to distinguish
|
|
# between these skip types.
|
|
raise signals.TestSkip('Skipping completed tests in retry run attempt.')
|
|
|
|
self._filterOneTestThroughTestFilter(test_name, test_filter)
|
|
self._filterOneTestThroughAbiBitness(test_name)
|
|
|
|
def _filterOneTestThroughTestFilter(self, test_name, test_filter=None):
|
|
"""Check test filter for the given test name.
|
|
|
|
Args:
|
|
test_name: string, name of a test
|
|
|
|
Raises:
|
|
signals.TestSilent if a test should not be executed
|
|
signals.TestSkip if a test should be logged but not be executed
|
|
"""
|
|
if not test_filter:
|
|
test_filter = self.test_filter
|
|
|
|
if not test_filter.Filter(test_name):
|
|
raise signals.TestSilent("Test case '%s' did not pass filters.")
|
|
|
|
if self.isSkipAllTests():
|
|
raise signals.TestSkip(self.getSkipAllTestsReason())
|
|
|
|
def _filterOneTestThroughAbiBitness(self, test_name):
|
|
"""Check test filter for the given test name.
|
|
|
|
Args:
|
|
test_name: string, name of a test
|
|
|
|
Raises:
|
|
signals.TestSilent if a test should not be executed
|
|
"""
|
|
asserts.skipIf(
|
|
self.abi_bitness and
|
|
((self.skip_on_32bit_abi is True) and self.abi_bitness == "32") or
|
|
((self.skip_on_64bit_abi is True) and self.abi_bitness == "64") or
|
|
(test_name.lower().endswith(const.SUFFIX_32BIT) and
|
|
self.abi_bitness != "32") or
|
|
(test_name.lower().endswith(const.SUFFIX_64BIT) and
|
|
self.abi_bitness != "64" and not self.run_32bit_on_64bit_abi),
|
|
"Test case '{}' excluded as ABI bitness is {}.".format(
|
|
test_name, self.abi_bitness))
|
|
|
|
def execOneTest(self, test_name, test_func, args, **kwargs):
|
|
"""Executes one test case and update test results.
|
|
|
|
Executes one test case, create a records.TestResultRecord object with
|
|
the execution information, and add the record to the test class's test
|
|
results.
|
|
|
|
Args:
|
|
test_name: Name of the test.
|
|
test_func: The test function.
|
|
args: A tuple of params.
|
|
kwargs: Extra kwargs.
|
|
"""
|
|
if self._test_filter_retry and not self._test_filter_retry.Filter(test_name):
|
|
return
|
|
|
|
is_silenced = False
|
|
tr_record = records.TestResultRecord(test_name, self.test_module_name)
|
|
tr_record.testBegin()
|
|
logging.info(TEST_CASE_TEMPLATE, self.results.progressStr, test_name)
|
|
verdict = None
|
|
finished = False
|
|
try:
|
|
ret = self._testEntry(tr_record)
|
|
asserts.assertTrue(ret is not False,
|
|
"Setup test entry for %s failed." % test_name)
|
|
self.filterOneTest(test_name)
|
|
if self.collect_tests_only:
|
|
asserts.explicitPass("Collect tests only.")
|
|
|
|
try:
|
|
ret = self._setUp(test_name)
|
|
asserts.assertTrue(ret is not False,
|
|
"Setup for %s failed." % test_name)
|
|
|
|
event_test = tfi.Begin("test function",
|
|
tfi.categories.TEST_CASE_EXECUTION)
|
|
if args or kwargs:
|
|
verdict = test_func(*args, **kwargs)
|
|
else:
|
|
verdict = test_func()
|
|
event_test.End()
|
|
finished = True
|
|
finally:
|
|
self._tearDown(test_name)
|
|
except (signals.TestFailure, AssertionError) as e:
|
|
tr_record.testFail(e)
|
|
self._exec_procedure_func(self._onFail)
|
|
finished = True
|
|
except signals.TestSkip as e:
|
|
# Test skipped.
|
|
tr_record.testSkip(e)
|
|
self._exec_procedure_func(self._onSkip)
|
|
finished = True
|
|
except signals.TestAbortClass as e:
|
|
# Abort signals, pass along.
|
|
tr_record.testFail(e)
|
|
self._is_final_run = True
|
|
finished = True
|
|
raise signals.TestAbortClass, e, sys.exc_info()[2]
|
|
except signals.TestAbortAll as e:
|
|
# Abort signals, pass along.
|
|
tr_record.testFail(e)
|
|
self._is_final_run = True
|
|
finished = True
|
|
raise signals.TestAbortAll, e, sys.exc_info()[2]
|
|
except utils.TimeoutError as e:
|
|
logging.exception(e)
|
|
# Mark current test case as failure and abort remaining tests.
|
|
tr_record.testFail(e)
|
|
self._is_final_run = True
|
|
finished = True
|
|
raise signals.TestAbortAll, e, sys.exc_info()[2]
|
|
except KeyboardInterrupt as e:
|
|
logging.error("Received KeyboardInterrupt signal")
|
|
# Abort signals, pass along.
|
|
tr_record.testFail(e)
|
|
self._is_final_run = True
|
|
finished = True
|
|
raise
|
|
except AdbError as e:
|
|
logging.error(e)
|
|
if not self.Heal():
|
|
# Non-recoverable adb failure. Mark test failure and abort
|
|
tr_record.testFail(e)
|
|
self._is_final_run = True
|
|
finished = True
|
|
raise signals.TestAbortAll, e, sys.exc_info()[2]
|
|
# error specific to the test case, mark test failure and continue with remaining test
|
|
tr_record.testFail(e)
|
|
self._exec_procedure_func(self._onFail)
|
|
finished = True
|
|
except signals.TestPass as e:
|
|
# Explicit test pass.
|
|
tr_record.testPass(e)
|
|
self._exec_procedure_func(self._onPass)
|
|
finished = True
|
|
except signals.TestSilent as e:
|
|
# Suppress test reporting.
|
|
is_silenced = True
|
|
self._exec_procedure_func(self._onSilent)
|
|
self.results.removeRecord(tr_record)
|
|
finished = True
|
|
except Exception as e:
|
|
# Exception happened during test.
|
|
logging.exception(e)
|
|
tr_record.testError(e)
|
|
self._exec_procedure_func(self._onException)
|
|
self._exec_procedure_func(self._onFail)
|
|
finished = True
|
|
else:
|
|
# Keep supporting return False for now.
|
|
# TODO(angli): Deprecate return False support.
|
|
if verdict or (verdict is None):
|
|
# Test passed.
|
|
tr_record.testPass()
|
|
self._exec_procedure_func(self._onPass)
|
|
return
|
|
# Test failed because it didn't return True.
|
|
# This should be removed eventually.
|
|
tr_record.testFail()
|
|
self._exec_procedure_func(self._onFail)
|
|
finished = True
|
|
finally:
|
|
if not finished:
|
|
for device in self.android_devices:
|
|
# if shell has not been set up yet
|
|
if device.shell is not None:
|
|
device.shell.DisableShell()
|
|
|
|
logging.error('Test timed out.')
|
|
tr_record.testError()
|
|
self._exec_procedure_func(self._onException)
|
|
self._exec_procedure_func(self._onFail)
|
|
|
|
if not is_silenced:
|
|
self.results.addRecord(tr_record)
|
|
self._testExit()
|
|
|
|
def runGeneratedTests(self,
|
|
test_func,
|
|
settings,
|
|
args=None,
|
|
kwargs=None,
|
|
tag="",
|
|
name_func=None):
|
|
"""Runs generated test cases.
|
|
|
|
Generated test cases are not written down as functions, but as a list
|
|
of parameter sets. This way we reduce code repetition and improve
|
|
test case scalability.
|
|
|
|
Args:
|
|
test_func: The common logic shared by all these generated test
|
|
cases. This function should take at least one argument,
|
|
which is a parameter set.
|
|
settings: A list of strings representing parameter sets. These are
|
|
usually json strings that get loaded in the test_func.
|
|
args: Iterable of additional position args to be passed to
|
|
test_func.
|
|
kwargs: Dict of additional keyword args to be passed to test_func
|
|
tag: Name of this group of generated test cases. Ignored if
|
|
name_func is provided and operates properly.
|
|
name_func: A function that takes a test setting and generates a
|
|
proper test name. The test name should be shorter than
|
|
utils.MAX_FILENAME_LEN. Names over the limit will be
|
|
truncated.
|
|
|
|
Returns:
|
|
A list of settings that did not pass.
|
|
"""
|
|
args = args or ()
|
|
kwargs = kwargs or {}
|
|
failed_settings = []
|
|
|
|
def GenerateTestName(setting):
|
|
test_name = "{} {}".format(tag, setting)
|
|
if name_func:
|
|
try:
|
|
test_name = name_func(setting, *args, **kwargs)
|
|
except:
|
|
logging.exception(("Failed to get test name from "
|
|
"test_func. Fall back to default %s"),
|
|
test_name)
|
|
|
|
if len(test_name) > utils.MAX_FILENAME_LEN:
|
|
test_name = test_name[:utils.MAX_FILENAME_LEN]
|
|
|
|
return test_name
|
|
|
|
for setting in settings:
|
|
test_name = GenerateTestName(setting)
|
|
|
|
tr_record = records.TestResultRecord(test_name, self.test_module_name)
|
|
self.results.requested.append(tr_record)
|
|
|
|
for setting in settings:
|
|
test_name = GenerateTestName(setting)
|
|
previous_success_cnt = len(self.results.passed)
|
|
|
|
event_exec = tfi.Begin('BaseTest execOneTest method for generated tests',
|
|
enable_logging=False)
|
|
self.execOneTest(test_name, test_func, (setting, ) + args, **kwargs)
|
|
event_exec.End()
|
|
if len(self.results.passed) - previous_success_cnt != 1:
|
|
failed_settings.append(setting)
|
|
|
|
return failed_settings
|
|
|
|
def _exec_func(self, func, *args):
|
|
"""Executes a function with exception safeguard.
|
|
|
|
This will let signals.TestAbortAll through so abortAll works in all
|
|
procedure functions.
|
|
|
|
Args:
|
|
func: Function to be executed.
|
|
args: Arguments to be passed to the function.
|
|
|
|
Returns:
|
|
Whatever the function returns, or False if non-caught exception
|
|
occurred.
|
|
"""
|
|
try:
|
|
return func(*args)
|
|
except signals.TestAbortAll as e:
|
|
raise signals.TestAbortAll, e, sys.exc_info()[2]
|
|
except:
|
|
logging.exception("Exception happened when executing %s in %s.",
|
|
func.__name__, self.test_module_name)
|
|
return False
|
|
|
|
def _get_all_test_names(self):
|
|
"""Finds all the function names that match the test case naming
|
|
convention in this class.
|
|
|
|
Returns:
|
|
A list of strings, each is a test case name.
|
|
"""
|
|
test_names = []
|
|
for name in dir(self):
|
|
if name.startswith(STR_TEST) or name.startswith(STR_GENERATE):
|
|
attr_func = getattr(self, name)
|
|
if hasattr(attr_func, "__call__"):
|
|
test_names.append(name)
|
|
return test_names
|
|
|
|
def _get_test_funcs(self, test_names):
|
|
"""Obtain the actual functions of test cases based on test names.
|
|
|
|
Args:
|
|
test_names: A list of strings, each string is a test case name.
|
|
|
|
Returns:
|
|
A list of tuples of (string, function). String is the test case
|
|
name, function is the actual test case function.
|
|
|
|
Raises:
|
|
errors.USERError is raised if the test name does not follow
|
|
naming convention "test_*". This can only be caused by user input
|
|
here.
|
|
"""
|
|
test_funcs = []
|
|
for test_name in test_names:
|
|
if not hasattr(self, test_name):
|
|
logging.warning("%s does not have test case %s.",
|
|
self.test_module_name, test_name)
|
|
elif (test_name.startswith(STR_TEST) or
|
|
test_name.startswith(STR_GENERATE)):
|
|
test_funcs.append((test_name, getattr(self, test_name)))
|
|
else:
|
|
msg = ("Test case name %s does not follow naming convention "
|
|
"test*, abort.") % test_name
|
|
raise errors.USERError(msg)
|
|
|
|
return test_funcs
|
|
|
|
def getTests(self, test_names=None):
|
|
"""Get the test cases within a test class.
|
|
|
|
Args:
|
|
test_names: A list of string that are test case names requested in
|
|
cmd line.
|
|
|
|
Returns:
|
|
A list of tuples of (string, function). String is the test case
|
|
name, function is the actual test case function.
|
|
"""
|
|
if not test_names:
|
|
if self.tests:
|
|
# Specified by run list in class.
|
|
test_names = list(self.tests)
|
|
else:
|
|
# No test case specified by user, execute all in the test class
|
|
test_names = self._get_all_test_names()
|
|
|
|
tests = self._get_test_funcs(test_names)
|
|
return tests
|
|
|
|
def _DiagnoseHost(self):
|
|
"""Runs diagnosis commands on host and logs the results."""
|
|
commands = ['ps aux | grep adb',
|
|
'adb --version',
|
|
'adb devices']
|
|
for cmd in commands:
|
|
results = cmd_utils.ExecuteShellCommand(cmd)
|
|
logging.debug('host diagnosis command %s', cmd)
|
|
logging.debug(' output: %s', results[cmd_utils.STDOUT][0])
|
|
|
|
def _DiagnoseDevice(self, device):
|
|
"""Runs diagnosis commands on device and logs the results."""
|
|
commands = ['ps aux | grep vts',
|
|
'cat /proc/meminfo']
|
|
for cmd in commands:
|
|
results = device.adb.shell(cmd, no_except=True, timeout=adb.DEFAULT_ADB_SHORT_TIMEOUT)
|
|
logging.debug('device diagnosis command %s', cmd)
|
|
logging.debug(' output: %s', results[const.STDOUT])
|
|
|
|
def Heal(self, passive=False, timeout=900):
|
|
"""Performs a self healing.
|
|
|
|
Includes self diagnosis that looks for any framework or device state errors.
|
|
Includes self recovery that attempts to correct discovered errors.
|
|
|
|
Args:
|
|
passive: bool, whether to perform passive only self-check. A passive check means
|
|
only to check known status stored in memory. No command will be issued
|
|
to host or device - which makes the check fast.
|
|
timeout: int, timeout in seconds.
|
|
|
|
Returns:
|
|
bool, True if everything is ok. Fales if some error is not recoverable.
|
|
"""
|
|
start = time.time()
|
|
|
|
if not passive:
|
|
available_devices = android_device.list_adb_devices()
|
|
self._DiagnoseHost()
|
|
|
|
for device in self.android_devices:
|
|
if device.serial not in available_devices:
|
|
device.log.warn(
|
|
"device become unavailable after tests. wait for device come back")
|
|
_timeout = timeout - time.time() + start
|
|
if _timeout < 0 or not device.waitForBootCompletion(timeout=_timeout):
|
|
device.log.error('failed to restore device %s')
|
|
return False
|
|
device.rootAdb()
|
|
device.stopServices()
|
|
device.startServices()
|
|
self._DiagnoseHost()
|
|
else:
|
|
self._DiagnoseDevice(device)
|
|
|
|
return all(map(lambda device: device.Heal(), self.android_devices))
|
|
|
|
def runTestsWithRetry(self, tests):
|
|
"""Run tests with retry and collect test results.
|
|
|
|
Args:
|
|
tests: A list of tests to be run.
|
|
"""
|
|
for count in range(self.max_retry_count + 1):
|
|
if count:
|
|
if not self.Heal():
|
|
logging.error('Self heal failed. '
|
|
'Some error is not recoverable within time constraint.')
|
|
return
|
|
|
|
include_filter = map(lambda item: item.test_name,
|
|
self.results.getNonPassingRecords(skipped=False))
|
|
self._test_filter_retry = filter_utils.Filter(include_filter=include_filter)
|
|
logging.info('Automatically retrying %s test cases. Run attempt %s of %s',
|
|
len(include_filter),
|
|
count + 1,
|
|
self.max_retry_count + 1)
|
|
msg = 'Retrying the following test cases: %s' % include_filter
|
|
logging.debug(msg)
|
|
|
|
path_retry_log = os.path.join(logging.log_path, 'retry_log.txt')
|
|
with open(path_retry_log, 'a+') as f:
|
|
f.write(msg + '\n')
|
|
|
|
self._is_final_run = count == self.max_retry_count
|
|
|
|
try:
|
|
self._runTests(tests)
|
|
except Exception as e:
|
|
if self._is_final_run:
|
|
raise e
|
|
|
|
if self._is_final_run:
|
|
break
|
|
|
|
def _runTests(self, tests):
|
|
"""Run tests and collect test results.
|
|
|
|
Args:
|
|
tests: A list of tests to be run.
|
|
"""
|
|
# Setup for the class with retry.
|
|
for i in xrange(_SETUP_RETRY_NUMBER):
|
|
setup_done = False
|
|
caught_exception = None
|
|
try:
|
|
if self._setUpClass() is False:
|
|
raise signals.TestFailure(
|
|
"Failed to setup %s." % self.test_module_name)
|
|
else:
|
|
setup_done = True
|
|
except Exception as e:
|
|
caught_exception = e
|
|
logging.exception("Failed to setup %s.", self.test_module_name)
|
|
finally:
|
|
if setup_done:
|
|
break
|
|
elif not caught_exception or i + 1 == _SETUP_RETRY_NUMBER:
|
|
self.results.failClass(self.test_module_name,
|
|
caught_exception)
|
|
self._exec_func(self._tearDownClass)
|
|
self._is_final_run = True
|
|
return
|
|
else:
|
|
# restart services before retry setup.
|
|
for device in self.android_devices:
|
|
logging.info("restarting service on device %s", device.serial)
|
|
device.stopServices()
|
|
device.startServices()
|
|
|
|
class_error = None
|
|
# Run tests in order.
|
|
try:
|
|
# Check if module is running in self test mode.
|
|
if self.run_as_vts_self_test:
|
|
logging.debug('setUpClass function was executed successfully.')
|
|
self.results.passClass(self.test_module_name)
|
|
return
|
|
|
|
for test_name, test_func in tests:
|
|
if test_name.startswith(STR_GENERATE):
|
|
logging.debug(
|
|
"Executing generated test trigger function '%s'",
|
|
test_name)
|
|
test_func()
|
|
logging.debug("Finished '%s'", test_name)
|
|
else:
|
|
event_exec = tfi.Begin('BaseTest execOneTest method for individual tests',
|
|
enable_logging=False)
|
|
self.execOneTest(test_name, test_func, None)
|
|
event_exec.End()
|
|
if self.isSkipAllTests() and not self.results.executed:
|
|
self.results.skipClass(
|
|
self.test_module_name,
|
|
"All test cases skipped; unable to find any test case.")
|
|
except signals.TestAbortClass as e:
|
|
logging.error("Received TestAbortClass signal")
|
|
class_error = e
|
|
self._is_final_run = True
|
|
except signals.TestAbortAll as e:
|
|
logging.info("Received TestAbortAll signal")
|
|
class_error = e
|
|
self._is_final_run = True
|
|
# Piggy-back test results on this exception object so we don't lose
|
|
# results from this test class.
|
|
setattr(e, "results", self.results)
|
|
raise signals.TestAbortAll, e, sys.exc_info()[2]
|
|
except KeyboardInterrupt as e:
|
|
class_error = e
|
|
self._is_final_run = True
|
|
# Piggy-back test results on this exception object so we don't lose
|
|
# results from this test class.
|
|
setattr(e, "results", self.results)
|
|
raise
|
|
except Exception as e:
|
|
# Exception happened during test.
|
|
logging.exception(e)
|
|
class_error = e
|
|
raise e
|
|
finally:
|
|
if not self.results.getNonPassingRecords(skipped=False):
|
|
self._is_final_run = True
|
|
|
|
if class_error and self._is_final_run:
|
|
self.results.failClass(self.test_module_name, class_error)
|
|
|
|
self._exec_func(self._tearDownClass)
|
|
|
|
if self._is_final_run:
|
|
if self.web.enabled:
|
|
name, timestamp = self.web.GetTestModuleKeys()
|
|
self.results.setTestModuleKeys(name, timestamp)
|
|
|
|
logging.info("Summary for test class %s: %s",
|
|
self.test_module_name, self.results.summary())
|
|
|
|
def run(self, test_names=None):
|
|
"""Runs test cases within a test class by the order they appear in the
|
|
execution list.
|
|
|
|
One of these test cases lists will be executed, shown here in priority
|
|
order:
|
|
1. The test_names list, which is passed from cmd line. Invalid names
|
|
are guarded by cmd line arg parsing.
|
|
2. The self.tests list defined in test class. Invalid names are
|
|
ignored.
|
|
3. All function that matches test case naming convention in the test
|
|
class.
|
|
|
|
Args:
|
|
test_names: A list of string that are test case names requested in
|
|
cmd line.
|
|
|
|
Returns:
|
|
The test results object of this class.
|
|
"""
|
|
logging.info("==========> %s <==========", self.test_module_name)
|
|
# Devise the actual test cases to run in the test class.
|
|
tests = self.getTests(test_names)
|
|
|
|
if not self.run_as_vts_self_test:
|
|
self.results.requested = [
|
|
records.TestResultRecord(test_name, self.test_module_name)
|
|
for test_name,_ in tests if test_name.startswith(STR_TEST)
|
|
]
|
|
|
|
self.runTestsWithRetry(tests)
|
|
return self.results
|
|
|
|
def cleanUp(self):
|
|
"""A function that is executed upon completion of all tests cases
|
|
selected in the test class.
|
|
|
|
This function should clean up objects initialized in the constructor by
|
|
user.
|
|
"""
|
|
|
|
def DumpBugReport(self, prefix=''):
|
|
"""Get device bugreport through adb command.
|
|
|
|
Args:
|
|
prefix: string, file name prefix. Usually in format of
|
|
<test_module>-<test_case>
|
|
"""
|
|
event = tfi.Begin('dump Bugreport',
|
|
tfi.categories.FAILED_TEST_CASE_PROCESSING)
|
|
if prefix:
|
|
prefix = re.sub('[^\w\-_\. ]', '_', prefix) + '_'
|
|
|
|
parent_dir = os.path.join(logging.log_path, 'bugreport')
|
|
|
|
if not file_util.Mkdir(parent_dir):
|
|
logging.error('Failed to create bugreport output directory %s', parent_dir)
|
|
return
|
|
|
|
for device in self.android_devices:
|
|
if device.fatal_error: continue
|
|
file_name = (_BUG_REPORT_FILE_PREFIX
|
|
+ prefix
|
|
+ '_%s' % device.serial
|
|
+ _BUG_REPORT_FILE_EXTENSION)
|
|
|
|
file_path = os.path.join(parent_dir, file_name)
|
|
|
|
logging.info('Dumping bugreport %s...' % file_path)
|
|
device.adb.bugreport(file_path)
|
|
event.End()
|
|
|
|
def skipAllTestsIf(self, condition, msg):
|
|
"""Skip all test cases if the given condition is true.
|
|
|
|
This method is usually called in setup functions when a precondition
|
|
to the test module is not met.
|
|
|
|
Args:
|
|
condition: object that can be evaluated by bool(), a condition that
|
|
will trigger skipAllTests if evaluated to be True.
|
|
msg: string, reason why tests are skipped. If set to None or empty
|
|
string, a default message will be used (not recommended)
|
|
"""
|
|
if condition:
|
|
self.skipAllTests(msg)
|
|
|
|
def skipAllTests(self, msg):
|
|
"""Skip all test cases.
|
|
|
|
This method is usually called in setup functions when a precondition
|
|
to the test module is not met.
|
|
|
|
Args:
|
|
msg: string, reason why tests are skipped. If set to None or empty
|
|
string, a default message will be used (not recommended)
|
|
"""
|
|
if not msg:
|
|
msg = "No reason provided."
|
|
|
|
setattr(self, _REASON_TO_SKIP_ALL_TESTS, msg)
|
|
|
|
def isSkipAllTests(self):
|
|
"""Returns whether all tests are set to be skipped.
|
|
|
|
Note: If all tests are being skipped not due to skipAllTests
|
|
being called, or there is no tests defined, this method will
|
|
still return False (since skipAllTests is not called.)
|
|
|
|
Returns:
|
|
bool, True if skipAllTests has been called; False otherwise.
|
|
"""
|
|
return self.getSkipAllTestsReason() is not None
|
|
|
|
def getSkipAllTestsReason(self):
|
|
"""Returns the reason why all tests are skipped.
|
|
|
|
Note: If all tests are being skipped not due to skipAllTests
|
|
being called, or there is no tests defined, this method will
|
|
still return None (since skipAllTests is not called.)
|
|
|
|
Returns:
|
|
String, reason why tests are skipped. None if skipAllTests
|
|
is not called.
|
|
"""
|
|
return getattr(self, _REASON_TO_SKIP_ALL_TESTS, None)
|
|
|
|
def DumpLogcat(self, prefix=''):
|
|
"""Dumps device logcat outputs to log directory.
|
|
|
|
Args:
|
|
prefix: string, file name prefix. Usually in format of
|
|
<test_module>-<test_case>
|
|
"""
|
|
event = tfi.Begin('dump logcat',
|
|
tfi.categories.FAILED_TEST_CASE_PROCESSING)
|
|
if prefix:
|
|
prefix = re.sub('[^\w\-_\. ]', '_', prefix) + '_'
|
|
|
|
parent_dir = os.path.join(logging.log_path, 'logcat')
|
|
|
|
if not file_util.Mkdir(parent_dir):
|
|
logging.error('Failed to create bugreport output directory %s', parent_dir)
|
|
return
|
|
|
|
for device in self.android_devices:
|
|
if (not device.isAdbLogcatOn) or device.fatal_error:
|
|
continue
|
|
for buffer in LOGCAT_BUFFERS:
|
|
file_name = (_LOGCAT_FILE_PREFIX
|
|
+ prefix
|
|
+ '_%s_' % buffer
|
|
+ device.serial
|
|
+ _LOGCAT_FILE_EXTENSION)
|
|
|
|
file_path = os.path.join(parent_dir, file_name)
|
|
|
|
logging.info('Dumping logcat %s...' % file_path)
|
|
device.adb.logcat('-b', buffer, '-d', '>', file_path)
|
|
event.End()
|