You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
416 lines
15 KiB
416 lines
15 KiB
# Copyright 2015 The Chromium OS Authors. All rights reserved.
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
# found in the LICENSE file.
|
|
|
|
"""An interface to access the local USB facade."""
|
|
|
|
import glob
|
|
import logging
|
|
import os
|
|
import time
|
|
|
|
from autotest_lib.client.bin import utils
|
|
from autotest_lib.client.cros.audio import cras_dbus_utils
|
|
from autotest_lib.client.cros.audio import cras_utils
|
|
|
|
|
|
class USBFacadeNativeError(Exception):
|
|
"""Error in USBFacadeNative."""
|
|
pass
|
|
|
|
|
|
class USBFacadeNative(object):
|
|
"""Facade to access the USB-related functionality.
|
|
|
|
Property:
|
|
_drivers_manager: A USBDeviceDriversManager object used to manage the
|
|
status of drivers associated with the USB audio gadget
|
|
on the host side.
|
|
|
|
"""
|
|
_DEFAULT_DEVICE_PRODUCT_NAME = 'Linux USB Audio Gadget'
|
|
_TIMEOUT_FINDING_USB_DEVICE_SECS = 10
|
|
_TIMEOUT_CRAS_NODES_CHANGE_SECS = 30
|
|
|
|
def __init__(self):
|
|
"""Initializes the USB facade.
|
|
|
|
The _drivers_manager is set with a USBDeviceDriversManager, which is
|
|
used to control the visibility and availability of a USB device on a
|
|
host Cros device.
|
|
|
|
"""
|
|
self._drivers_manager = USBDeviceDriversManager()
|
|
|
|
|
|
def _reenumerate_usb_devices(self):
|
|
"""Resets host controller to re-enumerate usb devices."""
|
|
self._drivers_manager.reset_host_controller()
|
|
|
|
|
|
def plug(self):
|
|
"""Sets and plugs the USB device into the host.
|
|
|
|
The USB device is initially set to one with the default product name,
|
|
which is assumed to be the name of the USB audio gadget on Chameleon.
|
|
This method blocks until Cras enumerate USB nodes within a timeout
|
|
specified in _wait_for_nodes_changed.
|
|
|
|
"""
|
|
# Only supports controlling one USB device of default name.
|
|
device_name = self._DEFAULT_DEVICE_PRODUCT_NAME
|
|
|
|
def find_usb_device():
|
|
"""Find USB device with name device_name.
|
|
|
|
@returns: True if succeed to find the device, False otherwise.
|
|
|
|
"""
|
|
try:
|
|
self._drivers_manager.find_usb_device(device_name)
|
|
return True
|
|
except USBDeviceDriversManagerError:
|
|
logging.debug('Can not find %s yet' % device_name)
|
|
return False
|
|
|
|
if self._drivers_manager.has_found_device(device_name):
|
|
if self._drivers_manager.drivers_are_bound():
|
|
return
|
|
self._drivers_manager.bind_usb_drivers()
|
|
self._wait_for_nodes_changed()
|
|
else:
|
|
# If driver manager has not found device yet, re-enumerate USB
|
|
# devices. The correct USB driver will be binded automatically.
|
|
self._reenumerate_usb_devices()
|
|
self._wait_for_nodes_changed()
|
|
# Wait some time for paths and fields in sysfs to be created.
|
|
utils.poll_for_condition(
|
|
condition=find_usb_device,
|
|
desc='Find USB device',
|
|
timeout=self._TIMEOUT_FINDING_USB_DEVICE_SECS)
|
|
|
|
|
|
def unplug(self):
|
|
"""Unplugs the USB device from the host."""
|
|
self._drivers_manager.unbind_usb_drivers()
|
|
|
|
|
|
def _wait_for_nodes_changed(self):
|
|
"""Waits for Cras to enumerate USB nodes.
|
|
|
|
USB nodes will be plugged, but not necessarily selected.
|
|
|
|
"""
|
|
def find_usb_node():
|
|
"""Checks if USB input and output nodes are plugged.
|
|
|
|
@returns: True if USB input and output nodes are plugged. False
|
|
otherwise.
|
|
"""
|
|
out_nodes, in_nodes = cras_utils.get_plugged_node_types()
|
|
logging.info('Cras nodes: output: %s, input: %s',
|
|
out_nodes, in_nodes)
|
|
return 'USB' in out_nodes and 'USB' in in_nodes
|
|
|
|
utils.poll_for_condition(
|
|
condition=find_usb_node,
|
|
desc='Find USB node',
|
|
timeout=self._TIMEOUT_CRAS_NODES_CHANGE_SECS)
|
|
|
|
|
|
class USBDeviceDriversManagerError(Exception):
|
|
"""Error in USBDeviceDriversManager."""
|
|
pass
|
|
|
|
|
|
class HostControllerDriver(object):
|
|
"""Abstract a host controller driver.
|
|
|
|
This class stores id and path like:
|
|
path: /sys/bus/pci/drivers/echi_hcd
|
|
id: 0000:00:1a.0
|
|
Then, it can bind/unbind driver by writing
|
|
0000:00:1a.0 to /sys/bus/pci/drivers/echi_hcd/bind
|
|
and /sys/bus/pci/drivers/echi_hcd/unbind.
|
|
|
|
"""
|
|
def __init__(self, hcd_id, hcd_path):
|
|
"""Inits an HostControllerDriver object.
|
|
|
|
@param hcd_id: The HCD id, e.g. 0000:00:1a.0
|
|
@param hcd_path: The path to HCD, e.g. /sys/bus/pci/drivers/echi_hcd.
|
|
|
|
"""
|
|
logging.debug('hcd id: %s, hcd path: %s', hcd_id, hcd_path)
|
|
self._hcd_id = hcd_id
|
|
self._hcd_path = hcd_path
|
|
|
|
|
|
def reset(self):
|
|
"""Resets HCD by unbinding and binding driver."""
|
|
utils.open_write_close(
|
|
os.path.join(self._hcd_path, 'unbind'), self._hcd_id)
|
|
utils.open_write_close(
|
|
os.path.join(self._hcd_path, 'bind'), self._hcd_id)
|
|
|
|
|
|
class USBDeviceDriversManager(object):
|
|
"""The class to control the USB drivers associated with a USB device.
|
|
|
|
By binding/unbinding certain USB driver, we can emulate the plug/unplug
|
|
action on that bus. However, this method only applies when the USB driver
|
|
has already been binded once.
|
|
To solve above problem, we can unbind then bind USB host controller driver
|
|
(HCD), then, HCD will re-enumerate all the USB devices. This method has
|
|
a side effect that all the USB devices will be disconnected for several
|
|
seconds, so we should only do it if needed.
|
|
Note that there might be multiple HCDs, e.g. 0000:00:1a.0 for bus1 and
|
|
0000:00:1b.0 for bus2.
|
|
|
|
Properties:
|
|
_device_product_name: The product name given to the USB device.
|
|
_device_bus_id: The bus ID of the USB device in the host.
|
|
_hcd_ids: The host controller driver IDs.
|
|
_hcds: A list of HostControllerDrivers.
|
|
|
|
"""
|
|
# The file to write to bind USB drivers of specified device
|
|
_USB_BIND_FILE_PATH = '/sys/bus/usb/drivers/usb/bind'
|
|
# The file to write to unbind USB drivers of specified device
|
|
_USB_UNBIND_FILE_PATH = '/sys/bus/usb/drivers/usb/unbind'
|
|
# The file path that exists when drivers are bound for current device
|
|
_USB_BOUND_DRIVERS_FILE_PATH = '/sys/bus/usb/drivers/usb/%s/driver'
|
|
# The pattern to glob usb drivers
|
|
_USB_DRIVER_GLOB_PATTERN = '/sys/bus/usb/drivers/usb/usb?/'
|
|
# The path to search for HCD on PCI or platform bus.
|
|
# The HCD id should be filled in the end.
|
|
_HCD_GLOB_PATTERNS = [
|
|
'/sys/bus/pci/drivers/*/%s',
|
|
'/sys/bus/platform/drivers/*/%s']
|
|
|
|
|
|
def __init__(self):
|
|
"""Initializes the manager.
|
|
|
|
_device_product_name and _device_bus_id are initially set to None.
|
|
|
|
"""
|
|
self._device_product_name = None
|
|
self._device_bus_id = None
|
|
self._hcd_ids = None
|
|
self._hcds = None
|
|
self._find_hcd_ids()
|
|
self._create_hcds()
|
|
|
|
|
|
def _find_hcd_ids(self):
|
|
"""Finds host controller driver ids for USB.
|
|
|
|
We can find the HCD id for USB from driver's realpath.
|
|
E.g. On ARM device:
|
|
/sys/bus/usb/drivers/usb/usb1 links to
|
|
/sys/devices/soc0/70090000.usb/xhci-hcd.0.auto/usb1
|
|
=> HCD id is xhci-hcd.0.auto
|
|
|
|
E.g. On X86 device:
|
|
/sys/bus/usb/drivers/usb/usb1 links to
|
|
/sys/devices/pci0000:00/0000:00:14.0/usb1
|
|
=> HCD id is 0000:00:14.0
|
|
|
|
There might be multiple HCD ids like 0000:00:1a.0 for usb1,
|
|
and 0000:00:1d.0 for usb2.
|
|
|
|
@raises: USBDeviceDriversManagerError if HCD id can not be found.
|
|
|
|
"""
|
|
def _get_dir_name(path):
|
|
return os.path.basename(os.path.dirname(path))
|
|
|
|
hcd_ids = set()
|
|
|
|
for search_root_path in glob.glob(self._USB_DRIVER_GLOB_PATTERN):
|
|
hcd_id = _get_dir_name(os.path.realpath(search_root_path))
|
|
hcd_ids.add(hcd_id)
|
|
|
|
if not hcd_ids:
|
|
raise USBDeviceDriversManagerError('Can not find HCD id')
|
|
|
|
self._hcd_ids = hcd_ids
|
|
logging.debug('Found HCD ids: %s', self._hcd_ids)
|
|
|
|
|
|
def _create_hcds(self):
|
|
"""Finds HCD paths from HCD id and create HostControllerDrivers.
|
|
|
|
HCD is under /sys/bus/pci/drivers/ for x86 boards, and under
|
|
/sys/bus/platform/drivers/ for ARM boards.
|
|
|
|
For each HCD id, finds HCD by checking HCD id under it, e.g.
|
|
/sys/bus/pci/drivers/ehci_hcd has 0000:00:1a.0 under it.
|
|
Then, create a HostControllerDriver and store it in self._hcds.
|
|
|
|
@raises: USBDeviceDriversManagerError if there are multiple
|
|
HCD path found for a given HCD id.
|
|
|
|
@raises: USBDeviceDriversManagerError if no HostControllerDriver is found.
|
|
|
|
"""
|
|
self._hcds = []
|
|
|
|
for hcd_id in self._hcd_ids:
|
|
for glob_pattern in self._HCD_GLOB_PATTERNS:
|
|
glob_pattern = glob_pattern % hcd_id
|
|
hcd_id_paths = glob.glob(glob_pattern)
|
|
if not hcd_id_paths:
|
|
continue
|
|
if len(hcd_id_paths) > 1:
|
|
raise USBDeviceDriversManagerError(
|
|
'More than 1 HCD id path found: %s' % hcd_id_paths)
|
|
hcd_id_path = hcd_id_paths[0]
|
|
|
|
# Gets /sys/bus/pci/drivers/echi_hcd from
|
|
# /sys/bus/pci/drivers/echi_hcd/0000:00:1a.0
|
|
hcd_path = os.path.dirname(hcd_id_path)
|
|
self._hcds.append(
|
|
HostControllerDriver(hcd_id=hcd_id, hcd_path=hcd_path))
|
|
|
|
|
|
def reset_host_controller(self):
|
|
"""Resets host controller by unbinding then binding HCD.
|
|
|
|
@raises: USBDeviceDriversManagerError if there is no HCD to control.
|
|
|
|
"""
|
|
if not self._hcds:
|
|
raise USBDeviceDriversManagerError('HCD is not found yet')
|
|
for hcd in self._hcds:
|
|
hcd.reset()
|
|
|
|
|
|
def _find_usb_device_bus_id(self, product_name):
|
|
"""Finds the bus ID of the USB device with the given product name.
|
|
|
|
@param product_name: The product name of the USB device as it appears
|
|
to the host.
|
|
|
|
@returns: The bus ID of the USB device if it is detected by the host
|
|
successfully; or None if there is no such device with the
|
|
given product name.
|
|
|
|
"""
|
|
def product_matched(path):
|
|
"""Checks if the product field matches expected product name.
|
|
|
|
@returns: True if the product name matches, False otherwise.
|
|
|
|
"""
|
|
read_product_name = utils.read_one_line(path)
|
|
logging.debug('Read product at %s = %s', path, read_product_name)
|
|
return read_product_name == product_name
|
|
|
|
# Find product field at these possible paths:
|
|
# '/sys/bus/usb/drivers/usb/usbX/X-Y/product' => bus id is X-Y.
|
|
# '/sys/bus/usb/drivers/usb/usbX/X-Y/X-Y.Z/product' => bus id is X-Y.Z.
|
|
|
|
for search_root_path in glob.glob(self._USB_DRIVER_GLOB_PATTERN):
|
|
logging.debug('search_root_path: %s', search_root_path)
|
|
for root, dirs, _ in os.walk(search_root_path):
|
|
logging.debug('root: %s', root)
|
|
for bus_id in dirs:
|
|
logging.debug('bus_id: %s', bus_id)
|
|
product_path = os.path.join(root, bus_id, 'product')
|
|
logging.debug('product_path: %s', product_path)
|
|
if not os.path.exists(product_path):
|
|
continue
|
|
if not product_matched(product_path):
|
|
continue
|
|
logging.debug(
|
|
'Bus ID of %s found: %s', product_name, bus_id)
|
|
return bus_id
|
|
|
|
logging.error('Bus ID of %s not found', product_name)
|
|
return None
|
|
|
|
|
|
def has_found_device(self, product_name):
|
|
"""Checks if the device has been found.
|
|
|
|
@param product_name: The product name of the USB device as it appears
|
|
to the host.
|
|
|
|
@returns: True if device has been found, False otherwise.
|
|
|
|
"""
|
|
return self._device_product_name == product_name
|
|
|
|
|
|
def find_usb_device(self, product_name):
|
|
"""Sets _device_product_name and _device_bus_id if it can be found.
|
|
|
|
@param product_name: The product name of the USB device as it appears
|
|
to the host.
|
|
|
|
@raises: USBDeviceDriversManagerError if device bus ID cannot be found
|
|
for the device with the given product name.
|
|
|
|
"""
|
|
device_bus_id = self._find_usb_device_bus_id(product_name)
|
|
if device_bus_id is None:
|
|
error_message = 'Cannot find device with product name: %s'
|
|
raise USBDeviceDriversManagerError(error_message % product_name)
|
|
else:
|
|
self._device_product_name = product_name
|
|
self._device_bus_id = device_bus_id
|
|
|
|
|
|
def drivers_are_bound(self):
|
|
"""Checks whether the drivers with the of current device are bound.
|
|
|
|
If the drivers are already bound, calling bind_usb_drivers will be
|
|
redundant and also result in an error.
|
|
|
|
@return: True if the path to the drivers exist, meaning the drivers
|
|
are already bound. False otherwise.
|
|
|
|
"""
|
|
if self._device_bus_id is None:
|
|
raise USBDeviceDriversManagerError('USB Bus ID is not set yet.')
|
|
driver_path = self._USB_BOUND_DRIVERS_FILE_PATH % self._device_bus_id
|
|
return os.path.exists(driver_path)
|
|
|
|
|
|
def bind_usb_drivers(self):
|
|
"""Binds the USB driver(s) of the current device to the host.
|
|
|
|
This is applied to all the drivers associated with and listed under
|
|
the USB device with the current _device_product_name and _device_bus_id.
|
|
|
|
@raises: USBDeviceDriversManagerError if device bus ID for this instance
|
|
has not been set yet.
|
|
|
|
"""
|
|
if self._device_bus_id is None:
|
|
raise USBDeviceDriversManagerError('USB Bus ID is not set yet.')
|
|
if self.drivers_are_bound():
|
|
return
|
|
utils.open_write_close(self._USB_BIND_FILE_PATH,
|
|
self._device_bus_id)
|
|
|
|
|
|
def unbind_usb_drivers(self):
|
|
"""Unbinds the USB driver(s) of the current device from the host.
|
|
|
|
This is applied to all the drivers associated with and listed under
|
|
the USB device with the current _device_product_name and _device_bus_id.
|
|
|
|
@raises: USBDeviceDriversManagerError if device bus ID for this instance
|
|
has not been set yet.
|
|
|
|
"""
|
|
if self._device_bus_id is None:
|
|
raise USBDeviceDriversManagerError('USB Bus ID is not set yet.')
|
|
if not self.drivers_are_bound():
|
|
return
|
|
utils.open_write_close(self._USB_UNBIND_FILE_PATH,
|
|
self._device_bus_id)
|