| #!/usr/bin/env python3 |
| # |
| # Copyright 2022 - The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the 'License'); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an 'AS IS' BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| """Provides cmw500 handover functionality.""" |
| |
| from enum import Enum |
| |
| from acts.controllers import handover_simulator as hs |
| from acts.controllers.rohdeschwarz_lib import cmw500 |
| |
| |
| class HandoverMode(Enum): |
| """Supported handover modes.""" |
| Redirection = "RED" |
| MTCSFallback = "MTCS" |
| Handover = "HAND" |
| |
| |
| class Cmw500HandoverSimulator(hs.AbstractHandoverSimulator): |
| """Provides methods for performing inter/intra-RAT handovers.""" |
| |
| def __init__(self, cmw): |
| """Init method to setup handover controller. |
| |
| Args: |
| cmw: the CMW500 instrument. |
| """ |
| self.cmw = cmw |
| self._lte = Cmw500LteHandoverManager(self.cmw) |
| self._wcdma = Cmw500WcdmaHandoverManager(self.cmw) |
| |
| def lte_handover(self, band, channel, bandwidth, source_technology): |
| """Performs a handover to LTE. |
| |
| Args: |
| band: the band of the handover destination. |
| channel: the downlink channel of the handover destination. |
| bandwidth: the downlink bandwidth of the handover destination. |
| source_technology: the source handover technology. |
| """ |
| mode = cmw500.DuplexMode.TDD if 33 <= int( |
| band) <= 46 else cmw500.DuplexMode.FDD |
| band = 'OB{}'.format(band) |
| bandwidth = cmw500.BandwidthFromFloat(bandwidth) |
| |
| source = self._get_handover_manager(source_technology) |
| self._prepare_handover(source, self._lte) |
| self._lte.configure_incoming_handover_lte(mode, band, channel, |
| bandwidth) |
| self._perform_handover(source, self._lte) |
| |
| def wcdma_handover(self, band, channel, source_technology): |
| """Performs a handover to WCDMA. |
| |
| Args: |
| band: the band of the handover destination. |
| channel: the downlink channel of the handover destination. |
| source_technology: the source handover technology. |
| """ |
| band = 'OB{}'.format(band) |
| |
| source = self._get_handover_manager(source_technology) |
| self._prepare_handover(source, self._wcdma) |
| self._wcdma.configure_incoming_handover_wcdma(band, channel) |
| self._perform_handover(source, self._wcdma) |
| |
| def _prepare_handover(self, source, destination): |
| """Initializes the source and destination signalling applications for a handover. |
| |
| Args: |
| source: the source handover manager. |
| destination: the destination handover manager. |
| """ |
| if not source.is_attached: |
| raise hs.HandoverSimulatorError( |
| "Unable to perform handover, source signalling application is not attached." |
| ) |
| |
| source.handover_destination = destination.application_name |
| if source.is_internal_handover: |
| destination.wait_for_signalling_state([ |
| cmw500.SignallingState.ReadyForHandover.value, |
| cmw500.SignallingState.ON.value |
| ]) |
| source.handover_mode = HandoverMode.Redirection |
| else: |
| destination.wait_for_signalling_state( |
| [cmw500.SignallingState.ReadyForHandover.value]) |
| source.handover_mode = HandoverMode.Handover |
| |
| def _perform_handover(self, source, destination): |
| """Performs the handover and wait for completion. |
| |
| Args: |
| source: the source handover manager. |
| destination: the destination handover manager. |
| """ |
| source.initiate_handover() |
| destination.wait_for_handover() |
| if not source.is_attached: |
| source.stop_signalling() |
| |
| self.cmw.wait_until_quiet() |
| |
| def _get_handover_manager(self, technology): |
| """Gets the handover manager for the specified technology. |
| |
| Args: |
| technology: the handover source/destination technology. |
| |
| Returns: |
| the manager for the specified technology. |
| """ |
| if technology == hs.CellularTechnology.LTE: |
| return self._lte |
| if technology == hs.CellularTechnology.WCDMA: |
| return self._wcdma |
| raise hs.HandoverSimulatorError( |
| 'Unsupported handover destination type {}.'.format(technology)) |
| |
| |
| class Cmw500HandoverManagerBase(): |
| """Provides common CMW500 functionality for conducting handovers.""" |
| |
| def __init__(self, cmw, technology): |
| """Initializes handover controller. |
| |
| Args: |
| cmw: the CMW500 instrument. |
| technology: the cellular technology to use. |
| """ |
| self.cmw = cmw |
| self.tech = technology.value |
| |
| @property |
| def application_name(self): |
| """Gets the name of the signalling application to be used in handovers.""" |
| return '{} Sig1'.format(self.tech) |
| |
| @property |
| def handover_destination(self): |
| """Gets the handover destination application.""" |
| cmd = 'PREPare:{}:SIGN:HANDover:DESTination?'.format(self.tech) |
| return self.cmw.send_and_recv(cmd).strip('"\'') |
| |
| @property |
| def is_internal_handover(self): |
| """Returns true if the handover is within the same signalling application.""" |
| return self.handover_destination == self.application_name |
| |
| @handover_destination.setter |
| def handover_destination(self, destination): |
| """Sets the handover destination application.""" |
| cmd = 'PREPare:{}:SIGN:HANDover:DESTination "{}"'.format( |
| self.tech, destination) |
| self.cmw.send_and_recv(cmd) |
| |
| @property |
| def handover_mode(self): |
| """Gets the handover mechanism to use.""" |
| cmd = 'PREPare:{}:SIGN:HANDover:MMODe?'.format(self.tech) |
| return self.cmw.send_and_recv(cmd) |
| |
| @handover_mode.setter |
| def handover_mode(self, mode): |
| """Sets the handover mechanism to use.""" |
| if not isinstance(mode, HandoverMode): |
| raise ValueError('mode should be the instance of HandoverMode.') |
| self.cmw.send_and_recv('PREPare:{}:SIGN:HANDover:MMODe {}'.format( |
| self.tech, mode.value)) |
| |
| @property |
| def is_attached(self): |
| """Returns True if the current technology pswitched state is attached.""" |
| cmd = 'FETCh:{}:SIGN:PSWitched:STATe?'.format(self.tech) |
| return self.cmw.send_and_recv(cmd) in self.get_attached_states() |
| |
| def stop_signalling(self): |
| """Stops the current signalling application.""" |
| cmd = 'SOURce:{}:SIGN:CELL:STATe {}'.format( |
| self.tech, cmw500.SignallingState.OFF.value) |
| self.cmw.send_and_recv(cmd) |
| self.wait_for_signalling_state([cmw500.SignallingState.OFF.value]) |
| |
| def wait_for_signalling_state(self, allowed, timeout=30): |
| """Polls the signalling state until it reaches an allowable state. |
| |
| Args: |
| allowed: a list of strings defining allowed signalling state responses. |
| timeout: timeout for valid state to be reached. |
| |
| Raises: |
| CmwError on time out. |
| """ |
| allowed = set(['{},ADJ'.format(state) for state in allowed]) |
| cmd = 'SOURce:{}:SIGN:CELL:STATe:ALL?'.format(self.tech) |
| self.cmw.wait_for_response(cmd, allowed, timeout=timeout) |
| |
| def wait_for_pswitched_state(self, allowed, timeout=120): |
| """Polls the pswitched state until it reaches an allowable state. |
| |
| Args: |
| allowed: a list of strings defining valid pswitched state responses. |
| timeout: timeout for valid state to be reached. |
| |
| Raises: |
| CmwError on time out. |
| """ |
| cmd = 'FETCh:{}:SIGN:PSWitched:STATe?'.format(self.tech) |
| self.cmw.wait_for_response(cmd, allowed, timeout=timeout) |
| |
| def get_attached_states(self): |
| """Gets a collection of valid responses when the application is attached. |
| |
| Returns: |
| states: the list of valid attached states. |
| """ |
| raise NotImplementedError() |
| |
| def initiate_handover(self): |
| """Initiates an outgoing handover.""" |
| raise NotImplementedError() |
| |
| def wait_for_handover(self): |
| """Waits for an incoming handover to be completed.""" |
| raise NotImplementedError() |
| |
| |
| class Cmw500LteHandoverManager(Cmw500HandoverManagerBase): |
| """Provides LTE-specific handover methods.""" |
| |
| ATTACHED_STATES = [cmw500.LTE_ATTACH_RESP] |
| |
| def __init__(self, cmw): |
| """Init method to setup handover controller. |
| |
| Args: |
| cmw: the CMW500 instrument. |
| """ |
| super().__init__(cmw, hs.CellularTechnology.LTE) |
| |
| def configure_incoming_handover_lte(self, mode, band, channel, bandwidth): |
| """Prepares the LTE simulator for an incoming handover. |
| |
| Args: |
| mode: the duplexing mode of the handover destination. |
| band: the band of the handover destination. |
| channel: the downlink channel of the handover destination. |
| bandwidth: the duplexing mode of the handover destination. |
| """ |
| if self.is_attached: |
| self.configure_handover(mode, band, channel, bandwidth) |
| else: |
| bts = self.cmw.get_base_station() |
| bts.duplex_mode = mode |
| bts.band = band |
| bts.dl_channel = channel |
| bts.bandwidth = bandwidth |
| |
| def initiate_handover(self): |
| """Initiates an outgoing handover.""" |
| self.cmw.send_and_recv('CALL:LTE:SIGN:PSWitched:ACTion HANDover') |
| |
| def wait_for_handover(self): |
| """Waits for an incoming handover to be completed.""" |
| self.cmw.wait_for_attached_state() |
| |
| def configure_handover(self, mode, band, channel, bandwidth, emit='NS01'): |
| """Configures the handover destination. |
| |
| Args: |
| mode: the duplexing mode of the handover destination. |
| band: the band of the handover destination. |
| channel: the downlink channel of the handover destination. |
| bandwidth: the downlink bandwidth of the handover destination. |
| emit: an additional optional spectrum emissions requirement. |
| """ |
| if not isinstance(bandwidth, cmw500.LteBandwidth): |
| raise ValueError('bandwidth should be an instance of ' |
| 'LteBandwidth.') |
| if not isinstance(mode, cmw500.DuplexMode): |
| raise ValueError('mode should be an instance of ' 'DuplexMode.') |
| self.cmw.send_and_recv( |
| 'PREPare:LTE:SIGN:HANDover:ENHanced {}, {}, {}, {}, {}'.format( |
| mode.value, band, channel, bandwidth.value, emit)) |
| |
| def get_attached_states(self): |
| """Gets a collection of valid attached states. |
| |
| Returns: |
| states: the list of valid attached states. |
| """ |
| return self.ATTACHED_STATES |
| |
| |
| class Cmw500WcdmaHandoverManager(Cmw500HandoverManagerBase): |
| """Provides WCDMA-specific handover methods.""" |
| |
| ATTACHED_STATES = set( |
| [cmw500.WCDMA_ATTACH_RESP, cmw500.WCDMA_CESTABLISHED_RESP]) |
| |
| def __init__(self, cmw): |
| """Init method to setup handover controller. |
| |
| Args: |
| cmw: the CMW500 instrument. |
| """ |
| super().__init__(cmw, hs.CellularTechnology.WCDMA) |
| self._stored_band = self._band |
| self._stored_channel = self._dl_channel |
| |
| @property |
| def _band(self): |
| """Sets the signalling application band.""" |
| self.cmw.send_and_recv('CONFigure:WCDMa:SIGN:CARRier:BAND?') |
| |
| @_band.setter |
| def _band(self, band): |
| """Sets the signalling application band. |
| |
| Args: |
| band: the band of the signalling application. |
| """ |
| cmd = 'CONFigure:WCDMa:SIGN:CARRier:BAND {}'.format(band) |
| self.cmw.send_and_recv(cmd) |
| |
| @property |
| def _dl_channel(self): |
| """Gets the signalling application dl channel.""" |
| cmd = 'CONFigure:WCDMa:SIGN:RFSettings:CARRier:CHANnel:DL?' |
| self.cmw.send_and_recv(cmd) |
| |
| @_dl_channel.setter |
| def _dl_channel(self, channel): |
| """Sets the signalling application dl channel. |
| |
| Args: |
| channel: the channel of the signalling application. |
| """ |
| cmd = 'CONFigure:WCDMa:SIGN:RFSettings:CARRier:CHANnel:DL {}'.format( |
| channel) |
| self.cmw.send_and_recv(cmd) |
| |
| def configure_incoming_handover_wcdma(self, band, channel): |
| """Prepares the WCDMA simulator for an incoming handover. |
| |
| Args: |
| band: the band of the handover destination. |
| channel: the downlink channel of the handover destination. |
| """ |
| # WCDMA has no dedicated configuration for handovers within the same |
| # signalling application instead, store the values for later and |
| # apply them when initiate_handover is called. |
| if not self.is_attached: |
| self._band = band |
| self._dl_channel = channel |
| |
| self._stored_band = band |
| self._stored_channel = channel |
| |
| def initiate_handover(self): |
| """Initiates an outgoing handover.""" |
| if self.is_internal_handover: |
| self._dl_channel = self._stored_channel |
| self._band = self._stored_band |
| else: |
| self.cmw.send_and_recv('CALL:WCDMA:SIGN:PSWitched:ACTion HANDover') |
| |
| def wait_for_handover(self): |
| """Waits for an incoming handover to be completed.""" |
| self.wait_for_pswitched_state( |
| [cmw500.WCDMA_ATTACH_RESP, cmw500.WCDMA_CESTABLISHED_RESP]) |
| |
| def get_attached_states(self): |
| """Gets a collection of valid attached states. |
| |
| Returns: |
| states: the list of valid attached states. |
| """ |
| return self.ATTACHED_STATES |