blob: eb54b55fbb9da7028bc8e8e900918c276e1ada4a [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2019 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
from enum import Enum
from acts.controllers import abstract_inst
LTE_ATTACH_RESP = 'ATT'
LTE_CONN_RESP = 'CONN'
LTE_IDLE_RESP = 'IDLE'
LTE_PSWITCHED_ON_RESP = 'ON'
LTE_PSWITCHED_OFF_RESP = 'OFF'
WCDMA_ATTACH_RESP = 'ATT'
WCDMA_CESTABLISHED_RESP = 'CEST'
WCDMA_PSWITCHED_ON_RESP = 'ON'
STATE_CHANGE_TIMEOUT = 20
class IPAddressType(Enum):
""" IP Address types"""
IPV4 = "IPV4"
IPV6 = "IPV6"
IPV4V6 = "IPV46"
class SignallingState(Enum):
"""Signalling states common to all RATs."""
ON = 'ON'
OFF = 'OFF'
ReadyForHandover = 'RFH'
class SccActivationMode(Enum):
"""Activation mode to use for SCCs."""
AUTO = 'AUTO'
MANUAL = 'MAN'
SEMI_AUTO = 'SEM'
class SccState(Enum):
"""Secondary component carrier states."""
ON = 'ON'
OFF = 'OFF'
RRC = 'RRC'
MAC = 'MAC'
class LteState(Enum):
"""LTE ON and OFF"""
LTE_ON = 'ON'
LTE_OFF = 'OFF'
class BtsNumber(Enum):
"""Base station Identifiers."""
BTS1 = 'PCC'
BTS2 = 'SCC1'
BTS3 = 'SCC2'
BTS4 = 'SCC3'
BTS5 = 'SCC4'
BTS6 = 'SCC6'
BTS7 = 'SCC7'
class LteBandwidth(Enum):
"""Supported LTE bandwidths."""
BANDWIDTH_1MHz = 'B014'
BANDWIDTH_3MHz = 'B030'
BANDWIDTH_5MHz = 'B050'
BANDWIDTH_10MHz = 'B100'
BANDWIDTH_15MHz = 'B150'
BANDWIDTH_20MHz = 'B200'
def BandwidthFromFloat(bw):
if bw == 20:
return LteBandwidth.BANDWIDTH_20MHz
elif bw == 15:
return LteBandwidth.BANDWIDTH_15MHz
elif bw == 10:
return LteBandwidth.BANDWIDTH_10MHz
elif bw == 5:
return LteBandwidth.BANDWIDTH_5MHz
elif bw == 3:
return LteBandwidth.BANDWIDTH_3MHz
elif bw == 1.4:
return LteBandwidth.BANDWIDTH_1MHz
raise ValueError('Bandwidth {} MHz is not valid for LTE'.format(bandwidth))
class DrxMode(Enum):
"""DRX Modes."""
DRXS = 'DRXS'
DRXL = 'DRXL'
USER_DEFINED = 'UDEF'
ON = 'ON'
OFF = 'OFF'
class DuplexMode(Enum):
"""Duplex Modes"""
FDD = 'FDD'
TDD = 'TDD'
class SchedulingMode(Enum):
"""Supported scheduling modes."""
RMC = 'RMC'
USERDEFINEDCH = 'UDCHannels'
class TransmissionModes(Enum):
"""Supported transmission modes."""
TM1 = 'TM1'
TM2 = 'TM2'
TM3 = 'TM3'
TM4 = 'TM4'
TM7 = 'TM7'
TM8 = 'TM8'
TM9 = 'TM9'
class UseCarrierSpecific(Enum):
"""Enable or disable carrier specific."""
UCS_ON = 'ON'
UCS_OFF = 'OFF'
class RbPosition(Enum):
"""Supported RB positions."""
LOW = 'LOW'
HIGH = 'HIGH'
P5 = 'P5'
P10 = 'P10'
P23 = 'P23'
P35 = 'P35'
P48 = 'P48'
class ModulationType(Enum):
"""Supported Modulation Types."""
QPSK = 'QPSK'
Q16 = 'Q16'
Q64 = 'Q64'
Q256 = 'Q256'
class DciFormat(Enum):
"""Support DCI Formats for MIMOs"""
D1 = 'D1'
D1A = 'D1A'
D1B = 'D1B'
D2 = 'D2'
D2A = 'D2A'
D2B = 'D2B'
D2C = 'D2C'
class MimoModes(Enum):
"""MIMO Modes dl antennas"""
MIMO1x1 = 'ONE'
MIMO2x2 = 'TWO'
MIMO4x4 = 'FOUR'
class MimoScenario(Enum):
"""Supported mimo scenarios"""
SCEN1x1 = 'SCELl:FLEXible SUA1,RF1C,RX1,RF1C,TX1'
SCEN2x2 = 'TRO:FLEXible SUA1,RF1C,RX1,RF1C,TX1,RF3C,TX2'
SCEN4x4 = 'FRO FLEXible SUA1,RF1C,RX1,RF1C,TX1,RF3C,TX2,RF2C,TX3,RF4C,TX4'
class RrcState(Enum):
"""States to enable/disable rrc."""
RRC_ON = 'ON'
RRC_OFF = 'OFF'
class MacPadding(Enum):
"""Enables/Disables Mac Padding."""
ON = 'ON'
OFF = 'OFF'
class ConnectionType(Enum):
"""Supported Connection Types."""
TEST = 'TESTmode'
DAU = 'DAPPlication'
class RepetitionMode(Enum):
"""Specifies LTE Measurement Repetition Mode."""
SINGLESHOT = 'SINGleshot'
CONTINUOUS = 'CONTinuous'
class TpcPowerControl(Enum):
"""Specifies Up Link power control types."""
MIN_POWER = 'MINPower'
MAX_POWER = 'MAXPower'
CONSTANT = 'CONStant'
SINGLE = 'SINGle'
UDSINGLE = 'UDSingle'
UDCONTINUOUS = 'UDContinuous'
ALTERNATE = 'ALT0'
CLOSED_LOOP = 'CLOop'
RP_CONTROL = 'RPControl'
FLEX_POWER = 'FULPower'
class ReducedPdcch(Enum):
"""Enables/disables the reduction of PDCCH resources."""
ON = 'ON'
OFF = 'OFF'
class Cmw500(abstract_inst.SocketInstrument):
def __init__(self, ip_addr, port):
"""Init method to setup variables for controllers.
Args:
ip_addr: Controller's ip address.
port: Port
"""
super(Cmw500, self).__init__(ip_addr, port)
self._connect_socket()
self._send('*CLS')
self._send('*ESE 0;*SRE 0')
self._send('*CLS')
self._send('*ESE 1;*SRE 4')
self._send('SYST:DISP:UPD ON')
def switch_lte_signalling(self, state):
""" Turns LTE signalling ON/OFF.
Args:
state: an instance of LteState indicating the state to which LTE
signal has to be set.
"""
if not isinstance(state, LteState):
raise ValueError('state should be the instance of LteState.')
state = state.value
cmd = 'SOURce:LTE:SIGN:CELL:STATe {}'.format(state)
self.send_and_recv(cmd)
time_elapsed = 0
while time_elapsed < STATE_CHANGE_TIMEOUT:
response = self.send_and_recv('SOURce:LTE:SIGN:CELL:STATe:ALL?')
if response == state + ',ADJ':
self._logger.info('LTE signalling is now {}.'.format(state))
break
# Wait for a second and increase time count by one
time.sleep(1)
time_elapsed += 1
else:
raise CmwError('Failed to turn {} LTE signalling.'.format(state))
def switch_scc_state(self, scc_index, state):
""" Changes the SCC to the requested state.
Args:
scc_index: the SCC number to modify.
state: an instance of SccState indicating which SCC state to set.
"""
cmd = 'CALL:LTE:SIGN:SCC{}:ACTion {}'.format(scc_index, state.value)
self.send_and_recv(cmd)
self.wait_for_scc_state(scc_index, [state])
def wait_for_scc_state(self, scc_index, allowed, timeout=120):
""" Polls for a SCC to reach the requested state.
Args:
scc_index: an integer defining the scc number.
allowed: a list of SccStates defining the allowed states.
timeout: the maximum amount of time to wait for the requested state.
"""
self.wait_for_response(
'FETCh:LTE:SIGN:SCC{}:STATe?'.format(scc_index),
[a.value for a in allowed],
timeout,
)
def wait_for_response(self, cmd, allowed, timeout=120):
"""Polls a specific query command until an allowed response is returned.
Args:
cmd: the query command string.
allowed: a collection of allowed string responses.
timeout: the maximum amount of time to wait for an allowed response.
"""
time_elapsed = 0
while time_elapsed < timeout:
response = self.send_and_recv(cmd)
if response in allowed:
return
time.sleep(1)
time_elapsed += 1
raise CmwError('Failed to wait for valid response.')
def enable_packet_switching(self):
"""Enable packet switching in call box."""
self.send_and_recv('CALL:LTE:SIGN:PSWitched:ACTion CONNect')
self.wait_for_pswitched_state()
def disable_packet_switching(self):
"""Disable packet switching in call box."""
self.send_and_recv('CALL:LTE:SIGN:PSWitched:ACTion DISConnect')
self.wait_for_pswitched_state()
@property
def use_carrier_specific(self):
"""Gets current status of carrier specific duplex configuration."""
return self.send_and_recv('CONFigure:LTE:SIGN:DMODe:UCSPECific?')
@use_carrier_specific.setter
def use_carrier_specific(self, state):
"""Sets the carrier specific duplex configuration.
Args:
state: ON/OFF UCS configuration.
"""
cmd = 'CONFigure:LTE:SIGN:DMODe:UCSPECific {}'.format(state)
self.send_and_recv(cmd)
def send_and_recv(self, cmd):
"""Send and recv the status of the command.
Args:
cmd: Command to send.
Returns:
status: returns the status of the command sent.
"""
self._send(cmd)
if '?' in cmd:
status = self._recv()
return status
def configure_mimo_settings(self, mimo):
"""Sets the mimo scenario for the test.
Args:
mimo: mimo scenario to set.
"""
cmd = 'ROUTe:LTE:SIGN:SCENario:{}'.format(mimo.value)
self.send_and_recv(cmd)
def wait_for_pswitched_state(self, timeout=10):
"""Wait until pswitched state.
Args:
timeout: timeout for lte pswitched state.
Raises:
CmwError on timeout.
"""
while timeout > 0:
state = self.send_and_recv('FETCh:LTE:SIGN:PSWitched:STATe?')
if state == LTE_PSWITCHED_ON_RESP:
self._logger.debug('Connection to setup initiated.')
break
elif state == LTE_PSWITCHED_OFF_RESP:
self._logger.debug('Connection to setup detached.')
break
# Wait for a second and decrease count by one
time.sleep(1)
timeout -= 1
else:
raise CmwError('Failure in setting up/detaching connection')
def wait_for_attached_state(self, timeout=120):
"""Attach the controller with device.
Args:
timeout: timeout for phone to get attached.
Raises:
CmwError on time out.
"""
while timeout > 0:
state = self.send_and_recv('FETCh:LTE:SIGN:PSWitched:STATe?')
if state == LTE_ATTACH_RESP:
self._logger.debug('Call box attached with device')
break
# Wait for a second and decrease count by one
time.sleep(1)
timeout -= 1
else:
raise CmwError('Device could not be attached')
def wait_for_rrc_state(self, state, timeout=120):
""" Waits until a certain RRC state is set.
Args:
state: the RRC state that is being waited for.
timeout: timeout for phone to be in connected state.
Raises:
CmwError on time out.
"""
if state not in [LTE_CONN_RESP, LTE_IDLE_RESP]:
raise ValueError(
'The allowed values for state are {} and {}.'.format(
LTE_CONN_RESP, LTE_IDLE_RESP))
while timeout > 0:
new_state = self.send_and_recv('SENSe:LTE:SIGN:RRCState?')
if new_state == state:
self._logger.debug('The RRC state is {}.'.format(new_state))
break
# Wait for a second and decrease count by one
time.sleep(1)
timeout -= 1
else:
raise CmwError('Timeout before RRC state was {}.'.format(state))
def stop_all_signalling(self):
"""Turns off all signaling applications, generators, or measurements."""
self.send_and_recv('SYSTem:SIGNaling:ALL:OFF')
self.wait_until_quiet()
def wait_until_quiet(self):
"""Waits for all pending operations to stop on the callbox."""
self.send_and_recv('*OPC?')
def reset(self):
"""System level reset"""
self.send_and_recv('*RST; *OPC')
@property
def get_instrument_id(self):
"""Gets instrument identification number"""
return self.send_and_recv('*IDN?')
def disconnect(self):
"""Disconnect controller from device and switch to local mode."""
self.switch_lte_signalling(LteState.LTE_OFF)
self.close_remote_mode()
self._close_socket()
def close_remote_mode(self):
"""Exits remote mode to local mode."""
self.send_and_recv('&GTL')
def detach(self):
"""Detach callbox and controller."""
self.send_and_recv('CALL:LTE:SIGN:PSWitched:ACTion DETach')
@property
def rrc_connection(self):
"""Gets the RRC connection state."""
return self.send_and_recv('CONFigure:LTE:SIGN:CONNection:KRRC?')
@rrc_connection.setter
def rrc_connection(self, state):
"""Selects whether the RRC connection is kept or released after attach.
Args:
mode: RRC State ON/OFF.
"""
if not isinstance(state, RrcState):
raise ValueError('state should be the instance of RrcState.')
cmd = 'CONFigure:LTE:SIGN:CONNection:KRRC {}'.format(state.value)
self.send_and_recv(cmd)
@property
def rrc_connection_timer(self):
"""Gets the inactivity timeout for disabled rrc connection."""
return self.send_and_recv('CONFigure:LTE:SIGN:CONNection:RITimer?')
@rrc_connection_timer.setter
def rrc_connection_timer(self, time_in_secs):
"""Sets the inactivity timeout for disabled rrc connection. By default
the timeout is set to 5.
Args:
time_in_secs: timeout of inactivity in rrc connection.
"""
cmd = 'CONFigure:LTE:SIGN:CONNection:RITimer {}'.format(time_in_secs)
self.send_and_recv(cmd)
@property
def scc_activation_mode(self):
"""Gets the activation mode to use for SCCs when establishing a
connection.
"""
return self.send_and_recv('CONFigure:LTE:SIGN:SCC:AMODe?')
@scc_activation_mode.setter
def scc_activation_mode(self, activation_mode):
"""Sets the activation mode to use with SCCs when establishing a
connection.
Args:
activation_mode: the scc activation mode to use.
"""
if not isinstance(activation_mode, SccActivationMode):
raise ValueError('state should be the instance of RrcState.')
cmd = 'CONFigure:LTE:SIGN:SCC:AMODe {}'.format(activation_mode.value)
self.send_and_recv(cmd)
@property
def dl_mac_padding(self):
"""Gets the state of mac padding."""
return self.send_and_recv('CONFigure:LTE:SIGN:CONNection:DLPadding?')
@dl_mac_padding.setter
def dl_mac_padding(self, state):
"""Enables/Disables downlink padding at the mac layer.
Args:
state: ON/OFF
"""
cmd = 'CONFigure:LTE:SIGN:CONNection:DLPadding {}'.format(state.value)
self.send_and_recv(cmd)
@property
def connection_type(self):
"""Gets the connection type applied in callbox."""
return self.send_and_recv('CONFigure:LTE:SIGN:CONNection:CTYPe?')
@connection_type.setter
def connection_type(self, ctype):
"""Sets the connection type to be applied.
Args:
ctype: Connection type.
"""
cmd = 'CONFigure:LTE:SIGN:CONNection:CTYPe {}'.format(ctype.value)
self.send_and_recv(cmd)
@property
def drx_connected_mode(self):
""" Gets the Connected DRX LTE cell parameter
Args:
None
Returns:
DRX connected mode (ON, OFF, USER_DEFINED, DRX_S, DRX_L)
"""
cmd = 'CONFigure:LTE:SIGN:CONNection:CDRX:ENABle?'
return self.send_and_recv(cmd)
@drx_connected_mode.setter
def drx_connected_mode(self, mode):
""" Sets the Connected DRX LTE cell parameter
Args:
mode: DRX mode
Returns:
None
"""
if not isinstance(mode, DrxMode):
raise ValueError('state should be the instance of DrxMode.')
cmd = 'CONFigure:LTE:SIGN:CONNection:CDRX:ENABle {}'.format(mode.value)
self.send_and_recv(cmd)
@property
def drx_on_duration_timer(self):
""" Gets the amount of PDCCH subframes to wait for data after
waking up from a DRX cycle
Args:
None
Returns:
DRX mode duration timer
"""
cmd = 'CONFigure:LTE:SIGN:CONNection:CDRX:ODTimer?'
return self.send_and_recv(cmd)
@drx_on_duration_timer.setter
def drx_on_duration_timer(self, time):
""" Sets the amount of PDCCH subframes to wait for data after
waking up from a DRX cycle
Args:
timer: Length of interval to wait for user data to be transmitted
Returns:
None
"""
cmd = 'CONFigure:LTE:SIGN:CONNection:CDRX:ODTimer {}'.format(time)
self.send_and_recv(cmd)
@property
def drx_inactivity_timer(self):
""" Gets the number of PDCCH subframes to wait before entering DRX mode
Args:
None
Returns:
DRX mode inactivity timer
"""
cmd = 'CONFigure:LTE:SIGN:CONNection:CDRX:ITIMer?'
return self.send_and_recv(cmd)
@drx_inactivity_timer.setter
def drx_inactivity_timer(self, time):
""" Sets the number of PDCCH subframes to wait before entering DRX mode
Args:
timer: Length of the interval to wait
Returns:
None
"""
cmd = 'CONFigure:LTE:SIGN:CONNection:CDRX:ITIMer {}'.format(time)
self.send_and_recv(cmd)
@property
def drx_retransmission_timer(self):
""" Gets the number of consecutive PDCCH subframes to wait
for retransmission
Args:
None
Returns:
Number of PDCCH subframes to wait for retransmission
"""
cmd = 'CONFigure:LTE:SIGN:CONNection:CDRX:RTIMer?'
self.send_and_recv(cmd)
@drx_retransmission_timer.setter
def drx_retransmission_timer(self, time):
""" Sets the number of consecutive PDCCH subframes to wait
for retransmission
Args:
time: Number of PDCCH subframes to wait
for retransmission
Returns:
None
"""
cmd = 'CONFigure:LTE:SIGN:CONNection:CDRX:RTIMer {}'.format(time)
self.send_and_recv(cmd)
@property
def drx_long_cycle(self):
""" Gets the amount of subframes representing a DRX long cycle
Args:
None
Returns:
The amount of subframes representing one long DRX cycle.
One cycle consists of DRX sleep + DRX on duration
"""
cmd = 'CONFigure:LTE:SIGN:CONNection:CDRX:LDCYcle?'
return self.send_and_recv(cmd)
@drx_long_cycle.setter
def drx_long_cycle(self, long_cycle):
""" Sets the amount of subframes representing a DRX long cycle
Args:
long_cycle: The amount of subframes representing one long DRX cycle.
One cycle consists of DRX sleep + DRX on duration
Returns:
None
"""
cmd = 'CONFigure:LTE:SIGN:CONNection:CDRX:LDCYcle {}'.format(
long_cycle)
self.send_and_recv(cmd)
@property
def drx_long_cycle_offset(self):
""" Gets the offset used to determine long cycle starting
subframe
Args:
None
Returns:
Long cycle offset
"""
cmd = 'CONFigure:LTE:SIGN:CONNection:CDRX:SOFFset?'
return self.send_and_recv(cmd)
@drx_long_cycle_offset.setter
def drx_long_cycle_offset(self, offset):
""" Sets the offset used to determine long cycle starting
subframe
Args:
offset: Number in range 0...(long cycle - 1)
"""
cmd = 'CONFigure:LTE:SIGN:CONNection:CDRX:SOFFset {}'.format(offset)
self.send_and_recv(cmd)
@property
def apn(self):
"""Gets the callbox network Access Point Name."""
cmd = 'CONFigure:LTE:SIGNaling:CONNection:APN?'
return self.send_and_recv(cmd)
@apn.setter
def apn(self, apn):
"""Sets the callbox network Access Point Name.
Args:
apn: the APN name
"""
cmd = 'CONFigure:LTE:SIGNaling:CONNection:APN {}'.format(apn)
self.send_and_recv(cmd)
@property
def ip_type(self):
"""Gets the callbox network IP type."""
cmd = 'CONFigure:LTE:SIGNaling:CONNection:IPVersion?'
return self.send_and_recv(cmd)
@ip_type.setter
def ip_type(self, ip_type):
""" Configures the callbox network IP type.
Args:
ip_type: the network type to use.
"""
if not isinstance(ip_type, IPAddressType):
raise ValueError('state should be the instance of IPAddressType.')
cmd = 'CONFigure:LTE:SIGNaling:CONNection:IPVersion {}'.format(
ip_type.value)
return self.send_and_recv(cmd)
@property
def mtu(self):
"""Gets the callbox network Maximum Transmission Unit."""
cmd = 'CONFigure:DATA:CONTrol:MTU?'
return self.send_and_recv(cmd)
@mtu.setter
def mtu(self, mtu):
"""Sets the callbox network Maximum Transmission Unit.
Args:
mtu: the MTU size.
"""
cmd = 'CONFigure:DATA:CONTrol:MTU {}'.format(mtu)
self.send_and_recv(cmd)
def get_base_station(self, bts_num=BtsNumber.BTS1):
"""Gets the base station object based on bts num. By default
bts_num set to PCC
Args:
bts_num: base station identifier
Returns:
base station object.
"""
return BaseStation(self, bts_num)
def init_lte_measurement(self):
"""Gets the class object for lte measurement which can be used to
initiate measurements.
Returns:
lte measurement object.
"""
return LteMeasurement(self)
def set_sms(self, message):
"""Sets the SMS message to be sent to the DUT.
Args:
message: the SMS message to send.
"""
cmd = 'CONFigure:LTE:SIGN:SMS:OUTGoing:INTernal "{}"'.format(message)
self.send_and_recv(cmd)
def send_sms(self):
"""Sends the currently set SMS message."""
self.send_and_recv('CALL:LTE:SIGN:PSWitched:ACTion SMS;*OPC?')
timeout = time.time() + STATE_CHANGE_TIMEOUT
while time.time() < timeout:
state = self.send_and_recv(
'SENSe:LTE:SIGN:SMS:OUTGoing:INFO:LMSent?')
if state == "SUCC":
return
time.sleep(1)
raise CmwError('Failed to send SMS message')
class BaseStation(object):
"""Class to interact with different base stations"""
def __init__(self, cmw, bts_num):
if not isinstance(bts_num, BtsNumber):
raise ValueError('bts_num should be an instance of BtsNumber.')
self._bts = bts_num.value
self._cmw = cmw
@property
def scc_state(self):
"""Gets the scc state of cell."""
cmd = 'FETCh:LTE:SIGN:{}:STATe?'.format(self._bts)
return self._cmw.send_and_recv(cmd)
@property
def duplex_mode(self):
"""Gets current duplex of cell."""
cmd = 'CONFigure:LTE:SIGN:{}:DMODe?'.format(self._bts)
return self._cmw.send_and_recv(cmd)
@duplex_mode.setter
def duplex_mode(self, mode):
"""Sets the Duplex mode of cell.
Args:
mode: String indicating FDD or TDD.
"""
if not isinstance(mode, DuplexMode):
raise ValueError('mode should be an instance of DuplexMode.')
cmd = 'CONFigure:LTE:SIGN:{}:DMODe {}'.format(self._bts, mode.value)
self._cmw.send_and_recv(cmd)
@property
def band(self):
"""Gets the current band of cell."""
cmd = 'CONFigure:LTE:SIGN:{}:BAND?'.format(self._bts)
return self._cmw.send_and_recv(cmd)
@band.setter
def band(self, band):
"""Sets the Band of cell.
Args:
band: band of cell.
"""
cmd = 'CONFigure:LTE:SIGN:{}:BAND {}'.format(self._bts, band)
self._cmw.send_and_recv(cmd)
@property
def dl_channel(self):
"""Gets the downlink channel of cell."""
cmd = 'CONFigure:LTE:SIGN:RFSettings:{}:CHANnel:DL?'.format(self._bts)
return self._cmw.send_and_recv(cmd)
@dl_channel.setter
def dl_channel(self, channel):
"""Sets the downlink channel number of cell.
Args:
channel: downlink channel number of cell.
"""
cmd = 'CONFigure:LTE:SIGN:RFSettings:{}:CHANnel:DL {}'.format(
self._bts, channel)
self._cmw.send_and_recv(cmd)
@property
def ul_channel(self):
"""Gets the uplink channel of cell."""
cmd = 'CONFigure:LTE:SIGN:RFSettings:{}:CHANnel:UL?'.format(self._bts)
return self._cmw.send_and_recv(cmd)
@ul_channel.setter
def ul_channel(self, channel):
"""Sets the up link channel number of cell.
Args:
channel: up link channel number of cell.
"""
cmd = 'CONFigure:LTE:SIGN:RFSettings:{}:CHANnel:UL {}'.format(
self._bts, channel)
self._cmw.send_and_recv(cmd)
@property
def bandwidth(self):
"""Get the channel bandwidth of the cell."""
cmd = 'CONFigure:LTE:SIGN:CELL:BANDwidth:{}:DL?'.format(self._bts)
return self._cmw.send_and_recv(cmd)
@bandwidth.setter
def bandwidth(self, bandwidth):
"""Sets the channel bandwidth of the cell.
Args:
bandwidth: channel bandwidth of cell.
"""
if not isinstance(bandwidth, LteBandwidth):
raise ValueError('bandwidth should be an instance of '
'LteBandwidth.')
cmd = 'CONFigure:LTE:SIGN:CELL:BANDwidth:{}:DL {}'.format(
self._bts, bandwidth.value)
self._cmw.send_and_recv(cmd)
@property
def ul_frequency(self):
"""Get the uplink frequency of the cell."""
cmd = 'CONFigure:LTE:SIGN:RFSettings:{}:CHANnel:UL? MHZ'.format(
self._bts)
return self._cmw.send_and_recv(cmd)
@ul_frequency.setter
def ul_frequency(self, freq):
"""Get the uplink frequency of the cell.
Args:
freq: uplink frequency of the cell.
"""
cmd = 'CONFigure:LTE:SIGN:RFSettings:{}:CHANnel:UL {} MHZ'.format(
self._bts, freq)
self._cmw.send_and_recv(cmd)
@property
def dl_frequency(self):
"""Get the downlink frequency of the cell"""
cmd = 'CONFigure:LTE:SIGN:RFSettings:{}:CHANnel:DL? MHZ'.format(
self._bts)
return self._cmw.send_and_recv(cmd)
@dl_frequency.setter
def dl_frequency(self, freq):
"""Get the downlink frequency of the cell.
Args:
freq: downlink frequency of the cell.
"""
cmd = 'CONFigure:LTE:SIGN:RFSettings:{}:CHANnel:DL {} MHZ'.format(
self._bts, freq)
self._cmw.send_and_recv(cmd)
@property
def transmode(self):
"""Gets the TM of cell."""
cmd = 'CONFigure:LTE:SIGN:CONNection:{}:TRANsmission?'.format(
self._bts)
return self._cmw.send_and_recv(cmd)
@transmode.setter
def transmode(self, tm_mode):
"""Sets the TM of cell.
Args:
tm_mode: TM of cell.
"""
if not isinstance(tm_mode, TransmissionModes):
raise ValueError('tm_mode should be an instance of '
'Transmission modes.')
cmd = 'CONFigure:LTE:SIGN:CONNection:{}:TRANsmission {}'.format(
self._bts, tm_mode.value)
self._cmw.send_and_recv(cmd)
@property
def downlink_power_level(self):
"""Gets RSPRE level."""
cmd = 'CONFigure:LTE:SIGN:DL:{}:RSEPre:LEVel?'.format(self._bts)
return self._cmw.send_and_recv(cmd)
@downlink_power_level.setter
def downlink_power_level(self, pwlevel):
"""Modifies RSPRE level.
Args:
pwlevel: power level in dBm.
"""
cmd = 'CONFigure:LTE:SIGN:DL:{}:RSEPre:LEVel {}'.format(
self._bts, pwlevel)
self._cmw.send_and_recv(cmd)
@property
def uplink_power_control(self):
"""Gets open loop nominal power directly."""
cmd = 'CONFigure:LTE:SIGN:UL:{}:PUSCh:OLNPower?'.format(self._bts)
return self._cmw.send_and_recv(cmd)
@uplink_power_control.setter
def uplink_power_control(self, ul_power):
"""Sets open loop nominal power directly.
Args:
ul_power: uplink power level.
"""
cmd = 'CONFigure:LTE:SIGN:UL:{}:PUSCh:OLNPower {}'.format(
self._bts, ul_power)
self._cmw.send_and_recv(cmd)
@property
def uldl_configuration(self):
"""Gets uldl configuration of the cell."""
cmd = 'CONFigure:LTE:SIGN:CELL:{}:ULDL?'.format(self._bts)
return self._cmw.send_and_recv(cmd)
@uldl_configuration.setter
def uldl_configuration(self, uldl):
"""Sets the ul-dl configuration.
Args:
uldl: Configuration value ranging from 0 to 6.
"""
if uldl not in range(0, 7):
raise ValueError('uldl configuration value should be between'
' 0 and 6 inclusive.')
cmd = 'CONFigure:LTE:SIGN:CELL:{}:ULDL {}'.format(self._bts, uldl)
self._cmw.send_and_recv(cmd)
@property
def tdd_special_subframe(self):
"""Gets special subframe of the cell."""
cmd = 'CONFigure:LTE:SIGN:CELL:{}:SSUBframe?'.format(self._bts)
return self._cmw.send_and_recv(cmd)
@tdd_special_subframe.setter
def tdd_special_subframe(self, sframe):
"""Sets the tdd special subframe of the cell.
Args:
sframe: Integer value ranging from 1 to 9.
"""
if sframe not in range(0, 10):
raise ValueError('tdd special subframe should be between 0 and 9'
' inclusive.')
cmd = 'CONFigure:LTE:SIGN:CELL:{}:SSUBframe {}'.format(
self._bts, sframe)
self._cmw.send_and_recv(cmd)
@property
def scheduling_mode(self):
"""Gets the current scheduling mode."""
cmd = 'CONFigure:LTE:SIGN:CONNection:{}:STYPe?'.format(self._bts)
return self._cmw.send_and_recv(cmd)
@scheduling_mode.setter
def scheduling_mode(self, mode):
"""Sets the scheduling type for the cell.
Args:
mode: Selects the channel mode to be scheduled.
"""
if not isinstance(mode, SchedulingMode):
raise ValueError('mode should be the instance of scheduling mode.')
cmd = 'CONFigure:LTE:SIGN:CONNection:{}:STYPe {}'.format(
self._bts, mode.value)
self._cmw.send_and_recv(cmd)
@property
def rb_configuration_dl(self):
"""Gets rmc's rb configuration for down link. This function returns
Number of Resource blocks, Resource block position and Modulation type.
"""
cmd = 'CONFigure:LTE:SIGN:CONNection:{}:{}:DL?'.format(
self._bts, self.scheduling_mode)
return self._cmw.send_and_recv(cmd)
@rb_configuration_dl.setter
def rb_configuration_dl(self, rb_config):
"""Sets the rb configuration for down link for scheduling type.
Args:
rb_config: Tuple containing Number of resource blocks, resource
block position and modulation type.
Raises:
ValueError: If tuple unpacking fails.
"""
if self.scheduling_mode == 'RMC':
rb, rb_pos, modulation = rb_config
cmd = ('CONFigure:LTE:SIGN:CONNection:{}:RMC:DL {},{},'
'{}'.format(self._bts, rb, rb_pos, modulation))
self._cmw.send_and_recv(cmd)
elif self.scheduling_mode == 'UDCH':
rb, start_rb, modulation, tbs = rb_config
self.validate_rb(rb)
if not isinstance(modulation, ModulationType):
raise ValueError('Modulation should be of type '
'ModulationType.')
cmd = ('CONFigure:LTE:SIGN:CONNection:{}:UDCHannels:DL {},{},'
'{},{}'.format(self._bts, rb, start_rb, modulation.value,
tbs))
self._cmw.send_and_recv(cmd)
@property
def rb_configuration_ul(self):
"""Gets rb configuration for up link. This function returns
Number of Resource blocks, Resource block position and Modulation type.
"""
cmd = 'CONFigure:LTE:SIGN:CONNection:{}:{}:UL?'.format(
self._bts, self.scheduling_mode)
return self._cmw.send_and_recv(cmd)
@rb_configuration_ul.setter
def rb_configuration_ul(self, rb_config):
"""Sets the rb configuration for down link for scheduling mode.
Args:
rb_config: Tuple containing Number of resource blocks, resource
block position and modulation type.
Raises:
ValueError: If tuple unpacking fails.
"""
if self.scheduling_mode == 'RMC':
rb, rb_pos, modulation = rb_config
cmd = ('CONFigure:LTE:SIGN:CONNection:{}:RMC:UL {},{},'
'{}'.format(self._bts, rb, rb_pos, modulation))
self._cmw.send_and_recv(cmd)
elif self.scheduling_mode == 'UDCH':
rb, start_rb, modulation, tbs = rb_config
self.validate_rb(rb)
if not isinstance(modulation, ModulationType):
raise ValueError('Modulation should be of type '
'ModulationType.')
cmd = ('CONFigure:LTE:SIGN:CONNection:{}:UDCHannels:UL {},{},'
'{},{}'.format(self._bts, rb, start_rb, modulation.value,
tbs))
self._cmw.send_and_recv(cmd)
def validate_rb(self, rb):
"""Validates if rb is within the limits for bandwidth set.
Args:
rb: No. of resource blocks.
Raises:
ValueError if rb out of range.
"""
bandwidth = self.bandwidth
if bandwidth == LteBandwidth.BANDWIDTH_1MHz.value:
if not 0 <= rb <= 6:
raise ValueError('RB should be between 0 to 6 inclusive'
' for 1.4Mhz.')
elif bandwidth == LteBandwidth.BANDWIDTH_3MHz.value:
if not 0 <= rb <= 10:
raise ValueError('RB should be between 0 to 10 inclusive'
' for 3 Mhz.')
elif bandwidth == LteBandwidth.BANDWIDTH_5MHz.value:
if not 0 <= rb <= 25:
raise ValueError('RB should be between 0 to 25 inclusive'
' for 5 Mhz.')
elif bandwidth == LteBandwidth.BANDWIDTH_10MHz.value:
if not 0 <= rb <= 50:
raise ValueError('RB should be between 0 to 50 inclusive'
' for 10 Mhz.')
elif bandwidth == LteBandwidth.BANDWIDTH_15MHz.value:
if not 0 <= rb <= 75:
raise ValueError('RB should be between 0 to 75 inclusive'
' for 15 Mhz.')
elif bandwidth == LteBandwidth.BANDWIDTH_20MHz.value:
if not 0 <= rb <= 100:
raise ValueError('RB should be between 0 to 100 inclusive'
' for 20 Mhz.')
@property
def rb_position_dl(self):
"""Gets the position of the allocated down link resource blocks within
the channel band-width.
"""
cmd = 'CONFigure:LTE:SIGN:CONNection:{}:RMC:RBPosition:DL?'.format(
self._bts)
return self._cmw.send_and_recv(cmd)
@rb_position_dl.setter
def rb_position_dl(self, rbpos):
"""Selects the position of the allocated down link resource blocks
within the channel band-width
Args:
rbpos: position of resource blocks.
"""
if not isinstance(rbpos, RbPosition):
raise ValueError('rbpos should be the instance of RbPosition.')
cmd = 'CONFigure:LTE:SIGN:CONNection:{}:RMC:RBPosition:DL {}'.format(
self._bts, rbpos.value)
self._cmw.send_and_recv(cmd)
@property
def rb_position_ul(self):
"""Gets the position of the allocated up link resource blocks within
the channel band-width.
"""
cmd = 'CONFigure:LTE:SIGN:CONNection:{}:RMC:RBPosition:UL?'.format(
self._bts)
return self._cmw.send_and_recv(cmd)
@rb_position_ul.setter
def rb_position_ul(self, rbpos):
"""Selects the position of the allocated up link resource blocks
within the channel band-width.
Args:
rbpos: position of resource blocks.
"""
if not isinstance(rbpos, RbPosition):
raise ValueError('rbpos should be the instance of RbPosition.')
cmd = 'CONFigure:LTE:SIGN:CONNection:{}:RMC:RBPosition:UL {}'.format(
self._bts, rbpos.value)
self._cmw.send_and_recv(cmd)
@property
def dci_format(self):
"""Gets the downlink control information (DCI) format."""
cmd = 'CONFigure:LTE:SIGN:CONNection:{}:DCIFormat?'.format(self._bts)
return self._cmw.send_and_recv(cmd)
@dci_format.setter
def dci_format(self, dci_format):
"""Selects the downlink control information (DCI) format.
Args:
dci_format: supported dci.
"""
if not isinstance(dci_format, DciFormat):
raise ValueError('dci_format should be the instance of DciFormat.')
cmd = 'CONFigure:LTE:SIGN:CONNection:{}:DCIFormat {}'.format(
self._bts, dci_format.value)
self._cmw.send_and_recv(cmd)
@property
def dl_antenna(self):
"""Gets dl antenna count of cell."""
cmd = 'CONFigure:LTE:SIGN:CONNection:{}:NENBantennas?'.format(
self._bts)
return self._cmw.send_and_recv(cmd)
@dl_antenna.setter
def dl_antenna(self, num_antenna):
"""Sets the dl antenna count of cell.
Args:
num_antenna: Count of number of dl antennas to use.
"""
if not isinstance(num_antenna, MimoModes):
raise ValueError('num_antenna should be an instance of MimoModes.')
cmd = 'CONFigure:LTE:SIGN:CONNection:{}:NENBantennas {}'.format(
self._bts, num_antenna.value)
self._cmw.send_and_recv(cmd)
@property
def reduced_pdcch(self):
"""Gets the reduction of PDCCH resources state."""
cmd = 'CONFigure:LTE:SIGN:CONNection:{}:PDCCh:RPDCch?'.format(
self._bts)
return self._cmw.send_and_recv(cmd)
@reduced_pdcch.setter
def reduced_pdcch(self, state):
"""Sets the reduction of PDCCH resources state.
Args:
state: ON/OFF.
"""
cmd = 'CONFigure:LTE:SIGN:CONNection:{}:PDCCh:RPDCch {}'.format(
self._bts, state.value)
self._cmw.send_and_recv(cmd)
@property
def tpc_power_control(self):
"""Gets the type of uplink power control used."""
cmd = 'CONFigure:LTE:SIGN:UL:{}:PUSCh:TPC:SET?'.format(self._bts)
return self._cmw.send_and_recv(cmd)
@tpc_power_control.setter
def tpc_power_control(self, set_type):
"""Set and execute the Up Link Power Control via TPC.
Args:
set_type: Type of tpc power control.
"""
if not isinstance(set_type, TpcPowerControl):
raise ValueError('set_type should be the instance of '
'TpCPowerControl.')
cmd = 'CONFigure:LTE:SIGN:UL:{}:PUSCh:TPC:SET {}'.format(
self._bts, set_type.value)
self._cmw.send_and_recv(cmd)
cmd = 'CONFigure:LTE:SIGN:UL:{}:PUSCh:TPC:PEXecute'.format(self._bts)
self._cmw.send_and_recv(cmd)
@property
def tpc_closed_loop_target_power(self):
"""Gets the target powers for power control with the TPC setup."""
cmd = 'CONFigure:LTE:SIGN:UL:{}:PUSCh:TPC:CLTPower?'.format(self._bts)
return self._cmw.send_and_recv(cmd)
@tpc_closed_loop_target_power.setter
def tpc_closed_loop_target_power(self, cltpower):
"""Sets the target powers for power control with the TPC setup.
Args:
tpower: Target power.
"""
cmd = 'CONFigure:LTE:SIGN:UL:{}:PUSCh:TPC:CLTPower {}'.format(
self._bts, cltpower)
self._cmw.send_and_recv(cmd)
class LteMeasurementState(Enum):
"""Possible measurement states"""
OFF = 'OFF'
READY = 'RDY'
RUN = 'RUN'
def _try_parse(s, fun):
try:
return fun(s)
except ValueError:
return None
class LteMeasurement(object):
""" Class for measuring LTE performance """
INDEX_TX = 17
INDEX_TX_MIN = 17
INDEX_TX_MAX = 18
def __init__(self, cmw):
self._cmw = cmw
@property
def sample_count(self):
"""Gets the number of samples to use when calculating results."""
cmd = 'CONFigure:LTE:MEAS:MEValuation:SCOunt:MODulation?'
return self._cmw.send_and_recv(cmd)
@sample_count.setter
def sample_count(self, sample_count):
cmd = 'CONFigure:LTE:MEAS:MEValuation:SCOunt:MODulation {}'.format(
sample_count)
self._cmw.send_and_recv(cmd)
@property
def average(self):
"""Gets the average values of the most recent measurement.
Invalid measurements are set to None.
"""
values = self._cmw.send_and_recv(
'FETch:LTE:MEAS:MEValuation:MODulation:AVERage?').split(',')
for i in range(0, 2):
values[i] = _try_parse(values[i], int)
for i in range(2, len(values)):
values[i] = _try_parse(values[i], float)
return values
@property
def extrema(self):
"""Gets the extrema values of the most recent measurement.
Invalid measurements are set to None.
"""
values = self._cmw.send_and_recv(
'FETch:LTE:MEAS:MEValuation:MODulation:EXTReme?').split(',')
for i in range(0, 2):
values[i] = _try_parse(values[i], int)
for i in range(2, len(values)):
values[i] = _try_parse(values[i], float)
return values
@property
def stdev(self):
"""Gets the standard deviation of the most recent measurement.
Invalid measurements are set to None.
"""
values = self._cmw.send_and_recv(
'FETch:LTE:MEAS:MEValuation:MODulation:SDEV?').split(',')
for i in range(0, 2):
values[i] = _try_parse(values[i], int)
for i in range(2, len(values)):
values[i] = _try_parse(values[i], float)
return values
@property
def tx_average(self):
"""Gets the measured average Tx power (in dBm)."""
value = self.average[self.INDEX_TX]
if value is None:
raise ValueError("LteMeasurement has no value for tx_average")
return value
@property
def tx_min(self):
"""Gets the measured minimum Tx power (in dBm)."""
value = self.extrema[self.INDEX_TX_MIN]
if value is None:
raise ValueError("LteMeasurement has no value for tx_min")
return value
@property
def tx_max(self):
"""Gets the measured maximum Tx power (in dBm)."""
value = self.extrema[self.INDEX_TX_MAX]
if value is None:
raise ValueError("LteMeasurement has no value for tx_max")
return value
@property
def tx_stdev(self):
"""Gets the measured Tx power standard deviation (in dBm)."""
value = self.stdev[self.INDEX_TX]
if value is None:
raise ValueError(
"LteMeasurement has no value for standard_deviation")
return value
@property
def measurement_repetition(self):
"""Returns the measurement repetition mode that has been set."""
return self._cmw.send_and_recv(
'CONFigure:LTE:MEAS:MEValuation:REPetition?')
@measurement_repetition.setter
def measurement_repetition(self, mode):
"""Sets the mode for measuring power levels.
Args:
mode: Single shot/continuous.
"""
if not isinstance(mode, RepetitionMode):
raise ValueError('mode should be the instance of Repetition Mode')
cmd = 'CONFigure:LTE:MEAS:MEValuation:REPetition {}'.format(mode.value)
self._cmw.send_and_recv(cmd)
@property
def query_measurement_state(self):
"""Returns the states and sub states of measurement."""
return self._cmw.send_and_recv('FETCh:LTE:MEAS:MEValuation:STATe:ALL?')
@property
def measure_tx_power(self):
"""Return the current Tx power measurement."""
return self._cmw.send_and_recv(
'FETCh:LTE:MEAS:MEValuation:PMONitor:AVERage?')
@property
def state(self):
"""Gets the state of the measurement."""
cmd = 'FETCh:LTE:MEAS:MEValuation:STATe?'
return LteMeasurementState(self._cmw.send_and_recv(cmd))
def initialize_measurement(self):
"""Initialize measurement modules."""
self._cmw.send_and_recv(
'CONF:LTE:MEAS:MEV:RES:ALL ON,ON,ON,ON,ON,ON,ON,ON,ON,ON,ON,ON')
self._cmw.send_and_recv('ROUTe:LTE:MEAS:SCENario:CSPath "LTE Sig1"')
self._cmw.send_and_recv('INIT:LTE:MEAS:MEValuation;*OPC?')
self._wait_for_state({LteMeasurementState.RUN})
def run_measurement(self):
"""Runs a single Tx multievaluation measurement to completion."""
self.stop_measurement()
self.measurement_repetition = RepetitionMode.SINGLESHOT
self.initialize_measurement()
self._wait_for_state({LteMeasurementState.READY}, timeout=120)
def stop_measurement(self):
"""Stops the on-going measurement.
This function call does not free up resources allocated for
measurement. Instead it moves from RUN to RDY state.
"""
self._cmw.send_and_recv('STOP:LTE:MEAS:MEValuation')
self._wait_for_state(
{LteMeasurementState.OFF, LteMeasurementState.READY})
def abort_measurement(self):
"""Aborts the measurement abruptly.
This function call will free up the resources allocated for
measurement and all the results will be wiped off.
"""
self._cmw.send_and_recv('ABORt:LTE:MEAS:MEValuation')
self._wait_for_state({LteMeasurementState.OFF})
def _wait_for_state(self, states, timeout=10):
"""Polls the measurement state until it reaches an allowable state
Args:
states: the allowed states
timeout: the maximum amount time to wait (in seconds)
"""
while timeout > 0:
if self.state in states:
return
time.sleep(1)
timeout -= 1
raise CmwError(
'Failed to wait for LTE measurement state: {}.'.format(states))
class CmwError(Exception):
"""Class to raise exceptions related to cmw."""