You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
206 lines
8.1 KiB
206 lines
8.1 KiB
# Copyright (c) 2010 The Chromium OS Authors. All rights reserved.
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
# found in the LICENSE file.
|
|
"""A module containing kernel handler class used by SAFT."""
|
|
|
|
import hashlib
|
|
import os
|
|
import re
|
|
|
|
TMP_FILE_NAME = 'kernel_header_dump'
|
|
|
|
# Types of kernel modifications.
|
|
KERNEL_BODY_MOD = 1
|
|
KERNEL_VERSION_MOD = 2
|
|
KERNEL_RESIGN_MOD = 3
|
|
|
|
|
|
class KernelHandlerError(Exception):
|
|
"""KernelHandler-specific exception."""
|
|
pass
|
|
|
|
|
|
class KernelHandler(object):
|
|
"""An object to provide ChromeOS kernel related actions.
|
|
|
|
Mostly it allows to corrupt and restore a particular kernel partition
|
|
(designated by the partition name, A or B.
|
|
|
|
@type os_if: autotest_lib.client.cros.faft.utils.os_interface.OSInterface
|
|
"""
|
|
|
|
# This value is used to alter contents of a byte in the appropriate kernel
|
|
# image. First added to corrupt the image, then subtracted to restore the
|
|
# image.
|
|
DELTA = 1
|
|
|
|
# The maximum kernel size in MB.
|
|
KERNEL_SIZE_MB = 16
|
|
|
|
def __init__(self, os_if):
|
|
self.os_if = os_if
|
|
self.dump_file_name = None
|
|
self.partition_map = {}
|
|
self.root_dev = None
|
|
self.initialized = False
|
|
|
|
def _get_version(self, device):
|
|
"""Get version of the kernel hosted on the passed in partition."""
|
|
# 16 K should be enough to include headers and keys
|
|
data = self.os_if.read_partition(device, 0x4000)
|
|
return self.os_if.retrieve_body_version(data)
|
|
|
|
def _get_datakey_version(self, device):
|
|
"""Get datakey version of kernel hosted on the passed in partition."""
|
|
# 16 K should be enought to include headers and keys
|
|
data = self.os_if.read_partition(device, 0x4000)
|
|
return self.os_if.retrieve_datakey_version(data)
|
|
|
|
def _get_partition_map(self, internal_disk=True):
|
|
"""Scan `cgpt show <device> output to find kernel devices.
|
|
|
|
Args:
|
|
internal_disk - decide whether to use internal kernel disk.
|
|
"""
|
|
if internal_disk:
|
|
target_device = self.os_if.get_internal_disk(
|
|
self.os_if.get_root_part())
|
|
else:
|
|
target_device = self.root_dev
|
|
|
|
kernel_partitions = re.compile('KERN-([AB])')
|
|
disk_map = self.os_if.run_shell_command_get_output(
|
|
'cgpt show %s' % target_device)
|
|
|
|
for line in disk_map:
|
|
matched_line = kernel_partitions.search(line)
|
|
if not matched_line:
|
|
continue
|
|
label = matched_line.group(1)
|
|
part_info = {}
|
|
device = self.os_if.join_part(target_device, line.split()[2])
|
|
part_info['device'] = device
|
|
part_info['version'] = self._get_version(device)
|
|
part_info['datakey_version'] = self._get_datakey_version(device)
|
|
self.partition_map[label] = part_info
|
|
|
|
def dump_kernel(self, section, kernel_path):
|
|
"""Dump the specified kernel to a file.
|
|
|
|
@param section: The kernel to dump. May be A or B.
|
|
@param kernel_path: The path to the kernel image.
|
|
"""
|
|
dev = self.partition_map[section.upper()]['device']
|
|
cmd = 'dd if=%s of=%s bs=%dM count=1' % (dev, kernel_path,
|
|
self.KERNEL_SIZE_MB)
|
|
self.os_if.run_shell_command(cmd)
|
|
|
|
def write_kernel(self, section, kernel_path):
|
|
"""Write a kernel image to the specified section.
|
|
|
|
@param section: The kernel to write. May be A or B.
|
|
@param kernel_path: The path to the kernel image to write.
|
|
"""
|
|
dev = self.partition_map[section.upper()]['device']
|
|
dd_cmd = 'dd if=%s of=%s bs=%dM count=1' % (kernel_path, dev,
|
|
self.KERNEL_SIZE_MB)
|
|
self.os_if.run_shell_command(dd_cmd, modifies_device=True)
|
|
|
|
def _modify_kernel(self,
|
|
section,
|
|
delta,
|
|
modification_type=KERNEL_BODY_MOD,
|
|
key_path=None):
|
|
"""Modify kernel image on a disk partition.
|
|
|
|
This method supports three types of kernel modification. KERNEL_BODY_MOD
|
|
just adds the value of delta to the first byte of the kernel blob.
|
|
This might leave the kernel corrupted (as required by the test).
|
|
|
|
The second type, KERNEL_VERSION_MOD - will use 'delta' as the new
|
|
version number, it will put it in the kernel header, and then will
|
|
resign the kernel blob.
|
|
|
|
The third type. KERNEL_RESIGN_MOD - will resign the kernel with keys in
|
|
argument key_path. If key_path is None, choose dev_key_path as resign
|
|
key directory.
|
|
"""
|
|
self.dump_kernel(section, self.dump_file_name)
|
|
data = list(self.os_if.read_file(self.dump_file_name))
|
|
if modification_type == KERNEL_BODY_MOD:
|
|
data[0] = '%c' % ((ord(data[0]) + delta) % 0x100)
|
|
self.os_if.write_file(self.dump_file_name, ''.join(data))
|
|
kernel_to_write = self.dump_file_name
|
|
elif modification_type == KERNEL_VERSION_MOD:
|
|
new_version = delta
|
|
kernel_to_write = self.dump_file_name + '.new'
|
|
self.os_if.run_shell_command(
|
|
'vbutil_kernel --repack %s --version %d '
|
|
'--signprivate %s --oldblob %s' %
|
|
(kernel_to_write, new_version,
|
|
os.path.join(self.dev_key_path,
|
|
'kernel_data_key.vbprivk'),
|
|
self.dump_file_name))
|
|
elif modification_type == KERNEL_RESIGN_MOD:
|
|
if key_path and self.os_if.is_dir(key_path):
|
|
resign_key_path = key_path
|
|
else:
|
|
resign_key_path = self.dev_key_path
|
|
|
|
kernel_to_write = self.dump_file_name + '.new'
|
|
self.os_if.run_shell_command(
|
|
'vbutil_kernel --repack %s '
|
|
'--signprivate %s --oldblob %s --keyblock %s' %
|
|
(kernel_to_write,
|
|
os.path.join(resign_key_path, 'kernel_data_key.vbprivk'),
|
|
self.dump_file_name,
|
|
os.path.join(resign_key_path, 'kernel.keyblock')))
|
|
else:
|
|
return # Unsupported mode, ignore.
|
|
self.write_kernel(section, kernel_to_write)
|
|
|
|
def corrupt_kernel(self, section):
|
|
"""Corrupt a kernel section (add DELTA to the first byte)."""
|
|
self._modify_kernel(section.upper(), self.DELTA)
|
|
|
|
def restore_kernel(self, section):
|
|
"""Restore the previously corrupted kernel."""
|
|
self._modify_kernel(section.upper(), -self.DELTA)
|
|
|
|
def get_version(self, section):
|
|
"""Return version read from this section blob's header."""
|
|
return self.partition_map[section.upper()]['version']
|
|
|
|
def get_datakey_version(self, section):
|
|
"""Return datakey version read from this section blob's header."""
|
|
return self.partition_map[section.upper()]['datakey_version']
|
|
|
|
def get_sha(self, section):
|
|
"""Return the SHA1 hash of the section blob."""
|
|
s = hashlib.sha1()
|
|
dev = self.partition_map[section.upper()]['device']
|
|
s.update(self.os_if.read_file(dev))
|
|
return s.hexdigest()
|
|
|
|
def set_version(self, section, version):
|
|
"""Set version of this kernel blob and re-sign it."""
|
|
if version < 0:
|
|
raise KernelHandlerError('Bad version value %d' % version)
|
|
self._modify_kernel(section.upper(), version, KERNEL_VERSION_MOD)
|
|
|
|
def resign_kernel(self, section, key_path=None):
|
|
"""Resign kernel with original kernel version and keys in key_path."""
|
|
self._modify_kernel(section.upper(), self.get_version(section),
|
|
KERNEL_RESIGN_MOD, key_path)
|
|
|
|
def init(self, dev_key_path='.', internal_disk=True):
|
|
"""Initialize the kernel handler object.
|
|
|
|
Input argument is an OS interface object reference.
|
|
"""
|
|
self.dev_key_path = dev_key_path
|
|
self.root_dev = self.os_if.get_root_dev()
|
|
self.dump_file_name = self.os_if.state_dir_file(TMP_FILE_NAME)
|
|
self._get_partition_map(internal_disk)
|
|
self.initialized = True
|