You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

148 lines
5.0 KiB

#
# Copyright (C) 2020 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Tools for running host side simulation of an OTA update."""
from __future__ import print_function
import argparse
import filecmp
import os
import shutil
import subprocess
import sys
import tempfile
import zipfile
import update_payload
def extract_file(zip_file_path, entry_name, target_file_path):
"""Extract a file from zip archive into |target_file_path|"""
with open(target_file_path, 'wb') as out_fp:
if isinstance(zip_file_path, zipfile.ZipFile):
with zip_file_path.open(entry_name) as fp:
shutil.copyfileobj(fp, out_fp)
elif os.path.isdir(zip_file_path):
with open(os.path.join(zip_file_path, entry_name), "rb") as fp:
shutil.copyfileobj(fp, out_fp)
def is_sparse_image(filepath):
with open(filepath, 'rb') as fp:
# Magic for android sparse image format
# https://source.android.com/devices/bootloader/images
return fp.read(4) == b'\x3A\xFF\x26\xED'
def extract_img(zip_archive, img_name, output_path):
entry_name = "IMAGES/" + img_name + ".img"
extract_file(zip_archive, entry_name, output_path)
if is_sparse_image(output_path):
raw_img_path = output_path + ".raw"
subprocess.check_output(["simg2img", output_path, raw_img_path])
os.rename(raw_img_path, output_path)
def run_ota(source, target, payload_path, tempdir):
"""Run an OTA on host side"""
payload = update_payload.Payload(payload_path)
payload.Init()
if zipfile.is_zipfile(source):
source = zipfile.ZipFile(source)
if zipfile.is_zipfile(target):
target = zipfile.ZipFile(target)
old_partitions = []
new_partitions = []
expected_new_partitions = []
for part in payload.manifest.partitions:
name = part.partition_name
old_image = os.path.join(tempdir, "source_" + name + ".img")
new_image = os.path.join(tempdir, "target_" + name + ".img")
print("Extracting source image for", name)
extract_img(source, name, old_image)
print("Extracting target image for", name)
extract_img(target, name, new_image)
old_partitions.append(old_image)
scratch_image_name = new_image + ".actual"
new_partitions.append(scratch_image_name)
with open(scratch_image_name, "wb") as fp:
fp.truncate(part.new_partition_info.size)
expected_new_partitions.append(new_image)
delta_generator_args = ["delta_generator", "--in_file=" + payload_path]
partition_names = [
part.partition_name for part in payload.manifest.partitions
]
delta_generator_args.append("--partition_names=" + ":".join(partition_names))
delta_generator_args.append("--old_partitions=" + ":".join(old_partitions))
delta_generator_args.append("--new_partitions=" + ":".join(new_partitions))
subprocess.check_output(delta_generator_args)
valid = True
for (expected_part, actual_part, part_name) in \
zip(expected_new_partitions, new_partitions, partition_names):
if filecmp.cmp(expected_part, actual_part):
print("Partition `{}` is valid".format(part_name))
else:
valid = False
print(
"Partition `{}` is INVALID expected image: {} actual image: {}"
.format(part_name, expected_part, actual_part))
if not valid and sys.stdout.isatty():
input("Paused to investigate invalid partitions, press any key to exit.")
def main():
parser = argparse.ArgumentParser(
description="Run host side simulation of OTA package")
parser.add_argument(
"--source",
help="Target file zip for the source build",
required=True, nargs=1)
parser.add_argument(
"--target",
help="Target file zip for the target build",
required=True, nargs=1)
parser.add_argument(
"payload",
help="payload.bin for the OTA package, or a zip of OTA package itself",
nargs=1)
args = parser.parse_args()
print(args)
assert os.path.exists(args.source[0]), \
"source target file must point to a valid zipfile or directory"
assert os.path.exists(args.target[0]), \
"target path must point to a valid zipfile or directory"
# pylint: disable=no-member
with tempfile.TemporaryDirectory() as tempdir:
payload_path = args.payload[0]
if zipfile.is_zipfile(payload_path):
with zipfile.ZipFile(payload_path, "r") as zfp:
payload_entry_name = 'payload.bin'
zfp.extract(payload_entry_name, tempdir)
payload_path = os.path.join(tempdir, payload_entry_name)
run_ota(args.source[0], args.target[0], payload_path, tempdir)
if __name__ == '__main__':
main()