| #!/usr/bin/env python3 |
| # |
| # Copyright 2019 - The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the 'License'); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an 'AS IS' BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| from acts import logger |
| from acts.controllers import cellular_lib |
| |
| |
| class AbstractCellularSimulator: |
| """ A generic cellular simulator controller class that can be derived to |
| implement equipment specific classes and allows the tests to be implemented |
| without depending on a singular instrument model. |
| |
| This class defines the interface that every cellular simulator controller |
| needs to implement and shouldn't be instantiated by itself. """ |
| |
| # The maximum number of carriers that this simulator can support for LTE |
| LTE_MAX_CARRIERS = None |
| |
| # The maximum power that the equipment is able to transmit |
| MAX_DL_POWER = None |
| |
| def __init__(self): |
| """ Initializes the cellular simulator. """ |
| self.log = logger.create_tagged_trace_logger('CellularSimulator') |
| self.num_carriers = None |
| |
| def destroy(self): |
| """ Sends finalization commands to the cellular equipment and closes |
| the connection. """ |
| raise NotImplementedError() |
| |
| def setup_lte_scenario(self): |
| """ Configures the equipment for an LTE simulation. """ |
| raise NotImplementedError() |
| |
| def set_band_combination(self, bands): |
| """ Prepares the test equipment for the indicated CA combination. |
| |
| Args: |
| bands: a list of bands represented as ints or strings |
| """ |
| raise NotImplementedError() |
| |
| def configure_bts(self, config, bts_index=0): |
| """ Commands the equipment to setup a base station with the required |
| configuration. This method applies configurations that are common to all |
| RATs. |
| |
| Args: |
| config: a BaseSimulation.BtsConfig object. |
| bts_index: the base station number. |
| """ |
| |
| if config.output_power: |
| self.set_output_power(bts_index, config.output_power) |
| |
| if config.input_power: |
| self.set_input_power(bts_index, config.input_power) |
| |
| if isinstance(config, cellular_lib.LteCellConfig.LteCellConfig): |
| self.configure_lte_bts(config, bts_index) |
| |
| def configure_lte_bts(self, config, bts_index=0): |
| """ Commands the equipment to setup an LTE base station with the |
| required configuration. |
| |
| Args: |
| config: an LteSimulation.BtsConfig object. |
| bts_index: the base station number. |
| """ |
| if config.band: |
| self.set_band(bts_index, config.band) |
| |
| if config.dlul_config: |
| self.set_tdd_config(bts_index, config.dlul_config) |
| |
| if config.ssf_config: |
| self.set_ssf_config(bts_index, config.ssf_config) |
| |
| if config.bandwidth: |
| self.set_bandwidth(bts_index, config.bandwidth) |
| |
| if config.dl_channel: |
| self.set_downlink_channel_number(bts_index, config.dl_channel) |
| |
| if config.mimo_mode: |
| self.set_mimo_mode(bts_index, config.mimo_mode) |
| |
| if config.transmission_mode: |
| self.set_transmission_mode(bts_index, config.transmission_mode) |
| |
| # Modulation order should be set before set_scheduling_mode being |
| # called. |
| if config.dl_256_qam_enabled is not None: |
| self.set_dl_256_qam_enabled(bts_index, config.dl_256_qam_enabled) |
| |
| if config.ul_64_qam_enabled is not None: |
| self.set_ul_64_qam_enabled(bts_index, config.ul_64_qam_enabled) |
| |
| if config.scheduling_mode: |
| |
| if (config.scheduling_mode == |
| cellular_lib.LteSimulation.SchedulingMode.STATIC |
| and not (config.dl_rbs and config.ul_rbs and config.dl_mcs |
| and config.ul_mcs)): |
| raise ValueError('When the scheduling mode is set to manual, ' |
| 'the RB and MCS parameters are required.') |
| |
| # If scheduling mode is set to Dynamic, the RB and MCS parameters |
| # will be ignored by set_scheduling_mode. |
| self.set_scheduling_mode(bts_index, config.scheduling_mode, |
| config.dl_mcs, config.ul_mcs, |
| config.dl_rbs, config.ul_rbs) |
| |
| # This variable stores a boolean value so the following is needed to |
| # differentiate False from None |
| if config.mac_padding is not None: |
| self.set_mac_padding(bts_index, config.mac_padding) |
| |
| if config.cfi: |
| self.set_cfi(bts_index, config.cfi) |
| |
| if config.paging_cycle: |
| self.set_paging_cycle(bts_index, config.paging_cycle) |
| |
| if config.phich: |
| self.set_phich_resource(bts_index, config.phich) |
| |
| if config.drx_connected_mode: |
| self.set_drx_connected_mode(bts_index, config.drx_connected_mode) |
| |
| if config.drx_on_duration_timer: |
| self.set_drx_on_duration_timer(bts_index, |
| config.drx_on_duration_timer) |
| |
| if config.drx_inactivity_timer: |
| self.set_drx_inactivity_timer(bts_index, |
| config.drx_inactivity_timer) |
| |
| if config.drx_retransmission_timer: |
| self.set_drx_retransmission_timer( |
| bts_index, config.drx_retransmission_timer) |
| |
| if config.drx_long_cycle: |
| self.set_drx_long_cycle(bts_index, config.drx_long_cycle) |
| |
| if config.drx_long_cycle_offset is not None: |
| self.set_drx_long_cycle_offset(bts_index, |
| config.drx_long_cycle_offset) |
| |
| def set_lte_rrc_state_change_timer(self, enabled, time=10): |
| """ Configures the LTE RRC state change timer. |
| |
| Args: |
| enabled: a boolean indicating if the timer should be on or off. |
| time: time in seconds for the timer to expire |
| """ |
| raise NotImplementedError() |
| |
| def set_band(self, bts_index, band): |
| """ Sets the band for the indicated base station. |
| |
| Args: |
| bts_index: the base station number |
| band: the new band |
| """ |
| raise NotImplementedError() |
| |
| def set_input_power(self, bts_index, input_power): |
| """ Sets the input power for the indicated base station. |
| |
| Args: |
| bts_index: the base station number |
| input_power: the new input power |
| """ |
| raise NotImplementedError() |
| |
| def set_output_power(self, bts_index, output_power): |
| """ Sets the output power for the indicated base station. |
| |
| Args: |
| bts_index: the base station number |
| output_power: the new output power |
| """ |
| raise NotImplementedError() |
| |
| def set_tdd_config(self, bts_index, tdd_config): |
| """ Sets the tdd configuration number for the indicated base station. |
| |
| Args: |
| bts_index: the base station number |
| tdd_config: the new tdd configuration number |
| """ |
| raise NotImplementedError() |
| |
| def set_ssf_config(self, bts_index, ssf_config): |
| """ Sets the Special Sub-Frame config number for the indicated |
| base station. |
| |
| Args: |
| bts_index: the base station number |
| ssf_config: the new ssf config number |
| """ |
| raise NotImplementedError() |
| |
| def set_bandwidth(self, bts_index, bandwidth): |
| """ Sets the bandwidth for the indicated base station. |
| |
| Args: |
| bts_index: the base station number |
| bandwidth: the new bandwidth |
| """ |
| raise NotImplementedError() |
| |
| def set_downlink_channel_number(self, bts_index, channel_number): |
| """ Sets the downlink channel number for the indicated base station. |
| |
| Args: |
| bts_index: the base station number |
| channel_number: the new channel number |
| """ |
| raise NotImplementedError() |
| |
| def set_mimo_mode(self, bts_index, mimo_mode): |
| """ Sets the mimo mode for the indicated base station. |
| |
| Args: |
| bts_index: the base station number |
| mimo_mode: the new mimo mode |
| """ |
| raise NotImplementedError() |
| |
| def set_transmission_mode(self, bts_index, transmission_mode): |
| """ Sets the transmission mode for the indicated base station. |
| |
| Args: |
| bts_index: the base station number |
| transmission_mode: the new transmission mode |
| """ |
| raise NotImplementedError() |
| |
| def set_scheduling_mode(self, bts_index, scheduling_mode, mcs_dl, mcs_ul, |
| nrb_dl, nrb_ul): |
| """ Sets the scheduling mode for the indicated base station. |
| |
| Args: |
| bts_index: the base station number |
| scheduling_mode: the new scheduling mode |
| mcs_dl: Downlink MCS (only for STATIC scheduling) |
| mcs_ul: Uplink MCS (only for STATIC scheduling) |
| nrb_dl: Number of RBs for downlink (only for STATIC scheduling) |
| nrb_ul: Number of RBs for uplink (only for STATIC scheduling) |
| """ |
| raise NotImplementedError() |
| |
| def set_dl_256_qam_enabled(self, bts_index, enabled): |
| """ Determines what MCS table should be used for the downlink. |
| |
| Args: |
| bts_index: the base station number |
| enabled: whether 256 QAM should be used |
| """ |
| raise NotImplementedError() |
| |
| def set_ul_64_qam_enabled(self, bts_index, enabled): |
| """ Determines what MCS table should be used for the uplink. |
| |
| Args: |
| bts_index: the base station number |
| enabled: whether 64 QAM should be used |
| """ |
| raise NotImplementedError() |
| |
| def set_mac_padding(self, bts_index, mac_padding): |
| """ Enables or disables MAC padding in the indicated base station. |
| |
| Args: |
| bts_index: the base station number |
| mac_padding: the new MAC padding setting |
| """ |
| raise NotImplementedError() |
| |
| def set_cfi(self, bts_index, cfi): |
| """ Sets the Channel Format Indicator for the indicated base station. |
| |
| Args: |
| bts_index: the base station number |
| cfi: the new CFI setting |
| """ |
| raise NotImplementedError() |
| |
| def set_paging_cycle(self, bts_index, cycle_duration): |
| """ Sets the paging cycle duration for the indicated base station. |
| |
| Args: |
| bts_index: the base station number |
| cycle_duration: the new paging cycle duration in milliseconds |
| """ |
| raise NotImplementedError() |
| |
| def set_phich_resource(self, bts_index, phich): |
| """ Sets the PHICH Resource setting for the indicated base station. |
| |
| Args: |
| bts_index: the base station number |
| phich: the new PHICH resource setting |
| """ |
| raise NotImplementedError() |
| |
| def set_drx_connected_mode(self, bts_index, active): |
| """ Sets the time interval to wait before entering DRX mode |
| |
| Args: |
| bts_index: the base station number |
| active: Boolean indicating whether cDRX mode |
| is active |
| """ |
| raise NotImplementedError() |
| |
| def set_drx_on_duration_timer(self, bts_index, timer): |
| """ Sets the amount of PDCCH subframes to wait for data after |
| waking up from a DRX cycle |
| |
| Args: |
| bts_index: the base station number |
| timer: Number of PDCCH subframes to wait and check for user data |
| after waking from the DRX cycle |
| """ |
| raise NotImplementedError() |
| |
| def set_drx_inactivity_timer(self, bts_index, timer): |
| """ Sets the number of PDCCH subframes to wait before entering DRX mode |
| |
| Args: |
| bts_index: the base station number |
| timer: The amount of time to wait before entering DRX mode |
| """ |
| raise NotImplementedError() |
| |
| def set_drx_retransmission_timer(self, bts_index, timer): |
| """ Sets the number of consecutive PDCCH subframes to wait |
| for retransmission |
| |
| Args: |
| bts_index: the base station number |
| timer: Number of PDCCH subframes to remain active |
| |
| """ |
| raise NotImplementedError() |
| |
| def set_drx_long_cycle(self, bts_index, cycle): |
| """ Sets the amount of subframes representing a DRX long cycle. |
| |
| Args: |
| bts_index: the base station number |
| cycle: The amount of subframes representing one long DRX cycle. |
| One cycle consists of DRX sleep + DRX on duration |
| """ |
| raise NotImplementedError() |
| |
| def set_drx_long_cycle_offset(self, bts_index, offset): |
| """ Sets the offset used to determine the subframe number |
| to begin the long drx cycle |
| |
| Args: |
| bts_index: the base station number |
| offset: Number in range 0 to (long cycle - 1) |
| """ |
| raise NotImplementedError() |
| |
| def lte_attach_secondary_carriers(self, ue_capability_enquiry): |
| """ Activates the secondary carriers for CA. Requires the DUT to be |
| attached to the primary carrier first. |
| |
| Args: |
| ue_capability_enquiry: UE capability enquiry message to be sent to |
| the UE before starting carrier aggregation. |
| """ |
| raise NotImplementedError() |
| |
| def wait_until_attached(self, timeout=120): |
| """ Waits until the DUT is attached to the primary carrier. |
| |
| Args: |
| timeout: after this amount of time the method will raise a |
| CellularSimulatorError exception. Default is 120 seconds. |
| """ |
| raise NotImplementedError() |
| |
| def wait_until_communication_state(self, timeout=120): |
| """ Waits until the DUT is in Communication state. |
| |
| Args: |
| timeout: after this amount of time the method will raise a |
| CellularSimulatorError exception. Default is 120 seconds. |
| """ |
| raise NotImplementedError() |
| |
| def wait_until_idle_state(self, timeout=120): |
| """ Waits until the DUT is in Idle state. |
| |
| Args: |
| timeout: after this amount of time the method will raise a |
| CellularSimulatorError exception. Default is 120 seconds. |
| """ |
| raise NotImplementedError() |
| |
| def detach(self): |
| """ Turns off all the base stations so the DUT loose connection.""" |
| raise NotImplementedError() |
| |
| def stop(self): |
| """ Stops current simulation. After calling this method, the simulator |
| will need to be set up again. """ |
| raise NotImplementedError() |
| |
| def start_data_traffic(self): |
| """ Starts transmitting data from the instrument to the DUT. """ |
| raise NotImplementedError() |
| |
| def stop_data_traffic(self): |
| """ Stops transmitting data from the instrument to the DUT. """ |
| raise NotImplementedError() |
| |
| def get_measured_pusch_power(self): |
| """ Queries PUSCH power measured at the callbox. |
| |
| Returns: |
| The PUSCH power in the primary input port. |
| """ |
| raise NotImplementedError() |
| |
| |
| class CellularSimulatorError(Exception): |
| """ Exceptions thrown when the cellular equipment is unreachable or it |
| returns an error after receiving a command. """ |
| pass |