You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
272 lines
11 KiB
272 lines
11 KiB
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
# found in the LICENSE file.
|
|
|
|
import dbus
|
|
import logging
|
|
import time
|
|
|
|
from autotest_lib.client.bin import utils
|
|
from autotest_lib.client.cros.networking import shill_proxy
|
|
|
|
|
|
class CellularProxy(shill_proxy.ShillProxy):
|
|
"""Wrapper around shill dbus interface used by cellular tests."""
|
|
|
|
# Properties exposed by shill.
|
|
DEVICE_PROPERTY_DBUS_OBJECT = 'DBus.Object'
|
|
DEVICE_PROPERTY_MODEL_ID = 'Cellular.ModelID'
|
|
DEVICE_PROPERTY_MANUFACTURER = 'Cellular.Manufacturer'
|
|
DEVICE_PROPERTY_OUT_OF_CREDITS = 'Cellular.OutOfCredits'
|
|
DEVICE_PROPERTY_SIM_LOCK_STATUS = 'Cellular.SIMLockStatus'
|
|
DEVICE_PROPERTY_SIM_PRESENT = 'Cellular.SIMPresent'
|
|
DEVICE_PROPERTY_TECHNOLOGY_FAMILY = 'Cellular.Family'
|
|
DEVICE_PROPERTY_TECHNOLOGY_FAMILY_CDMA = 'CDMA'
|
|
DEVICE_PROPERTY_TECHNOLOGY_FAMILY_GSM = 'GSM'
|
|
SERVICE_PROPERTY_LAST_GOOD_APN = 'Cellular.LastGoodAPN'
|
|
|
|
# APN info property names.
|
|
APN_INFO_PROPERTY_APN = 'apn'
|
|
|
|
# Keys into the dictionaries exposed as properties.
|
|
PROPERTY_KEY_SIM_LOCK_TYPE = 'LockType'
|
|
PROPERTY_KEY_SIM_LOCK_ENABLED = 'LockEnabled'
|
|
PROPERTY_KEY_SIM_LOCK_RETRIES_LEFT = 'RetriesLeft'
|
|
|
|
# Valid values taken by properties exposed by shill.
|
|
VALUE_SIM_LOCK_TYPE_PIN = 'sim-pin'
|
|
VALUE_SIM_LOCK_TYPE_PUK = 'sim-puk'
|
|
|
|
# Various timeouts in seconds.
|
|
SERVICE_CONNECT_TIMEOUT = 60
|
|
SERVICE_DISCONNECT_TIMEOUT = 60
|
|
SERVICE_REGISTRATION_TIMEOUT = 60
|
|
SLEEP_INTERVAL = 0.1
|
|
|
|
def set_logging_for_cellular_test(self):
|
|
"""Set the logging in shill for a test of cellular technology.
|
|
|
|
Set the log level to |ShillProxy.LOG_LEVEL_FOR_TEST| and the log scopes
|
|
to the ones defined in |ShillProxy.LOG_SCOPES_FOR_TEST| for
|
|
|ShillProxy.TECHNOLOGY_CELLULAR|.
|
|
|
|
"""
|
|
self.set_logging_for_test(self.TECHNOLOGY_CELLULAR)
|
|
|
|
|
|
def find_cellular_service_object(self):
|
|
"""Returns the first dbus object found that is a cellular service.
|
|
|
|
@return DBus object for the first cellular service found. None if no
|
|
service found.
|
|
|
|
"""
|
|
return self.find_object('Service', {'Type': self.TECHNOLOGY_CELLULAR})
|
|
|
|
|
|
def wait_for_cellular_service_object(
|
|
self, timeout_seconds=SERVICE_REGISTRATION_TIMEOUT):
|
|
"""Waits for the cellular service object to show up.
|
|
|
|
@param timeout_seconds: Amount of time to wait for cellular service.
|
|
@return DBus object for the first cellular service found.
|
|
@raises ShillProxyError if no cellular service is found within the
|
|
registration timeout period.
|
|
|
|
"""
|
|
return utils.poll_for_condition(
|
|
lambda: self.find_cellular_service_object(),
|
|
exception=shill_proxy.ShillProxyTimeoutError(
|
|
'Failed to find cellular service object'),
|
|
timeout=timeout_seconds)
|
|
|
|
|
|
def find_cellular_device_object(self):
|
|
"""Returns the first dbus object found that is a cellular device.
|
|
|
|
@return DBus object for the first cellular device found. None if no
|
|
device found.
|
|
|
|
"""
|
|
return self.find_object('Device', {'Type': self.TECHNOLOGY_CELLULAR})
|
|
|
|
|
|
def reset_modem(self, modem, expect_device=True, expect_powered=True,
|
|
expect_service=True):
|
|
"""Reset |modem|.
|
|
|
|
Do, in sequence,
|
|
(1) Ensure that the current device object disappears.
|
|
(2) If |expect_device|, ensure that the device reappears.
|
|
(3) If |expect_powered|, ensure that the device is powered.
|
|
(4) If |expect_service|, ensure that the service reappears.
|
|
|
|
This function does not check the service state for the device after
|
|
reset.
|
|
|
|
@param modem: DBus object for the modem to reset.
|
|
@param expect_device: If True, ensure that a DBus object reappears for
|
|
the same modem after the reset.
|
|
@param expect_powered: If True, ensure that the modem is powered on
|
|
after the reset.
|
|
@param expect_service: If True, ensure that a service managing the
|
|
reappeared modem also reappears.
|
|
|
|
@return (device, service)
|
|
device: DBus object for the reappeared Device after the reset.
|
|
service: DBus object for the reappeared Service after the reset.
|
|
Either of these may be None, if the object is not expected to
|
|
reappear.
|
|
|
|
@raises ShillProxyError if any of the conditions (1)-(4) fail.
|
|
|
|
"""
|
|
logging.info('Resetting modem')
|
|
# Obtain identifying information about the modem.
|
|
properties = modem.GetProperties(utf8_strings=True)
|
|
# NOTE: Using the Model ID means that this will break if we have two
|
|
# identical cellular modems in a DUT. Fortunately, we only support one
|
|
# modem at a time.
|
|
model_id = properties.get(self.DEVICE_PROPERTY_MODEL_ID)
|
|
if not model_id:
|
|
raise shill_proxy.ShillProxyError(
|
|
'Failed to get identifying information for the modem.')
|
|
old_modem_path = modem.object_path
|
|
old_modem_mm_object = properties.get(self.DEVICE_PROPERTY_DBUS_OBJECT)
|
|
if not old_modem_mm_object:
|
|
raise shill_proxy.ShillProxyError(
|
|
'Failed to get the mm object path for the modem.')
|
|
|
|
manufacturer = properties.get(self.DEVICE_PROPERTY_MANUFACTURER)
|
|
if not manufacturer:
|
|
raise shill_proxy.ShillProxyError(
|
|
'Failed to get the manufacturer for the modem.')
|
|
if "QUALCOMM" in manufacturer:
|
|
logging.info(
|
|
'Qualcomm modem found. Bypassing modem reset (b/168113309)'
|
|
)
|
|
new_modem = modem
|
|
else:
|
|
modem.Reset()
|
|
|
|
# (1) Wait for the old modem to disappear
|
|
utils.poll_for_condition(lambda: self._is_old_modem_gone(
|
|
old_modem_path, old_modem_mm_object),
|
|
exception=shill_proxy.
|
|
ShillProxyTimeoutError(
|
|
'Old modem disappeared'),
|
|
timeout=60)
|
|
|
|
# (2) Wait for the device to reappear
|
|
if not expect_device:
|
|
return None, None
|
|
# The timeout here should be sufficient for our slowest modem to
|
|
# reappear.
|
|
new_modem = utils.poll_for_condition(
|
|
lambda: self._get_reappeared_modem(model_id,
|
|
old_modem_mm_object),
|
|
exception=shill_proxy.ShillProxyTimeoutError(
|
|
'The modem reappeared after reset.'),
|
|
timeout=60)
|
|
|
|
# (3) Check powered state of the device
|
|
if not expect_powered:
|
|
return new_modem, None
|
|
success, _, _ = self.wait_for_property_in(
|
|
new_modem,
|
|
self.DEVICE_PROPERTY_POWERED, [self.VALUE_POWERED_ON],
|
|
timeout_seconds=10)
|
|
if not success:
|
|
raise shill_proxy.ShillProxyError(
|
|
'After modem reset, new modem failed to enter powered '
|
|
'state.')
|
|
|
|
# (4) Check that service reappears
|
|
if not expect_service:
|
|
return new_modem, None
|
|
new_service = self.get_service_for_device(new_modem)
|
|
if not new_service:
|
|
raise shill_proxy.ShillProxyError(
|
|
'Failed to find a shill service managing the reappeared '
|
|
'device.')
|
|
return new_modem, new_service
|
|
|
|
|
|
def disable_modem_for_test_setup(self, timeout_seconds=10):
|
|
"""
|
|
Disables all cellular modems.
|
|
|
|
Use this method only for setting up tests. Do not use this method to
|
|
test disable functionality because this method repeatedly attempts to
|
|
disable the cellular technology until it succeeds (ignoring all DBus
|
|
errors) since the DisableTechnology() call may fail for various reasons
|
|
(eg. an enable is in progress).
|
|
|
|
@param timeout_seconds: Amount of time to wait until the modem is
|
|
disabled.
|
|
@raises ShillProxyError if the modems fail to disable within
|
|
|timeout_seconds|.
|
|
|
|
"""
|
|
def _disable_cellular_technology(self):
|
|
try:
|
|
self._manager.DisableTechnology(self.TECHNOLOGY_CELLULAR)
|
|
return True
|
|
except dbus.DBusException as e:
|
|
return False
|
|
|
|
utils.poll_for_condition(
|
|
lambda: _disable_cellular_technology(self),
|
|
exception=shill_proxy.ShillProxyTimeoutError(
|
|
'Failed to disable cellular technology.'),
|
|
timeout=timeout_seconds)
|
|
modem = self.find_cellular_device_object()
|
|
self.wait_for_property_in(modem, self.DEVICE_PROPERTY_POWERED,
|
|
[self.VALUE_POWERED_OFF],
|
|
timeout_seconds=timeout_seconds)
|
|
|
|
|
|
def _is_old_modem_gone(self, modem_path, modem_mm_object):
|
|
"""Tests if the DBus object for modem disappears after Reset.
|
|
|
|
@param modem_path: The DBus path for the modem object that must vanish.
|
|
@param modem_mm_object: The modemmanager object path reported by the
|
|
old modem. This is unique everytime a new modem is (re)exposed.
|
|
|
|
@return True if the object disappeared, false otherwise.
|
|
|
|
"""
|
|
device = self.get_dbus_object(self.DBUS_TYPE_DEVICE, modem_path)
|
|
try:
|
|
properties = device.GetProperties()
|
|
# DBus object exists, perhaps a reappeared device?
|
|
return (properties.get(self.DEVICE_PROPERTY_DBUS_OBJECT) !=
|
|
modem_mm_object)
|
|
except dbus.DBusException as e:
|
|
if e.get_dbus_name() == self.DBUS_ERROR_UNKNOWN_OBJECT:
|
|
return True
|
|
return False
|
|
|
|
|
|
def _get_reappeared_modem(self, model_id, old_modem_mm_object):
|
|
"""Check that a vanished modem reappers.
|
|
|
|
@param model_id: The model ID reported by the vanished modem.
|
|
@param old_modem_mm_object: The previously reported modemmanager object
|
|
path for this modem.
|
|
|
|
@return The reappeared DBus object, if any. None otherwise.
|
|
|
|
"""
|
|
# TODO(pprabhu) This will break if we have multiple cellular devices
|
|
# in the system at the same time.
|
|
device = self.find_cellular_device_object()
|
|
if not device:
|
|
return None
|
|
properties = device.GetProperties(utf8_strings=True)
|
|
if (model_id == properties.get(self.DEVICE_PROPERTY_MODEL_ID) and
|
|
(old_modem_mm_object !=
|
|
properties.get(self.DEVICE_PROPERTY_DBUS_OBJECT))):
|
|
return device
|
|
return None
|