| # Copyright 2024, The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| """Module that detects device attributes and USB speed using adb commands.""" |
| |
| import enum |
| import logging |
| import subprocess |
| from typing import NamedTuple |
| from atest import atest_utils |
| from atest import constants |
| |
| |
| @enum.unique |
| class UsbAttributeName(enum.Enum): |
| NEGOTIATED_SPEED = 'current_speed' |
| MAXIMUM_SPEED = 'maximum_speed' |
| |
| |
| class DeviceIds(NamedTuple): |
| manufacturer: str |
| model: str |
| name: str |
| serial: str |
| address: str |
| |
| |
| def verify_and_print_usb_speed_warning( |
| device_ids: DeviceIds, negotiated_speed: int, max_speed: int |
| ) -> bool: |
| """Checks whether the connection speed is optimal for the given device. |
| |
| Args: |
| device_ids: Identifiers allowing a user to recognize the device the usb |
| speed warning is related to. |
| negotiated_speed: The current speed of the device. |
| max_speed: The maximum speed that the given device is capable of. |
| |
| Returns: |
| True if the warning was printed, False otherwise. |
| """ |
| # If a USB-2 is used with a USB-3 capable device, the speed will be |
| # downgraded to 480 Mbps and never 12 Mbps, so this is the only case we |
| # check. |
| if negotiated_speed == 480 and negotiated_speed < max_speed: |
| _print_usb_speed_warning(device_ids, negotiated_speed, max_speed) |
| return True |
| return False |
| |
| |
| def _print_usb_speed_warning( |
| device_ids: DeviceIds, negotiated_speed: int, max_speed: int |
| ): |
| """Prints a warning about the device's operating speed if it's suboptimal. |
| |
| Args: |
| device_ids: Identifiers allowing a user to recognize the device the usb |
| speed warning is related to. |
| negotiated_speed: The negotiated speed (in Mbits per seconds) the device is |
| operating at. |
| max_speed: The maximum speed (in Mbits per seconds) of which the device is |
| capable. |
| """ |
| atest_utils.colorful_print( |
| f'Warning: The {device_ids.manufacturer} {device_ids.model} device (' |
| f'{device_ids.name}) with address {device_ids.address} and serial ' |
| f'{device_ids.serial} is using ' |
| f'{_speed_to_string(negotiated_speed)} while ' |
| f'{_speed_to_string(max_speed)} capable. Check the USB cables/hubs.', |
| constants.MAGENTA, |
| ) |
| |
| |
| def _speed_to_string(speed: int) -> str: |
| """Converts a speed in Mbps to a string.""" |
| return { |
| 480: 'USB-2 (480 Mbps)', |
| 5000: 'USB-3.0 (5,000 Mbps)', |
| 10000: 'USB-3.1 (10,000 Mbps)', |
| 20000: 'USB-3.2 (20,000 Mbps)', |
| 40000: 'USB-4.0 (40,000 Mbps)', |
| }.get(speed, f'{speed:,} Mbps') |
| |
| |
| def _string_to_speed(speed_str: str) -> int: |
| return { |
| 'UNKNOWN': 0, |
| 'high-speed': 480, |
| 'super-speed': 5000, |
| 'super-speed-plus': 10000, |
| }.get(speed_str, 0) |
| |
| |
| def get_udc_driver_usb_device_dir_name() -> str: |
| """Reads the directory where the usb devices attributes are stored. |
| |
| Returns: |
| A string corresponding to the directory name. |
| """ |
| return _adb_read_file('/config/usb_gadget/g1/UDC') |
| |
| |
| def get_udc_driver_usb_device_attribute_speed_value( |
| speed_dir_name: str, |
| attr_name: UsbAttributeName, |
| ) -> int: |
| """Reads the usb speed string from the device and returns the numeric speed. |
| |
| Args: |
| speed_dir_name: name of the directory where the usb driver attributes are |
| located. |
| attr_name: The attribute to read from the device. |
| |
| Returns: |
| An int corresponding to the numeric speed value converted from the udc |
| driver attribute value. 0 is returned if adb is unable to read the value. |
| """ |
| speed_reading = _adb_read_file( |
| '/sys/class/udc/' + speed_dir_name + '/' + attr_name.value |
| ) |
| return _string_to_speed(speed_reading) |
| |
| |
| def _adb_read_file(file_path: str) -> str: |
| cmd = [ |
| 'adb', |
| 'shell', |
| 'su', |
| '0', |
| f'cat {file_path}', |
| ] |
| try: |
| logging.debug('Running command: %s', cmd) |
| result = subprocess.check_output( |
| cmd, |
| encoding='utf-8', |
| stderr=subprocess.STDOUT, |
| ) |
| return result.strip() |
| except subprocess.CalledProcessError as cpe: |
| logging.debug( |
| f'Cannot read directory; USB speed will not be read. Error: %s', cpe |
| ) |
| except OSError as ose: |
| logging.debug(f'Cannot read usb speed from the device. Error: %s', ose) |
| return '' |
| |
| |
| def get_adb_device_identifiers() -> DeviceIds | None: |
| """Fetch the user-facing device identifiers.""" |
| if not atest_utils.has_command('adb'): |
| return None |
| |
| device_serial = _adb_run_cmd(['adb', 'shell', 'getprop', 'ro.serialno']) |
| if not device_serial: |
| return None |
| |
| device_address_resp = _adb_run_cmd(['adb', 'devices']) |
| try: |
| device_addresses = device_address_resp.splitlines() |
| for line in device_addresses: |
| if 'device' in line: |
| device_address = line.split()[0].strip() |
| except IndexError: |
| logging.debug('No devices are connected. USB speed will not be read.') |
| return None |
| |
| device_manufacturer = _adb_run_cmd( |
| ['adb', 'shell', 'getprop', 'ro.product.manufacturer'] |
| ) |
| device_model = _adb_run_cmd(['adb', 'shell', 'getprop', 'ro.product.model']) |
| device_name = _adb_run_cmd(['adb', 'shell', 'getprop', 'ro.product.name']) |
| |
| return DeviceIds( |
| manufacturer=device_manufacturer, |
| model=device_model, |
| name=device_name, |
| serial=device_serial, |
| address=device_address, |
| ) |
| |
| |
| def _adb_run_cmd(cmd: list[str]) -> str: |
| try: |
| logging.debug(f'Running command: %s.', cmd) |
| result = subprocess.check_output( |
| cmd, |
| encoding='utf-8', |
| stderr=subprocess.STDOUT, |
| ) |
| return result.strip() if result else '' |
| except subprocess.CalledProcessError: |
| logging.debug( |
| 'Exception raised while running `%s`. USB speed will not be read.', cmd |
| ) |
| except OSError: |
| logging.debug('Could not find adb. USB speed will not be read.') |
| return '' |