You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
2420 lines
96 KiB
2420 lines
96 KiB
# pylint: disable=missing-docstring
|
|
|
|
"""\
|
|
Functions to expose over the RPC interface.
|
|
|
|
For all modify* and delete* functions that ask for an 'id' parameter to
|
|
identify the object to operate on, the id may be either
|
|
* the database row ID
|
|
* the name of the object (label name, hostname, user login, etc.)
|
|
* a dictionary containing uniquely identifying field (this option should seldom
|
|
be used)
|
|
|
|
When specifying foreign key fields (i.e. adding hosts to a label, or adding
|
|
users to an ACL group), the given value may be either the database row ID or the
|
|
name of the object.
|
|
|
|
All get* functions return lists of dictionaries. Each dictionary represents one
|
|
object and maps field names to values.
|
|
|
|
Some examples:
|
|
modify_host(2, hostname='myhost') # modify hostname of host with database ID 2
|
|
modify_host('ipaj2', hostname='myhost') # modify hostname of host 'ipaj2'
|
|
modify_test('sleeptest', test_type='Client', params=', seconds=60')
|
|
delete_acl_group(1) # delete by ID
|
|
delete_acl_group('Everyone') # delete by name
|
|
acl_group_add_users('Everyone', ['mbligh', 'showard'])
|
|
get_jobs(owner='showard', status='Queued')
|
|
|
|
See doctests/001_rpc_test.txt for (lots) more examples.
|
|
"""
|
|
|
|
__author__ = 'showard@google.com (Steve Howard)'
|
|
|
|
import ast
|
|
import collections
|
|
import contextlib
|
|
import datetime
|
|
import logging
|
|
import os
|
|
import sys
|
|
import warnings
|
|
|
|
from django.db import connection as db_connection
|
|
from django.db import transaction
|
|
from django.db.models import Count
|
|
from django.db.utils import DatabaseError
|
|
|
|
import common
|
|
from autotest_lib.client.common_lib import control_data
|
|
from autotest_lib.client.common_lib import error
|
|
from autotest_lib.client.common_lib import global_config
|
|
from autotest_lib.client.common_lib import priorities
|
|
from autotest_lib.client.common_lib.cros import dev_server
|
|
from autotest_lib.frontend.afe import control_file as control_file_lib
|
|
from autotest_lib.frontend.afe import model_attributes
|
|
from autotest_lib.frontend.afe import model_logic
|
|
from autotest_lib.frontend.afe import models
|
|
from autotest_lib.frontend.afe import rpc_utils
|
|
from autotest_lib.frontend.tko import models as tko_models
|
|
from autotest_lib.frontend.tko import rpc_interface as tko_rpc_interface
|
|
from autotest_lib.server import frontend
|
|
from autotest_lib.server import utils
|
|
from autotest_lib.server.cros import provision
|
|
from autotest_lib.server.cros.dynamic_suite import constants
|
|
from autotest_lib.server.cros.dynamic_suite import control_file_getter
|
|
from autotest_lib.server.cros.dynamic_suite import suite_common
|
|
from autotest_lib.server.cros.dynamic_suite import tools
|
|
from autotest_lib.server.cros.dynamic_suite.suite import Suite
|
|
from autotest_lib.server.lib import status_history
|
|
from autotest_lib.site_utils import job_history
|
|
from autotest_lib.site_utils import server_manager_utils
|
|
from autotest_lib.site_utils import stable_version_utils
|
|
|
|
|
|
_CONFIG = global_config.global_config
|
|
|
|
# Definition of LabHealthIndicator
|
|
LabHealthIndicator = collections.namedtuple(
|
|
'LabHealthIndicator',
|
|
[
|
|
'if_lab_close',
|
|
'available_duts',
|
|
'devserver_health',
|
|
'upcoming_builds',
|
|
]
|
|
)
|
|
|
|
RESPECT_STATIC_LABELS = global_config.global_config.get_config_value(
|
|
'SKYLAB', 'respect_static_labels', type=bool, default=False)
|
|
|
|
RESPECT_STATIC_ATTRIBUTES = global_config.global_config.get_config_value(
|
|
'SKYLAB', 'respect_static_attributes', type=bool, default=False)
|
|
|
|
# Relevant CrosDynamicSuiteExceptions are defined in client/common_lib/error.py.
|
|
|
|
# labels
|
|
|
|
def modify_label(id, **data):
|
|
"""Modify a label.
|
|
|
|
@param id: id or name of a label. More often a label name.
|
|
@param data: New data for a label.
|
|
"""
|
|
label_model = models.Label.smart_get(id)
|
|
if label_model.is_replaced_by_static():
|
|
raise error.UnmodifiableLabelException(
|
|
'Failed to delete label "%s" because it is a static label. '
|
|
'Use go/chromeos-skylab-inventory-tools to modify this '
|
|
'label.' % label_model.name)
|
|
|
|
label_model.update_object(data)
|
|
|
|
# Main forwards the RPC to shards
|
|
if not utils.is_shard():
|
|
rpc_utils.fanout_rpc(label_model.host_set.all(), 'modify_label', False,
|
|
id=id, **data)
|
|
|
|
|
|
def delete_label(id):
|
|
"""Delete a label.
|
|
|
|
@param id: id or name of a label. More often a label name.
|
|
"""
|
|
label_model = models.Label.smart_get(id)
|
|
if label_model.is_replaced_by_static():
|
|
raise error.UnmodifiableLabelException(
|
|
'Failed to delete label "%s" because it is a static label. '
|
|
'Use go/chromeos-skylab-inventory-tools to modify this '
|
|
'label.' % label_model.name)
|
|
|
|
# Hosts that have the label to be deleted. Save this info before
|
|
# the label is deleted to use it later.
|
|
hosts = []
|
|
for h in label_model.host_set.all():
|
|
hosts.append(models.Host.smart_get(h.id))
|
|
label_model.delete()
|
|
|
|
# Main forwards the RPC to shards
|
|
if not utils.is_shard():
|
|
rpc_utils.fanout_rpc(hosts, 'delete_label', False, id=id)
|
|
|
|
|
|
def add_label(name, ignore_exception_if_exists=False, **kwargs):
|
|
"""Adds a new label of a given name.
|
|
|
|
@param name: label name.
|
|
@param ignore_exception_if_exists: If True and the exception was
|
|
thrown due to the duplicated label name when adding a label,
|
|
then suppress the exception. Default is False.
|
|
@param kwargs: keyword args that store more info about a label
|
|
other than the name.
|
|
@return: int/long id of a new label.
|
|
"""
|
|
# models.Label.add_object() throws model_logic.ValidationError
|
|
# when it is given a label name that already exists.
|
|
# However, ValidationError can be thrown with different errors,
|
|
# and those errors should be thrown up to the call chain.
|
|
try:
|
|
label = models.Label.add_object(name=name, **kwargs)
|
|
except:
|
|
exc_info = sys.exc_info()
|
|
if ignore_exception_if_exists:
|
|
label = rpc_utils.get_label(name)
|
|
# If the exception is raised not because of duplicated
|
|
# "name", then raise the original exception.
|
|
if label is None:
|
|
raise exc_info[0], exc_info[1], exc_info[2]
|
|
else:
|
|
raise exc_info[0], exc_info[1], exc_info[2]
|
|
return label.id
|
|
|
|
|
|
def add_label_to_hosts(id, hosts):
|
|
"""Adds a label of the given id to the given hosts only in local DB.
|
|
|
|
@param id: id or name of a label. More often a label name.
|
|
@param hosts: The hostnames of hosts that need the label.
|
|
|
|
@raises models.Label.DoesNotExist: If the label with id doesn't exist.
|
|
"""
|
|
label = models.Label.smart_get(id)
|
|
if label.is_replaced_by_static():
|
|
label = models.StaticLabel.smart_get(label.name)
|
|
|
|
host_objs = models.Host.smart_get_bulk(hosts)
|
|
if label.platform:
|
|
models.Host.check_no_platform(host_objs)
|
|
# Ensure a host has no more than one board label with it.
|
|
if label.name.startswith('board:'):
|
|
models.Host.check_board_labels_allowed(host_objs, [label.name])
|
|
label.host_set.add(*host_objs)
|
|
|
|
|
|
def _create_label_everywhere(id, hosts):
|
|
"""
|
|
Yet another method to create labels.
|
|
|
|
ALERT! This method should be run only on main not shards!
|
|
DO NOT RUN THIS ON A SHARD!!! Deputies will hate you if you do!!!
|
|
|
|
This method exists primarily to serve label_add_hosts() and
|
|
host_add_labels(). Basically it pulls out the label check/add logic
|
|
from label_add_hosts() into this nice method that not only creates
|
|
the label but also tells the shards that service the hosts to also
|
|
create the label.
|
|
|
|
@param id: id or name of a label. More often a label name.
|
|
@param hosts: A list of hostnames or ids. More often hostnames.
|
|
"""
|
|
try:
|
|
label = models.Label.smart_get(id)
|
|
except models.Label.DoesNotExist:
|
|
# This matches the type checks in smart_get, which is a hack
|
|
# in and off itself. The aim here is to create any non-existent
|
|
# label, which we cannot do if the 'id' specified isn't a label name.
|
|
if isinstance(id, basestring):
|
|
label = models.Label.smart_get(add_label(id))
|
|
else:
|
|
raise ValueError('Label id (%s) does not exist. Please specify '
|
|
'the argument, id, as a string (label name).'
|
|
% id)
|
|
|
|
# Make sure the label exists on the shard with the same id
|
|
# as it is on the main.
|
|
# It is possible that the label is already in a shard because
|
|
# we are adding a new label only to shards of hosts that the label
|
|
# is going to be attached.
|
|
# For example, we add a label L1 to a host in shard S1.
|
|
# Main and S1 will have L1 but other shards won't.
|
|
# Later, when we add the same label L1 to hosts in shards S1 and S2,
|
|
# S1 already has the label but S2 doesn't.
|
|
# S2 should have the new label without any problem.
|
|
# We ignore exception in such a case.
|
|
host_objs = models.Host.smart_get_bulk(hosts)
|
|
rpc_utils.fanout_rpc(
|
|
host_objs, 'add_label', include_hostnames=False,
|
|
name=label.name, ignore_exception_if_exists=True,
|
|
id=label.id, platform=label.platform)
|
|
|
|
|
|
@rpc_utils.route_rpc_to_main
|
|
def label_add_hosts(id, hosts):
|
|
"""Adds a label with the given id to the given hosts.
|
|
|
|
This method should be run only on main not shards.
|
|
The given label will be created if it doesn't exist, provided the `id`
|
|
supplied is a label name not an int/long id.
|
|
|
|
@param id: id or name of a label. More often a label name.
|
|
@param hosts: A list of hostnames or ids. More often hostnames.
|
|
|
|
@raises ValueError: If the id specified is an int/long (label id)
|
|
while the label does not exist.
|
|
"""
|
|
# Create the label.
|
|
_create_label_everywhere(id, hosts)
|
|
|
|
# Add it to the main.
|
|
add_label_to_hosts(id, hosts)
|
|
|
|
# Add it to the shards.
|
|
host_objs = models.Host.smart_get_bulk(hosts)
|
|
rpc_utils.fanout_rpc(host_objs, 'add_label_to_hosts', id=id)
|
|
|
|
|
|
def remove_label_from_hosts(id, hosts):
|
|
"""Removes a label of the given id from the given hosts only in local DB.
|
|
|
|
@param id: id or name of a label.
|
|
@param hosts: The hostnames of hosts that need to remove the label from.
|
|
"""
|
|
host_objs = models.Host.smart_get_bulk(hosts)
|
|
label = models.Label.smart_get(id)
|
|
if label.is_replaced_by_static():
|
|
raise error.UnmodifiableLabelException(
|
|
'Failed to remove label "%s" for hosts "%r" because it is a '
|
|
'static label. Use go/chromeos-skylab-inventory-tools to '
|
|
'modify this label.' % (label.name, hosts))
|
|
|
|
label.host_set.remove(*host_objs)
|
|
|
|
|
|
@rpc_utils.route_rpc_to_main
|
|
def label_remove_hosts(id, hosts):
|
|
"""Removes a label of the given id from the given hosts.
|
|
|
|
This method should be run only on main not shards.
|
|
|
|
@param id: id or name of a label.
|
|
@param hosts: A list of hostnames or ids. More often hostnames.
|
|
"""
|
|
host_objs = models.Host.smart_get_bulk(hosts)
|
|
remove_label_from_hosts(id, hosts)
|
|
|
|
rpc_utils.fanout_rpc(host_objs, 'remove_label_from_hosts', id=id)
|
|
|
|
|
|
def get_labels(exclude_filters=(), **filter_data):
|
|
"""\
|
|
@param exclude_filters: A sequence of dictionaries of filters.
|
|
|
|
@returns A sequence of nested dictionaries of label information.
|
|
"""
|
|
labels = models.Label.query_objects(filter_data)
|
|
for exclude_filter in exclude_filters:
|
|
labels = labels.exclude(**exclude_filter)
|
|
|
|
if not RESPECT_STATIC_LABELS:
|
|
return rpc_utils.prepare_rows_as_nested_dicts(labels, ())
|
|
|
|
static_labels = models.StaticLabel.query_objects(filter_data)
|
|
for exclude_filter in exclude_filters:
|
|
static_labels = static_labels.exclude(**exclude_filter)
|
|
|
|
non_static_lists = rpc_utils.prepare_rows_as_nested_dicts(labels, ())
|
|
static_lists = rpc_utils.prepare_rows_as_nested_dicts(static_labels, ())
|
|
|
|
label_ids = [label.id for label in labels]
|
|
replaced = models.ReplacedLabel.objects.filter(label__id__in=label_ids)
|
|
replaced_ids = {r.label_id for r in replaced}
|
|
replaced_label_names = {l.name for l in labels if l.id in replaced_ids}
|
|
|
|
return_lists = []
|
|
for non_static_label in non_static_lists:
|
|
if non_static_label.get('id') not in replaced_ids:
|
|
return_lists.append(non_static_label)
|
|
|
|
for static_label in static_lists:
|
|
if static_label.get('name') in replaced_label_names:
|
|
return_lists.append(static_label)
|
|
|
|
return return_lists
|
|
|
|
|
|
# hosts
|
|
|
|
def add_host(hostname, status=None, locked=None, lock_reason='', protection=None):
|
|
if locked and not lock_reason:
|
|
raise model_logic.ValidationError(
|
|
{'locked': 'Please provide a reason for locking when adding host.'})
|
|
|
|
return models.Host.add_object(hostname=hostname, status=status,
|
|
locked=locked, lock_reason=lock_reason,
|
|
protection=protection).id
|
|
|
|
|
|
@rpc_utils.route_rpc_to_main
|
|
def modify_host(id, **kwargs):
|
|
"""Modify local attributes of a host.
|
|
|
|
If this is called on the main, but the host is assigned to a shard, this
|
|
will call `modify_host_local` RPC to the responsible shard. This means if
|
|
a host is being locked using this function, this change will also propagate
|
|
to shards.
|
|
When this is called on a shard, the shard just routes the RPC to the main
|
|
and does nothing.
|
|
|
|
@param id: id of the host to modify.
|
|
@param kwargs: key=value pairs of values to set on the host.
|
|
"""
|
|
rpc_utils.check_modify_host(kwargs)
|
|
host = models.Host.smart_get(id)
|
|
try:
|
|
rpc_utils.check_modify_host_locking(host, kwargs)
|
|
except model_logic.ValidationError as e:
|
|
if not kwargs.get('force_modify_locking', False):
|
|
raise
|
|
logging.exception('The following exception will be ignored and lock '
|
|
'modification will be enforced. %s', e)
|
|
|
|
# This is required to make `lock_time` for a host be exactly same
|
|
# between the main and a shard.
|
|
if kwargs.get('locked', None) and 'lock_time' not in kwargs:
|
|
kwargs['lock_time'] = datetime.datetime.now()
|
|
|
|
# force_modifying_locking is not an internal field in database, remove.
|
|
shard_kwargs = dict(kwargs)
|
|
shard_kwargs.pop('force_modify_locking', None)
|
|
rpc_utils.fanout_rpc([host], 'modify_host_local',
|
|
include_hostnames=False, id=id, **shard_kwargs)
|
|
|
|
# Update the local DB **after** RPC fanout is complete.
|
|
# This guarantees that the main state is only updated if the shards were
|
|
# correctly updated.
|
|
# In case the shard update fails mid-flight and the main-shard desync, we
|
|
# always consider the main state to be the source-of-truth, and any
|
|
# (automated) corrective actions will revert the (partial) shard updates.
|
|
host.update_object(kwargs)
|
|
|
|
|
|
def modify_host_local(id, **kwargs):
|
|
"""Modify host attributes in local DB.
|
|
|
|
@param id: Host id.
|
|
@param kwargs: key=value pairs of values to set on the host.
|
|
"""
|
|
models.Host.smart_get(id).update_object(kwargs)
|
|
|
|
|
|
@rpc_utils.route_rpc_to_main
|
|
def modify_hosts(host_filter_data, update_data):
|
|
"""Modify local attributes of multiple hosts.
|
|
|
|
If this is called on the main, but one of the hosts in that match the
|
|
filters is assigned to a shard, this will call `modify_hosts_local` RPC
|
|
to the responsible shard.
|
|
When this is called on a shard, the shard just routes the RPC to the main
|
|
and does nothing.
|
|
|
|
The filters are always applied on the main, not on the shards. This means
|
|
if the states of a host differ on the main and a shard, the state on the
|
|
main will be used. I.e. this means:
|
|
A host was synced to Shard 1. On Shard 1 the status of the host was set to
|
|
'Repair Failed'.
|
|
- A call to modify_hosts with host_filter_data={'status': 'Ready'} will
|
|
update the host (both on the shard and on the main), because the state
|
|
of the host as the main knows it is still 'Ready'.
|
|
- A call to modify_hosts with host_filter_data={'status': 'Repair failed'
|
|
will not update the host, because the filter doesn't apply on the main.
|
|
|
|
@param host_filter_data: Filters out which hosts to modify.
|
|
@param update_data: A dictionary with the changes to make to the hosts.
|
|
"""
|
|
update_data = update_data.copy()
|
|
rpc_utils.check_modify_host(update_data)
|
|
hosts = models.Host.query_objects(host_filter_data)
|
|
|
|
affected_shard_hostnames = set()
|
|
affected_host_ids = []
|
|
|
|
# Check all hosts before changing data for exception safety.
|
|
for host in hosts:
|
|
try:
|
|
rpc_utils.check_modify_host_locking(host, update_data)
|
|
except model_logic.ValidationError as e:
|
|
if not update_data.get('force_modify_locking', False):
|
|
raise
|
|
logging.exception('The following exception will be ignored and '
|
|
'lock modification will be enforced. %s', e)
|
|
|
|
if host.shard:
|
|
affected_shard_hostnames.add(host.shard.hostname)
|
|
affected_host_ids.append(host.id)
|
|
|
|
# This is required to make `lock_time` for a host be exactly same
|
|
# between the main and a shard.
|
|
if update_data.get('locked', None) and 'lock_time' not in update_data:
|
|
update_data['lock_time'] = datetime.datetime.now()
|
|
for host in hosts:
|
|
host.update_object(update_data)
|
|
|
|
update_data.pop('force_modify_locking', None)
|
|
# Caution: Changing the filter from the original here. See docstring.
|
|
rpc_utils.run_rpc_on_multiple_hostnames(
|
|
'modify_hosts_local', affected_shard_hostnames,
|
|
host_filter_data={'id__in': affected_host_ids},
|
|
update_data=update_data)
|
|
|
|
|
|
def modify_hosts_local(host_filter_data, update_data):
|
|
"""Modify attributes of hosts in local DB.
|
|
|
|
@param host_filter_data: Filters out which hosts to modify.
|
|
@param update_data: A dictionary with the changes to make to the hosts.
|
|
"""
|
|
for host in models.Host.query_objects(host_filter_data):
|
|
host.update_object(update_data)
|
|
|
|
|
|
def add_labels_to_host(id, labels):
|
|
"""Adds labels to a given host only in local DB.
|
|
|
|
@param id: id or hostname for a host.
|
|
@param labels: ids or names for labels.
|
|
"""
|
|
label_objs = models.Label.smart_get_bulk(labels)
|
|
if not RESPECT_STATIC_LABELS:
|
|
models.Host.smart_get(id).labels.add(*label_objs)
|
|
else:
|
|
static_labels, non_static_labels = models.Host.classify_label_objects(
|
|
label_objs)
|
|
host = models.Host.smart_get(id)
|
|
host.static_labels.add(*static_labels)
|
|
host.labels.add(*non_static_labels)
|
|
|
|
|
|
@rpc_utils.route_rpc_to_main
|
|
def host_add_labels(id, labels):
|
|
"""Adds labels to a given host.
|
|
|
|
@param id: id or hostname for a host.
|
|
@param labels: ids or names for labels.
|
|
|
|
@raises ValidationError: If adding more than one platform/board label.
|
|
"""
|
|
# Create the labels on the main/shards.
|
|
for label in labels:
|
|
_create_label_everywhere(label, [id])
|
|
|
|
label_objs = models.Label.smart_get_bulk(labels)
|
|
|
|
platforms = [label.name for label in label_objs if label.platform]
|
|
if len(platforms) > 1:
|
|
raise model_logic.ValidationError(
|
|
{'labels': ('Adding more than one platform: %s' %
|
|
', '.join(platforms))})
|
|
|
|
host_obj = models.Host.smart_get(id)
|
|
if platforms:
|
|
models.Host.check_no_platform([host_obj])
|
|
if any(label_name.startswith('board:') for label_name in labels):
|
|
models.Host.check_board_labels_allowed([host_obj], labels)
|
|
add_labels_to_host(id, labels)
|
|
|
|
rpc_utils.fanout_rpc([host_obj], 'add_labels_to_host', False,
|
|
id=id, labels=labels)
|
|
|
|
|
|
def remove_labels_from_host(id, labels):
|
|
"""Removes labels from a given host only in local DB.
|
|
|
|
@param id: id or hostname for a host.
|
|
@param labels: ids or names for labels.
|
|
"""
|
|
label_objs = models.Label.smart_get_bulk(labels)
|
|
if not RESPECT_STATIC_LABELS:
|
|
models.Host.smart_get(id).labels.remove(*label_objs)
|
|
else:
|
|
static_labels, non_static_labels = models.Host.classify_label_objects(
|
|
label_objs)
|
|
host = models.Host.smart_get(id)
|
|
host.labels.remove(*non_static_labels)
|
|
if static_labels:
|
|
logging.info('Cannot remove labels "%r" for host "%r" due to they '
|
|
'are static labels. Use '
|
|
'go/chromeos-skylab-inventory-tools to modify these '
|
|
'labels.', static_labels, id)
|
|
|
|
|
|
@rpc_utils.route_rpc_to_main
|
|
def host_remove_labels(id, labels):
|
|
"""Removes labels from a given host.
|
|
|
|
@param id: id or hostname for a host.
|
|
@param labels: ids or names for labels.
|
|
"""
|
|
remove_labels_from_host(id, labels)
|
|
|
|
host_obj = models.Host.smart_get(id)
|
|
rpc_utils.fanout_rpc([host_obj], 'remove_labels_from_host', False,
|
|
id=id, labels=labels)
|
|
|
|
|
|
def get_host_attribute(attribute, **host_filter_data):
|
|
"""
|
|
@param attribute: string name of attribute
|
|
@param host_filter_data: filter data to apply to Hosts to choose hosts to
|
|
act upon
|
|
"""
|
|
hosts = rpc_utils.get_host_query((), False, True, host_filter_data)
|
|
hosts = list(hosts)
|
|
models.Host.objects.populate_relationships(hosts, models.HostAttribute,
|
|
'attribute_list')
|
|
host_attr_dicts = []
|
|
host_objs = []
|
|
for host_obj in hosts:
|
|
for attr_obj in host_obj.attribute_list:
|
|
if attr_obj.attribute == attribute:
|
|
host_attr_dicts.append(attr_obj.get_object_dict())
|
|
host_objs.append(host_obj)
|
|
|
|
if RESPECT_STATIC_ATTRIBUTES:
|
|
for host_attr, host_obj in zip(host_attr_dicts, host_objs):
|
|
static_attrs = models.StaticHostAttribute.query_objects(
|
|
{'host_id': host_obj.id, 'attribute': attribute})
|
|
if len(static_attrs) > 0:
|
|
host_attr['value'] = static_attrs[0].value
|
|
|
|
return rpc_utils.prepare_for_serialization(host_attr_dicts)
|
|
|
|
|
|
@rpc_utils.route_rpc_to_main
|
|
def set_host_attribute(attribute, value, **host_filter_data):
|
|
"""Set an attribute on hosts.
|
|
|
|
This RPC is a shim that forwards calls to main to be handled there.
|
|
|
|
@param attribute: string name of attribute
|
|
@param value: string, or None to delete an attribute
|
|
@param host_filter_data: filter data to apply to Hosts to choose hosts to
|
|
act upon
|
|
"""
|
|
assert not utils.is_shard()
|
|
set_host_attribute_impl(attribute, value, **host_filter_data)
|
|
|
|
|
|
def set_host_attribute_impl(attribute, value, **host_filter_data):
|
|
"""Set an attribute on hosts.
|
|
|
|
*** DO NOT CALL THIS RPC from client code ***
|
|
This RPC exists for main-shard communication only.
|
|
Call set_host_attribute instead.
|
|
|
|
@param attribute: string name of attribute
|
|
@param value: string, or None to delete an attribute
|
|
@param host_filter_data: filter data to apply to Hosts to choose hosts to
|
|
act upon
|
|
"""
|
|
assert host_filter_data # disallow accidental actions on all hosts
|
|
hosts = models.Host.query_objects(host_filter_data)
|
|
models.AclGroup.check_for_acl_violation_hosts(hosts)
|
|
for host in hosts:
|
|
host.set_or_delete_attribute(attribute, value)
|
|
|
|
# Main forwards this RPC to shards.
|
|
if not utils.is_shard():
|
|
rpc_utils.fanout_rpc(hosts, 'set_host_attribute_impl', False,
|
|
attribute=attribute, value=value, **host_filter_data)
|
|
|
|
|
|
@rpc_utils.forward_single_host_rpc_to_shard
|
|
def delete_host(id):
|
|
models.Host.smart_get(id).delete()
|
|
|
|
|
|
def get_hosts(multiple_labels=(), exclude_only_if_needed_labels=False,
|
|
valid_only=True, include_current_job=False, **filter_data):
|
|
"""Get a list of dictionaries which contains the information of hosts.
|
|
|
|
@param multiple_labels: match hosts in all of the labels given. Should
|
|
be a list of label names.
|
|
@param exclude_only_if_needed_labels: Deprecated. Raise error if it's True.
|
|
@param include_current_job: Set to True to include ids of currently running
|
|
job and special task.
|
|
"""
|
|
if exclude_only_if_needed_labels:
|
|
raise error.RPCException('exclude_only_if_needed_labels is deprecated')
|
|
|
|
hosts = rpc_utils.get_host_query(multiple_labels,
|
|
exclude_only_if_needed_labels,
|
|
valid_only, filter_data)
|
|
hosts = list(hosts)
|
|
models.Host.objects.populate_relationships(hosts, models.Label,
|
|
'label_list')
|
|
models.Host.objects.populate_relationships(hosts, models.AclGroup,
|
|
'acl_list')
|
|
models.Host.objects.populate_relationships(hosts, models.HostAttribute,
|
|
'attribute_list')
|
|
models.Host.objects.populate_relationships(hosts,
|
|
models.StaticHostAttribute,
|
|
'staticattribute_list')
|
|
host_dicts = []
|
|
for host_obj in hosts:
|
|
host_dict = host_obj.get_object_dict()
|
|
host_dict['acls'] = [acl.name for acl in host_obj.acl_list]
|
|
host_dict['attributes'] = dict((attribute.attribute, attribute.value)
|
|
for attribute in host_obj.attribute_list)
|
|
if RESPECT_STATIC_LABELS:
|
|
label_list = []
|
|
# Only keep static labels which has a corresponding entries in
|
|
# afe_labels.
|
|
for label in host_obj.label_list:
|
|
if label.is_replaced_by_static():
|
|
static_label = models.StaticLabel.smart_get(label.name)
|
|
label_list.append(static_label)
|
|
else:
|
|
label_list.append(label)
|
|
|
|
host_dict['labels'] = [label.name for label in label_list]
|
|
host_dict['platform'] = rpc_utils.find_platform(
|
|
host_obj.hostname, label_list)
|
|
else:
|
|
host_dict['labels'] = [label.name for label in host_obj.label_list]
|
|
host_dict['platform'] = rpc_utils.find_platform(
|
|
host_obj.hostname, host_obj.label_list)
|
|
|
|
if RESPECT_STATIC_ATTRIBUTES:
|
|
# Overwrite attribute with values in afe_static_host_attributes.
|
|
for attr in host_obj.staticattribute_list:
|
|
if attr.attribute in host_dict['attributes']:
|
|
host_dict['attributes'][attr.attribute] = attr.value
|
|
|
|
if include_current_job:
|
|
host_dict['current_job'] = None
|
|
host_dict['current_special_task'] = None
|
|
entries = models.HostQueueEntry.objects.filter(
|
|
host_id=host_dict['id'], active=True, complete=False)
|
|
if entries:
|
|
host_dict['current_job'] = (
|
|
entries[0].get_object_dict()['job'])
|
|
tasks = models.SpecialTask.objects.filter(
|
|
host_id=host_dict['id'], is_active=True, is_complete=False)
|
|
if tasks:
|
|
host_dict['current_special_task'] = (
|
|
'%d-%s' % (tasks[0].get_object_dict()['id'],
|
|
tasks[0].get_object_dict()['task'].lower()))
|
|
host_dicts.append(host_dict)
|
|
|
|
return rpc_utils.prepare_for_serialization(host_dicts)
|
|
|
|
|
|
def get_num_hosts(multiple_labels=(), exclude_only_if_needed_labels=False,
|
|
valid_only=True, **filter_data):
|
|
"""
|
|
Same parameters as get_hosts().
|
|
|
|
@returns The number of matching hosts.
|
|
"""
|
|
if exclude_only_if_needed_labels:
|
|
raise error.RPCException('exclude_only_if_needed_labels is deprecated')
|
|
|
|
hosts = rpc_utils.get_host_query(multiple_labels,
|
|
exclude_only_if_needed_labels,
|
|
valid_only, filter_data)
|
|
return len(hosts)
|
|
|
|
|
|
# tests
|
|
|
|
def get_tests(**filter_data):
|
|
return rpc_utils.prepare_for_serialization(
|
|
models.Test.list_objects(filter_data))
|
|
|
|
|
|
def get_tests_status_counts_by_job_name_label(job_name_prefix, label_name):
|
|
"""Gets the counts of all passed and failed tests from the matching jobs.
|
|
|
|
@param job_name_prefix: Name prefix of the jobs to get the summary
|
|
from, e.g., 'butterfly-release/r40-6457.21.0/bvt-cq/'. Prefix
|
|
matching is case insensitive.
|
|
@param label_name: Label that must be set in the jobs, e.g.,
|
|
'cros-version:butterfly-release/R40-6457.21.0'.
|
|
|
|
@returns A summary of the counts of all the passed and failed tests.
|
|
"""
|
|
job_ids = list(models.Job.objects.filter(
|
|
name__istartswith=job_name_prefix,
|
|
dependency_labels__name=label_name).values_list(
|
|
'pk', flat=True))
|
|
summary = {'passed': 0, 'failed': 0}
|
|
if not job_ids:
|
|
return summary
|
|
|
|
counts = (tko_models.TestView.objects.filter(
|
|
afe_job_id__in=job_ids).exclude(
|
|
test_name='SERVER_JOB').exclude(
|
|
test_name__startswith='CLIENT_JOB').values(
|
|
'status').annotate(
|
|
count=Count('status')))
|
|
for status in counts:
|
|
if status['status'] == 'GOOD':
|
|
summary['passed'] += status['count']
|
|
else:
|
|
summary['failed'] += status['count']
|
|
return summary
|
|
|
|
|
|
# profilers
|
|
|
|
def add_profiler(name, description=None):
|
|
return models.Profiler.add_object(name=name, description=description).id
|
|
|
|
|
|
def modify_profiler(id, **data):
|
|
models.Profiler.smart_get(id).update_object(data)
|
|
|
|
|
|
def delete_profiler(id):
|
|
models.Profiler.smart_get(id).delete()
|
|
|
|
|
|
def get_profilers(**filter_data):
|
|
return rpc_utils.prepare_for_serialization(
|
|
models.Profiler.list_objects(filter_data))
|
|
|
|
|
|
# users
|
|
|
|
def get_users(**filter_data):
|
|
return rpc_utils.prepare_for_serialization(
|
|
models.User.list_objects(filter_data))
|
|
|
|
|
|
# acl groups
|
|
|
|
def add_acl_group(name, description=None):
|
|
group = models.AclGroup.add_object(name=name, description=description)
|
|
group.users.add(models.User.current_user())
|
|
return group.id
|
|
|
|
|
|
def modify_acl_group(id, **data):
|
|
group = models.AclGroup.smart_get(id)
|
|
group.check_for_acl_violation_acl_group()
|
|
group.update_object(data)
|
|
group.add_current_user_if_empty()
|
|
|
|
|
|
def acl_group_add_users(id, users):
|
|
group = models.AclGroup.smart_get(id)
|
|
group.check_for_acl_violation_acl_group()
|
|
users = models.User.smart_get_bulk(users)
|
|
group.users.add(*users)
|
|
|
|
|
|
def acl_group_remove_users(id, users):
|
|
group = models.AclGroup.smart_get(id)
|
|
group.check_for_acl_violation_acl_group()
|
|
users = models.User.smart_get_bulk(users)
|
|
group.users.remove(*users)
|
|
group.add_current_user_if_empty()
|
|
|
|
|
|
def acl_group_add_hosts(id, hosts):
|
|
group = models.AclGroup.smart_get(id)
|
|
group.check_for_acl_violation_acl_group()
|
|
hosts = models.Host.smart_get_bulk(hosts)
|
|
group.hosts.add(*hosts)
|
|
group.on_host_membership_change()
|
|
|
|
|
|
def acl_group_remove_hosts(id, hosts):
|
|
group = models.AclGroup.smart_get(id)
|
|
group.check_for_acl_violation_acl_group()
|
|
hosts = models.Host.smart_get_bulk(hosts)
|
|
group.hosts.remove(*hosts)
|
|
group.on_host_membership_change()
|
|
|
|
|
|
def delete_acl_group(id):
|
|
models.AclGroup.smart_get(id).delete()
|
|
|
|
|
|
def get_acl_groups(**filter_data):
|
|
acl_groups = models.AclGroup.list_objects(filter_data)
|
|
for acl_group in acl_groups:
|
|
acl_group_obj = models.AclGroup.objects.get(id=acl_group['id'])
|
|
acl_group['users'] = [user.login
|
|
for user in acl_group_obj.users.all()]
|
|
acl_group['hosts'] = [host.hostname
|
|
for host in acl_group_obj.hosts.all()]
|
|
return rpc_utils.prepare_for_serialization(acl_groups)
|
|
|
|
|
|
# jobs
|
|
|
|
def generate_control_file(tests=(), profilers=(),
|
|
client_control_file='', use_container=False,
|
|
profile_only=None, db_tests=True,
|
|
test_source_build=None):
|
|
"""
|
|
Generates a client-side control file to run tests.
|
|
|
|
@param tests List of tests to run. See db_tests for more information.
|
|
@param profilers List of profilers to activate during the job.
|
|
@param client_control_file The contents of a client-side control file to
|
|
run at the end of all tests. If this is supplied, all tests must be
|
|
client side.
|
|
TODO: in the future we should support server control files directly
|
|
to wrap with a kernel. That'll require changing the parameter
|
|
name and adding a boolean to indicate if it is a client or server
|
|
control file.
|
|
@param use_container unused argument today. TODO: Enable containers
|
|
on the host during a client side test.
|
|
@param profile_only A boolean that indicates what default profile_only
|
|
mode to use in the control file. Passing None will generate a
|
|
control file that does not explcitly set the default mode at all.
|
|
@param db_tests: if True, the test object can be found in the database
|
|
backing the test model. In this case, tests is a tuple
|
|
of test IDs which are used to retrieve the test objects
|
|
from the database. If False, tests is a tuple of test
|
|
dictionaries stored client-side in the AFE.
|
|
@param test_source_build: Build to be used to retrieve test code. Default
|
|
to None.
|
|
|
|
@returns a dict with the following keys:
|
|
control_file: str, The control file text.
|
|
is_server: bool, is the control file a server-side control file?
|
|
synch_count: How many machines the job uses per autoserv execution.
|
|
synch_count == 1 means the job is asynchronous.
|
|
dependencies: A list of the names of labels on which the job depends.
|
|
"""
|
|
if not tests and not client_control_file:
|
|
return dict(control_file='', is_server=False, synch_count=1,
|
|
dependencies=[])
|
|
|
|
cf_info, test_objects, profiler_objects = (
|
|
rpc_utils.prepare_generate_control_file(tests, profilers,
|
|
db_tests))
|
|
cf_info['control_file'] = control_file_lib.generate_control(
|
|
tests=test_objects, profilers=profiler_objects,
|
|
is_server=cf_info['is_server'],
|
|
client_control_file=client_control_file, profile_only=profile_only,
|
|
test_source_build=test_source_build)
|
|
return cf_info
|
|
|
|
|
|
def create_job_page_handler(name, priority, control_file, control_type,
|
|
image=None, hostless=False, firmware_rw_build=None,
|
|
firmware_ro_build=None, test_source_build=None,
|
|
is_cloning=False, cheets_build=None, **kwargs):
|
|
"""\
|
|
Create and enqueue a job.
|
|
|
|
@param name name of this job
|
|
@param priority Integer priority of this job. Higher is more important.
|
|
@param control_file String contents of the control file.
|
|
@param control_type Type of control file, Client or Server.
|
|
@param image: ChromeOS build to be installed in the dut. Default to None.
|
|
@param firmware_rw_build: Firmware build to update RW firmware. Default to
|
|
None, i.e., RW firmware will not be updated.
|
|
@param firmware_ro_build: Firmware build to update RO firmware. Default to
|
|
None, i.e., RO firmware will not be updated.
|
|
@param test_source_build: Build to be used to retrieve test code. Default
|
|
to None.
|
|
@param is_cloning: True if creating a cloning job.
|
|
@param cheets_build: ChromeOS Android build to be installed in the dut.
|
|
Default to None. Cheets build will not be updated.
|
|
@param kwargs extra args that will be required by create_suite_job or
|
|
create_job.
|
|
|
|
@returns The created Job id number.
|
|
"""
|
|
test_args = {}
|
|
if kwargs.get('args'):
|
|
# args' format is: ['disable_sysinfo=False', 'fast=True', ...]
|
|
args = kwargs.get('args')
|
|
for arg in args:
|
|
k, v = arg.split('=')[0], arg.split('=')[1]
|
|
test_args[k] = v
|
|
|
|
if is_cloning:
|
|
logging.info('Start to clone a new job')
|
|
# When cloning a job, hosts and meta_hosts should not exist together,
|
|
# which would cause host-scheduler to schedule two hqe jobs to one host
|
|
# at the same time, and crash itself. Clear meta_hosts for this case.
|
|
if kwargs.get('hosts') and kwargs.get('meta_hosts'):
|
|
kwargs['meta_hosts'] = []
|
|
else:
|
|
logging.info('Start to create a new job')
|
|
control_file = rpc_utils.encode_ascii(control_file)
|
|
if not control_file:
|
|
raise model_logic.ValidationError({
|
|
'control_file' : "Control file cannot be empty"})
|
|
|
|
if image and hostless:
|
|
builds = {}
|
|
builds[provision.CROS_VERSION_PREFIX] = image
|
|
if cheets_build:
|
|
builds[provision.CROS_ANDROID_VERSION_PREFIX] = cheets_build
|
|
if firmware_rw_build:
|
|
builds[provision.FW_RW_VERSION_PREFIX] = firmware_rw_build
|
|
if firmware_ro_build:
|
|
builds[provision.FW_RO_VERSION_PREFIX] = firmware_ro_build
|
|
return create_suite_job(
|
|
name=name, control_file=control_file, priority=priority,
|
|
builds=builds, test_source_build=test_source_build,
|
|
is_cloning=is_cloning, test_args=test_args, **kwargs)
|
|
|
|
return create_job(name, priority, control_file, control_type, image=image,
|
|
hostless=hostless, test_args=test_args, **kwargs)
|
|
|
|
|
|
@rpc_utils.route_rpc_to_main
|
|
def create_job(
|
|
name,
|
|
priority,
|
|
control_file,
|
|
control_type,
|
|
hosts=(),
|
|
meta_hosts=(),
|
|
one_time_hosts=(),
|
|
synch_count=None,
|
|
is_template=False,
|
|
timeout=None,
|
|
timeout_mins=None,
|
|
max_runtime_mins=None,
|
|
run_verify=False,
|
|
email_list='',
|
|
dependencies=(),
|
|
reboot_before=None,
|
|
reboot_after=None,
|
|
parse_failed_repair=None,
|
|
hostless=False,
|
|
keyvals=None,
|
|
drone_set=None,
|
|
image=None,
|
|
parent_job_id=None,
|
|
test_retry=0,
|
|
run_reset=True,
|
|
require_ssp=None,
|
|
test_args=None,
|
|
**kwargs):
|
|
"""\
|
|
Create and enqueue a job.
|
|
|
|
@param name name of this job
|
|
@param priority Integer priority of this job. Higher is more important.
|
|
@param control_file String contents of the control file.
|
|
@param control_type Type of control file, Client or Server.
|
|
@param synch_count How many machines the job uses per autoserv execution.
|
|
synch_count == 1 means the job is asynchronous. If an atomic group is
|
|
given this value is treated as a minimum.
|
|
@param is_template If true then create a template job.
|
|
@param timeout Hours after this call returns until the job times out.
|
|
@param timeout_mins Minutes after this call returns until the job times
|
|
out.
|
|
@param max_runtime_mins Minutes from job starting time until job times out
|
|
@param run_verify Should the host be verified before running the test?
|
|
@param email_list String containing emails to mail when the job is done
|
|
@param dependencies List of label names on which this job depends
|
|
@param reboot_before Never, If dirty, or Always
|
|
@param reboot_after Never, If all tests passed, or Always
|
|
@param parse_failed_repair if true, results of failed repairs launched by
|
|
this job will be parsed as part of the job.
|
|
@param hostless if true, create a hostless job
|
|
@param keyvals dict of keyvals to associate with the job
|
|
@param hosts List of hosts to run job on.
|
|
@param meta_hosts List where each entry is a label name, and for each entry
|
|
one host will be chosen from that label to run the job on.
|
|
@param one_time_hosts List of hosts not in the database to run the job on.
|
|
@param drone_set The name of the drone set to run this test on.
|
|
@param image OS image to install before running job.
|
|
@param parent_job_id id of a job considered to be parent of created job.
|
|
@param test_retry DEPRECATED
|
|
@param run_reset Should the host be reset before running the test?
|
|
@param require_ssp Set to True to require server-side packaging to run the
|
|
test. If it's set to None, drone will still try to run
|
|
the server side with server-side packaging. If the
|
|
autotest-server package doesn't exist for the build or
|
|
image is not set, drone will run the test without server-
|
|
side packaging. Default is None.
|
|
@param test_args A dict of args passed to be injected into control file.
|
|
@param kwargs extra keyword args. NOT USED.
|
|
|
|
@returns The created Job id number.
|
|
"""
|
|
if test_args:
|
|
control_file = tools.inject_vars(test_args, control_file)
|
|
if image:
|
|
dependencies += (provision.image_version_to_label(image),)
|
|
return rpc_utils.create_job_common(
|
|
name=name,
|
|
priority=priority,
|
|
control_type=control_type,
|
|
control_file=control_file,
|
|
hosts=hosts,
|
|
meta_hosts=meta_hosts,
|
|
one_time_hosts=one_time_hosts,
|
|
synch_count=synch_count,
|
|
is_template=is_template,
|
|
timeout=timeout,
|
|
timeout_mins=timeout_mins,
|
|
max_runtime_mins=max_runtime_mins,
|
|
run_verify=run_verify,
|
|
email_list=email_list,
|
|
dependencies=dependencies,
|
|
reboot_before=reboot_before,
|
|
reboot_after=reboot_after,
|
|
parse_failed_repair=parse_failed_repair,
|
|
hostless=hostless,
|
|
keyvals=keyvals,
|
|
drone_set=drone_set,
|
|
parent_job_id=parent_job_id,
|
|
run_reset=run_reset,
|
|
require_ssp=require_ssp)
|
|
|
|
|
|
def abort_host_queue_entries(**filter_data):
|
|
"""\
|
|
Abort a set of host queue entries.
|
|
|
|
@return: A list of dictionaries, each contains information
|
|
about an aborted HQE.
|
|
"""
|
|
query = models.HostQueueEntry.query_objects(filter_data)
|
|
|
|
# Dont allow aborts on:
|
|
# 1. Jobs that have already completed (whether or not they were aborted)
|
|
# 2. Jobs that we have already been aborted (but may not have completed)
|
|
query = query.filter(complete=False).filter(aborted=False)
|
|
models.AclGroup.check_abort_permissions(query)
|
|
host_queue_entries = list(query.select_related())
|
|
rpc_utils.check_abort_synchronous_jobs(host_queue_entries)
|
|
|
|
models.HostQueueEntry.abort_host_queue_entries(host_queue_entries)
|
|
hqe_info = [{'HostQueueEntry': hqe.id, 'Job': hqe.job_id,
|
|
'Job name': hqe.job.name} for hqe in host_queue_entries]
|
|
return hqe_info
|
|
|
|
|
|
def abort_special_tasks(**filter_data):
|
|
"""\
|
|
Abort the special task, or tasks, specified in the filter.
|
|
"""
|
|
query = models.SpecialTask.query_objects(filter_data)
|
|
special_tasks = query.filter(is_active=True)
|
|
for task in special_tasks:
|
|
task.abort()
|
|
|
|
|
|
def _call_special_tasks_on_hosts(task, hosts):
|
|
"""\
|
|
Schedules a set of hosts for a special task.
|
|
|
|
@returns A list of hostnames that a special task was created for.
|
|
"""
|
|
models.AclGroup.check_for_acl_violation_hosts(hosts)
|
|
shard_host_map = rpc_utils.bucket_hosts_by_shard(hosts)
|
|
if shard_host_map and not utils.is_shard():
|
|
raise ValueError('The following hosts are on shards, please '
|
|
'follow the link to the shards and create jobs '
|
|
'there instead. %s.' % shard_host_map)
|
|
for host in hosts:
|
|
models.SpecialTask.schedule_special_task(host, task)
|
|
return list(sorted(host.hostname for host in hosts))
|
|
|
|
|
|
def _forward_special_tasks_on_hosts(task, rpc, **filter_data):
|
|
"""Forward special tasks to corresponding shards.
|
|
|
|
For main, when special tasks are fired on hosts that are sharded,
|
|
forward the RPC to corresponding shards.
|
|
|
|
For shard, create special task records in local DB.
|
|
|
|
@param task: Enum value of frontend.afe.models.SpecialTask.Task
|
|
@param rpc: RPC name to forward.
|
|
@param filter_data: Filter keywords to be used for DB query.
|
|
|
|
@return: A list of hostnames that a special task was created for.
|
|
"""
|
|
hosts = models.Host.query_objects(filter_data)
|
|
shard_host_map = rpc_utils.bucket_hosts_by_shard(hosts)
|
|
|
|
# Filter out hosts on a shard from those on the main, forward
|
|
# rpcs to the shard with an additional hostname__in filter, and
|
|
# create a local SpecialTask for each remaining host.
|
|
if shard_host_map and not utils.is_shard():
|
|
hosts = [h for h in hosts if h.shard is None]
|
|
for shard, hostnames in shard_host_map.iteritems():
|
|
|
|
# The main client of this module is the frontend website, and
|
|
# it invokes it with an 'id' or an 'id__in' filter. Regardless,
|
|
# the 'hostname' filter should narrow down the list of hosts on
|
|
# each shard even though we supply all the ids in filter_data.
|
|
# This method uses hostname instead of id because it fits better
|
|
# with the overall architecture of redirection functions in
|
|
# rpc_utils.
|
|
shard_filter = filter_data.copy()
|
|
shard_filter['hostname__in'] = hostnames
|
|
rpc_utils.run_rpc_on_multiple_hostnames(
|
|
rpc, [shard], **shard_filter)
|
|
|
|
# There is a race condition here if someone assigns a shard to one of these
|
|
# hosts before we create the task. The host will stay on the main if:
|
|
# 1. The host is not Ready
|
|
# 2. The host is Ready but has a task
|
|
# But if the host is Ready and doesn't have a task yet, it will get sent
|
|
# to the shard as we're creating a task here.
|
|
|
|
# Given that we only rarely verify Ready hosts it isn't worth putting this
|
|
# entire method in a transaction. The worst case scenario is that we have
|
|
# a verify running on a Ready host while the shard is using it, if the
|
|
# verify fails no subsequent tasks will be created against the host on the
|
|
# main, and verifies are safe enough that this is OK.
|
|
return _call_special_tasks_on_hosts(task, hosts)
|
|
|
|
|
|
def reverify_hosts(**filter_data):
|
|
"""\
|
|
Schedules a set of hosts for verify.
|
|
|
|
@returns A list of hostnames that a verify task was created for.
|
|
"""
|
|
return _forward_special_tasks_on_hosts(
|
|
models.SpecialTask.Task.VERIFY, 'reverify_hosts', **filter_data)
|
|
|
|
|
|
def repair_hosts(**filter_data):
|
|
"""\
|
|
Schedules a set of hosts for repair.
|
|
|
|
@returns A list of hostnames that a repair task was created for.
|
|
"""
|
|
return _forward_special_tasks_on_hosts(
|
|
models.SpecialTask.Task.REPAIR, 'repair_hosts', **filter_data)
|
|
|
|
|
|
def get_jobs(not_yet_run=False, running=False, finished=False,
|
|
suite=False, sub=False, standalone=False, **filter_data):
|
|
"""\
|
|
Extra status filter args for get_jobs:
|
|
-not_yet_run: Include only jobs that have not yet started running.
|
|
-running: Include only jobs that have start running but for which not
|
|
all hosts have completed.
|
|
-finished: Include only jobs for which all hosts have completed (or
|
|
aborted).
|
|
|
|
Extra type filter args for get_jobs:
|
|
-suite: Include only jobs with child jobs.
|
|
-sub: Include only jobs with a parent job.
|
|
-standalone: Inlcude only jobs with no child or parent jobs.
|
|
At most one of these three fields should be specified.
|
|
"""
|
|
extra_args = rpc_utils.extra_job_status_filters(not_yet_run,
|
|
running,
|
|
finished)
|
|
filter_data['extra_args'] = rpc_utils.extra_job_type_filters(extra_args,
|
|
suite,
|
|
sub,
|
|
standalone)
|
|
job_dicts = []
|
|
jobs = list(models.Job.query_objects(filter_data))
|
|
models.Job.objects.populate_relationships(jobs, models.Label,
|
|
'dependencies')
|
|
models.Job.objects.populate_relationships(jobs, models.JobKeyval, 'keyvals')
|
|
for job in jobs:
|
|
job_dict = job.get_object_dict()
|
|
job_dict['dependencies'] = ','.join(label.name
|
|
for label in job.dependencies)
|
|
job_dict['keyvals'] = dict((keyval.key, keyval.value)
|
|
for keyval in job.keyvals)
|
|
job_dicts.append(job_dict)
|
|
return rpc_utils.prepare_for_serialization(job_dicts)
|
|
|
|
|
|
def get_num_jobs(not_yet_run=False, running=False, finished=False,
|
|
suite=False, sub=False, standalone=False,
|
|
**filter_data):
|
|
"""\
|
|
See get_jobs() for documentation of extra filter parameters.
|
|
"""
|
|
extra_args = rpc_utils.extra_job_status_filters(not_yet_run,
|
|
running,
|
|
finished)
|
|
filter_data['extra_args'] = rpc_utils.extra_job_type_filters(extra_args,
|
|
suite,
|
|
sub,
|
|
standalone)
|
|
return models.Job.query_count(filter_data)
|
|
|
|
|
|
def get_jobs_summary(**filter_data):
|
|
"""\
|
|
Like get_jobs(), but adds 'status_counts' and 'result_counts' field.
|
|
|
|
'status_counts' filed is a dictionary mapping status strings to the number
|
|
of hosts currently with that status, i.e. {'Queued' : 4, 'Running' : 2}.
|
|
|
|
'result_counts' field is piped to tko's rpc_interface and has the return
|
|
format specified under get_group_counts.
|
|
"""
|
|
jobs = get_jobs(**filter_data)
|
|
ids = [job['id'] for job in jobs]
|
|
all_status_counts = models.Job.objects.get_status_counts(ids)
|
|
for job in jobs:
|
|
job['status_counts'] = all_status_counts[job['id']]
|
|
job['result_counts'] = tko_rpc_interface.get_status_counts(
|
|
['afe_job_id', 'afe_job_id'],
|
|
header_groups=[['afe_job_id'], ['afe_job_id']],
|
|
**{'afe_job_id': job['id']})
|
|
return rpc_utils.prepare_for_serialization(jobs)
|
|
|
|
|
|
def get_info_for_clone(id, preserve_metahosts, queue_entry_filter_data=None):
|
|
"""\
|
|
Retrieves all the information needed to clone a job.
|
|
"""
|
|
job = models.Job.objects.get(id=id)
|
|
job_info = rpc_utils.get_job_info(job,
|
|
preserve_metahosts,
|
|
queue_entry_filter_data)
|
|
|
|
host_dicts = []
|
|
for host in job_info['hosts']:
|
|
host_dict = get_hosts(id=host.id)[0]
|
|
other_labels = host_dict['labels']
|
|
if host_dict['platform']:
|
|
other_labels.remove(host_dict['platform'])
|
|
host_dict['other_labels'] = ', '.join(other_labels)
|
|
host_dicts.append(host_dict)
|
|
|
|
for host in job_info['one_time_hosts']:
|
|
host_dict = dict(hostname=host.hostname,
|
|
id=host.id,
|
|
platform='(one-time host)',
|
|
locked_text='')
|
|
host_dicts.append(host_dict)
|
|
|
|
# convert keys from Label objects to strings (names of labels)
|
|
meta_host_counts = dict((meta_host.name, count) for meta_host, count
|
|
in job_info['meta_host_counts'].iteritems())
|
|
|
|
info = dict(job=job.get_object_dict(),
|
|
meta_host_counts=meta_host_counts,
|
|
hosts=host_dicts)
|
|
info['job']['dependencies'] = job_info['dependencies']
|
|
info['hostless'] = job_info['hostless']
|
|
info['drone_set'] = job.drone_set and job.drone_set.name
|
|
|
|
image = _get_image_for_job(job, job_info['hostless'])
|
|
if image:
|
|
info['job']['image'] = image
|
|
|
|
return rpc_utils.prepare_for_serialization(info)
|
|
|
|
|
|
def _get_image_for_job(job, hostless):
|
|
"""Gets the image used for a job.
|
|
|
|
Gets the image used for an AFE job from the job's keyvals 'build' or
|
|
'builds'. If that fails, and the job is a hostless job, tries to
|
|
get the image from its control file attributes 'build' or 'builds'.
|
|
|
|
TODO(ntang): Needs to handle FAFT with two builds for ro/rw.
|
|
|
|
@param job An AFE job object.
|
|
@param hostless Boolean indicating whether the job is hostless.
|
|
|
|
@returns The image build used for the job.
|
|
"""
|
|
keyvals = job.keyval_dict()
|
|
image = keyvals.get('build')
|
|
if not image:
|
|
value = keyvals.get('builds')
|
|
builds = None
|
|
if isinstance(value, dict):
|
|
builds = value
|
|
elif isinstance(value, basestring):
|
|
builds = ast.literal_eval(value)
|
|
if builds:
|
|
image = builds.get('cros-version')
|
|
if not image and hostless and job.control_file:
|
|
try:
|
|
control_obj = control_data.parse_control_string(
|
|
job.control_file)
|
|
if hasattr(control_obj, 'build'):
|
|
image = getattr(control_obj, 'build')
|
|
if not image and hasattr(control_obj, 'builds'):
|
|
builds = getattr(control_obj, 'builds')
|
|
image = builds.get('cros-version')
|
|
except:
|
|
logging.warning('Failed to parse control file for job: %s',
|
|
job.name)
|
|
return image
|
|
|
|
|
|
def get_host_queue_entries_by_insert_time(
|
|
insert_time_after=None, insert_time_before=None, **filter_data):
|
|
"""Like get_host_queue_entries, but using the insert index table.
|
|
|
|
@param insert_time_after: A lower bound on insert_time
|
|
@param insert_time_before: An upper bound on insert_time
|
|
@returns A sequence of nested dictionaries of host and job information.
|
|
"""
|
|
assert insert_time_after is not None or insert_time_before is not None, \
|
|
('Caller to get_host_queue_entries_by_insert_time must provide either'
|
|
' insert_time_after or insert_time_before.')
|
|
# Get insert bounds on the index of the host queue entries.
|
|
if insert_time_after:
|
|
query = models.HostQueueEntryStartTimes.objects.filter(
|
|
# Note: '-insert_time' means descending. We want the largest
|
|
# insert time smaller than the insert time.
|
|
insert_time__lte=insert_time_after).order_by('-insert_time')
|
|
try:
|
|
constraint = query[0].highest_hqe_id
|
|
if 'id__gte' in filter_data:
|
|
constraint = max(constraint, filter_data['id__gte'])
|
|
filter_data['id__gte'] = constraint
|
|
except IndexError:
|
|
pass
|
|
|
|
# Get end bounds.
|
|
if insert_time_before:
|
|
query = models.HostQueueEntryStartTimes.objects.filter(
|
|
insert_time__gte=insert_time_before).order_by('insert_time')
|
|
try:
|
|
constraint = query[0].highest_hqe_id
|
|
if 'id__lte' in filter_data:
|
|
constraint = min(constraint, filter_data['id__lte'])
|
|
filter_data['id__lte'] = constraint
|
|
except IndexError:
|
|
pass
|
|
|
|
return rpc_utils.prepare_rows_as_nested_dicts(
|
|
models.HostQueueEntry.query_objects(filter_data),
|
|
('host', 'job'))
|
|
|
|
|
|
def get_host_queue_entries(start_time=None, end_time=None, **filter_data):
|
|
"""\
|
|
@returns A sequence of nested dictionaries of host and job information.
|
|
"""
|
|
filter_data = rpc_utils.inject_times_to_filter('started_on__gte',
|
|
'started_on__lte',
|
|
start_time,
|
|
end_time,
|
|
**filter_data)
|
|
return rpc_utils.prepare_rows_as_nested_dicts(
|
|
models.HostQueueEntry.query_objects(filter_data),
|
|
('host', 'job'))
|
|
|
|
|
|
def get_num_host_queue_entries(start_time=None, end_time=None, **filter_data):
|
|
"""\
|
|
Get the number of host queue entries associated with this job.
|
|
"""
|
|
filter_data = rpc_utils.inject_times_to_filter('started_on__gte',
|
|
'started_on__lte',
|
|
start_time,
|
|
end_time,
|
|
**filter_data)
|
|
return models.HostQueueEntry.query_count(filter_data)
|
|
|
|
|
|
def get_hqe_percentage_complete(**filter_data):
|
|
"""
|
|
Computes the fraction of host queue entries matching the given filter data
|
|
that are complete.
|
|
"""
|
|
query = models.HostQueueEntry.query_objects(filter_data)
|
|
complete_count = query.filter(complete=True).count()
|
|
total_count = query.count()
|
|
if total_count == 0:
|
|
return 1
|
|
return float(complete_count) / total_count
|
|
|
|
|
|
# special tasks
|
|
|
|
def get_special_tasks(**filter_data):
|
|
"""Get special task entries from the local database.
|
|
|
|
Query the special tasks table for tasks matching the given
|
|
`filter_data`, and return a list of the results. No attempt is
|
|
made to forward the call to shards; the buck will stop here.
|
|
The caller is expected to know the target shard for such reasons
|
|
as:
|
|
* The caller is a service (such as gs_offloader) configured
|
|
to operate on behalf of one specific shard, and no other.
|
|
* The caller has a host as a parameter, and knows that this is
|
|
the shard assigned to that host.
|
|
|
|
@param filter_data Filter keywords to pass to the underlying
|
|
database query.
|
|
|
|
"""
|
|
return rpc_utils.prepare_rows_as_nested_dicts(
|
|
models.SpecialTask.query_objects(filter_data),
|
|
('host', 'queue_entry'))
|
|
|
|
|
|
def get_host_special_tasks(host_id, **filter_data):
|
|
"""Get special task entries for a given host.
|
|
|
|
Query the special tasks table for tasks that ran on the host
|
|
given by `host_id` and matching the given `filter_data`.
|
|
Return a list of the results. If the host is assigned to a
|
|
shard, forward this call to that shard.
|
|
|
|
@param host_id Id in the database of the target host.
|
|
@param filter_data Filter keywords to pass to the underlying
|
|
database query.
|
|
|
|
"""
|
|
# Retrieve host data even if the host is in an invalid state.
|
|
host = models.Host.smart_get(host_id, False)
|
|
if not host.shard:
|
|
return get_special_tasks(host_id=host_id, **filter_data)
|
|
else:
|
|
# The return values from AFE methods are post-processed
|
|
# objects that aren't JSON-serializable. So, we have to
|
|
# call AFE.run() to get the raw, serializable output from
|
|
# the shard.
|
|
shard_afe = frontend.AFE(server=host.shard.hostname)
|
|
return shard_afe.run('get_special_tasks',
|
|
host_id=host_id, **filter_data)
|
|
|
|
|
|
def get_num_special_tasks(**kwargs):
|
|
"""Get the number of special task entries from the local database.
|
|
|
|
Query the special tasks table for tasks matching the given 'kwargs',
|
|
and return the number of the results. No attempt is made to forward
|
|
the call to shards; the buck will stop here.
|
|
|
|
@param kwargs Filter keywords to pass to the underlying database query.
|
|
|
|
"""
|
|
return models.SpecialTask.query_count(kwargs)
|
|
|
|
|
|
def get_host_num_special_tasks(host, **kwargs):
|
|
"""Get special task entries for a given host.
|
|
|
|
Query the special tasks table for tasks that ran on the host
|
|
given by 'host' and matching the given 'kwargs'.
|
|
Return a list of the results. If the host is assigned to a
|
|
shard, forward this call to that shard.
|
|
|
|
@param host id or name of a host. More often a hostname.
|
|
@param kwargs Filter keywords to pass to the underlying database query.
|
|
|
|
"""
|
|
# Retrieve host data even if the host is in an invalid state.
|
|
host_model = models.Host.smart_get(host, False)
|
|
if not host_model.shard:
|
|
return get_num_special_tasks(host=host, **kwargs)
|
|
else:
|
|
shard_afe = frontend.AFE(server=host_model.shard.hostname)
|
|
return shard_afe.run('get_num_special_tasks', host=host, **kwargs)
|
|
|
|
|
|
def get_status_task(host_id, end_time):
|
|
"""Get the "status task" for a host from the local shard.
|
|
|
|
Returns a single special task representing the given host's
|
|
"status task". The status task is a completed special task that
|
|
identifies whether the corresponding host was working or broken
|
|
when it completed. A successful task indicates a working host;
|
|
a failed task indicates broken.
|
|
|
|
This call will not be forward to a shard; the receiving server
|
|
must be the shard that owns the host.
|
|
|
|
@param host_id Id in the database of the target host.
|
|
@param end_time Time reference for the host's status.
|
|
|
|
@return A single task; its status (successful or not)
|
|
corresponds to the status of the host (working or
|
|
broken) at the given time. If no task is found, return
|
|
`None`.
|
|
|
|
"""
|
|
tasklist = rpc_utils.prepare_rows_as_nested_dicts(
|
|
status_history.get_status_task(host_id, end_time),
|
|
('host', 'queue_entry'))
|
|
return tasklist[0] if tasklist else None
|
|
|
|
|
|
def get_host_status_task(host_id, end_time):
|
|
"""Get the "status task" for a host from its owning shard.
|
|
|
|
Finds the given host's owning shard, and forwards to it a call
|
|
to `get_status_task()` (see above).
|
|
|
|
@param host_id Id in the database of the target host.
|
|
@param end_time Time reference for the host's status.
|
|
|
|
@return A single task; its status (successful or not)
|
|
corresponds to the status of the host (working or
|
|
broken) at the given time. If no task is found, return
|
|
`None`.
|
|
|
|
"""
|
|
host = models.Host.smart_get(host_id)
|
|
if not host.shard:
|
|
return get_status_task(host_id, end_time)
|
|
else:
|
|
# The return values from AFE methods are post-processed
|
|
# objects that aren't JSON-serializable. So, we have to
|
|
# call AFE.run() to get the raw, serializable output from
|
|
# the shard.
|
|
shard_afe = frontend.AFE(server=host.shard.hostname)
|
|
return shard_afe.run('get_status_task',
|
|
host_id=host_id, end_time=end_time)
|
|
|
|
|
|
def get_host_diagnosis_interval(host_id, end_time, success):
|
|
"""Find a "diagnosis interval" for a given host.
|
|
|
|
A "diagnosis interval" identifies a start and end time where
|
|
the host went from "working" to "broken", or vice versa. The
|
|
interval's starting time is the starting time of the last status
|
|
task with the old status; the end time is the finish time of the
|
|
first status task with the new status.
|
|
|
|
This routine finds the most recent diagnosis interval for the
|
|
given host prior to `end_time`, with a starting status matching
|
|
`success`. If `success` is true, the interval will start with a
|
|
successful status task; if false the interval will start with a
|
|
failed status task.
|
|
|
|
@param host_id Id in the database of the target host.
|
|
@param end_time Time reference for the diagnosis interval.
|
|
@param success Whether the diagnosis interval should start
|
|
with a successful or failed status task.
|
|
|
|
@return A list of two strings. The first is the timestamp for
|
|
the beginning of the interval; the second is the
|
|
timestamp for the end. If the host has never changed
|
|
state, the list is empty.
|
|
|
|
"""
|
|
host = models.Host.smart_get(host_id)
|
|
if not host.shard or utils.is_shard():
|
|
return status_history.get_diagnosis_interval(
|
|
host_id, end_time, success)
|
|
else:
|
|
shard_afe = frontend.AFE(server=host.shard.hostname)
|
|
return shard_afe.get_host_diagnosis_interval(
|
|
host_id, end_time, success)
|
|
|
|
|
|
# support for host detail view
|
|
|
|
def get_host_queue_entries_and_special_tasks(host, query_start=None,
|
|
query_limit=None, start_time=None,
|
|
end_time=None):
|
|
"""
|
|
@returns an interleaved list of HostQueueEntries and SpecialTasks,
|
|
in approximate run order. each dict contains keys for type, host,
|
|
job, status, started_on, execution_path, and ID.
|
|
"""
|
|
total_limit = None
|
|
if query_limit is not None:
|
|
total_limit = query_start + query_limit
|
|
filter_data_common = {'host': host,
|
|
'query_limit': total_limit,
|
|
'sort_by': ['-id']}
|
|
|
|
filter_data_special_tasks = rpc_utils.inject_times_to_filter(
|
|
'time_started__gte', 'time_started__lte', start_time, end_time,
|
|
**filter_data_common)
|
|
|
|
queue_entries = get_host_queue_entries(
|
|
start_time, end_time, **filter_data_common)
|
|
special_tasks = get_host_special_tasks(host, **filter_data_special_tasks)
|
|
|
|
interleaved_entries = rpc_utils.interleave_entries(queue_entries,
|
|
special_tasks)
|
|
if query_start is not None:
|
|
interleaved_entries = interleaved_entries[query_start:]
|
|
if query_limit is not None:
|
|
interleaved_entries = interleaved_entries[:query_limit]
|
|
return rpc_utils.prepare_host_queue_entries_and_special_tasks(
|
|
interleaved_entries, queue_entries)
|
|
|
|
|
|
def get_num_host_queue_entries_and_special_tasks(host, start_time=None,
|
|
end_time=None):
|
|
filter_data_common = {'host': host}
|
|
|
|
filter_data_queue_entries, filter_data_special_tasks = (
|
|
rpc_utils.inject_times_to_hqe_special_tasks_filters(
|
|
filter_data_common, start_time, end_time))
|
|
|
|
return (models.HostQueueEntry.query_count(filter_data_queue_entries)
|
|
+ get_host_num_special_tasks(**filter_data_special_tasks))
|
|
|
|
|
|
# other
|
|
|
|
def echo(data=""):
|
|
"""\
|
|
Returns a passed in string. For doing a basic test to see if RPC calls
|
|
can successfully be made.
|
|
"""
|
|
return data
|
|
|
|
|
|
def get_motd():
|
|
"""\
|
|
Returns the message of the day as a string.
|
|
"""
|
|
return rpc_utils.get_motd()
|
|
|
|
|
|
def get_static_data():
|
|
"""\
|
|
Returns a dictionary containing a bunch of data that shouldn't change
|
|
often and is otherwise inaccessible. This includes:
|
|
|
|
priorities: List of job priority choices.
|
|
default_priority: Default priority value for new jobs.
|
|
users: Sorted list of all users.
|
|
labels: Sorted list of labels not start with 'cros-version' and
|
|
'fw-version'.
|
|
tests: Sorted list of all tests.
|
|
profilers: Sorted list of all profilers.
|
|
current_user: Logged-in username.
|
|
host_statuses: Sorted list of possible Host statuses.
|
|
job_statuses: Sorted list of possible HostQueueEntry statuses.
|
|
job_timeout_default: The default job timeout length in minutes.
|
|
parse_failed_repair_default: Default value for the parse_failed_repair job
|
|
option.
|
|
reboot_before_options: A list of valid RebootBefore string enums.
|
|
reboot_after_options: A list of valid RebootAfter string enums.
|
|
motd: Server's message of the day.
|
|
status_dictionary: A mapping from one word job status names to a more
|
|
informative description.
|
|
"""
|
|
|
|
default_drone_set_name = models.DroneSet.default_drone_set_name()
|
|
drone_sets = ([default_drone_set_name] +
|
|
sorted(drone_set.name for drone_set in
|
|
models.DroneSet.objects.exclude(
|
|
name=default_drone_set_name)))
|
|
|
|
result = {}
|
|
result['priorities'] = priorities.Priority.choices()
|
|
result['default_priority'] = 'Default'
|
|
result['max_schedulable_priority'] = priorities.Priority.DEFAULT
|
|
result['users'] = get_users(sort_by=['login'])
|
|
|
|
label_exclude_filters = [{'name__startswith': 'cros-version'},
|
|
{'name__startswith': 'fw-version'},
|
|
{'name__startswith': 'fwrw-version'},
|
|
{'name__startswith': 'fwro-version'}]
|
|
result['labels'] = get_labels(
|
|
label_exclude_filters,
|
|
sort_by=['-platform', 'name'])
|
|
|
|
result['tests'] = get_tests(sort_by=['name'])
|
|
result['profilers'] = get_profilers(sort_by=['name'])
|
|
result['current_user'] = rpc_utils.prepare_for_serialization(
|
|
models.User.current_user().get_object_dict())
|
|
result['host_statuses'] = sorted(models.Host.Status.names)
|
|
result['job_statuses'] = sorted(models.HostQueueEntry.Status.names)
|
|
result['job_timeout_mins_default'] = models.Job.DEFAULT_TIMEOUT_MINS
|
|
result['job_max_runtime_mins_default'] = (
|
|
models.Job.DEFAULT_MAX_RUNTIME_MINS)
|
|
result['parse_failed_repair_default'] = bool(
|
|
models.Job.DEFAULT_PARSE_FAILED_REPAIR)
|
|
result['reboot_before_options'] = model_attributes.RebootBefore.names
|
|
result['reboot_after_options'] = model_attributes.RebootAfter.names
|
|
result['motd'] = rpc_utils.get_motd()
|
|
result['drone_sets_enabled'] = models.DroneSet.drone_sets_enabled()
|
|
result['drone_sets'] = drone_sets
|
|
|
|
result['status_dictionary'] = {"Aborted": "Aborted",
|
|
"Verifying": "Verifying Host",
|
|
"Provisioning": "Provisioning Host",
|
|
"Pending": "Waiting on other hosts",
|
|
"Running": "Running autoserv",
|
|
"Completed": "Autoserv completed",
|
|
"Failed": "Failed to complete",
|
|
"Queued": "Queued",
|
|
"Starting": "Next in host's queue",
|
|
"Stopped": "Other host(s) failed verify",
|
|
"Parsing": "Awaiting parse of final results",
|
|
"Gathering": "Gathering log files",
|
|
"Waiting": "Waiting for scheduler action",
|
|
"Archiving": "Archiving results",
|
|
"Resetting": "Resetting hosts"}
|
|
|
|
result['wmatrix_url'] = rpc_utils.get_wmatrix_url()
|
|
result['stainless_url'] = rpc_utils.get_stainless_url()
|
|
result['is_moblab'] = bool(utils.is_moblab())
|
|
|
|
return result
|
|
|
|
|
|
def get_server_time():
|
|
return datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
|
|
|
|
|
|
def ping_db():
|
|
"""Simple connection test to db"""
|
|
try:
|
|
db_connection.cursor()
|
|
except DatabaseError:
|
|
return [False]
|
|
return [True]
|
|
|
|
|
|
def get_hosts_by_attribute(attribute, value):
|
|
"""
|
|
Get the list of valid hosts that share the same host attribute value.
|
|
|
|
@param attribute: String of the host attribute to check.
|
|
@param value: String of the value that is shared between hosts.
|
|
|
|
@returns List of hostnames that all have the same host attribute and
|
|
value.
|
|
"""
|
|
rows = models.HostAttribute.query_objects({'attribute': attribute,
|
|
'value': value})
|
|
if RESPECT_STATIC_ATTRIBUTES:
|
|
returned_hosts = set()
|
|
# Add hosts:
|
|
# * Non-valid
|
|
# * Exist in afe_host_attribute with given attribute.
|
|
# * Don't exist in afe_static_host_attribute OR exist in
|
|
# afe_static_host_attribute with the same given value.
|
|
for row in rows:
|
|
if row.host.invalid != 0:
|
|
continue
|
|
|
|
static_hosts = models.StaticHostAttribute.query_objects(
|
|
{'host_id': row.host.id, 'attribute': attribute})
|
|
values = [static_host.value for static_host in static_hosts]
|
|
if len(values) == 0 or values[0] == value:
|
|
returned_hosts.add(row.host.hostname)
|
|
|
|
# Add hosts:
|
|
# * Non-valid
|
|
# * Exist in afe_static_host_attribute with given attribute
|
|
# and value
|
|
# * No need to check whether each static attribute has its
|
|
# corresponding entry in afe_host_attribute since it is ensured
|
|
# in inventory sync.
|
|
static_rows = models.StaticHostAttribute.query_objects(
|
|
{'attribute': attribute, 'value': value})
|
|
for row in static_rows:
|
|
if row.host.invalid != 0:
|
|
continue
|
|
|
|
returned_hosts.add(row.host.hostname)
|
|
|
|
return list(returned_hosts)
|
|
else:
|
|
return [row.host.hostname for row in rows if row.host.invalid == 0]
|
|
|
|
|
|
def _get_control_file_by_suite(suite_name):
|
|
"""Get control file contents by suite name.
|
|
|
|
@param suite_name: Suite name as string.
|
|
@returns: Control file contents as string.
|
|
"""
|
|
getter = control_file_getter.FileSystemGetter(
|
|
[_CONFIG.get_config_value('SCHEDULER',
|
|
'drone_installation_directory')])
|
|
return getter.get_control_file_contents_by_name(suite_name)
|
|
|
|
|
|
@rpc_utils.route_rpc_to_main
|
|
def create_suite_job(
|
|
name='',
|
|
board='',
|
|
pool='',
|
|
child_dependencies=(),
|
|
control_file='',
|
|
check_hosts=True,
|
|
num=None,
|
|
file_bugs=False,
|
|
timeout=24,
|
|
timeout_mins=None,
|
|
priority=priorities.Priority.DEFAULT,
|
|
suite_args=None,
|
|
wait_for_results=True,
|
|
job_retry=False,
|
|
max_retries=None,
|
|
max_runtime_mins=None,
|
|
suite_min_duts=0,
|
|
offload_failures_only=False,
|
|
builds=None,
|
|
test_source_build=None,
|
|
run_prod_code=False,
|
|
delay_minutes=0,
|
|
is_cloning=False,
|
|
job_keyvals=None,
|
|
test_args=None,
|
|
**kwargs):
|
|
"""
|
|
Create a job to run a test suite on the given device with the given image.
|
|
|
|
When the timeout specified in the control file is reached, the
|
|
job is guaranteed to have completed and results will be available.
|
|
|
|
@param name: The test name if control_file is supplied, otherwise the name
|
|
of the test suite to run, e.g. 'bvt'.
|
|
@param board: the kind of device to run the tests on.
|
|
@param builds: the builds to install e.g.
|
|
{'cros-version:': 'x86-alex-release/R18-1655.0.0',
|
|
'fwrw-version:': 'x86-alex-firmware/R36-5771.50.0',
|
|
'fwro-version:': 'x86-alex-firmware/R36-5771.49.0'}
|
|
If builds is given a value, it overrides argument build.
|
|
@param test_source_build: Build that contains the server-side test code.
|
|
@param pool: Specify the pool of machines to use for scheduling
|
|
purposes.
|
|
@param child_dependencies: (optional) list of additional dependency labels
|
|
(strings) that will be added as dependency labels to child jobs.
|
|
@param control_file: the control file of the job.
|
|
@param check_hosts: require appropriate live hosts to exist in the lab.
|
|
@param num: Specify the number of machines to schedule across (integer).
|
|
Leave unspecified or use None to use default sharding factor.
|
|
@param file_bugs: File a bug on each test failure in this suite.
|
|
@param timeout: The max lifetime of this suite, in hours.
|
|
@param timeout_mins: The max lifetime of this suite, in minutes. Takes
|
|
priority over timeout.
|
|
@param priority: Integer denoting priority. Higher is more important.
|
|
@param suite_args: Optional arguments which will be parsed by the suite
|
|
control file. Used by control.test_that_wrapper to
|
|
determine which tests to run.
|
|
@param wait_for_results: Set to False to run the suite job without waiting
|
|
for test jobs to finish. Default is True.
|
|
@param job_retry: Set to True to enable job-level retry. Default is False.
|
|
@param max_retries: Integer, maximum job retries allowed at suite level.
|
|
None for no max.
|
|
@param max_runtime_mins: Maximum amount of time a job can be running in
|
|
minutes.
|
|
@param suite_min_duts: Integer. Scheduler will prioritize getting the
|
|
minimum number of machines for the suite when it is
|
|
competing with another suite that has a higher
|
|
priority but already got minimum machines it needs.
|
|
@param offload_failures_only: Only enable gs_offloading for failed jobs.
|
|
@param run_prod_code: If True, the suite will run the test code that
|
|
lives in prod aka the test code currently on the
|
|
lab servers. If False, the control files and test
|
|
code for this suite run will be retrieved from the
|
|
build artifacts.
|
|
@param delay_minutes: Delay the creation of test jobs for a given number of
|
|
minutes.
|
|
@param is_cloning: True if creating a cloning job.
|
|
@param job_keyvals: A dict of job keyvals to be inject to control file.
|
|
@param test_args: A dict of args passed all the way to each individual test
|
|
that will be actually run.
|
|
@param kwargs: extra keyword args. NOT USED.
|
|
|
|
@raises ControlFileNotFound: if a unique suite control file doesn't exist.
|
|
@raises NoControlFileList: if we can't list the control files at all.
|
|
@raises StageControlFileFailure: If the dev server throws 500 while
|
|
staging test_suites.
|
|
@raises ControlFileEmpty: if the control file exists on the server, but
|
|
can't be read.
|
|
|
|
@return: the job ID of the suite; -1 on error.
|
|
"""
|
|
if num is not None:
|
|
warnings.warn('num is deprecated for create_suite_job')
|
|
del num
|
|
|
|
if builds is None:
|
|
builds = {}
|
|
|
|
# Default test source build to CrOS build if it's not specified and
|
|
# run_prod_code is set to False.
|
|
if not run_prod_code:
|
|
test_source_build = Suite.get_test_source_build(
|
|
builds, test_source_build=test_source_build)
|
|
|
|
sample_dut = rpc_utils.get_sample_dut(board, pool)
|
|
|
|
suite_name = suite_common.canonicalize_suite_name(name)
|
|
if run_prod_code:
|
|
ds = dev_server.resolve(test_source_build, hostname=sample_dut)
|
|
keyvals = {}
|
|
else:
|
|
ds, keyvals = suite_common.stage_build_artifacts(
|
|
test_source_build, hostname=sample_dut)
|
|
keyvals[constants.SUITE_MIN_DUTS_KEY] = suite_min_duts
|
|
|
|
# Do not change this naming convention without updating
|
|
# site_utils.parse_job_name.
|
|
if run_prod_code:
|
|
# If run_prod_code is True, test_source_build is not set, use the
|
|
# first build in the builds list for the sutie job name.
|
|
name = '%s-%s' % (builds.values()[0], suite_name)
|
|
else:
|
|
name = '%s-%s' % (test_source_build, suite_name)
|
|
|
|
timeout_mins = timeout_mins or timeout * 60
|
|
max_runtime_mins = max_runtime_mins or timeout * 60
|
|
|
|
if not board:
|
|
board = utils.ParseBuildName(builds[provision.CROS_VERSION_PREFIX])[0]
|
|
|
|
if run_prod_code:
|
|
control_file = _get_control_file_by_suite(suite_name)
|
|
|
|
if not control_file:
|
|
# No control file was supplied so look it up from the build artifacts.
|
|
control_file = suite_common.get_control_file_by_build(
|
|
test_source_build, ds, suite_name)
|
|
|
|
# Prepend builds and board to the control file.
|
|
if is_cloning:
|
|
control_file = tools.remove_injection(control_file)
|
|
|
|
if suite_args is None:
|
|
suite_args = dict()
|
|
|
|
inject_dict = {
|
|
'board': board,
|
|
# `build` is needed for suites like AU to stage image inside suite
|
|
# control file.
|
|
'build': test_source_build,
|
|
'builds': builds,
|
|
'check_hosts': check_hosts,
|
|
'pool': pool,
|
|
'child_dependencies': child_dependencies,
|
|
'file_bugs': file_bugs,
|
|
'timeout': timeout,
|
|
'timeout_mins': timeout_mins,
|
|
'devserver_url': ds.url(),
|
|
'priority': priority,
|
|
'wait_for_results': wait_for_results,
|
|
'job_retry': job_retry,
|
|
'max_retries': max_retries,
|
|
'max_runtime_mins': max_runtime_mins,
|
|
'offload_failures_only': offload_failures_only,
|
|
'test_source_build': test_source_build,
|
|
'run_prod_code': run_prod_code,
|
|
'delay_minutes': delay_minutes,
|
|
'job_keyvals': job_keyvals,
|
|
'test_args': test_args,
|
|
}
|
|
inject_dict.update(suite_args)
|
|
control_file = tools.inject_vars(inject_dict, control_file)
|
|
|
|
return rpc_utils.create_job_common(name,
|
|
priority=priority,
|
|
timeout_mins=timeout_mins,
|
|
max_runtime_mins=max_runtime_mins,
|
|
control_type='Server',
|
|
control_file=control_file,
|
|
hostless=True,
|
|
keyvals=keyvals)
|
|
|
|
|
|
def get_job_history(**filter_data):
|
|
"""Get history of the job, including the special tasks executed for the job
|
|
|
|
@param filter_data: filter for the call, should at least include
|
|
{'job_id': [job id]}
|
|
@returns: JSON string of the job's history, including the information such
|
|
as the hosts run the job and the special tasks executed before
|
|
and after the job.
|
|
"""
|
|
job_id = filter_data['job_id']
|
|
job_info = job_history.get_job_info(job_id)
|
|
return rpc_utils.prepare_for_serialization(job_info.get_history())
|
|
|
|
|
|
def shard_heartbeat(shard_hostname, jobs=(), hqes=(), known_job_ids=(),
|
|
known_host_ids=(), known_host_statuses=()):
|
|
"""Receive updates for job statuses from shards and assign hosts and jobs.
|
|
|
|
@param shard_hostname: Hostname of the calling shard
|
|
@param jobs: Jobs in serialized form that should be updated with newer
|
|
status from a shard.
|
|
@param hqes: Hostqueueentries in serialized form that should be updated with
|
|
newer status from a shard. Note that for every hostqueueentry
|
|
the corresponding job must be in jobs.
|
|
@param known_job_ids: List of ids of jobs the shard already has.
|
|
@param known_host_ids: List of ids of hosts the shard already has.
|
|
@param known_host_statuses: List of statuses of hosts the shard already has.
|
|
|
|
@returns: Serialized representations of hosts, jobs, suite job keyvals
|
|
and their dependencies to be inserted into a shard's database.
|
|
"""
|
|
# The following alternatives to sending host and job ids in every heartbeat
|
|
# have been considered:
|
|
# 1. Sending the highest known job and host ids. This would work for jobs:
|
|
# Newer jobs always have larger ids. Also, if a job is not assigned to a
|
|
# particular shard during a heartbeat, it never will be assigned to this
|
|
# shard later.
|
|
# This is not true for hosts though: A host that is leased won't be sent
|
|
# to the shard now, but might be sent in a future heartbeat. This means
|
|
# sometimes hosts should be transfered that have a lower id than the
|
|
# maximum host id the shard knows.
|
|
# 2. Send the number of jobs/hosts the shard knows to the main in each
|
|
# heartbeat. Compare these to the number of records that already have
|
|
# the shard_id set to this shard. In the normal case, they should match.
|
|
# In case they don't, resend all entities of that type.
|
|
# This would work well for hosts, because there aren't that many.
|
|
# Resending all jobs is quite a big overhead though.
|
|
# Also, this approach might run into edge cases when entities are
|
|
# ever deleted.
|
|
# 3. Mixtures of the above: Use 1 for jobs and 2 for hosts.
|
|
# Using two different approaches isn't consistent and might cause
|
|
# confusion. Also the issues with the case of deletions might still
|
|
# occur.
|
|
#
|
|
# The overhead of sending all job and host ids in every heartbeat is low:
|
|
# At peaks one board has about 1200 created but unfinished jobs.
|
|
# See the numbers here: http://goo.gl/gQCGWH
|
|
# Assuming that job id's have 6 digits and that json serialization takes a
|
|
# comma and a space as overhead, the traffic per id sent is about 8 bytes.
|
|
# If 5000 ids need to be sent, this means 40 kilobytes of traffic.
|
|
# A NOT IN query with 5000 ids took about 30ms in tests made.
|
|
# These numbers seem low enough to outweigh the disadvantages of the
|
|
# solutions described above.
|
|
shard_obj = rpc_utils.retrieve_shard(shard_hostname=shard_hostname)
|
|
rpc_utils.persist_records_sent_from_shard(shard_obj, jobs, hqes)
|
|
assert len(known_host_ids) == len(known_host_statuses)
|
|
for i in range(len(known_host_ids)):
|
|
host_model = models.Host.objects.get(pk=known_host_ids[i])
|
|
if host_model.status != known_host_statuses[i]:
|
|
host_model.status = known_host_statuses[i]
|
|
host_model.save()
|
|
|
|
hosts, jobs, suite_keyvals, inc_ids = rpc_utils.find_records_for_shard(
|
|
shard_obj, known_job_ids=known_job_ids,
|
|
known_host_ids=known_host_ids)
|
|
return {
|
|
'hosts': [host.serialize() for host in hosts],
|
|
'jobs': [job.serialize() for job in jobs],
|
|
'suite_keyvals': [kv.serialize() for kv in suite_keyvals],
|
|
'incorrect_host_ids': [int(i) for i in inc_ids],
|
|
}
|
|
|
|
|
|
def get_shards(**filter_data):
|
|
"""Return a list of all shards.
|
|
|
|
@returns A sequence of nested dictionaries of shard information.
|
|
"""
|
|
shards = models.Shard.query_objects(filter_data)
|
|
serialized_shards = rpc_utils.prepare_rows_as_nested_dicts(shards, ())
|
|
for serialized, shard in zip(serialized_shards, shards):
|
|
serialized['labels'] = [label.name for label in shard.labels.all()]
|
|
|
|
return serialized_shards
|
|
|
|
|
|
def _assign_board_to_shard_precheck(labels):
|
|
"""Verify whether board labels are valid to be added to a given shard.
|
|
|
|
First check whether board label is in correct format. Second, check whether
|
|
the board label exist. Third, check whether the board has already been
|
|
assigned to shard.
|
|
|
|
@param labels: Board labels separated by comma.
|
|
|
|
@raises error.RPCException: If label provided doesn't start with `board:`
|
|
or board has been added to shard already.
|
|
@raises models.Label.DoesNotExist: If the label specified doesn't exist.
|
|
|
|
@returns: A list of label models that ready to be added to shard.
|
|
"""
|
|
if not labels:
|
|
# allow creation of label-less shards (labels='' would otherwise fail the
|
|
# checks below)
|
|
return []
|
|
labels = labels.split(',')
|
|
label_models = []
|
|
for label in labels:
|
|
# Check whether the board label is in correct format.
|
|
if not label.startswith('board:'):
|
|
raise error.RPCException('Sharding only supports `board:.*` label.')
|
|
# Check whether the board label exist. If not, exception will be thrown
|
|
# by smart_get function.
|
|
label = models.Label.smart_get(label)
|
|
# Check whether the board has been sharded already
|
|
try:
|
|
shard = models.Shard.objects.get(labels=label)
|
|
raise error.RPCException(
|
|
'%s is already on shard %s' % (label, shard.hostname))
|
|
except models.Shard.DoesNotExist:
|
|
# board is not on any shard, so it's valid.
|
|
label_models.append(label)
|
|
return label_models
|
|
|
|
|
|
def add_shard(hostname, labels):
|
|
"""Add a shard and start running jobs on it.
|
|
|
|
@param hostname: The hostname of the shard to be added; needs to be unique.
|
|
@param labels: Board labels separated by comma. Jobs of one of the labels
|
|
will be assigned to the shard.
|
|
|
|
@raises error.RPCException: If label provided doesn't start with `board:` or
|
|
board has been added to shard already.
|
|
@raises model_logic.ValidationError: If a shard with the given hostname
|
|
already exist.
|
|
@raises models.Label.DoesNotExist: If the label specified doesn't exist.
|
|
|
|
@returns: The id of the added shard.
|
|
"""
|
|
labels = _assign_board_to_shard_precheck(labels)
|
|
shard = models.Shard.add_object(hostname=hostname)
|
|
for label in labels:
|
|
shard.labels.add(label)
|
|
return shard.id
|
|
|
|
|
|
def add_board_to_shard(hostname, labels):
|
|
"""Add boards to a given shard
|
|
|
|
@param hostname: The hostname of the shard to be changed.
|
|
@param labels: Board labels separated by comma.
|
|
|
|
@raises error.RPCException: If label provided doesn't start with `board:` or
|
|
board has been added to shard already.
|
|
@raises models.Label.DoesNotExist: If the label specified doesn't exist.
|
|
|
|
@returns: The id of the changed shard.
|
|
"""
|
|
labels = _assign_board_to_shard_precheck(labels)
|
|
shard = models.Shard.objects.get(hostname=hostname)
|
|
for label in labels:
|
|
shard.labels.add(label)
|
|
return shard.id
|
|
|
|
|
|
# Remove board RPCs are rare, so we can afford to make them a bit more
|
|
# expensive (by performing in a transaction) in order to guarantee
|
|
# atomicity.
|
|
@transaction.commit_on_success
|
|
def remove_board_from_shard(hostname, label):
|
|
"""Remove board from the given shard.
|
|
@param hostname: The hostname of the shard to be changed.
|
|
@param labels: Board label.
|
|
|
|
@raises models.Label.DoesNotExist: If the label specified doesn't exist.
|
|
|
|
@returns: The id of the changed shard.
|
|
"""
|
|
shard = models.Shard.objects.get(hostname=hostname)
|
|
label = models.Label.smart_get(label)
|
|
if label not in shard.labels.all():
|
|
raise error.RPCException(
|
|
'Cannot remove label from shard that does not belong to it.')
|
|
|
|
shard.labels.remove(label)
|
|
if label.is_replaced_by_static():
|
|
static_label = models.StaticLabel.smart_get(label.name)
|
|
models.Host.objects.filter(
|
|
static_labels__in=[static_label]).update(shard=None)
|
|
else:
|
|
models.Host.objects.filter(labels__in=[label]).update(shard=None)
|
|
|
|
|
|
def delete_shard(hostname):
|
|
"""Delete a shard and reclaim all resources from it.
|
|
|
|
This claims back all assigned hosts from the shard.
|
|
"""
|
|
shard = rpc_utils.retrieve_shard(shard_hostname=hostname)
|
|
|
|
# Remove shard information.
|
|
models.Host.objects.filter(shard=shard).update(shard=None)
|
|
|
|
# Note: The original job-cleanup query was performed with django call
|
|
# models.Job.objects.filter(shard=shard).update(shard=None)
|
|
#
|
|
# But that started becoming unreliable due to the large size of afe_jobs.
|
|
#
|
|
# We don't need atomicity here, so the new cleanup method is iterative, in
|
|
# chunks of 100k jobs.
|
|
QUERY = ('UPDATE afe_jobs SET shard_id = NULL WHERE shard_id = %s '
|
|
'LIMIT 100000')
|
|
try:
|
|
with contextlib.closing(db_connection.cursor()) as cursor:
|
|
clear_jobs = True
|
|
assert shard.id is not None
|
|
while clear_jobs:
|
|
cursor.execute(QUERY % shard.id)
|
|
clear_jobs = bool(cursor.fetchone())
|
|
# Unit tests use sqlite backend instead of MySQL. sqlite does not support
|
|
# UPDATE ... LIMIT, so fall back to the old behavior.
|
|
except DatabaseError as e:
|
|
if 'syntax error' in str(e):
|
|
models.Job.objects.filter(shard=shard).update(shard=None)
|
|
else:
|
|
raise
|
|
|
|
shard.labels.clear()
|
|
shard.delete()
|
|
|
|
|
|
def get_servers(hostname=None, role=None, status=None):
|
|
"""Get a list of servers with matching role and status.
|
|
|
|
@param hostname: FQDN of the server.
|
|
@param role: Name of the server role, e.g., drone, scheduler. Default to
|
|
None to match any role.
|
|
@param status: Status of the server, e.g., primary, backup, repair_required.
|
|
Default to None to match any server status.
|
|
|
|
@raises error.RPCException: If server database is not used.
|
|
@return: A list of server names for servers with matching role and status.
|
|
"""
|
|
if not server_manager_utils.use_server_db():
|
|
raise error.RPCException('Server database is not enabled. Please try '
|
|
'retrieve servers from global config.')
|
|
servers = server_manager_utils.get_servers(hostname=hostname, role=role,
|
|
status=status)
|
|
return [s.get_details() for s in servers]
|
|
|
|
|
|
@rpc_utils.route_rpc_to_main
|
|
def get_stable_version(board=stable_version_utils.DEFAULT, android=False):
|
|
"""Get stable version for the given board.
|
|
|
|
@param board: Name of the board.
|
|
@param android: Unused legacy parameter. This is maintained for the
|
|
sake of clients on old branches that still pass the
|
|
parameter. TODO(jrbarnette) Remove this completely once R68
|
|
drops off stable.
|
|
|
|
@return: Stable version of the given board. Return global configure value
|
|
of CROS.stable_cros_version if stable_versinos table does not have
|
|
entry of board DEFAULT.
|
|
"""
|
|
assert not android, 'get_stable_version no longer supports `android`.'
|
|
return stable_version_utils.get(board=board)
|
|
|
|
|
|
@rpc_utils.route_rpc_to_main
|
|
def get_all_stable_versions():
|
|
"""Get stable versions for all boards.
|
|
|
|
@return: A dictionary of board:version.
|
|
"""
|
|
return stable_version_utils.get_all()
|
|
|
|
|
|
@rpc_utils.route_rpc_to_main
|
|
def set_stable_version(version, board=stable_version_utils.DEFAULT):
|
|
"""Modify stable version for the given board.
|
|
|
|
@param version: The new value of stable version for given board.
|
|
@param board: Name of the board, default to value `DEFAULT`.
|
|
"""
|
|
logging.warning("rpc_interface::set_stable_version: attempted to set stable version. setting the stable version is not permitted")
|
|
return None
|
|
|
|
|
|
@rpc_utils.route_rpc_to_main
|
|
def delete_stable_version(board):
|
|
"""Modify stable version for the given board.
|
|
|
|
Delete a stable version entry in afe_stable_versions table for a given
|
|
board, so default stable version will be used.
|
|
|
|
@param board: Name of the board.
|
|
"""
|
|
stable_version_utils.delete(board=board)
|
|
|
|
|
|
def get_tests_by_build(build, ignore_invalid_tests=True):
|
|
"""Get the tests that are available for the specified build.
|
|
|
|
@param build: unique name by which to refer to the image.
|
|
@param ignore_invalid_tests: flag on if unparsable tests are ignored.
|
|
|
|
@return: A sorted list of all tests that are in the build specified.
|
|
"""
|
|
# Collect the control files specified in this build
|
|
cfile_getter = control_file_lib._initialize_control_file_getter(build)
|
|
if suite_common.ENABLE_CONTROLS_IN_BATCH:
|
|
control_file_info_list = cfile_getter.get_suite_info()
|
|
control_file_list = control_file_info_list.keys()
|
|
else:
|
|
control_file_list = cfile_getter.get_control_file_list()
|
|
|
|
test_objects = []
|
|
_id = 0
|
|
for control_file_path in control_file_list:
|
|
# Read and parse the control file
|
|
if suite_common.ENABLE_CONTROLS_IN_BATCH:
|
|
control_file = control_file_info_list[control_file_path]
|
|
else:
|
|
control_file = cfile_getter.get_control_file_contents(
|
|
control_file_path)
|
|
try:
|
|
control_obj = control_data.parse_control_string(control_file)
|
|
except:
|
|
logging.info('Failed to parse control file: %s', control_file_path)
|
|
if not ignore_invalid_tests:
|
|
raise
|
|
|
|
# Extract the values needed for the AFE from the control_obj.
|
|
# The keys list represents attributes in the control_obj that
|
|
# are required by the AFE
|
|
keys = ['author', 'doc', 'name', 'time', 'test_type', 'experimental',
|
|
'test_category', 'test_class', 'dependencies', 'run_verify',
|
|
'sync_count', 'job_retries', 'path']
|
|
|
|
test_object = {}
|
|
for key in keys:
|
|
test_object[key] = getattr(control_obj, key) if hasattr(
|
|
control_obj, key) else ''
|
|
|
|
# Unfortunately, the AFE expects different key-names for certain
|
|
# values, these must be corrected to avoid the risk of tests
|
|
# being omitted by the AFE.
|
|
# The 'id' is an additional value used in the AFE.
|
|
# The control_data parsing does not reference 'run_reset', but it
|
|
# is also used in the AFE and defaults to True.
|
|
test_object['id'] = _id
|
|
test_object['run_reset'] = True
|
|
test_object['description'] = test_object.get('doc', '')
|
|
test_object['test_time'] = test_object.get('time', 0)
|
|
|
|
# TODO(crbug.com/873716) DEPRECATED. Remove entirely.
|
|
test_object['test_retry'] = 0
|
|
|
|
# Fix the test name to be consistent with the current presentation
|
|
# of test names in the AFE.
|
|
testpath, subname = os.path.split(control_file_path)
|
|
testname = os.path.basename(testpath)
|
|
subname = subname.split('.')[1:]
|
|
if subname:
|
|
testname = '%s:%s' % (testname, ':'.join(subname))
|
|
|
|
test_object['name'] = testname
|
|
|
|
# Correct the test path as parse_control_string sets an empty string.
|
|
test_object['path'] = control_file_path
|
|
|
|
_id += 1
|
|
test_objects.append(test_object)
|
|
|
|
test_objects = sorted(test_objects, key=lambda x: x.get('name'))
|
|
return rpc_utils.prepare_for_serialization(test_objects)
|
|
|
|
|
|
@rpc_utils.route_rpc_to_main
|
|
def get_lab_health_indicators(board=None):
|
|
"""Get the healthy indicators for whole lab.
|
|
|
|
The indicators now includes:
|
|
1. lab is closed or not.
|
|
2. Available DUTs list for a given board.
|
|
3. Devserver capacity.
|
|
4. When is the next major DUT utilization (e.g. CQ is coming in 3 minutes).
|
|
|
|
@param board: if board is specified, a list of available DUTs will be
|
|
returned for it. Otherwise, skip this indicator.
|
|
|
|
@returns: A healthy indicator object including health info.
|
|
"""
|
|
return LabHealthIndicator(None, None, None, None)
|