| # Copyright 2022 - The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the 'License'); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an 'AS IS' BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| import re |
| import logging |
| import os |
| import paramiko |
| import socket |
| import time |
| |
| from acts.controllers.cellular_simulator import AbstractCellularSimulator |
| |
| class SocketWrapper(): |
| """A wrapper for socket communicate with test equipment. |
| |
| Attributes: |
| _socket: a socket object. |
| _ip: a string value for ip address |
| which we want to connect. |
| _port: an integer for port |
| which we want to connect. |
| _connecting_timeout: an integer for socket connecting timeout. |
| _encode_format: a string specify encoding format. |
| _cmd_terminator: a character indicates the end of command/data |
| which need to be sent. |
| """ |
| |
| def __init__(self, ip, port, |
| connecting_timeout=120, |
| cmd_terminator='\n', |
| encode_format='utf-8', |
| buff_size=1024): |
| self._socket = None |
| self._ip = ip |
| self._port = port |
| self._connecting_timeout = connecting_timeout |
| self._cmd_terminator = cmd_terminator |
| self._encode_format = encode_format |
| self._buff_size = buff_size |
| self._logger = logging.getLogger(__name__) |
| |
| def _connect(self): |
| self._socket = socket.create_connection( |
| (self._ip, self._port), timeout=self._connecting_timeout |
| ) |
| |
| def send_command(self, cmd: str): |
| if not self._socket: |
| self._connect() |
| if cmd and cmd[-1] != self._cmd_terminator: |
| cmd = cmd + self._cmd_terminator |
| self._socket.sendall(cmd.encode(self._encode_format)) |
| |
| def send_command_recv(self, cmd: str) -> str: |
| """Send data and wait for response |
| |
| Args: |
| cmd: a string command to be sent. |
| |
| Returns: |
| a string response. |
| """ |
| self.send_command(cmd) |
| response = '' |
| try: |
| response = self._socket.recv(self._buff_size).decode( |
| self._encode_format |
| ) |
| except socket.timeout as e: |
| self._logger.info('Socket timeout while receiving response.') |
| self.close() |
| raise |
| |
| return response |
| |
| def close(self): |
| self._socket.close() |
| self._socket = None |
| |
| class UXMCellularSimulator(AbstractCellularSimulator): |
| """A cellular simulator for UXM callbox.""" |
| |
| # Keys to obtain data from cell_info dictionary. |
| KEY_CELL_NUMBER = "cell_number" |
| KEY_CELL_TYPE = "cell_type" |
| |
| # UXM socket port |
| UXM_SOCKET_PORT = 5125 |
| |
| # UXM SCPI COMMAND |
| SCPI_IMPORT_STATUS_QUERY_CMD = 'SYSTem:SCPI:IMPort:STATus?' |
| SCPI_SYSTEM_ERROR_CHECK_CMD = 'SYST:ERR?\n' |
| SCPI_CHECK_CONNECTION_CMD = '*IDN?\n' |
| SCPI_DEREGISTER_UE_IMS = 'SYSTem:IMS:SERVer:UE:DERegister' |
| # require: path to SCPI file |
| SCPI_IMPORT_SCPI_FILE_CMD = 'SYSTem:SCPI:IMPort "{}"\n' |
| # require: 1. cell type (E.g. NR5G), 2. cell number (E.g CELL1) |
| SCPI_CELL_ON_CMD = 'BSE:CONFig:{}:{}:ACTive 1' |
| SCPI_CELL_OFF_CMD = 'BSE:CONFig:{}:{}:ACTive 0' |
| SCPI_GET_CELL_STATUS = 'BSE:STATus:{}:{}?' |
| SCPI_RRC_RELEASE_LTE_CMD = 'BSE:FUNCtion:{}:{}:RELease:SEND' |
| SCPI_RRC_RELEASE_NR_CMD = 'BSE:CONFig:{}:{}:RCONtrol:RRC:STARt RRELease' |
| # require cell number |
| SCPI_CREATE_DEDICATED_BEARER = 'BSE:FUNCtion:LTE:{}:NAS:EBID10:DEDicated:CREate' |
| SCPI_CHANGE_SIM_NR_CMD = 'BSE:CONFig:NR5G:CELL1:SECurity:AUTHenticate:KEY:TYPE {}' |
| SCPI_CHANGE_SIM_LTE_CMD = 'BSE:CONFig:LTE:SECurity:AUTHenticate:KEY {}' |
| SCPI_SETTINGS_PRESET_CMD = 'SYSTem:PRESet:FULL' |
| |
| # UXM's Test Application recovery |
| TA_BOOT_TIME = 100 |
| |
| # shh command |
| SSH_START_GUI_APP_CMD_FORMAT = 'psexec -s -d -i 1 "{exe_path}"' |
| SSH_CHECK_APP_RUNNING_CMD_FORMAT = 'tasklist | findstr /R {regex_app_name}' |
| SSH_KILL_PROCESS_BY_NAME = 'taskkill /IM {process_name} /F' |
| UXM_TEST_APP_NAME = 'TestApp.exe' |
| |
| # start process success regex |
| PSEXEC_PROC_STARTED_REGEX_FORMAT = 'started on * with process ID {proc_id}' |
| |
| # HCCU default value |
| HCCU_SOCKET_PORT = 4882 |
| # number of digit of the length of setup name |
| HCCU_SCPI_CHANGE_SETUP_CMD = ':SYSTem:SETup:CONFig #{number_of_digit}{setup_name_len}{setup_name}' |
| HCCU_SCPI_CHANGE_SCENARIO_CMD = ':SETup:SCENe "((NE_1, {scenario_name}))"' |
| HCCU_STATUS_CHECK_CMD = ':SETup:INSTrument:STATus? 0\n' |
| HCCU_FR2_SETUP_NAME = '{Name:"TSPC_1UXM5G_HF_2RRH_M1740A"}' |
| HCCU_FR1_SETUP_NAME = '{Name:"TSPC_1UXM5G_LF"}' |
| HCCU_GET_INSTRUMENT_COUNT_CMD = ':SETup:INSTrument:COUNt?' |
| HCCU_FR2_INSTRUMENT_COUNT = 5 |
| HCCU_FR1_INSTRUMENT_COUNT = 2 |
| HCCU_FR2_SCENARIO = 'NR_4DL2x2_2UL2x2_LTE_4CC' |
| HCCU_FR1_SCENARIO = 'NR_1DL4x4_1UL2x2_LTE_4CC' |
| |
| |
| def __init__(self, ip_address, custom_files,uxm_user, |
| ssh_private_key_to_uxm, ta_exe_path, ta_exe_name): |
| """Initializes the cellular simulator. |
| |
| Args: |
| ip_address: the ip address of host where Keysight Test Application (TA) |
| is installed. |
| custom_files: a list of file path for custom files. |
| uxm_user: username of host where Keysight TA resides. |
| ssh_private_key_to_uxm: private key for key based ssh to |
| host where Keysight TA resides. |
| ta_exe_path: path to TA exe. |
| ta_exe_name: name of TA exe. |
| """ |
| super().__init__() |
| self.custom_files = custom_files |
| self.rockbottom_script = None |
| self.cells = [] |
| self.uxm_ip = ip_address |
| self.uxm_user = uxm_user |
| self.ssh_private_key_to_uxm = os.path.expanduser( |
| ssh_private_key_to_uxm) |
| self.ta_exe_path = ta_exe_path |
| self.ta_exe_name = ta_exe_name |
| self.ssh_client = self.create_ssh_client() |
| |
| # get roclbottom file |
| for file in self.custom_files: |
| if 'rockbottom_' in file: |
| self.rockbottom_script = file |
| |
| # connect to Keysight Test Application via socket |
| self.recovery_ta() |
| self.socket = self._socket_connect(self.uxm_ip, self.UXM_SOCKET_PORT) |
| self.check_socket_connection() |
| self.timeout = 120 |
| |
| # hccu socket |
| self.hccu_socket_port = self.HCCU_SOCKET_PORT |
| self.hccu_socket = SocketWrapper(self.uxm_ip, self.hccu_socket_port) |
| |
| def socket_connect(self): |
| self.socket = self._socket_connect(self.uxm_ip, self.UXM_SOCKET_PORT) |
| |
| def switch_HCCU_scenario(self, scenario_name: str): |
| cmd = self.HCCU_SCPI_CHANGE_SCENARIO_CMD.format( |
| scenario_name=scenario_name) |
| self.hccu_socket.send_command(cmd) |
| self.log.debug(f'Sent command: {cmd}') |
| # this is require for the command to take effect |
| # because hccu's port need to be free. |
| self.hccu_socket.close() |
| |
| def switch_HCCU_setup(self, setup_name: str): |
| """Change HHCU system setup. |
| |
| Args: |
| setup_name: a string name |
| of the system setup will be changed to. |
| """ |
| setup_name_len = str(len(setup_name)) |
| number_of_digit = str(len(setup_name_len)) |
| cmd = self.HCCU_SCPI_CHANGE_SETUP_CMD.format( |
| number_of_digit=number_of_digit, |
| setup_name_len=setup_name_len, |
| setup_name=setup_name |
| ) |
| self.hccu_socket.send_command(cmd) |
| self.log.debug(f'Sent command: {cmd}') |
| # this is require for the command to take effect |
| # because hccu's port need to be free. |
| self.hccu_socket.close() |
| |
| def wait_until_hccu_operational(self, timeout=1200): |
| """ Wait for hccu is ready to operate for a specified timeout. |
| |
| Args: |
| timeout: time we are waiting for |
| hccu in opertional status. |
| |
| Returns: |
| True if HCCU status is operational within timeout. |
| False otherwise. |
| """ |
| # check status |
| self.log.info('Waiting for HCCU to ready to operate.') |
| cmd = self.HCCU_STATUS_CHECK_CMD |
| t = 0 |
| interval = 10 |
| while t < timeout: |
| response = self.hccu_socket.send_command_recv(cmd) |
| if response == 'OPER\n': |
| return True |
| time.sleep(interval) |
| t += interval |
| return False |
| |
| def switch_HCCU_settings(self, is_fr2: bool): |
| """Set HCCU setup configuration. |
| |
| HCCU stands for Hardware Configuration Control Utility, |
| an interface allows us to control Keysight Test Equipment. |
| |
| Args: |
| is_fr2: a bool value. |
| """ |
| # change HCCU configration |
| data = '' |
| scenario_name = '' |
| instrument_count_res = self.hccu_socket.send_command_recv( |
| self.HCCU_GET_INSTRUMENT_COUNT_CMD) |
| instrument_count = int(instrument_count_res) |
| # if hccu setup is correct, no need to change. |
| if is_fr2 and instrument_count == self.HCCU_FR2_INSTRUMENT_COUNT: |
| self.log.info('UXM has correct HCCU setup.') |
| return |
| if not is_fr2 and instrument_count == self.HCCU_FR1_INSTRUMENT_COUNT: |
| self.log.info('UXM has correct HCCU setup.') |
| return |
| |
| self.log.info('UXM has incorrect HCCU setup, start changing setup.') |
| # terminate TA and close socket |
| self.log.info('Terminate TA before switch HCCU settings.') |
| self.terminate_process(self.UXM_TEST_APP_NAME) |
| self.socket.close() |
| |
| # change hccu setup |
| if is_fr2: |
| data = self.HCCU_FR2_SETUP_NAME |
| scenario_name = self.HCCU_FR2_SCENARIO |
| else: |
| data = self.HCCU_FR1_SETUP_NAME |
| scenario_name = self.HCCU_FR1_SCENARIO |
| self.log.info('Switch HCCU setup.') |
| self.switch_HCCU_setup(data) |
| time.sleep(10) |
| if not self.wait_until_hccu_operational(): |
| raise RuntimeError('Fail to switch HCCU setup.') |
| |
| # change scenario |
| self.log.info('Ativate HCCU scenario.') |
| self.switch_HCCU_scenario(scenario_name) |
| time.sleep(40) |
| if not self.wait_until_hccu_operational(): |
| raise RuntimeError('Fail to switch HCCU scenario.') |
| |
| # start TA and reconnect socket. |
| self.recovery_ta() |
| self.socket = self._socket_connect(self.uxm_ip, self.UXM_SOCKET_PORT) |
| |
| def create_ssh_client(self): |
| """Create a ssh client to host.""" |
| ssh = paramiko.SSHClient() |
| ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) |
| mykey = paramiko.Ed25519Key.from_private_key_file( |
| self.ssh_private_key_to_uxm) |
| ssh.connect(hostname=self.uxm_ip, username=self.uxm_user, pkey=mykey) |
| self.log.info('SSH client to %s is connected' % self.uxm_ip) |
| return ssh |
| |
| def terminate_process(self, process_name): |
| cmd = self.SSH_KILL_PROCESS_BY_NAME.format( |
| process_name=process_name |
| ) |
| stdin, stdout, stderr = self.ssh_client.exec_command(cmd) |
| stdin.close() |
| err = ''.join(stderr.readlines()) |
| out = ''.join(stdout.readlines()) |
| final_output = str(out) + str(err) |
| self.log.info(final_output) |
| return out |
| |
| def is_ta_running(self): |
| is_running_cmd = self.SSH_CHECK_APP_RUNNING_CMD_FORMAT.format( |
| regex_app_name=self.ta_exe_name) |
| stdin, stdout, stderr = self.ssh_client.exec_command(is_running_cmd) |
| stdin.close() |
| err = ''.join(stderr.readlines()) |
| out = ''.join(stdout.readlines()) |
| final_output = str(out) + str(err) |
| self.log.info(final_output) |
| return (out != '' and err == '') |
| |
| def _start_test_app(self): |
| """Start Test Application on Windows.""" |
| # start GUI exe via ssh |
| start_app_cmd = self.SSH_START_GUI_APP_CMD_FORMAT.format( |
| exe_path=self.ta_exe_path) |
| stdin, stdout, stderr = self.ssh_client.exec_command(start_app_cmd) |
| self.log.info(f'Command sent to {self.uxm_ip}: {start_app_cmd}') |
| stdin.close() |
| err = ''.join(stderr.readlines()) |
| out = ''.join(stdout.readlines()) |
| # psexec return process ID as part of the exit code |
| exit_status = stderr.channel.recv_exit_status() |
| is_started = re.search( |
| self.PSEXEC_PROC_STARTED_REGEX_FORMAT.format(proc_id=exit_status), |
| err[-1]) |
| if is_started: |
| raise RuntimeError('Fail to start TA: ' + out + err) |
| # wait for ta completely boot up |
| self.log.info('TA is starting') |
| time.sleep(self.TA_BOOT_TIME) |
| |
| def recovery_ta(self): |
| """Start TA if it is not running.""" |
| if not self.is_ta_running(): |
| self._start_test_app() |
| # checking if ta booting process complete |
| # by checking socket connection |
| s = None |
| retries = 12 |
| for _ in range(retries): |
| try: |
| s = self._socket_connect(self.uxm_ip, self.UXM_SOCKET_PORT) |
| s.close() |
| return |
| except ConnectionRefusedError as cre: |
| self.log.info( |
| 'Connection refused, wait 10s for TA to boot') |
| time.sleep(10) |
| raise RuntimeError('TA does not start on time') |
| |
| def set_rockbottom_script_path(self, path): |
| """Set path to rockbottom script. |
| |
| Args: |
| path: path to rockbottom script. |
| """ |
| self.rockbottom_script = path |
| |
| def set_cell_info(self, cell_info): |
| """Set type and number for multiple cells. |
| |
| Args: |
| cell_info: list of dictionaries, |
| each dictionary contain cell type |
| and cell number for each cell |
| that the simulator need to control. |
| """ |
| if not cell_info: |
| raise ValueError('Missing cell info from configurations file') |
| self.cells = cell_info |
| |
| def deregister_ue_ims(self): |
| """Remove UE IMS profile from UXM.""" |
| self._socket_send_SCPI_command( |
| self.SCPI_DEREGISTER_UE_IMS) |
| |
| def create_dedicated_bearer(self): |
| """Create a dedicated bearer setup for ims call. |
| |
| After UE connected and register on UXM IMS tab. |
| It is required to create a dedicated bearer setup |
| with EPS bearer ID 10. |
| """ |
| cell_number = self.cells[0][self.KEY_CELL_NUMBER] |
| self._socket_send_SCPI_command( |
| self.SCPI_CREATE_DEDICATED_BEARER.format(cell_number)) |
| |
| def turn_cell_on(self, cell_type, cell_number): |
| """Turn UXM's cell on. |
| |
| Args: |
| cell_type: type of cell (e.g NR5G, LTE). |
| cell_number: ordinal number of a cell. |
| """ |
| if cell_type and cell_number: |
| self._socket_send_SCPI_command( |
| self.SCPI_CELL_ON_CMD.format(cell_type, cell_number)) |
| else: |
| raise ValueError('Invalid cell info\n' + |
| f' cell type: {cell_type}\n' + |
| f' cell number: {cell_number}\n') |
| |
| def turn_cell_off(self, cell_type, cell_number): |
| """Turn UXM's cell off. |
| |
| Args: |
| cell_type: type of cell (e.g NR5G, LTE). |
| cell_number: ordinal number of a cell. |
| """ |
| if cell_type and cell_number: |
| self._socket_send_SCPI_command( |
| self.SCPI_CELL_OFF_CMD.format(cell_type, cell_number)) |
| else: |
| raise ValueError('Invalid cell info\n' + |
| f' cell type: {cell_type}\n' + |
| f' cell number: {cell_number}\n') |
| |
| def get_cell_status(self, cell_type, cell_number): |
| """Get status of cell. |
| |
| Args: |
| cell_type: type of cell (e.g NR5G, LTE). |
| cell_number: ordinal number of a cell. |
| """ |
| if not cell_type or not cell_number: |
| raise ValueError('Invalid cell with\n' + |
| f' cell type: {cell_type}\n' + |
| f' cell number: {cell_number}\n') |
| |
| return self._socket_send_SCPI_for_result_command( |
| self.SCPI_GET_CELL_STATUS.format(cell_type, cell_number)) |
| |
| def check_socket_connection(self): |
| """Check if the socket connection is established. |
| |
| Query the identification of the Keysight Test Application |
| we are trying to connect to. Empty response indicates |
| connection fail, and vice versa. |
| """ |
| self.socket.sendall(self.SCPI_CHECK_CONNECTION_CMD.encode()) |
| response = self.socket.recv(1024).decode() |
| if response: |
| self.log.info(f'Connected to: {response}') |
| else: |
| self.log.error('Fail to connect to callbox') |
| |
| def _socket_connect(self, host, port): |
| """Create socket connection. |
| |
| Args: |
| host: IP address of desktop where Keysight Test Application resides. |
| port: port that Keysight Test Application is listening for socket |
| communication. |
| Returns: |
| s: socket object. |
| """ |
| self.log.info('Establishing connection to callbox via socket') |
| s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| s.connect((host, port)) |
| return s |
| |
| def _socket_send_SCPI_command(self, command): |
| """Send SCPI command without expecting response. |
| |
| Args: |
| command: a string SCPI command. |
| """ |
| # make sure there is a line break for the socket to send command |
| command = command + '\n' |
| # send command |
| self.socket.sendall(command.encode()) |
| self.log.info(f'Sent {command}') |
| |
| def _socket_receive_SCPI_result(self): |
| """Receive response from socket. """ |
| i = 1 |
| response = '' |
| while i < self.timeout and not response: |
| response = self.socket.recv(1024).decode() |
| i += 1 |
| return response |
| |
| def _socket_send_SCPI_for_result_command(self, command): |
| """Send SCPI command and expecting response. |
| |
| Args: |
| command: a string SCPI command. |
| """ |
| self._socket_send_SCPI_command(command) |
| response = self._socket_receive_SCPI_result() |
| return response |
| |
| def check_system_error(self): |
| """Query system error from Keysight Test Application. |
| |
| Returns: |
| status: a message indicate the number of errors |
| and detail of errors if any. |
| a string `0,"No error"` indicates no error. |
| """ |
| status = self._socket_send_SCPI_for_result_command( |
| self.SCPI_SYSTEM_ERROR_CHECK_CMD) |
| self.log.info(f'System error status: {status}') |
| return status |
| |
| def import_configuration(self, path): |
| """Import SCPI config file. |
| |
| Args: |
| path: path to SCPI file. |
| """ |
| self._socket_send_SCPI_command( |
| self.SCPI_SETTINGS_PRESET_CMD) |
| time.sleep(10) |
| self._socket_send_SCPI_command( |
| self.SCPI_IMPORT_SCPI_FILE_CMD.format(path)) |
| time.sleep(45) |
| |
| def destroy(self): |
| """Close socket connection with UXM. """ |
| self.socket.close() |
| |
| def setup_lte_scenario(self, path): |
| """Configures the equipment for an LTE simulation. |
| |
| Args: |
| path: path to SCPI config file. |
| """ |
| self.import_configuration(path) |
| |
| def dut_rockbottom(self, dut): |
| """Set the dut to rockbottom state. |
| |
| Args: |
| dut: a CellularAndroid controller. |
| """ |
| # The rockbottom script might include a device reboot, so it is |
| # necessary to stop SL4A during its execution. |
| dut.ad.stop_services() |
| self.log.info('Executing rockbottom script for ' + dut.ad.model) |
| os.chmod(self.rockbottom_script, 0o777) |
| os.system('{} {}'.format(self.rockbottom_script, dut.ad.serial)) |
| # Make sure the DUT is in root mode after coming back |
| dut.ad.root_adb() |
| # Restart SL4A |
| dut.ad.start_services() |
| |
| def set_sim_type(self, is_3gpp_sim): |
| sim_type = 'KEYSight' |
| if is_3gpp_sim: |
| sim_type = 'TEST3GPP' |
| self._socket_send_SCPI_command( |
| self.SCPI_CHANGE_SIM_NR_CMD.format(sim_type)) |
| time.sleep(2) |
| self._socket_send_SCPI_command( |
| self.SCPI_CHANGE_SIM_LTE_CMD.format(sim_type)) |
| time.sleep(2) |
| |
| def wait_until_attached_one_cell(self, |
| cell_type, |
| cell_number, |
| dut, |
| wait_for_camp_interval, |
| attach_retries): |
| """Wait until connect to given UXM cell. |
| |
| After turn off airplane mode, sleep for |
| wait_for_camp_interval seconds for device to camp. |
| If not device is not connected after the wait, |
| either toggle airplane mode on/off or reboot device. |
| Args: |
| cell_type: type of cell |
| which we are trying to connect to. |
| cell_number: ordinal number of a cell |
| which we are trying to connect to. |
| dut: a AndroidCellular controller. |
| wait_for_camp_interval: sleep interval, |
| wait for device to camp. |
| attach_retries: number of retry |
| to wait for device |
| to connect to 1 basestation. |
| Raise: |
| RuntimeError: device unable to connect to cell. |
| """ |
| # airplane mode on |
| dut.toggle_airplane_mode(True) |
| time.sleep(5) |
| |
| # turn cell on |
| self.turn_cell_on(cell_type, cell_number) |
| time.sleep(5) |
| |
| interval = 10 |
| # waits for device to camp |
| for index in range(1, attach_retries): |
| count = 0 |
| # airplane mode off |
| dut.toggle_airplane_mode(False) |
| time.sleep(5) |
| # check connection in small interval |
| while count < wait_for_camp_interval: |
| time.sleep(interval) |
| cell_state = self.get_cell_status(cell_type, cell_number) |
| self.log.info(f'cell state: {cell_state}') |
| if cell_state == 'CONN\n': |
| # wait for connection stable |
| time.sleep(15) |
| # check connection status again |
| cell_state = self.get_cell_status(cell_type, cell_number) |
| self.log.info(f'cell state: {cell_state}') |
| if cell_state == 'CONN\n': |
| return True |
| if cell_state == 'OFF\n': |
| self.turn_cell_on(cell_type, cell_number) |
| time.sleep(5) |
| count += interval |
| |
| # reboot device |
| if (index % 2) == 0: |
| dut.ad.reboot() |
| if self.rockbottom_script: |
| self.dut_rockbottom(dut) |
| else: |
| self.log.warning( |
| f'Rockbottom script was not executed after reboot.' |
| ) |
| # toggle APM and cell on/off |
| elif (index % 1) == 0: |
| # Toggle APM on |
| dut.toggle_airplane_mode(True) |
| time.sleep(5) |
| |
| # Toggle simulator cell |
| self.turn_cell_off(cell_type, cell_number) |
| time.sleep(5) |
| self.turn_cell_on(cell_type, cell_number) |
| time.sleep(5) |
| |
| # Toggle APM off |
| dut.toggle_airplane_mode(False) |
| time.sleep(5) |
| # increase length of small waiting interval |
| interval += 5 |
| |
| # Phone cannot connected to basestation of callbox |
| raise RuntimeError( |
| f'Phone was unable to connect to cell: {cell_type}-{cell_number}') |
| |
| def wait_until_attached(self, dut, timeout, attach_retries): |
| """Waits until the DUT is attached to all required cells. |
| |
| Args: |
| dut: a CellularAndroid controller. |
| timeout: sleep interval, |
| wait for device to camp in 1 try. |
| attach_retries: number of retry |
| to wait for device |
| to connect to 1 basestation. |
| """ |
| # get cell info |
| first_cell_type = self.cells[0][self.KEY_CELL_TYPE] |
| first_cell_number = self.cells[0][self.KEY_CELL_NUMBER] |
| if len(self.cells) == 2: |
| second_cell_type = self.cells[1][self.KEY_CELL_TYPE] |
| second_cell_number = self.cells[1][self.KEY_CELL_NUMBER] |
| |
| # connect to 1st cell |
| self.wait_until_attached_one_cell(first_cell_type, |
| first_cell_number, dut, timeout, |
| attach_retries) |
| |
| # aggregation to NR |
| if len(self.cells) == 2: |
| self.turn_cell_on( |
| second_cell_type, |
| second_cell_number, |
| ) |
| |
| for _ in range(1, attach_retries): |
| self.log.info('Try to aggregate to NR.') |
| self._socket_send_SCPI_command( |
| 'BSE:CONFig:LTE:CELL1:CAGGregation:AGGRegate:NRCC:DL None') |
| self._socket_send_SCPI_command( |
| 'BSE:CONFig:LTE:CELL1:CAGGregation:AGGRegate:NRCC:UL None') |
| self._socket_send_SCPI_command( |
| 'BSE:CONFig:LTE:CELL1:CAGGregation:AGGRegate:NRCC:DL CELL1') |
| self._socket_send_SCPI_command( |
| 'BSE:CONFig:LTE:CELL1:CAGGregation:AGGRegate:NRCC:DL CELL1') |
| time.sleep(1) |
| self._socket_send_SCPI_command( |
| "BSE:CONFig:LTE:CELL1:CAGGregation:AGGRegate:NRCC:APPly") |
| # wait for status stable |
| time.sleep(10) |
| cell_state = self.get_cell_status(second_cell_type, second_cell_number) |
| self.log.info(f'cell state: {cell_state}') |
| if cell_state == 'CONN\n': |
| return |
| else: |
| self.turn_cell_off(second_cell_type, second_cell_number) |
| # wait for LTE cell to connect again |
| self.wait_until_attached_one_cell(first_cell_type, |
| first_cell_number, dut, 120, |
| 2) |
| |
| raise RuntimeError(f'Fail to aggregate to NR from LTE.') |
| |
| def set_lte_rrc_state_change_timer(self, enabled, time=10): |
| """Configures the LTE RRC state change timer. |
| |
| Args: |
| enabled: a boolean indicating if the timer should be on or off. |
| time: time in seconds for the timer to expire. |
| """ |
| raise NotImplementedError( |
| 'This UXM callbox simulator does not support this feature.') |
| |
| def set_band(self, bts_index, band): |
| """Sets the band for the indicated base station. |
| |
| Args: |
| bts_index: the base station number. |
| band: the new band. |
| """ |
| raise NotImplementedError( |
| 'This UXM callbox simulator does not support this feature.') |
| |
| def get_duplex_mode(self, band): |
| """Determines if the band uses FDD or TDD duplex mode |
| |
| Args: |
| band: a band number. |
| |
| Returns: |
| an variable of class DuplexMode indicating if band is FDD or TDD. |
| """ |
| raise NotImplementedError( |
| 'This UXM callbox simulator does not support this feature.') |
| |
| def set_input_power(self, bts_index, input_power): |
| """Sets the input power for the indicated base station. |
| |
| Args: |
| bts_index: the base station number. |
| input_power: the new input power. |
| """ |
| raise NotImplementedError( |
| 'This UXM callbox simulator does not support this feature.') |
| |
| def set_output_power(self, bts_index, output_power): |
| """Sets the output power for the indicated base station. |
| |
| Args: |
| bts_index: the base station number. |
| output_power: the new output power. |
| """ |
| raise NotImplementedError( |
| 'This UXM callbox simulator does not support this feature.') |
| |
| def set_tdd_config(self, bts_index, tdd_config): |
| """Sets the tdd configuration number for the indicated base station. |
| |
| Args: |
| bts_index: the base station number. |
| tdd_config: the new tdd configuration number. |
| """ |
| raise NotImplementedError( |
| 'This UXM callbox simulator does not support this feature.') |
| |
| def set_ssf_config(self, bts_index, ssf_config): |
| """Sets the Special Sub-Frame config number for the indicated. |
| |
| base station. |
| |
| Args: |
| bts_index: the base station number. |
| ssf_config: the new ssf config number. |
| """ |
| raise NotImplementedError( |
| 'This UXM callbox simulator does not support this feature.') |
| |
| def set_bandwidth(self, bts_index, bandwidth): |
| """Sets the bandwidth for the indicated base station. |
| |
| Args: |
| bts_index: the base station number |
| bandwidth: the new bandwidth |
| """ |
| raise NotImplementedError( |
| 'This UXM callbox simulator does not support this feature.') |
| |
| def set_downlink_channel_number(self, bts_index, channel_number): |
| """Sets the downlink channel number for the indicated base station. |
| |
| Args: |
| bts_index: the base station number. |
| channel_number: the new channel number. |
| """ |
| raise NotImplementedError( |
| 'This UXM callbox simulator does not support this feature.') |
| |
| def set_mimo_mode(self, bts_index, mimo_mode): |
| """Sets the mimo mode for the indicated base station. |
| |
| Args: |
| bts_index: the base station number |
| mimo_mode: the new mimo mode |
| """ |
| raise NotImplementedError( |
| 'This UXM callbox simulator does not support this feature.') |
| |
| def set_transmission_mode(self, bts_index, tmode): |
| """Sets the transmission mode for the indicated base station. |
| |
| Args: |
| bts_index: the base station number. |
| tmode: the new transmission mode. |
| """ |
| raise NotImplementedError( |
| 'This UXM callbox simulator does not support this feature.') |
| |
| def set_scheduling_mode(self, |
| bts_index, |
| scheduling, |
| mcs_dl=None, |
| mcs_ul=None, |
| nrb_dl=None, |
| nrb_ul=None): |
| """Sets the scheduling mode for the indicated base station. |
| |
| Args: |
| bts_index: the base station number. |
| scheduling: the new scheduling mode. |
| mcs_dl: Downlink MCS. |
| mcs_ul: Uplink MCS. |
| nrb_dl: Number of RBs for downlink. |
| nrb_ul: Number of RBs for uplink. |
| """ |
| raise NotImplementedError( |
| 'This UXM callbox simulator does not support this feature.') |
| |
| def set_dl_256_qam_enabled(self, bts_index, enabled): |
| """Determines what MCS table should be used for the downlink. |
| |
| This only saves the setting that will be used when configuring MCS. |
| |
| Args: |
| bts_index: the base station number. |
| enabled: whether 256 QAM should be used. |
| """ |
| raise NotImplementedError( |
| 'This UXM callbox simulator does not support this feature.') |
| |
| def set_ul_64_qam_enabled(self, bts_index, enabled): |
| """Determines what MCS table should be used for the uplink. |
| |
| This only saves the setting that will be used when configuring MCS. |
| |
| Args: |
| bts_index: the base station number. |
| enabled: whether 64 QAM should be used. |
| """ |
| raise NotImplementedError( |
| 'This UXM callbox simulator does not support this feature.') |
| |
| def set_mac_padding(self, bts_index, mac_padding): |
| """Enables or disables MAC padding in the indicated base station. |
| |
| Args: |
| bts_index: the base station number. |
| mac_padding: the new MAC padding setting. |
| """ |
| raise NotImplementedError( |
| 'This UXM callbox simulator does not support this feature.') |
| |
| def set_cfi(self, bts_index, cfi): |
| """Sets the Channel Format Indicator for the indicated base station. |
| |
| Args: |
| bts_index: the base station number. |
| cfi: the new CFI setting. |
| """ |
| raise NotImplementedError( |
| 'This UXM callbox simulator does not support this feature.') |
| |
| def set_paging_cycle(self, bts_index, cycle_duration): |
| """Sets the paging cycle duration for the indicated base station. |
| |
| Args: |
| bts_index: the base station number. |
| cycle_duration: the new paging cycle duration in milliseconds. |
| """ |
| raise NotImplementedError( |
| 'This UXM callbox simulator does not support this feature.') |
| |
| def set_phich_resource(self, bts_index, phich): |
| """Sets the PHICH Resource setting for the indicated base station. |
| |
| Args: |
| bts_index: the base station number. |
| phich: the new PHICH resource setting. |
| """ |
| raise NotImplementedError( |
| 'This UXM callbox simulator does not support this feature.') |
| |
| def lte_attach_secondary_carriers(self, ue_capability_enquiry): |
| """Activates the secondary carriers for CA. |
| |
| Requires the DUT to be attached to the primary carrier first. |
| |
| Args: |
| ue_capability_enquiry: UE capability enquiry message to be sent to |
| the UE before starting carrier aggregation. |
| """ |
| raise NotImplementedError( |
| 'This UXM callbox simulator does not support this feature.') |
| |
| def wait_until_communication_state(self, timeout=120): |
| """Waits until the DUT is in Communication state. |
| |
| Args: |
| timeout: after this amount of time the method will raise |
| a CellularSimulatorError exception. Default is 120 seconds. |
| """ |
| raise NotImplementedError( |
| 'This UXM callbox simulator does not support this feature.') |
| |
| def wait_until_idle_state(self, timeout=120): |
| """Waits until the DUT is in Idle state. |
| |
| Args: |
| timeout: after this amount of time the method will raise a |
| CellularSimulatorError exception. Default is 120 seconds. |
| """ |
| # turn on RRC release |
| cell_type = self.cells[0][self.KEY_CELL_TYPE] |
| cell_number = self.cells[0][self.KEY_CELL_NUMBER] |
| |
| # choose cmd base on cell type |
| cmd = None |
| if cell_type == 'LTE': |
| cmd = self.SCPI_RRC_RELEASE_LTE_CMD |
| else: |
| cmd = self.SCPI_RRC_RELEASE_NR_CMD |
| |
| if not cmd: |
| raise RuntimeError(f'Cell type [{cell_type}] is not supporting IDLE.') |
| |
| # checking status |
| self.log.info('Wait for IDLE state.') |
| for _ in range(5): |
| cell_state = self.get_cell_status(cell_type, cell_number) |
| self.log.info(f'cell state: {cell_state}') |
| if cell_state == 'CONN\n': |
| # RRC release |
| self._socket_send_SCPI_command(cmd.format(cell_type, cell_number)) |
| # wait for status stable |
| time.sleep(60) |
| elif cell_state == 'IDLE\n': |
| return |
| |
| raise RuntimeError('RRC release fail.') |
| |
| def detach(self): |
| """ Turns off all the base stations so the DUT loose connection.""" |
| for cell in self.cells: |
| cell_type = cell[self.KEY_CELL_TYPE] |
| cell_number = cell[self.KEY_CELL_NUMBER] |
| self.turn_cell_off(cell_type, cell_number) |
| time.sleep(5) |
| |
| def stop(self): |
| """Stops current simulation. |
| |
| After calling this method, the simulator will need to be set up again. |
| """ |
| raise NotImplementedError( |
| 'This UXM callbox simulator does not support this feature.') |
| |
| def start_data_traffic(self): |
| """Starts transmitting data from the instrument to the DUT. """ |
| raise NotImplementedError( |
| 'This UXM callbox simulator does not support this feature.') |
| |
| def stop_data_traffic(self): |
| """Stops transmitting data from the instrument to the DUT. """ |
| raise NotImplementedError( |
| 'This UXM callbox simulator does not support this feature.') |