You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
595 lines
21 KiB
595 lines
21 KiB
#!/usr/bin/python2
|
|
# pylint: disable=missing-docstring
|
|
|
|
import logging
|
|
import os
|
|
import re
|
|
import shutil
|
|
import sys
|
|
import tempfile
|
|
import unittest
|
|
|
|
from six.moves import range
|
|
import six
|
|
|
|
import common
|
|
from autotest_lib.client.bin import job, sysinfo, harness
|
|
from autotest_lib.client.bin import utils
|
|
from autotest_lib.client.common_lib import error
|
|
from autotest_lib.client.common_lib import logging_manager, logging_config
|
|
from autotest_lib.client.common_lib import base_job_unittest
|
|
from autotest_lib.client.common_lib.test_utils import mock
|
|
|
|
|
|
class job_test_case(unittest.TestCase):
|
|
"""Generic job TestCase class that defines a standard job setUp and
|
|
tearDown, with some standard stubs."""
|
|
|
|
job_class = job.base_client_job
|
|
|
|
def setUp(self):
|
|
self.god = mock.mock_god(ut=self)
|
|
self.god.stub_with(job.base_client_job, '_get_environ_autodir',
|
|
classmethod(lambda cls: '/adir'))
|
|
self.job = self.job_class.__new__(self.job_class)
|
|
self.job._job_directory = base_job_unittest.stub_job_directory
|
|
|
|
_, self.control_file = tempfile.mkstemp()
|
|
|
|
|
|
def tearDown(self):
|
|
self.god.unstub_all()
|
|
os.remove(self.control_file)
|
|
|
|
|
|
class test_find_base_directories(
|
|
base_job_unittest.test_find_base_directories.generic_tests,
|
|
job_test_case):
|
|
|
|
def test_autodir_equals_clientdir(self):
|
|
autodir, clientdir, _ = self.job._find_base_directories()
|
|
self.assertEqual(autodir, '/adir')
|
|
self.assertEqual(clientdir, '/adir')
|
|
|
|
|
|
def test_serverdir_is_none(self):
|
|
_, _, serverdir = self.job._find_base_directories()
|
|
self.assertEqual(serverdir, None)
|
|
|
|
|
|
class abstract_test_init(base_job_unittest.test_init.generic_tests):
|
|
"""Generic client job mixin used when defining variations on the
|
|
job.__init__ generic tests."""
|
|
OPTIONAL_ATTRIBUTES = (
|
|
base_job_unittest.test_init.generic_tests.OPTIONAL_ATTRIBUTES
|
|
- set(['control', 'harness']))
|
|
|
|
|
|
class test_init_minimal_options(abstract_test_init, job_test_case):
|
|
|
|
def call_init(self):
|
|
# TODO(jadmanski): refactor more of the __init__ code to not need to
|
|
# stub out countless random APIs
|
|
self.god.stub_function_to_return(job.os, 'mkdir', None)
|
|
self.god.stub_function_to_return(job.os.path, 'exists', True)
|
|
self.god.stub_function_to_return(self.job, '_load_state', None)
|
|
self.god.stub_function_to_return(self.job, 'record', None)
|
|
self.god.stub_function_to_return(job.shutil, 'copyfile', None)
|
|
self.god.stub_function_to_return(job.logging_manager,
|
|
'configure_logging', None)
|
|
class manager:
|
|
def start_logging(self):
|
|
return None
|
|
self.god.stub_function_to_return(job.logging_manager,
|
|
'get_logging_manager', manager())
|
|
class stub_sysinfo:
|
|
def log_per_reboot_data(self):
|
|
return None
|
|
self.god.stub_function_to_return(job.sysinfo, 'sysinfo',
|
|
stub_sysinfo())
|
|
class stub_harness:
|
|
run_start = lambda self: None
|
|
self.god.stub_function_to_return(job.harness, 'select', stub_harness())
|
|
class options:
|
|
tag = ''
|
|
verbose = False
|
|
cont = False
|
|
harness = 'stub'
|
|
harness_args = None
|
|
hostname = None
|
|
user = None
|
|
log = False
|
|
args = ''
|
|
output_dir = ''
|
|
self.god.stub_function_to_return(job.utils, 'drop_caches', None)
|
|
|
|
self.job._job_state = base_job_unittest.stub_job_state
|
|
self.job.__init__(self.control_file, options)
|
|
|
|
|
|
class placeholder(object):
|
|
"""A simple placeholder for attributes"""
|
|
pass
|
|
|
|
|
|
class first_line_comparator(mock.argument_comparator):
|
|
def __init__(self, first_line):
|
|
self.first_line = first_line
|
|
|
|
|
|
def is_satisfied_by(self, parameter):
|
|
return self.first_line == parameter.splitlines()[0]
|
|
|
|
|
|
class test_base_job(unittest.TestCase):
|
|
def setUp(self):
|
|
# make god
|
|
self.god = mock.mock_god(ut=self)
|
|
|
|
# need to set some environ variables
|
|
self.autodir = "autodir"
|
|
os.environ['AUTODIR'] = self.autodir
|
|
|
|
# set up some variables
|
|
_, self.control = tempfile.mkstemp()
|
|
self.jobtag = "jobtag"
|
|
|
|
# get rid of stdout and logging
|
|
sys.stdout = six.StringIO()
|
|
logging_manager.configure_logging(logging_config.TestingConfig())
|
|
logging.disable(logging.CRITICAL)
|
|
def placeholder_configure_logging(*args, **kwargs):
|
|
pass
|
|
self.god.stub_with(logging_manager, 'configure_logging',
|
|
placeholder_configure_logging)
|
|
real_get_logging_manager = logging_manager.get_logging_manager
|
|
def get_logging_manager_no_fds(manage_stdout_and_stderr=False,
|
|
redirect_fds=False):
|
|
return real_get_logging_manager(manage_stdout_and_stderr, False)
|
|
self.god.stub_with(logging_manager, 'get_logging_manager',
|
|
get_logging_manager_no_fds)
|
|
|
|
# stub out some stuff
|
|
self.god.stub_function(os.path, 'exists')
|
|
self.god.stub_function(os.path, 'isdir')
|
|
self.god.stub_function(os, 'makedirs')
|
|
self.god.stub_function(os, 'mkdir')
|
|
self.god.stub_function(os, 'remove')
|
|
self.god.stub_function(shutil, 'rmtree')
|
|
self.god.stub_function(shutil, 'copyfile')
|
|
self.god.stub_function(job, 'open')
|
|
self.god.stub_function(utils, 'system')
|
|
self.god.stub_function(utils, 'drop_caches')
|
|
self.god.stub_function(harness, 'select')
|
|
self.god.stub_function(sysinfo, 'log_per_reboot_data')
|
|
|
|
self.god.stub_class(job.local_host, 'LocalHost')
|
|
self.god.stub_class(sysinfo, 'sysinfo')
|
|
|
|
self.god.stub_class_method(job.base_client_job,
|
|
'_cleanup_debugdir_files')
|
|
self.god.stub_class_method(job.base_client_job, '_cleanup_results_dir')
|
|
|
|
self.god.stub_with(job.base_job.job_directory, '_ensure_valid',
|
|
lambda *_: None)
|
|
|
|
|
|
def tearDown(self):
|
|
sys.stdout = sys.__stdout__
|
|
self.god.unstub_all()
|
|
os.remove(self.control)
|
|
|
|
|
|
def _setup_pre_record_init(self, cont):
|
|
self.god.stub_function(self.job, '_load_state')
|
|
|
|
resultdir = os.path.join(self.autodir, 'results', self.jobtag)
|
|
tmpdir = os.path.join(self.autodir, 'tmp')
|
|
if not cont:
|
|
job.base_client_job._cleanup_debugdir_files.expect_call()
|
|
job.base_client_job._cleanup_results_dir.expect_call()
|
|
|
|
self.job._load_state.expect_call()
|
|
|
|
my_harness = self.god.create_mock_class(harness.harness,
|
|
'my_harness')
|
|
harness.select.expect_call(None,
|
|
self.job,
|
|
None).and_return(my_harness)
|
|
|
|
return resultdir, my_harness
|
|
|
|
|
|
def _setup_post_record_init(self, cont, resultdir, my_harness):
|
|
# now some specific stubs
|
|
self.god.stub_function(self.job, 'config_get')
|
|
self.god.stub_function(self.job, 'config_set')
|
|
self.god.stub_function(self.job, 'record')
|
|
|
|
# other setup
|
|
results = os.path.join(self.autodir, 'results')
|
|
download = os.path.join(self.autodir, 'tests', 'download')
|
|
pkgdir = os.path.join(self.autodir, 'packages')
|
|
|
|
utils.drop_caches.expect_call()
|
|
job_sysinfo = sysinfo.sysinfo.expect_new(resultdir)
|
|
if not cont:
|
|
os.path.exists.expect_call(download).and_return(False)
|
|
os.mkdir.expect_call(download)
|
|
shutil.copyfile.expect_call(mock.is_string_comparator(),
|
|
os.path.join(resultdir, 'control'))
|
|
|
|
job.local_host.LocalHost.expect_new(hostname='localhost')
|
|
job_sysinfo.log_per_reboot_data.expect_call()
|
|
if not cont:
|
|
self.job.record.expect_call('START', None, None)
|
|
|
|
my_harness.run_start.expect_call()
|
|
|
|
|
|
def construct_job(self, cont):
|
|
# will construct class instance using __new__
|
|
self.job = job.base_client_job.__new__(job.base_client_job)
|
|
|
|
# record
|
|
resultdir, my_harness = self._setup_pre_record_init(cont)
|
|
self._setup_post_record_init(cont, resultdir, my_harness)
|
|
|
|
# finish constructor
|
|
options = placeholder()
|
|
options.tag = self.jobtag
|
|
options.cont = cont
|
|
options.harness = None
|
|
options.harness_args = None
|
|
options.log = False
|
|
options.verbose = False
|
|
options.hostname = 'localhost'
|
|
options.user = 'my_user'
|
|
options.args = ''
|
|
options.output_dir = ''
|
|
self.job.__init__(self.control, options)
|
|
|
|
# check
|
|
self.god.check_playback()
|
|
|
|
|
|
def get_partition_mock(self, devname):
|
|
"""
|
|
Create a mock of a partition object and return it.
|
|
"""
|
|
class mock(object):
|
|
device = devname
|
|
get_mountpoint = self.god.create_mock_function('get_mountpoint')
|
|
return mock
|
|
|
|
|
|
def test_constructor_first_run(self):
|
|
self.construct_job(False)
|
|
|
|
|
|
def test_constructor_continuation(self):
|
|
self.construct_job(True)
|
|
|
|
|
|
def test_constructor_post_record_failure(self):
|
|
"""
|
|
Test post record initialization failure.
|
|
"""
|
|
self.job = job.base_client_job.__new__(job.base_client_job)
|
|
options = placeholder()
|
|
options.tag = self.jobtag
|
|
options.cont = False
|
|
options.harness = None
|
|
options.harness_args = None
|
|
options.log = False
|
|
options.verbose = False
|
|
options.hostname = 'localhost'
|
|
options.user = 'my_user'
|
|
options.args = ''
|
|
options.output_dir = ''
|
|
error = Exception('fail')
|
|
|
|
self.god.stub_function(self.job, '_post_record_init')
|
|
self.god.stub_function(self.job, 'record')
|
|
|
|
self._setup_pre_record_init(False)
|
|
self.job._post_record_init.expect_call(
|
|
self.control, options, True).and_raises(error)
|
|
self.job.record.expect_call(
|
|
'ABORT', None, None,'client.bin.job.__init__ failed: %s' %
|
|
str(error))
|
|
|
|
self.assertRaises(
|
|
Exception, self.job.__init__, self.control, options,
|
|
drop_caches=True)
|
|
|
|
# check
|
|
self.god.check_playback()
|
|
|
|
|
|
def test_control_functions(self):
|
|
self.construct_job(True)
|
|
control_file = "blah"
|
|
self.job.control_set(control_file)
|
|
self.assertEquals(self.job.control_get(), os.path.abspath(control_file))
|
|
|
|
|
|
def test_harness_select(self):
|
|
self.construct_job(True)
|
|
|
|
# record
|
|
which = "which"
|
|
harness_args = ''
|
|
harness.select.expect_call(which, self.job,
|
|
harness_args).and_return(None)
|
|
|
|
# run and test
|
|
self.job.harness_select(which, harness_args)
|
|
self.god.check_playback()
|
|
|
|
|
|
def test_setup_dirs_raise(self):
|
|
self.construct_job(True)
|
|
|
|
# setup
|
|
results_dir = 'foo'
|
|
tmp_dir = 'bar'
|
|
|
|
# record
|
|
os.path.exists.expect_call(tmp_dir).and_return(True)
|
|
os.path.isdir.expect_call(tmp_dir).and_return(False)
|
|
|
|
# test
|
|
self.assertRaises(ValueError, self.job.setup_dirs, results_dir, tmp_dir)
|
|
self.god.check_playback()
|
|
|
|
|
|
def test_setup_dirs(self):
|
|
self.construct_job(True)
|
|
|
|
# setup
|
|
results_dir1 = os.path.join(self.job.resultdir, 'build')
|
|
results_dir2 = os.path.join(self.job.resultdir, 'build.2')
|
|
results_dir3 = os.path.join(self.job.resultdir, 'build.3')
|
|
tmp_dir = 'bar'
|
|
|
|
# record
|
|
os.path.exists.expect_call(tmp_dir).and_return(False)
|
|
os.mkdir.expect_call(tmp_dir)
|
|
os.path.isdir.expect_call(tmp_dir).and_return(True)
|
|
os.path.exists.expect_call(results_dir1).and_return(True)
|
|
os.path.exists.expect_call(results_dir2).and_return(True)
|
|
os.path.exists.expect_call(results_dir3).and_return(False)
|
|
os.path.exists.expect_call(results_dir3).and_return(False)
|
|
os.mkdir.expect_call(results_dir3)
|
|
|
|
# test
|
|
self.assertEqual(self.job.setup_dirs(None, tmp_dir),
|
|
(results_dir3, tmp_dir))
|
|
self.god.check_playback()
|
|
|
|
|
|
def test_run_test_logs_test_error_from_unhandled_error(self):
|
|
self.construct_job(True)
|
|
|
|
# set up stubs
|
|
self.god.stub_function(self.job.pkgmgr, 'get_package_name')
|
|
self.god.stub_function(self.job, "_runtest")
|
|
|
|
# create an unhandled error object
|
|
class MyError(error.TestError):
|
|
pass
|
|
real_error = MyError("this is the real error message")
|
|
unhandled_error = error.UnhandledTestError(real_error)
|
|
|
|
# set up the recording
|
|
testname = "error_test"
|
|
outputdir = os.path.join(self.job.resultdir, testname)
|
|
self.job.pkgmgr.get_package_name.expect_call(
|
|
testname, 'test').and_return(("", testname))
|
|
os.path.exists.expect_call(outputdir).and_return(False)
|
|
self.job.record.expect_call("START", testname, testname,
|
|
optional_fields=None)
|
|
self.job._runtest.expect_call(testname, "", None, (), {}).and_raises(
|
|
unhandled_error)
|
|
self.job.record.expect_call("ERROR", testname, testname,
|
|
first_line_comparator(str(real_error)))
|
|
self.job.record.expect_call("END ERROR", testname, testname)
|
|
self.job.harness.run_test_complete.expect_call()
|
|
utils.drop_caches.expect_call()
|
|
|
|
# run and check
|
|
self.job.run_test(testname)
|
|
self.god.check_playback()
|
|
|
|
|
|
def test_run_test_logs_non_test_error_from_unhandled_error(self):
|
|
self.construct_job(True)
|
|
|
|
# set up stubs
|
|
self.god.stub_function(self.job.pkgmgr, 'get_package_name')
|
|
self.god.stub_function(self.job, "_runtest")
|
|
|
|
# create an unhandled error object
|
|
class MyError(Exception):
|
|
pass
|
|
real_error = MyError("this is the real error message")
|
|
unhandled_error = error.UnhandledTestError(real_error)
|
|
reason = first_line_comparator("Unhandled MyError: %s" % real_error)
|
|
|
|
# set up the recording
|
|
testname = "error_test"
|
|
outputdir = os.path.join(self.job.resultdir, testname)
|
|
self.job.pkgmgr.get_package_name.expect_call(
|
|
testname, 'test').and_return(("", testname))
|
|
os.path.exists.expect_call(outputdir).and_return(False)
|
|
self.job.record.expect_call("START", testname, testname,
|
|
optional_fields=None)
|
|
self.job._runtest.expect_call(testname, "", None, (), {}).and_raises(
|
|
unhandled_error)
|
|
self.job.record.expect_call("ERROR", testname, testname, reason)
|
|
self.job.record.expect_call("END ERROR", testname, testname)
|
|
self.job.harness.run_test_complete.expect_call()
|
|
utils.drop_caches.expect_call()
|
|
|
|
# run and check
|
|
self.job.run_test(testname)
|
|
self.god.check_playback()
|
|
|
|
|
|
def test_report_reboot_failure(self):
|
|
self.construct_job(True)
|
|
|
|
# record
|
|
self.job.record.expect_call("ABORT", "sub", "reboot.verify",
|
|
"boot failure")
|
|
self.job.record.expect_call("END ABORT", "sub", "reboot",
|
|
optional_fields={"kernel": "2.6.15-smp"})
|
|
|
|
# playback
|
|
self.job._record_reboot_failure("sub", "reboot.verify", "boot failure",
|
|
running_id="2.6.15-smp")
|
|
self.god.check_playback()
|
|
|
|
|
|
def _setup_check_post_reboot(self, mount_info, cpu_count):
|
|
# setup
|
|
self.god.stub_function(job.partition_lib, "get_partition_list")
|
|
self.god.stub_function(utils, "count_cpus")
|
|
|
|
part_list = [self.get_partition_mock("/dev/hda1"),
|
|
self.get_partition_mock("/dev/hdb1")]
|
|
mount_list = ["/mnt/hda1", "/mnt/hdb1"]
|
|
|
|
# record
|
|
job.partition_lib.get_partition_list.expect_call(
|
|
self.job, exclude_swap=False).and_return(part_list)
|
|
for i in range(len(part_list)):
|
|
part_list[i].get_mountpoint.expect_call().and_return(mount_list[i])
|
|
if cpu_count is not None:
|
|
utils.count_cpus.expect_call().and_return(cpu_count)
|
|
self.job._state.set('client', 'mount_info', mount_info)
|
|
self.job._state.set('client', 'cpu_count', 8)
|
|
|
|
|
|
def test_check_post_reboot_success(self):
|
|
self.construct_job(True)
|
|
|
|
mount_info = set([("/dev/hda1", "/mnt/hda1"),
|
|
("/dev/hdb1", "/mnt/hdb1")])
|
|
self._setup_check_post_reboot(mount_info, 8)
|
|
|
|
# playback
|
|
self.job._check_post_reboot("sub")
|
|
self.god.check_playback()
|
|
|
|
|
|
def test_check_post_reboot_mounts_failure(self):
|
|
self.construct_job(True)
|
|
|
|
mount_info = set([("/dev/hda1", "/mnt/hda1")])
|
|
self._setup_check_post_reboot(mount_info, None)
|
|
|
|
self.god.stub_function(self.job, "_record_reboot_failure")
|
|
self.job._record_reboot_failure.expect_call("sub",
|
|
"reboot.verify_config", "mounted partitions are different after"
|
|
" reboot (old entries: set([]), new entries: set([('/dev/hdb1',"
|
|
" '/mnt/hdb1')]))", running_id=None)
|
|
|
|
# playback
|
|
self.assertRaises(error.JobError, self.job._check_post_reboot, "sub")
|
|
self.god.check_playback()
|
|
|
|
|
|
def test_check_post_reboot_cpu_failure(self):
|
|
self.construct_job(True)
|
|
|
|
mount_info = set([("/dev/hda1", "/mnt/hda1"),
|
|
("/dev/hdb1", "/mnt/hdb1")])
|
|
self._setup_check_post_reboot(mount_info, 4)
|
|
|
|
self.god.stub_function(self.job, "_record_reboot_failure")
|
|
self.job._record_reboot_failure.expect_call(
|
|
'sub', 'reboot.verify_config',
|
|
'Number of CPUs changed after reboot (old count: 8, new count: 4)',
|
|
running_id=None)
|
|
|
|
# playback
|
|
self.assertRaises(error.JobError, self.job._check_post_reboot, "sub")
|
|
self.god.check_playback()
|
|
|
|
|
|
def test_parse_args(self):
|
|
test_set = {"a='foo bar baz' b='moo apt'":
|
|
["a='foo bar baz'", "b='moo apt'"],
|
|
"a='foo bar baz' only=gah":
|
|
["a='foo bar baz'", "only=gah"],
|
|
"a='b c d' no=argh":
|
|
["a='b c d'", "no=argh"]}
|
|
for t in test_set:
|
|
parsed_args = job.base_client_job._parse_args(t)
|
|
expected_args = test_set[t]
|
|
self.assertEqual(parsed_args, expected_args)
|
|
|
|
|
|
def test_run_test_timeout_parameter_is_propagated(self):
|
|
self.construct_job(True)
|
|
|
|
# set up stubs
|
|
self.god.stub_function(self.job.pkgmgr, 'get_package_name')
|
|
self.god.stub_function(self.job, "_runtest")
|
|
|
|
# create an unhandled error object
|
|
#class MyError(error.TestError):
|
|
# pass
|
|
#real_error = MyError("this is the real error message")
|
|
#unhandled_error = error.UnhandledTestError(real_error)
|
|
|
|
# set up the recording
|
|
testname = "test"
|
|
outputdir = os.path.join(self.job.resultdir, testname)
|
|
self.job.pkgmgr.get_package_name.expect_call(
|
|
testname, 'test').and_return(("", testname))
|
|
os.path.exists.expect_call(outputdir).and_return(False)
|
|
timeout = 60
|
|
optional_fields = {}
|
|
optional_fields['timeout'] = timeout
|
|
self.job.record.expect_call("START", testname, testname,
|
|
optional_fields=optional_fields)
|
|
self.job._runtest.expect_call(testname, "", timeout, (), {})
|
|
self.job.record.expect_call("GOOD", testname, testname,
|
|
"completed successfully")
|
|
self.job.record.expect_call("END GOOD", testname, testname)
|
|
self.job.harness.run_test_complete.expect_call()
|
|
utils.drop_caches.expect_call()
|
|
|
|
# run and check
|
|
self.job.run_test(testname, timeout=timeout)
|
|
self.god.check_playback()
|
|
|
|
|
|
class test_name_pattern(unittest.TestCase):
|
|
"""Tests for _NAME_PATTERN."""
|
|
|
|
def _one_name_pattern_test(self, line, want):
|
|
"""Parametrized test."""
|
|
match = re.match(job._NAME_PATTERN, line)
|
|
self.assertIsNotNone(match)
|
|
self.assertEqual(match.group(1), want)
|
|
|
|
def test_name_pattern_nospace_single(self):
|
|
self._one_name_pattern_test("NAME='some_Test'", 'some_Test')
|
|
|
|
def test_name_pattern_nospace_double(self):
|
|
self._one_name_pattern_test('NAME="some_Test"', 'some_Test')
|
|
|
|
def test_name_pattern_space_single(self):
|
|
self._one_name_pattern_test("NAME = 'some_Test'", 'some_Test')
|
|
|
|
def test_name_pattern_space_double(self):
|
|
self._one_name_pattern_test('NAME = "some_Test"', 'some_Test')
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|