You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
462 lines
16 KiB
462 lines
16 KiB
# Lint as: python2, python3
|
|
# Copyright 2016 The Chromium OS Authors. All rights reserved.
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
# found in the LICENSE file.
|
|
|
|
"""This class defines the CrosHost Label class."""
|
|
|
|
from __future__ import absolute_import
|
|
from __future__ import division
|
|
from __future__ import print_function
|
|
|
|
import collections
|
|
import logging
|
|
import re
|
|
|
|
import common
|
|
|
|
from autotest_lib.client.bin import utils
|
|
from autotest_lib.client.common_lib import global_config
|
|
from autotest_lib.client.cros.audio import cras_utils
|
|
from autotest_lib.server.cros.dynamic_suite import constants as ds_constants
|
|
from autotest_lib.server.hosts import base_label
|
|
from autotest_lib.server.hosts import common_label
|
|
from autotest_lib.server.hosts import servo_constants
|
|
from autotest_lib.site_utils import hwid_lib
|
|
from six.moves import zip
|
|
|
|
# pylint: disable=missing-docstring
|
|
LsbOutput = collections.namedtuple('LsbOutput', ['unibuild', 'board'])
|
|
|
|
# fallback values if we can't contact the HWID server
|
|
HWID_LABELS_FALLBACK = ['sku', 'phase', 'touchscreen', 'touchpad', 'variant', 'stylus']
|
|
|
|
# Repair and Deploy taskName
|
|
REPAIR_TASK_NAME = 'repair'
|
|
DEPLOY_TASK_NAME = 'deploy'
|
|
|
|
|
|
def _parse_lsb_output(host):
|
|
"""Parses the LSB output and returns key data points for labeling.
|
|
|
|
@param host: Host that the command will be executed against
|
|
@returns: LsbOutput with the result of parsing the /etc/lsb-release output
|
|
"""
|
|
release_info = utils.parse_cmd_output('cat /etc/lsb-release',
|
|
run_method=host.run)
|
|
|
|
unibuild = release_info.get('CHROMEOS_RELEASE_UNIBUILD') == '1'
|
|
return LsbOutput(unibuild, release_info['CHROMEOS_RELEASE_BOARD'])
|
|
|
|
|
|
class DeviceSkuLabel(base_label.StringPrefixLabel):
|
|
"""Determine the correct device_sku label for the device."""
|
|
|
|
_NAME = ds_constants.DEVICE_SKU_LABEL
|
|
|
|
def generate_labels(self, host):
|
|
device_sku = host.host_info_store.get().device_sku
|
|
if device_sku:
|
|
return [device_sku]
|
|
|
|
mosys_cmd = 'mosys platform sku'
|
|
result = host.run(command=mosys_cmd, ignore_status=True)
|
|
if result.exit_status == 0:
|
|
return [result.stdout.strip()]
|
|
|
|
return []
|
|
|
|
def update_for_task(self, task_name):
|
|
# This label is stored in the lab config.
|
|
return task_name in (DEPLOY_TASK_NAME, REPAIR_TASK_NAME, '')
|
|
|
|
|
|
class BrandCodeLabel(base_label.StringPrefixLabel):
|
|
"""Determine the correct brand_code (aka RLZ-code) for the device."""
|
|
|
|
_NAME = ds_constants.BRAND_CODE_LABEL
|
|
|
|
def generate_labels(self, host):
|
|
brand_code = host.host_info_store.get().brand_code
|
|
if brand_code:
|
|
return [brand_code]
|
|
|
|
cros_config_cmd = 'cros_config / brand-code'
|
|
result = host.run(command=cros_config_cmd, ignore_status=True)
|
|
if result.exit_status == 0:
|
|
return [result.stdout.strip()]
|
|
|
|
return []
|
|
|
|
|
|
class BluetoothPeerLabel(base_label.StringPrefixLabel):
|
|
"""Return the Bluetooth peer labels.
|
|
|
|
working_bluetooth_btpeer label is applied if a Raspberry Pi Bluetooth peer
|
|
is detected.There can be up to 4 Bluetooth peers. Labels
|
|
working_bluetooth_btpeer:[1-4] will be assigned depending on the number of
|
|
peers present.
|
|
|
|
"""
|
|
|
|
_NAME = 'working_bluetooth_btpeer'
|
|
|
|
def exists(self, host):
|
|
return len(host._btpeer_host_list) > 0
|
|
|
|
def generate_labels(self, host):
|
|
labels_list = []
|
|
count = 1
|
|
|
|
for (btpeer, btpeer_host) in \
|
|
zip(host.btpeer_list, host._btpeer_host_list):
|
|
try:
|
|
# Initialize one device type to make sure the peer is working
|
|
bt_hid_device = btpeer.get_bluetooth_hid_mouse()
|
|
if bt_hid_device.CheckSerialConnection():
|
|
labels_list.append(str(count))
|
|
count += 1
|
|
except Exception as e:
|
|
logging.error('Error with initializing bt_hid_mouse on '
|
|
'btpeer %s %s', btpeer_host.hostname, e)
|
|
|
|
logging.info('Bluetooth Peer labels are %s', labels_list)
|
|
return labels_list
|
|
|
|
def update_for_task(self, task_name):
|
|
# This label is stored in the state config, so only repair tasks update
|
|
# it or when no task name is mentioned.
|
|
return task_name in (REPAIR_TASK_NAME, '')
|
|
|
|
|
|
class Cr50Label(base_label.StringPrefixLabel):
|
|
"""Label indicating the cr50 image type."""
|
|
|
|
_NAME = 'cr50'
|
|
|
|
def __init__(self):
|
|
self.ver = None
|
|
|
|
def exists(self, host):
|
|
# Make sure the gsctool version command runs ok
|
|
self.ver = host.run('gsctool -a -f', ignore_status=True)
|
|
return self.ver.exit_status == 0
|
|
|
|
def _get_version(self, region):
|
|
"""Get the version number of the given region"""
|
|
return re.search(region + ' (\d+\.\d+\.\d+)', self.ver.stdout).group(1)
|
|
|
|
def generate_labels(self, host):
|
|
# Check the major version to determine prePVT vs PVT
|
|
version = self._get_version('RW')
|
|
major_version = int(version.split('.')[1])
|
|
# PVT images have a odd major version prePVT have even
|
|
return ['pvt' if (major_version % 2) else 'prepvt']
|
|
|
|
def update_for_task(self, task_name):
|
|
# This label is stored in the state config, so only repair tasks update
|
|
# it or when no task name is mentioned.
|
|
return task_name in (REPAIR_TASK_NAME, '')
|
|
|
|
|
|
class Cr50RWKeyidLabel(Cr50Label):
|
|
"""Label indicating the cr50 RW version."""
|
|
_REGION = 'RW'
|
|
_NAME = 'cr50-rw-keyid'
|
|
|
|
def _get_keyid_info(self, region):
|
|
"""Get the keyid of the given region."""
|
|
match = re.search('keyids:.*%s (\S+)' % region, self.ver.stdout)
|
|
keyid = match.group(1).rstrip(',')
|
|
is_prod = int(keyid, 16) & (1 << 2)
|
|
return [keyid, 'prod' if is_prod else 'dev']
|
|
|
|
def generate_labels(self, host):
|
|
"""Get the key type."""
|
|
return self._get_keyid_info(self._REGION)
|
|
|
|
|
|
class Cr50ROKeyidLabel(Cr50RWKeyidLabel):
|
|
"""Label indicating the RO key type."""
|
|
_REGION = 'RO'
|
|
_NAME = 'cr50-ro-keyid'
|
|
|
|
|
|
class ChameleonLabel(base_label.BaseLabel):
|
|
"""Determine if a Chameleon is connected to this host."""
|
|
|
|
_NAME = 'chameleon'
|
|
|
|
def exists(self, host):
|
|
# See crbug.com/1004500#2 for details.
|
|
has_chameleon = host._chameleon_host is not None
|
|
# TODO(crbug.com/995900) -- debug why chameleon label is flipping
|
|
try:
|
|
logging.info("has_chameleon %s", has_chameleon)
|
|
logging.info("_chameleon_host %s",
|
|
getattr(host, "_chameleon_host", "NO_ATTRIBUTE"))
|
|
logging.info("chameleon %s",
|
|
getattr(host, "chameleon", "NO_ATTRIBUTE"))
|
|
except:
|
|
pass
|
|
return has_chameleon
|
|
|
|
def update_for_task(self, task_name):
|
|
# This label is stored in the state config, so only repair tasks update
|
|
# it or when no task name is mentioned.
|
|
return task_name in (REPAIR_TASK_NAME, '')
|
|
|
|
|
|
class ChameleonConnectionLabel(base_label.StringPrefixLabel):
|
|
"""Return the Chameleon connection label."""
|
|
|
|
_NAME = 'chameleon'
|
|
|
|
def exists(self, host):
|
|
return host._chameleon_host is not None
|
|
|
|
def generate_labels(self, host):
|
|
return [host.chameleon.get_label()]
|
|
|
|
def update_for_task(self, task_name):
|
|
# This label is stored in the lab config, so only deploy tasks update it
|
|
# or when no task name is mentioned.
|
|
return task_name in (DEPLOY_TASK_NAME, '')
|
|
|
|
|
|
class AudioLoopbackDongleLabel(base_label.BaseLabel):
|
|
"""Return the label if an audio loopback dongle is plugged in."""
|
|
|
|
_NAME = 'audio_loopback_dongle'
|
|
|
|
def exists(self, host):
|
|
# Based on crbug.com/991285, AudioLoopbackDongle sometimes flips.
|
|
# Ensure that AudioLoopbackDongle.exists returns True
|
|
# forever, after it returns True *once*.
|
|
if self._cached_exists(host):
|
|
# If the current state is True, return it, don't run the command on
|
|
# the DUT and potentially flip the state.
|
|
return True
|
|
# If the current state is not True, run the command on
|
|
# the DUT. The new state will be set to whatever the command
|
|
# produces.
|
|
return self._host_run_exists(host)
|
|
|
|
def _cached_exists(self, host):
|
|
"""Get the state of AudioLoopbackDongle in the data store"""
|
|
info = host.host_info_store.get()
|
|
for label in info.labels:
|
|
if label.startswith(self._NAME):
|
|
return True
|
|
return False
|
|
|
|
def _host_run_exists(self, host):
|
|
"""Detect presence of audio_loopback_dongle by physically
|
|
running a command on the DUT."""
|
|
nodes_info = host.run(command=cras_utils.get_cras_nodes_cmd(),
|
|
ignore_status=True).stdout
|
|
if (cras_utils.node_type_is_plugged('HEADPHONE', nodes_info) and
|
|
cras_utils.node_type_is_plugged('MIC', nodes_info)):
|
|
return True
|
|
return False
|
|
|
|
def update_for_task(self, task_name):
|
|
# This label is stored in the state config, so only repair tasks update
|
|
# it or when no task name is mentioned.
|
|
return task_name in (REPAIR_TASK_NAME, '')
|
|
|
|
|
|
class ServoTypeLabel(base_label.StringPrefixLabel):
|
|
_NAME = servo_constants.SERVO_TYPE_LABEL_PREFIX
|
|
|
|
def generate_labels(self, host):
|
|
info = host.host_info_store.get()
|
|
|
|
servo_type = self._get_from_labels(info)
|
|
if servo_type != '':
|
|
logging.info("Using servo_type: %s from cache!", servo_type)
|
|
return [servo_type]
|
|
|
|
if host.servo is not None:
|
|
try:
|
|
servo_type = host.servo.get_servo_version()
|
|
if servo_type != '':
|
|
return [servo_type]
|
|
logging.warning('Cannot collect servo_type from servo'
|
|
' by `dut-control servo_type`! Please file a bug'
|
|
' and inform infra team as we are not expected '
|
|
' to reach this point.')
|
|
except Exception as e:
|
|
# We don't want fail the label and break DUTs here just
|
|
# because of servo issue.
|
|
logging.error("Failed to update servo_type, %s", str(e))
|
|
return []
|
|
|
|
def _get_from_labels(self, info):
|
|
prefix = self._NAME + ':'
|
|
for label in info.labels:
|
|
if label.startswith(prefix):
|
|
suffix_length = len(prefix)
|
|
return label[suffix_length:]
|
|
return ''
|
|
|
|
def update_for_task(self, task_name):
|
|
# This label is stored in the lab config,
|
|
# only deploy and repair tasks update it
|
|
# or when no task name is mentioned.
|
|
return task_name in (DEPLOY_TASK_NAME, '')
|
|
|
|
|
|
def _parse_hwid_labels(hwid_info_list):
|
|
if len(hwid_info_list) == 0:
|
|
return hwid_info_list
|
|
|
|
res = []
|
|
# See crbug.com/997816#c7 for details of two potential formats of returns
|
|
# from HWID server.
|
|
if isinstance(hwid_info_list[0], dict):
|
|
# Format of hwid_info:
|
|
# [{u'name': u'sku', u'value': u'xxx'}, ..., ]
|
|
for hwid_info in hwid_info_list:
|
|
value = hwid_info.get('value', '')
|
|
name = hwid_info.get('name', '')
|
|
# There should always be a name but just in case there is not.
|
|
if name:
|
|
new_label = name if not value else '%s:%s' % (name, value)
|
|
res.append(new_label)
|
|
else:
|
|
# Format of hwid_info:
|
|
# [<DUTLabel name: 'sku' value: u'xxx'>, ..., ]
|
|
for hwid_info in hwid_info_list:
|
|
new_label = str(hwid_info)
|
|
logging.info('processing hwid label: %s', new_label)
|
|
res.append(new_label)
|
|
|
|
return res
|
|
|
|
|
|
class HWIDLabel(base_label.StringLabel):
|
|
"""Return all the labels generated from the hwid."""
|
|
|
|
# We leave out _NAME because hwid_lib will generate everything for us.
|
|
|
|
def __init__(self):
|
|
# Grab the key file needed to access the hwid service.
|
|
self.key_file = global_config.global_config.get_config_value(
|
|
'CROS', 'HWID_KEY', type=str)
|
|
|
|
|
|
@staticmethod
|
|
def _merge_hwid_label_lists(new, old):
|
|
"""merge a list of old and new values for hwid_labels.
|
|
preferring new values if available
|
|
|
|
@returns: list of labels"""
|
|
# TODO(gregorynisbet): what is the appropriate way to merge
|
|
# old and new information?
|
|
retained = set(x for x in old)
|
|
for label in new:
|
|
key, sep, value = label.partition(':')
|
|
# If we have a key-value key such as variant:aaa,
|
|
# then we remove all the old labels with the same key.
|
|
if sep:
|
|
retained = set(x for x in retained if (not x.startswith(key + ':')))
|
|
return list(sorted(retained.union(new)))
|
|
|
|
|
|
def _hwid_label_names(self):
|
|
"""get the labels that hwid_lib controls.
|
|
|
|
@returns: hwid_labels
|
|
"""
|
|
all_hwid_labels, _ = self.get_all_labels()
|
|
# If and only if get_all_labels was unsuccessful,
|
|
# it will return a falsey value.
|
|
out = all_hwid_labels or HWID_LABELS_FALLBACK
|
|
|
|
# TODO(gregorynisbet): remove this
|
|
# TODO(crbug.com/999785)
|
|
if "sku" not in out:
|
|
logging.info("sku-less label names %s", out)
|
|
|
|
return out
|
|
|
|
|
|
def _old_label_values(self, host):
|
|
"""get the hwid_lib labels on previous run
|
|
|
|
@returns: hwid_labels"""
|
|
out = []
|
|
info = host.host_info_store.get()
|
|
for hwid_label in self._hwid_label_names():
|
|
for label in info.labels:
|
|
# NOTE: we want *all* the labels starting
|
|
# with this prefix.
|
|
if label.startswith(hwid_label):
|
|
out.append(label)
|
|
return out
|
|
|
|
|
|
def generate_labels(self, host):
|
|
# use previous values as default
|
|
old_hwid_labels = self._old_label_values(host)
|
|
logging.info("old_hwid_labels: %r", old_hwid_labels)
|
|
hwid = host.run_output('crossystem hwid').strip()
|
|
hwid_info_list = []
|
|
try:
|
|
hwid_info_response = hwid_lib.get_hwid_info(
|
|
hwid=hwid,
|
|
info_type=hwid_lib.HWID_INFO_LABEL,
|
|
key_file=self.key_file,
|
|
)
|
|
logging.info("hwid_info_response: %r", hwid_info_response)
|
|
hwid_info_list = hwid_info_response.get('labels', [])
|
|
except hwid_lib.HwIdException as e:
|
|
logging.info("HwIdException: %s", e)
|
|
|
|
new_hwid_labels = _parse_hwid_labels(hwid_info_list)
|
|
logging.info("new HWID labels: %r", new_hwid_labels)
|
|
|
|
return HWIDLabel._merge_hwid_label_lists(
|
|
old=old_hwid_labels,
|
|
new=new_hwid_labels,
|
|
)
|
|
|
|
|
|
def get_all_labels(self):
|
|
"""We need to try all labels as a prefix and as standalone.
|
|
|
|
We don't know for sure which labels are prefix labels and which are
|
|
standalone so we try all of them as both.
|
|
"""
|
|
all_hwid_labels = []
|
|
try:
|
|
all_hwid_labels = hwid_lib.get_all_possible_dut_labels(
|
|
self.key_file)
|
|
except IOError:
|
|
logging.error('Can not open key file: %s', self.key_file)
|
|
except hwid_lib.HwIdException as e:
|
|
logging.error('hwid service: %s', e)
|
|
return all_hwid_labels, all_hwid_labels
|
|
|
|
|
|
CROS_LABELS = [
|
|
AudioLoopbackDongleLabel(), #STATECONFIG
|
|
BluetoothPeerLabel(), #STATECONFIG
|
|
ChameleonConnectionLabel(), #LABCONFIG
|
|
ChameleonLabel(), #STATECONFIG
|
|
common_label.OSLabel(),
|
|
DeviceSkuLabel(), #LABCONFIG
|
|
HWIDLabel(),
|
|
ServoTypeLabel(), #LABCONFIG
|
|
# Temporarily add back as there's no way to reference cr50 configs.
|
|
# See crbug.com/1057145 for the root cause.
|
|
# See crbug.com/1057719 for future tracking.
|
|
Cr50Label(),
|
|
Cr50ROKeyidLabel(),
|
|
]
|
|
|
|
LABSTATION_LABELS = [
|
|
common_label.OSLabel(),
|
|
]
|