You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
904 lines
33 KiB
904 lines
33 KiB
# Lint as: python2, python3
|
|
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
# found in the LICENSE file.
|
|
|
|
|
|
from __future__ import print_function
|
|
from __future__ import division
|
|
from __future__ import absolute_import
|
|
|
|
import dbus, gobject, logging, os, stat
|
|
from dbus.mainloop.glib import DBusGMainLoop
|
|
import six
|
|
from six.moves import zip
|
|
|
|
import common
|
|
|
|
from autotest_lib.client.bin import utils
|
|
from autotest_lib.client.common_lib import autotemp, error
|
|
from autotest_lib.client.cros import dbus_util
|
|
from autotest_lib.client.cros.mainloop import ExceptionForward
|
|
from autotest_lib.client.cros.mainloop import GenericTesterMainLoop
|
|
|
|
|
|
"""This module contains several helper classes for writing tests to verify the
|
|
CrosDisks DBus interface. In particular, the CrosDisksTester class can be used
|
|
to derive functional tests that interact with the CrosDisks server over DBus.
|
|
"""
|
|
|
|
|
|
class ExceptionSuppressor(object):
|
|
"""A context manager class for suppressing certain types of exception.
|
|
|
|
An instance of this class is expected to be used with the with statement
|
|
and takes a set of exception classes at instantiation, which are types of
|
|
exception to be suppressed (and logged) in the code block under the with
|
|
statement.
|
|
|
|
Example:
|
|
|
|
with ExceptionSuppressor(OSError, IOError):
|
|
# An exception, which is a sub-class of OSError or IOError, is
|
|
# suppressed in the block code under the with statement.
|
|
"""
|
|
def __init__(self, *args):
|
|
self.__suppressed_exc_types = (args)
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_value, traceback):
|
|
if exc_type and issubclass(exc_type, self.__suppressed_exc_types):
|
|
try:
|
|
logging.exception('Suppressed exception: %s(%s)',
|
|
exc_type, exc_value)
|
|
except Exception:
|
|
pass
|
|
return True
|
|
return False
|
|
|
|
|
|
class DBusClient(object):
|
|
""" A base class of a DBus proxy client to test a DBus server.
|
|
|
|
This class is expected to be used along with a GLib main loop and provides
|
|
some convenient functions for testing the DBus API exposed by a DBus server.
|
|
"""
|
|
|
|
def __init__(self, main_loop, bus, bus_name, object_path, timeout=None):
|
|
"""Initializes the instance.
|
|
|
|
Args:
|
|
main_loop: The GLib main loop.
|
|
bus: The bus where the DBus server is connected to.
|
|
bus_name: The bus name owned by the DBus server.
|
|
object_path: The object path of the DBus server.
|
|
timeout: Maximum time in seconds to wait for the DBus connection.
|
|
"""
|
|
self.__signal_content = {}
|
|
self.main_loop = main_loop
|
|
self.signal_timeout_in_seconds = 10
|
|
logging.debug('Getting D-Bus proxy object on bus "%s" and path "%s"',
|
|
bus_name, object_path)
|
|
self.proxy_object = dbus_util.get_dbus_object(bus, bus_name,
|
|
object_path, timeout)
|
|
|
|
def clear_signal_content(self, signal_name):
|
|
"""Clears the content of the signal.
|
|
|
|
Args:
|
|
signal_name: The name of the signal.
|
|
"""
|
|
if signal_name in self.__signal_content:
|
|
self.__signal_content[signal_name] = None
|
|
|
|
def get_signal_content(self, signal_name):
|
|
"""Gets the content of a signal.
|
|
|
|
Args:
|
|
signal_name: The name of the signal.
|
|
|
|
Returns:
|
|
The content of a signal or None if the signal is not being handled.
|
|
"""
|
|
return self.__signal_content.get(signal_name)
|
|
|
|
def handle_signal(self, interface, signal_name, argument_names=()):
|
|
"""Registers a signal handler to handle a given signal.
|
|
|
|
Args:
|
|
interface: The DBus interface of the signal.
|
|
signal_name: The name of the signal.
|
|
argument_names: A list of argument names that the signal contains.
|
|
"""
|
|
if signal_name in self.__signal_content:
|
|
return
|
|
|
|
self.__signal_content[signal_name] = None
|
|
|
|
def signal_handler(*args):
|
|
self.__signal_content[signal_name] = dict(zip(argument_names, args))
|
|
|
|
logging.debug('Handling D-Bus signal "%s(%s)" on interface "%s"',
|
|
signal_name, ', '.join(argument_names), interface)
|
|
self.proxy_object.connect_to_signal(signal_name, signal_handler,
|
|
interface)
|
|
|
|
def wait_for_signal(self, signal_name):
|
|
"""Waits for the reception of a signal.
|
|
|
|
Args:
|
|
signal_name: The name of the signal to wait for.
|
|
|
|
Returns:
|
|
The content of the signal.
|
|
"""
|
|
if signal_name not in self.__signal_content:
|
|
return None
|
|
|
|
def check_signal_content():
|
|
context = self.main_loop.get_context()
|
|
while context.iteration(False):
|
|
pass
|
|
return self.__signal_content[signal_name] is not None
|
|
|
|
logging.debug('Waiting for D-Bus signal "%s"', signal_name)
|
|
utils.poll_for_condition(condition=check_signal_content,
|
|
desc='%s signal' % signal_name,
|
|
timeout=self.signal_timeout_in_seconds)
|
|
content = self.__signal_content[signal_name]
|
|
logging.debug('Received D-Bus signal "%s(%s)"', signal_name, content)
|
|
self.__signal_content[signal_name] = None
|
|
return content
|
|
|
|
def expect_signal(self, signal_name, expected_content):
|
|
"""Waits the the reception of a signal and verifies its content.
|
|
|
|
Args:
|
|
signal_name: The name of the signal to wait for.
|
|
expected_content: The expected content of the signal, which can be
|
|
partially specified. Only specified fields are
|
|
compared between the actual and expected content.
|
|
|
|
Returns:
|
|
The actual content of the signal.
|
|
|
|
Raises:
|
|
error.TestFail: A test failure when there is a mismatch between the
|
|
actual and expected content of the signal.
|
|
"""
|
|
actual_content = self.wait_for_signal(signal_name)
|
|
logging.debug("%s signal: expected=%s actual=%s",
|
|
signal_name, expected_content, actual_content)
|
|
for argument, expected_value in six.iteritems(expected_content):
|
|
if argument not in actual_content:
|
|
raise error.TestFail(
|
|
('%s signal missing "%s": expected=%s, actual=%s') %
|
|
(signal_name, argument, expected_content, actual_content))
|
|
|
|
if actual_content[argument] != expected_value:
|
|
raise error.TestFail(
|
|
('%s signal not matched on "%s": expected=%s, actual=%s') %
|
|
(signal_name, argument, expected_content, actual_content))
|
|
return actual_content
|
|
|
|
|
|
class CrosDisksClient(DBusClient):
|
|
"""A DBus proxy client for testing the CrosDisks DBus server.
|
|
"""
|
|
|
|
CROS_DISKS_BUS_NAME = 'org.chromium.CrosDisks'
|
|
CROS_DISKS_INTERFACE = 'org.chromium.CrosDisks'
|
|
CROS_DISKS_OBJECT_PATH = '/org/chromium/CrosDisks'
|
|
DBUS_PROPERTIES_INTERFACE = 'org.freedesktop.DBus.Properties'
|
|
FORMAT_COMPLETED_SIGNAL = 'FormatCompleted'
|
|
FORMAT_COMPLETED_SIGNAL_ARGUMENTS = (
|
|
'status', 'path'
|
|
)
|
|
MOUNT_COMPLETED_SIGNAL = 'MountCompleted'
|
|
MOUNT_COMPLETED_SIGNAL_ARGUMENTS = (
|
|
'status', 'source_path', 'source_type', 'mount_path'
|
|
)
|
|
RENAME_COMPLETED_SIGNAL = 'RenameCompleted'
|
|
RENAME_COMPLETED_SIGNAL_ARGUMENTS = (
|
|
'status', 'path'
|
|
)
|
|
|
|
def __init__(self, main_loop, bus, timeout_seconds=None):
|
|
"""Initializes the instance.
|
|
|
|
Args:
|
|
main_loop: The GLib main loop.
|
|
bus: The bus where the DBus server is connected to.
|
|
timeout_seconds: Maximum time in seconds to wait for the DBus
|
|
connection.
|
|
"""
|
|
super(CrosDisksClient, self).__init__(main_loop, bus,
|
|
self.CROS_DISKS_BUS_NAME,
|
|
self.CROS_DISKS_OBJECT_PATH,
|
|
timeout_seconds)
|
|
self.interface = dbus.Interface(self.proxy_object,
|
|
self.CROS_DISKS_INTERFACE)
|
|
self.properties = dbus.Interface(self.proxy_object,
|
|
self.DBUS_PROPERTIES_INTERFACE)
|
|
self.handle_signal(self.CROS_DISKS_INTERFACE,
|
|
self.FORMAT_COMPLETED_SIGNAL,
|
|
self.FORMAT_COMPLETED_SIGNAL_ARGUMENTS)
|
|
self.handle_signal(self.CROS_DISKS_INTERFACE,
|
|
self.MOUNT_COMPLETED_SIGNAL,
|
|
self.MOUNT_COMPLETED_SIGNAL_ARGUMENTS)
|
|
self.handle_signal(self.CROS_DISKS_INTERFACE,
|
|
self.RENAME_COMPLETED_SIGNAL,
|
|
self.RENAME_COMPLETED_SIGNAL_ARGUMENTS)
|
|
|
|
def enumerate_devices(self):
|
|
"""Invokes the CrosDisks EnumerateMountableDevices method.
|
|
|
|
Returns:
|
|
A list of sysfs paths of devices that are recognized by
|
|
CrosDisks.
|
|
"""
|
|
return self.interface.EnumerateDevices()
|
|
|
|
def get_device_properties(self, path):
|
|
"""Invokes the CrosDisks GetDeviceProperties method.
|
|
|
|
Args:
|
|
path: The device path.
|
|
|
|
Returns:
|
|
The properties of the device in a dictionary.
|
|
"""
|
|
return self.interface.GetDeviceProperties(path)
|
|
|
|
def format(self, path, filesystem_type=None, options=None):
|
|
"""Invokes the CrosDisks Format method.
|
|
|
|
Args:
|
|
path: The device path to format.
|
|
filesystem_type: The filesystem type used for formatting the device.
|
|
options: A list of options used for formatting the device.
|
|
"""
|
|
if filesystem_type is None:
|
|
filesystem_type = ''
|
|
if options is None:
|
|
options = []
|
|
self.clear_signal_content(self.FORMAT_COMPLETED_SIGNAL)
|
|
self.interface.Format(path, filesystem_type,
|
|
dbus.Array(options, signature='s'))
|
|
|
|
def wait_for_format_completion(self):
|
|
"""Waits for the CrosDisks FormatCompleted signal.
|
|
|
|
Returns:
|
|
The content of the FormatCompleted signal.
|
|
"""
|
|
return self.wait_for_signal(self.FORMAT_COMPLETED_SIGNAL)
|
|
|
|
def expect_format_completion(self, expected_content):
|
|
"""Waits and verifies for the CrosDisks FormatCompleted signal.
|
|
|
|
Args:
|
|
expected_content: The expected content of the FormatCompleted
|
|
signal, which can be partially specified.
|
|
Only specified fields are compared between the
|
|
actual and expected content.
|
|
|
|
Returns:
|
|
The actual content of the FormatCompleted signal.
|
|
|
|
Raises:
|
|
error.TestFail: A test failure when there is a mismatch between the
|
|
actual and expected content of the FormatCompleted
|
|
signal.
|
|
"""
|
|
return self.expect_signal(self.FORMAT_COMPLETED_SIGNAL,
|
|
expected_content)
|
|
|
|
def rename(self, path, volume_name=None):
|
|
"""Invokes the CrosDisks Rename method.
|
|
|
|
Args:
|
|
path: The device path to rename.
|
|
volume_name: The new name used for renaming.
|
|
"""
|
|
if volume_name is None:
|
|
volume_name = ''
|
|
self.clear_signal_content(self.RENAME_COMPLETED_SIGNAL)
|
|
self.interface.Rename(path, volume_name)
|
|
|
|
def wait_for_rename_completion(self):
|
|
"""Waits for the CrosDisks RenameCompleted signal.
|
|
|
|
Returns:
|
|
The content of the RenameCompleted signal.
|
|
"""
|
|
return self.wait_for_signal(self.RENAME_COMPLETED_SIGNAL)
|
|
|
|
def expect_rename_completion(self, expected_content):
|
|
"""Waits and verifies for the CrosDisks RenameCompleted signal.
|
|
|
|
Args:
|
|
expected_content: The expected content of the RenameCompleted
|
|
signal, which can be partially specified.
|
|
Only specified fields are compared between the
|
|
actual and expected content.
|
|
|
|
Returns:
|
|
The actual content of the RenameCompleted signal.
|
|
|
|
Raises:
|
|
error.TestFail: A test failure when there is a mismatch between the
|
|
actual and expected content of the RenameCompleted
|
|
signal.
|
|
"""
|
|
return self.expect_signal(self.RENAME_COMPLETED_SIGNAL,
|
|
expected_content)
|
|
|
|
def mount(self, path, filesystem_type=None, options=None):
|
|
"""Invokes the CrosDisks Mount method.
|
|
|
|
Args:
|
|
path: The device path to mount.
|
|
filesystem_type: The filesystem type used for mounting the device.
|
|
options: A list of options used for mounting the device.
|
|
"""
|
|
if filesystem_type is None:
|
|
filesystem_type = ''
|
|
if options is None:
|
|
options = []
|
|
self.clear_signal_content(self.MOUNT_COMPLETED_SIGNAL)
|
|
self.interface.Mount(path, filesystem_type,
|
|
dbus.Array(options, signature='s'))
|
|
|
|
def unmount(self, path, options=None):
|
|
"""Invokes the CrosDisks Unmount method.
|
|
|
|
Args:
|
|
path: The device or mount path to unmount.
|
|
options: A list of options used for unmounting the path.
|
|
|
|
Returns:
|
|
The mount error code.
|
|
"""
|
|
if options is None:
|
|
options = []
|
|
return self.interface.Unmount(path, dbus.Array(options, signature='s'))
|
|
|
|
def wait_for_mount_completion(self):
|
|
"""Waits for the CrosDisks MountCompleted signal.
|
|
|
|
Returns:
|
|
The content of the MountCompleted signal.
|
|
"""
|
|
return self.wait_for_signal(self.MOUNT_COMPLETED_SIGNAL)
|
|
|
|
def expect_mount_completion(self, expected_content):
|
|
"""Waits and verifies for the CrosDisks MountCompleted signal.
|
|
|
|
Args:
|
|
expected_content: The expected content of the MountCompleted
|
|
signal, which can be partially specified.
|
|
Only specified fields are compared between the
|
|
actual and expected content.
|
|
|
|
Returns:
|
|
The actual content of the MountCompleted signal.
|
|
|
|
Raises:
|
|
error.TestFail: A test failure when there is a mismatch between the
|
|
actual and expected content of the MountCompleted
|
|
signal.
|
|
"""
|
|
return self.expect_signal(self.MOUNT_COMPLETED_SIGNAL,
|
|
expected_content)
|
|
|
|
def add_loopback_to_allowlist(self, path):
|
|
"""Adds a device by its path to the allowlist for testing.
|
|
|
|
Args:
|
|
path: path to the /dev/loopX device.
|
|
"""
|
|
sys_path = '/sys/devices/virtual/block/' + os.path.basename(path)
|
|
self.interface.AddDeviceToAllowlist(sys_path)
|
|
|
|
def remove_loopback_from_allowlist(self, path):
|
|
"""Removes a device by its sys path from the allowlist for testing.
|
|
|
|
Args:
|
|
path: path to the /dev/loopX device.
|
|
"""
|
|
sys_path = '/sys/devices/virtual/block/' + os.path.basename(path)
|
|
self.interface.RemoveDeviceFromAllowlist(sys_path)
|
|
|
|
|
|
class CrosDisksTester(GenericTesterMainLoop):
|
|
"""A base tester class for testing the CrosDisks server.
|
|
|
|
A derived class should override the get_tests method to return a list of
|
|
test methods. The perform_one_test method invokes each test method in the
|
|
list to verify some functionalities of CrosDisks server.
|
|
"""
|
|
def __init__(self, test):
|
|
bus_loop = DBusGMainLoop(set_as_default=True)
|
|
self.bus = dbus.SystemBus(mainloop=bus_loop)
|
|
self.main_loop = gobject.MainLoop()
|
|
super(CrosDisksTester, self).__init__(test, self.main_loop)
|
|
self.cros_disks = CrosDisksClient(self.main_loop, self.bus)
|
|
|
|
def get_tests(self):
|
|
"""Returns a list of test methods to be invoked by perform_one_test.
|
|
|
|
A derived class should override this method.
|
|
|
|
Returns:
|
|
A list of test methods.
|
|
"""
|
|
return []
|
|
|
|
@ExceptionForward
|
|
def perform_one_test(self):
|
|
"""Exercises each test method in the list returned by get_tests.
|
|
"""
|
|
tests = self.get_tests()
|
|
self.remaining_requirements = set([test.__name__ for test in tests])
|
|
for test in tests:
|
|
test()
|
|
self.requirement_completed(test.__name__)
|
|
|
|
def reconnect_client(self, timeout_seconds=None):
|
|
""""Reconnect the CrosDisks DBus client.
|
|
|
|
Args:
|
|
timeout_seconds: Maximum time in seconds to wait for the DBus
|
|
connection.
|
|
"""
|
|
self.cros_disks = CrosDisksClient(self.main_loop, self.bus,
|
|
timeout_seconds)
|
|
|
|
|
|
class FilesystemTestObject(object):
|
|
"""A base class to represent a filesystem test object.
|
|
|
|
A filesystem test object can be a file, directory or symbolic link.
|
|
A derived class should override the _create and _verify method to implement
|
|
how the test object should be created and verified, respectively, on a
|
|
filesystem.
|
|
"""
|
|
def __init__(self, path, content, mode):
|
|
"""Initializes the instance.
|
|
|
|
Args:
|
|
path: The relative path of the test object.
|
|
content: The content of the test object.
|
|
mode: The file permissions given to the test object.
|
|
"""
|
|
self._path = path
|
|
self._content = content
|
|
self._mode = mode
|
|
|
|
def create(self, base_dir):
|
|
"""Creates the test object in a base directory.
|
|
|
|
Args:
|
|
base_dir: The base directory where the test object is created.
|
|
|
|
Returns:
|
|
True if the test object is created successfully or False otherwise.
|
|
"""
|
|
if not self._create(base_dir):
|
|
logging.debug('Failed to create filesystem test object at "%s"',
|
|
os.path.join(base_dir, self._path))
|
|
return False
|
|
return True
|
|
|
|
def verify(self, base_dir):
|
|
"""Verifies the test object in a base directory.
|
|
|
|
Args:
|
|
base_dir: The base directory where the test object is expected to be
|
|
found.
|
|
|
|
Returns:
|
|
True if the test object is found in the base directory and matches
|
|
the expected content, or False otherwise.
|
|
"""
|
|
if not self._verify(base_dir):
|
|
logging.error('Mismatched filesystem object at "%s"',
|
|
os.path.join(base_dir, self._path))
|
|
return False
|
|
return True
|
|
|
|
def _create(self, base_dir):
|
|
return False
|
|
|
|
def _verify(self, base_dir):
|
|
return False
|
|
|
|
|
|
class FilesystemTestDirectory(FilesystemTestObject):
|
|
"""A filesystem test object that represents a directory."""
|
|
|
|
def __init__(self, path, content, mode=stat.S_IRWXU|stat.S_IRGRP| \
|
|
stat.S_IXGRP|stat.S_IROTH|stat.S_IXOTH, strict=False):
|
|
"""Initializes the directory.
|
|
|
|
Args:
|
|
path: The name of this directory.
|
|
content: The list of items in this directory.
|
|
mode: The file permissions given to this directory.
|
|
strict: Whether verify() strictly compares directory contents for
|
|
equality. This flag only applies to this directory, and not
|
|
to any child directories.
|
|
"""
|
|
super(FilesystemTestDirectory, self).__init__(path, content, mode)
|
|
self._strict = strict
|
|
|
|
def _create(self, base_dir):
|
|
path = os.path.join(base_dir, self._path) if self._path else base_dir
|
|
|
|
if self._path:
|
|
with ExceptionSuppressor(OSError):
|
|
os.makedirs(path)
|
|
os.chmod(path, self._mode)
|
|
|
|
if not os.path.isdir(path):
|
|
return False
|
|
|
|
for content in self._content:
|
|
if not content.create(path):
|
|
return False
|
|
|
|
return True
|
|
|
|
def _verify(self, base_dir):
|
|
path = os.path.join(base_dir, self._path) if self._path else base_dir
|
|
if not os.path.isdir(path):
|
|
return False
|
|
|
|
result = True
|
|
seen = set()
|
|
|
|
for content in self._content:
|
|
if not content.verify(path):
|
|
result = False
|
|
seen.add(content._path)
|
|
|
|
if self._strict:
|
|
for child in os.listdir(path):
|
|
if child not in seen:
|
|
logging.error('Unexpected filesystem entry "%s"',
|
|
os.path.join(path, child))
|
|
result = False
|
|
|
|
return result
|
|
|
|
|
|
class FilesystemTestFile(FilesystemTestObject):
|
|
"""A filesystem test object that represents a file."""
|
|
|
|
def __init__(self,
|
|
path,
|
|
content,
|
|
mode=stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP \
|
|
| stat.S_IROTH,
|
|
mtime=None):
|
|
"""Initializes the file.
|
|
|
|
Args:
|
|
path: The name of this file.
|
|
content: A byte string with the expected file contents.
|
|
mode: The file permissions given to this file.
|
|
mtime: If set, the expected file modification timestamp.
|
|
"""
|
|
super(FilesystemTestFile, self).__init__(path, content, mode)
|
|
self._mtime = mtime
|
|
|
|
def _create(self, base_dir):
|
|
path = os.path.join(base_dir, self._path)
|
|
with ExceptionSuppressor(IOError):
|
|
with open(path, 'wb+') as f:
|
|
f.write(self._content)
|
|
with ExceptionSuppressor(OSError):
|
|
os.chmod(path, self._mode)
|
|
return True
|
|
return False
|
|
|
|
def _verify(self, base_dir):
|
|
path = os.path.join(base_dir, self._path)
|
|
with ExceptionSuppressor(IOError):
|
|
result = True
|
|
|
|
if self._content is not None:
|
|
with open(path, 'rb') as f:
|
|
if f.read() != self._content:
|
|
logging.error('Mismatched file contents for "%s"',
|
|
path)
|
|
result = False
|
|
|
|
if self._mtime is not None:
|
|
st = os.stat(path)
|
|
if st.st_mtime != self._mtime:
|
|
logging.error(
|
|
'Mismatched file modification time for "%s": ' +
|
|
'want %d, got %d', path, self._mtime, st.st_mtime)
|
|
result = False
|
|
|
|
return result
|
|
|
|
return False
|
|
|
|
|
|
class DefaultFilesystemTestContent(FilesystemTestDirectory):
|
|
def __init__(self):
|
|
super(DefaultFilesystemTestContent, self).__init__('', [
|
|
FilesystemTestFile('file1', '0123456789'),
|
|
FilesystemTestDirectory('dir1', [
|
|
FilesystemTestFile('file1', ''),
|
|
FilesystemTestFile('file2', 'abcdefg'),
|
|
FilesystemTestDirectory('dir2', [
|
|
FilesystemTestFile('file3', 'abcdefg'),
|
|
FilesystemTestFile('file4', 'a' * 65536),
|
|
]),
|
|
]),
|
|
], stat.S_IRWXU|stat.S_IRGRP|stat.S_IXGRP|stat.S_IROTH|stat.S_IXOTH)
|
|
|
|
|
|
class VirtualFilesystemImage(object):
|
|
def __init__(self, block_size, block_count, filesystem_type,
|
|
*args, **kwargs):
|
|
"""Initializes the instance.
|
|
|
|
Args:
|
|
block_size: The number of bytes of each block in the image.
|
|
block_count: The number of blocks in the image.
|
|
filesystem_type: The filesystem type to be given to the mkfs
|
|
program for formatting the image.
|
|
|
|
Keyword Args:
|
|
mount_filesystem_type: The filesystem type to be given to the
|
|
mount program for mounting the image.
|
|
mkfs_options: A list of options to be given to the mkfs program.
|
|
"""
|
|
self._block_size = block_size
|
|
self._block_count = block_count
|
|
self._filesystem_type = filesystem_type
|
|
self._mount_filesystem_type = kwargs.get('mount_filesystem_type')
|
|
if self._mount_filesystem_type is None:
|
|
self._mount_filesystem_type = filesystem_type
|
|
self._mkfs_options = kwargs.get('mkfs_options')
|
|
if self._mkfs_options is None:
|
|
self._mkfs_options = []
|
|
self._image_file = None
|
|
self._loop_device = None
|
|
self._loop_device_stat = None
|
|
self._mount_dir = None
|
|
|
|
def __del__(self):
|
|
with ExceptionSuppressor(Exception):
|
|
self.clean()
|
|
|
|
def __enter__(self):
|
|
self.create()
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_value, traceback):
|
|
self.clean()
|
|
return False
|
|
|
|
def _remove_temp_path(self, temp_path):
|
|
"""Removes a temporary file or directory created using autotemp."""
|
|
if temp_path:
|
|
with ExceptionSuppressor(Exception):
|
|
path = temp_path.name
|
|
temp_path.clean()
|
|
logging.debug('Removed "%s"', path)
|
|
|
|
def _remove_image_file(self):
|
|
"""Removes the image file if one has been created."""
|
|
self._remove_temp_path(self._image_file)
|
|
self._image_file = None
|
|
|
|
def _remove_mount_dir(self):
|
|
"""Removes the mount directory if one has been created."""
|
|
self._remove_temp_path(self._mount_dir)
|
|
self._mount_dir = None
|
|
|
|
@property
|
|
def image_file(self):
|
|
"""Gets the path of the image file.
|
|
|
|
Returns:
|
|
The path of the image file or None if no image file has been
|
|
created.
|
|
"""
|
|
return self._image_file.name if self._image_file else None
|
|
|
|
@property
|
|
def loop_device(self):
|
|
"""Gets the loop device where the image file is attached to.
|
|
|
|
Returns:
|
|
The path of the loop device where the image file is attached to or
|
|
None if no loop device is attaching the image file.
|
|
"""
|
|
return self._loop_device
|
|
|
|
@property
|
|
def mount_dir(self):
|
|
"""Gets the directory where the image file is mounted to.
|
|
|
|
Returns:
|
|
The directory where the image file is mounted to or None if no
|
|
mount directory has been created.
|
|
"""
|
|
return self._mount_dir.name if self._mount_dir else None
|
|
|
|
def create(self):
|
|
"""Creates a zero-filled image file with the specified size.
|
|
|
|
The created image file is temporary and removed when clean()
|
|
is called.
|
|
"""
|
|
self.clean()
|
|
self._image_file = autotemp.tempfile(unique_id='fsImage')
|
|
try:
|
|
logging.debug('Creating zero-filled image file at "%s"',
|
|
self._image_file.name)
|
|
utils.run('dd if=/dev/zero of=%s bs=%s count=%s' %
|
|
(self._image_file.name, self._block_size,
|
|
self._block_count))
|
|
except error.CmdError as exc:
|
|
self._remove_image_file()
|
|
message = 'Failed to create filesystem image: %s' % exc
|
|
raise RuntimeError(message)
|
|
|
|
def clean(self):
|
|
"""Removes the image file if one has been created.
|
|
|
|
Before removal, the image file is detached from the loop device that
|
|
it is attached to.
|
|
"""
|
|
self.detach_from_loop_device()
|
|
self._remove_image_file()
|
|
|
|
def attach_to_loop_device(self):
|
|
"""Attaches the created image file to a loop device.
|
|
|
|
Creates the image file, if one has not been created, by calling
|
|
create().
|
|
|
|
Returns:
|
|
The path of the loop device where the image file is attached to.
|
|
"""
|
|
if self._loop_device:
|
|
return self._loop_device
|
|
|
|
if not self._image_file:
|
|
self.create()
|
|
|
|
logging.debug('Attaching image file "%s" to loop device',
|
|
self._image_file.name)
|
|
utils.run('losetup -f %s' % self._image_file.name)
|
|
output = utils.system_output('losetup -j %s' % self._image_file.name)
|
|
# output should look like: "/dev/loop0: [000d]:6329 (/tmp/test.img)"
|
|
self._loop_device = output.split(':')[0]
|
|
logging.debug('Attached image file "%s" to loop device "%s"',
|
|
self._image_file.name, self._loop_device)
|
|
|
|
self._loop_device_stat = os.stat(self._loop_device)
|
|
logging.debug('Loop device "%s" (uid=%d, gid=%d, permissions=%04o)',
|
|
self._loop_device,
|
|
self._loop_device_stat.st_uid,
|
|
self._loop_device_stat.st_gid,
|
|
stat.S_IMODE(self._loop_device_stat.st_mode))
|
|
return self._loop_device
|
|
|
|
def detach_from_loop_device(self):
|
|
"""Detaches the image file from the loop device."""
|
|
if not self._loop_device:
|
|
return
|
|
|
|
self.unmount()
|
|
|
|
logging.debug('Cleaning up remaining mount points of loop device "%s"',
|
|
self._loop_device)
|
|
utils.run('umount -f %s' % self._loop_device, ignore_status=True)
|
|
|
|
logging.debug('Restore ownership/permissions of loop device "%s"',
|
|
self._loop_device)
|
|
os.chmod(self._loop_device,
|
|
stat.S_IMODE(self._loop_device_stat.st_mode))
|
|
os.chown(self._loop_device,
|
|
self._loop_device_stat.st_uid, self._loop_device_stat.st_gid)
|
|
|
|
logging.debug('Detaching image file "%s" from loop device "%s"',
|
|
self._image_file.name, self._loop_device)
|
|
utils.run('losetup -d %s' % self._loop_device)
|
|
self._loop_device = None
|
|
|
|
def format(self):
|
|
"""Formats the image file as the specified filesystem."""
|
|
self.attach_to_loop_device()
|
|
try:
|
|
logging.debug('Formatting image file at "%s" as "%s" filesystem',
|
|
self._image_file.name, self._filesystem_type)
|
|
utils.run('yes | mkfs -t %s %s %s' %
|
|
(self._filesystem_type, ' '.join(self._mkfs_options),
|
|
self._loop_device))
|
|
logging.debug('blkid: %s', utils.system_output(
|
|
'blkid -c /dev/null %s' % self._loop_device,
|
|
ignore_status=True))
|
|
except error.CmdError as exc:
|
|
message = 'Failed to format filesystem image: %s' % exc
|
|
raise RuntimeError(message)
|
|
|
|
def mount(self, options=None):
|
|
"""Mounts the image file to a directory.
|
|
|
|
Args:
|
|
options: An optional list of mount options.
|
|
"""
|
|
if self._mount_dir:
|
|
return self._mount_dir.name
|
|
|
|
if options is None:
|
|
options = []
|
|
|
|
options_arg = ','.join(options)
|
|
if options_arg:
|
|
options_arg = '-o ' + options_arg
|
|
|
|
self.attach_to_loop_device()
|
|
self._mount_dir = autotemp.tempdir(unique_id='fsImage')
|
|
try:
|
|
logging.debug('Mounting image file "%s" (%s) to directory "%s"',
|
|
self._image_file.name, self._loop_device,
|
|
self._mount_dir.name)
|
|
utils.run('mount -t %s %s %s %s' %
|
|
(self._mount_filesystem_type, options_arg,
|
|
self._loop_device, self._mount_dir.name))
|
|
except error.CmdError as exc:
|
|
self._remove_mount_dir()
|
|
message = ('Failed to mount virtual filesystem image "%s": %s' %
|
|
(self._image_file.name, exc))
|
|
raise RuntimeError(message)
|
|
return self._mount_dir.name
|
|
|
|
def unmount(self):
|
|
"""Unmounts the image file from the mounted directory."""
|
|
if not self._mount_dir:
|
|
return
|
|
|
|
try:
|
|
logging.debug('Unmounting image file "%s" (%s) from directory "%s"',
|
|
self._image_file.name, self._loop_device,
|
|
self._mount_dir.name)
|
|
utils.run('umount %s' % self._mount_dir.name)
|
|
except error.CmdError as exc:
|
|
message = ('Failed to unmount virtual filesystem image "%s": %s' %
|
|
(self._image_file.name, exc))
|
|
raise RuntimeError(message)
|
|
finally:
|
|
self._remove_mount_dir()
|
|
|
|
def get_volume_label(self):
|
|
"""Gets volume name information of |self._loop_device|
|
|
|
|
@return a string with volume name if it exists.
|
|
"""
|
|
# This script is run as root in a normal autotest run,
|
|
# so this works: It doesn't have access to the necessary info
|
|
# when run as a non-privileged user
|
|
cmd = "blkid -c /dev/null -o udev %s" % self._loop_device
|
|
output = utils.system_output(cmd, ignore_status=True)
|
|
|
|
for line in output.splitlines():
|
|
udev_key, udev_val = line.split('=')
|
|
|
|
if udev_key == 'ID_FS_LABEL':
|
|
return udev_val
|
|
|
|
return None
|