You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

547 lines
20 KiB

#!/usr/bin/env python
#
# Copyright 2018 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
#
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Helper tool for performing an authenticated AVB unlock of an Android Things device.
This tool communicates with an Android Things device over fastboot to perform an
authenticated AVB unlock. The user provides unlock credentials valid for the
device they want to unlock, likely obtained from the Android Things Developer
Console. The tool handles the sequence of fastboot commands to complete the
challenge-response unlock protocol.
Unlock credentials can be provided to the tool in one of two ways:
1) by providing paths to the individual credential files using the
'--pik_cert', '--puk_cert', and '--puk' command line swtiches, or
2) by providing a path to a zip archive containing the three credential files,
named as follows:
- Product Intermediate Key (PIK) certificate: 'pik_certificate.*\.bin'
- Product Unlock Key (PUK) certificate: 'puk_certificate.*\.bin'
- PUK private key: 'puk.*\.pem'
You can also provide one or more archives and/or one or more directories
containing such zip archives. In either scenario, the tool will search all
of the provided credential archives for a match against the product ID of
the device being unlocked and automatically use the first match.
This tool also clears the factory partition persistent digest unless the
--clear_factory_digest=false option is used. There is no harm to clear this
digest even if changes to the factory partition are not planned.
Dependencies:
- Python 2.7.x, 3.2.x, or newer (for argparse)
- PyCrypto 2.5 or newer (for PKCS1_v1_5 and RSA PKCS#8 PEM key import)
- Android SDK Platform Tools (for fastboot), in PATH
- https://developer.android.com/studio/releases/platform-tools
"""
HELP_DESCRIPTION = """Performs an authenticated AVB unlock of an Android Things device over
fastboot, given valid unlock credentials for the device."""
HELP_USAGE = """
%(prog)s [-h] [-v] [-s SERIAL] [--clear_factory_digest=true|false] unlock_creds.zip [unlock_creds_2.zip ...]
%(prog)s --pik_cert pik_cert.bin --puk_cert puk_cert.bin --puk puk.pem"""
HELP_EPILOG = """examples:
%(prog)s unlock_creds.zip
%(prog)s unlock_creds.zip unlock_creds_2.zip -s SERIAL
%(prog)s path_to_dir_with_multiple_unlock_creds/
%(prog)s --pik_cert pik_cert.bin --puk_cert puk_cert.bin --puk puk.pem"""
import sys
ver = sys.version_info
if (ver[0] < 2) or (ver[0] == 2 and ver[1] < 7) or (ver[0] == 3 and ver[1] < 2):
print('This script requires Python 2.7+ or 3.2+')
sys.exit(1)
import argparse
import binascii
import os
import re
import shutil
import struct
import subprocess
import tempfile
import zipfile
# Requires PyCrypto 2.5 (or newer) for PKCS1_v1_5 and support for importing
# PEM-encoded RSA keys
try:
from Crypto.Hash import SHA512
from Crypto.PublicKey import RSA
from Crypto.Signature import PKCS1_v1_5
except ImportError as e:
print('PyCrypto 2.5 or newer required, missing or too old: ' + str(e))
class UnlockCredentials(object):
"""Helper data container class for the 3 unlock credentials involved in an AVB authenticated unlock operation.
"""
def __init__(self,
intermediate_cert_file,
unlock_cert_file,
unlock_key_file,
source_file=None):
# The certificates are AvbAtxCertificate structs as defined in libavb_atx,
# not an X.509 certificate. Do a basic length sanity check when reading
# them.
EXPECTED_CERTIFICATE_SIZE = 1620
with open(intermediate_cert_file, 'rb') as f:
self._intermediate_cert = f.read()
if len(self._intermediate_cert) != EXPECTED_CERTIFICATE_SIZE:
raise ValueError('Invalid intermediate key certificate length.')
with open(unlock_cert_file, 'rb') as f:
self._unlock_cert = f.read()
if len(self._unlock_cert) != EXPECTED_CERTIFICATE_SIZE:
raise ValueError('Invalid product unlock key certificate length.')
with open(unlock_key_file, 'rb') as f:
self._unlock_key = RSA.importKey(f.read())
if not self._unlock_key.has_private():
raise ValueError('Unlock key was not an RSA private key.')
self._source_file = source_file
@property
def intermediate_cert(self):
return self._intermediate_cert
@property
def unlock_cert(self):
return self._unlock_cert
@property
def unlock_key(self):
return self._unlock_key
@property
def source_file(self):
return self._source_file
@classmethod
def from_credential_archive(cls, archive):
"""Create UnlockCredentials from an unlock credential zip archive.
The zip archive must contain the following three credential files, named as
follows:
- Product Intermediate Key (PIK) certificate: 'pik_certificate.*\.bin'
- Product Unlock Key (PUK) certificate: 'puk_certificate.*\.bin'
- PUK private key: 'puk.*\.pem'
This uses @contextlib.contextmanager so we can clean up the tempdir created
to unpack the zip contents into.
Arguments:
- archive: Filename of zip archive containing unlock credentials.
Raises:
ValueError: If archive is either missing a required file or contains
multiple files matching one of the filename formats.
"""
def _find_one_match(contents, regex, desc):
r = re.compile(regex)
matches = list(filter(r.search, contents))
if not matches:
raise ValueError(
"Couldn't find {} file (matching regex '{}') in archive {}".format(
desc, regex, archive))
elif len(matches) > 1:
raise ValueError(
"Found multiple files for {} (matching regex '{}') in archive {}"
.format(desc, regex, archive))
return matches[0]
tempdir = tempfile.mkdtemp()
try:
with zipfile.ZipFile(archive, mode='r') as zip:
contents = zip.namelist()
pik_cert_re = r'^pik_certificate.*\.bin$'
pik_cert = _find_one_match(contents, pik_cert_re,
'intermediate key (PIK) certificate')
puk_cert_re = r'^puk_certificate.*\.bin$'
puk_cert = _find_one_match(contents, puk_cert_re,
'unlock key (PUK) certificate')
puk_re = r'^puk.*\.pem$'
puk = _find_one_match(contents, puk_re, 'unlock key (PUK)')
zip.extractall(path=tempdir, members=[pik_cert, puk_cert, puk])
return cls(
intermediate_cert_file=os.path.join(tempdir, pik_cert),
unlock_cert_file=os.path.join(tempdir, puk_cert),
unlock_key_file=os.path.join(tempdir, puk),
source_file=archive)
finally:
shutil.rmtree(tempdir)
class UnlockChallenge(object):
"""Helper class for parsing the AvbAtxUnlockChallenge struct returned from 'fastboot oem at-get-vboot-unlock-challenge'.
The file provided to the constructor should be the full 52-byte
AvbAtxUnlockChallenge struct, not just the challenge itself.
"""
def __init__(self, challenge_file):
CHALLENGE_STRUCT_SIZE = 52
PRODUCT_ID_HASH_SIZE = 32
CHALLENGE_DATA_SIZE = 16
with open(challenge_file, 'rb') as f:
data = f.read()
if len(data) != CHALLENGE_STRUCT_SIZE:
raise ValueError('Invalid unlock challenge length.')
self._version, self._product_id_hash, self._challenge_data = struct.unpack(
'<I{}s{}s'.format(PRODUCT_ID_HASH_SIZE, CHALLENGE_DATA_SIZE), data)
@property
def version(self):
return self._version
@property
def product_id_hash(self):
return self._product_id_hash
@property
def challenge_data(self):
return self._challenge_data
def GetAtxCertificateSubject(cert):
"""Parses and returns the subject field from the given AvbAtxCertificate struct."""
CERT_SUBJECT_OFFSET = 4 + 1032 # Format version and public key come before subject
CERT_SUBJECT_LENGTH = 32
return cert[CERT_SUBJECT_OFFSET:CERT_SUBJECT_OFFSET + CERT_SUBJECT_LENGTH]
def SelectMatchingUnlockCredential(all_creds, challenge):
"""Find and return the first UnlockCredentials object whose product ID matches that of the unlock challenge.
The Product Unlock Key (PUK) certificate's subject field contains the
SHA256 hash of the product ID that it can be used to unlock. This same
value (SHA256 hash of the product ID) is contained in the unlock challenge.
Arguments:
all_creds: List of UnlockCredentials objects to be searched for a match
against the given challenge.
challenge: UnlockChallenge object created from challenge obtained via
'fastboot oem at-get-vboot-unlock-challenge'.
"""
for creds in all_creds:
if GetAtxCertificateSubject(creds.unlock_cert) == challenge.product_id_hash:
return creds
def MakeAtxUnlockCredential(creds, challenge, out_file):
"""Simple reimplementation of 'avbtool make_atx_unlock_credential'.
Generates an Android Things authenticated unlock credential to authorize
unlocking AVB on a device.
This is reimplemented locally for simplicity, which avoids the need to bundle
this tool with the full avbtool. avbtool also uses openssl by default whereas
this uses PyCrypto, which makes it easier to support Windows since there are
no officially supported openssl binary distributions.
Arguments:
creds: UnlockCredentials object wrapping the PIK certificate, PUK
certificate, and PUK private key.
challenge: UnlockChallenge object created from challenge obtained via
'fastboot oem at-get-vboot-unlock-challenge'.
out_file: Output filename to write the AvbAtxUnlockCredential struct to.
Raises:
ValueError: If challenge has wrong length.
"""
hash = SHA512.new(challenge.challenge_data)
signer = PKCS1_v1_5.new(creds.unlock_key)
signature = signer.sign(hash)
with open(out_file, 'wb') as out:
out.write(struct.pack('<I', 1)) # Format Version
out.write(creds.intermediate_cert)
out.write(creds.unlock_cert)
out.write(signature)
def AuthenticatedUnlock(all_creds, serial=None, verbose=False):
"""Performs an authenticated AVB unlock of a device over fastboot.
Arguments:
all_creds: List of UnlockCredentials objects wrapping the PIK certificate,
PUK certificate, and PUK private key. The list will be searched to find
matching credentials for the device being unlocked.
serial: [optional] A device serial number or other valid value to be passed
to fastboot's '-s' switch to select the device to unlock.
verbose: [optional] Enable verbose output, which prints the fastboot
commands and their output as the commands are run.
"""
tempdir = tempfile.mkdtemp()
try:
challenge_file = os.path.join(tempdir, 'challenge')
credential_file = os.path.join(tempdir, 'credential')
def fastboot_cmd(args):
args = ['fastboot'] + (['-s', serial] if serial else []) + args
if verbose:
print('\n$ ' + ' '.join(args))
out = subprocess.check_output(
args, stderr=subprocess.STDOUT).decode('utf-8')
if verbose:
print(out)
return out
try:
fastboot_cmd(['oem', 'at-get-vboot-unlock-challenge'])
fastboot_cmd(['get_staged', challenge_file])
challenge = UnlockChallenge(challenge_file)
print('Product ID SHA256 hash = {}'.format(
binascii.hexlify(challenge.product_id_hash)))
selected_cred = SelectMatchingUnlockCredential(all_creds, challenge)
if not selected_cred:
print(
'ERROR: None of the provided unlock credentials match this device.')
return False
if selected_cred.source_file:
print('Found matching unlock credentials: {}'.format(
selected_cred.source_file))
MakeAtxUnlockCredential(selected_cred, challenge, credential_file)
fastboot_cmd(['stage', credential_file])
fastboot_cmd(['oem', 'at-unlock-vboot'])
res = fastboot_cmd(['getvar', 'at-vboot-state'])
if re.search(r'avb-locked(:\s*|=)0', res) is not None:
print('Device successfully AVB unlocked')
return True
else:
print('ERROR: Commands succeeded but device still locked')
return False
except subprocess.CalledProcessError as e:
print(e.output.decode('utf-8'))
print("Command '{}' returned non-zero exit status {}".format(
' '.join(e.cmd), e.returncode))
return False
finally:
shutil.rmtree(tempdir)
def FindUnlockCredentialsInDirectory(dir, verbose=False):
if not os.path.isdir(dir):
raise ValueError('Not a directory: ' + dir)
creds = []
for file in os.listdir(dir):
path = os.path.join(dir, file)
if os.path.isfile(path):
try:
creds.append(UnlockCredentials.from_credential_archive(path))
if verbose:
print('Found valid unlock credential bundle: ' + path)
except (IOError, ValueError, zipfile.BadZipfile) as e:
if verbose:
print(
"Ignoring file which isn't a valid unlock credential zip bundle: "
+ path)
return creds
def ClearFactoryPersistentDigest(serial=None, verbose=False):
"""Clears the factory partition persistent digest using fastboot.
Most of the time this should be cleared when unlocking a device because
otherwise any attempts to update the factory partition will be rejected once
the device is again locked, causing confusion. There is no harm to clear this
digest even if factory partition updates are not planned.
Arguments:
serial: [optional] A device serial number or other valid value to be passed
to fastboot's '-s' switch to select the device to unlock.
verbose: [optional] Enable verbose output, which prints the fastboot
commands and their output as the commands are run.
"""
FACTORY_PERSISTENT_DIGEST_NAME = 'avb.persistent_digest.factory'
tempdir = tempfile.mkdtemp()
try:
digest_data = os.path.join(tempdir, 'digest_data')
with open(digest_data, 'wb') as out:
out.write(struct.pack('<I', len(FACTORY_PERSISTENT_DIGEST_NAME)))
out.write(FACTORY_PERSISTENT_DIGEST_NAME)
# Sending a zero length digest will clear the existing digest.
out.write(struct.pack('<I', 0))
def fastboot_cmd(args):
args = ['fastboot'] + (['-s', serial] if serial else []) + args
if verbose:
print('$ ' + ' '.join(args))
out = subprocess.check_output(
args, stderr=subprocess.STDOUT).decode('utf-8')
if verbose:
print(out)
try:
fastboot_cmd(['stage', digest_data])
fastboot_cmd(['oem', 'at-write-persistent-digest'])
print("Successfully cleared the factory partition persistent digest.")
return True
except subprocess.CalledProcessError as e:
print(e.output.decode('utf-8'))
print("Command '{}' returned non-zero exit status {}".format(
' '.join(e.cmd), e.returncode))
print("Warning: Failed to clear factory partition persistent digest.")
return False
finally:
shutil.rmtree(tempdir)
def parse_boolean(value):
if value.strip().lower() in ('true', 't', 'yes', 'y', 'on', '1'):
return True
elif value.strip().lower() in ('false', 'f', 'no', 'n', 'off', '0'):
return False
else:
raise argparse.ArgumentTypeError('Unexpected boolean value: %s' % value)
def main(in_args):
parser = argparse.ArgumentParser(
description=HELP_DESCRIPTION,
usage=HELP_USAGE,
epilog=HELP_EPILOG,
formatter_class=argparse.RawDescriptionHelpFormatter)
# General optional arguments.
parser.add_argument(
'-v',
'--verbose',
action='store_true',
help=
'enable verbose output, e.g. prints fastboot commands and their output')
parser.add_argument(
'-s',
'--serial',
help=
"specify device to unlock, either by serial or any other valid value for fastboot's -s arg"
)
parser.add_argument(
'--clear_factory_digest',
nargs='?',
type=parse_boolean,
default='true',
const='true',
help='Defaults to true. Set to false to prevent clearing the factory persistent digest')
# User must provide either a unlock credential bundle, or the individual files
# normally contained in such a bundle.
# argparse doesn't support specifying this argument format - two groups of
# mutually exclusive arguments, where one group requires all arguments in that
# group to be specified - so we define them as optional arguments and do the
# validation ourselves below.
# Argument group #1 - Unlock credential zip archive(s) (or directory
# containing multiple such archives)
parser.add_argument(
'bundle',
metavar='unlock_creds.zip',
nargs='*',
help=
'Unlock using a zip bundle/archive of credentials (e.g. from Developer '
'Console). You can optionally provide multiple archives and/or a '
'directory of such bundles and the tool will automatically select the '
'correct one to use based on matching the product ID against the device '
'being unlocked.')
# Argument group #2 - Individual credential files
parser.add_argument(
'--pik_cert',
metavar='pik_cert.bin',
help='Path to product intermediate key (PIK) certificate file')
parser.add_argument(
'--puk_cert',
metavar='puk_cert.bin',
help='Path to product unlock key (PUK) certificate file')
parser.add_argument(
'--puk',
metavar='puk.pem',
help='Path to product unlock key in PEM format')
# Print help if no args given
args = parser.parse_args(in_args if in_args else ['-h'])
# Do the custom validation described above.
if args.pik_cert is not None or args.puk_cert is not None or args.puk is not None:
# Check mutual exclusion with bundle positional argument
if len(args.bundle):
parser.error(
'bundle argument is mutually exclusive with --pik_cert, --puk_cert, and --puk'
)
# Check for 'mutual inclusion' of individual file options
if args.pik_cert is None:
parser.error("--pik_cert is required if --puk_cert or --puk' is given")
if args.puk_cert is None:
parser.error("--puk_cert is required if --pik_cert or --puk' is given")
if args.puk is None:
parser.error("--puk is required if --pik_cert or --puk_cert' is given")
elif not len(args.bundle):
parser.error(
'must provide either credentials bundle or individual credential files')
# Parse arguments into UnlockCredentials objects
if len(args.bundle):
creds = []
for path in args.bundle:
if os.path.isfile(path):
creds.append(UnlockCredentials.from_credential_archive(path))
elif os.path.isdir(path):
creds.extend(
FindUnlockCredentialsInDirectory(path, verbose=args.verbose))
else:
parser.error("path argument '{}' does not exist".format(path))
if len(creds) == 0:
parser.error('No unlock credentials were found in any of the given paths')
else:
creds = [UnlockCredentials(args.pik_cert, args.puk_cert, args.puk)]
ret = AuthenticatedUnlock(creds, serial=args.serial, verbose=args.verbose)
if ret and args.clear_factory_digest:
ret = ClearFactoryPersistentDigest(serial=args.serial, verbose=args.verbose)
return 0 if ret else 1
if __name__ == '__main__':
sys.exit(main(sys.argv[1:]))