Merge "Creates Cmx 500 controller with XLAPI Part II"
diff --git a/acts/framework/acts/controllers/rohdeschwarz_lib/cmx500.py b/acts/framework/acts/controllers/rohdeschwarz_lib/cmx500.py
index 6c406b3..18c5ab3 100644
--- a/acts/framework/acts/controllers/rohdeschwarz_lib/cmx500.py
+++ b/acts/framework/acts/controllers/rohdeschwarz_lib/cmx500.py
@@ -22,9 +22,10 @@
from os import path
from acts.controllers import abstract_inst
-DEFAULT_XLAPI_PATH = '/home/mobileharness/Rohde-Schwarz/XLAPI/7.51.21/venv/lib/python3.7/site-packages'
+DEFAULT_XLAPI_PATH = '/home/mobileharness/Rohde-Schwarz/XLAPI/latest/venv/lib/python3.7/site-packages'
DEFAULT_LTE_STATE_CHANGE_TIMER = 10
-Default_CELL_SWITCH_ON_TIMER = 60
+DEFAULT_CELL_SWITCH_ON_TIMER = 60
+DEFAULT_ENDC_TIMER = 300
logger = logging.getLogger('Xlapi_cmx500')
@@ -42,6 +43,13 @@
'p0_nominal_pusch',
]
+LTE_MHZ_UPPER_BOUND_TO_RB = [
+ (1.5, 6),
+ (4.0, 15),
+ (7.5, 25),
+ (12.5, 50),
+ (17.5, 75),
+]
class DciFormat(Enum):
"""Support DCI Formats for MIMOs."""
@@ -177,7 +185,7 @@
self._config_antenna_ports()
self.lte_rrc_state_change_timer = DEFAULT_LTE_STATE_CHANGE_TIMER
self.rrc_state_change_time_enable = False
- self.cell_switch_on_timer = Default_CELL_SWITCH_ON_TIMER
+ self.cell_switch_on_timer = DEFAULT_CELL_SWITCH_ON_TIMER
# _config_antenna_ports for the special RF connection with cmw500 + cmx500.
def _config_antenna_ports(self):
@@ -279,6 +287,10 @@
"""
return self.bts[bts_index]
+ def get_network(self):
+ """ Gets the network object from cmx500 object."""
+ return self._network
+
def init_lte_measurement(self):
"""Gets the class object for lte measurement which can be used to
initiate measurements.
@@ -332,6 +344,26 @@
logger.info(
'The LTE cell status is {} after stop'.format(bts.is_on()))
+ def switch_on_nsa_signalling(self):
+ if self.bts:
+ self.disconnect()
+ logger.info('Switches on NSA signalling')
+ self.bts.append(LteBaseStation(self, self.lte_cell))
+ self.bts.append(NrBaseStation(self, self.nr_cell))
+ self.bts[0].start()
+ lte_cell_status = self.bts[0].wait_cell_on(self.cell_switch_on_timer)
+ if lte_cell_status:
+ logger.info('The LTE pcell status is on')
+ else:
+ raise CmxError('The LTE pcell cannot be switched on')
+
+ self.bts[1].start()
+ nr_cell_status = self.bts[1].wait_cell_on(self.cell_switch_on_timer)
+ if nr_cell_status:
+ logger.info('The NR cell status is on')
+ else:
+ raise CmxError('The NR cell cannot be switched on')
+
def update_lte_cell_config(self, config):
"""Updates lte cell settings with config."""
set_counts = 0
@@ -409,6 +441,7 @@
self._cell = cell
self._cmx = cmx
self._cc = cmx.dut.cc(cell)
+ self._network = cmx.get_network()
@property
def band(self):
@@ -579,6 +612,8 @@
# Sets num of crs antenna ports back to previous value
self._cell.set_num_crs_antenna_ports(num_crs_antenna_ports)
+ self._network.apply_changes()
+
if is_on:
self._cell.start()
@@ -607,16 +642,19 @@
return self._cell.get_dl_earfcn().to_freq().in_units(
Frequency.Units.GHz)
+ def _to_rb_bandwidth(self, bandwidth):
+ for idx in range(5):
+ if bandwidth < LTE_MHZ_UPPER_BOUND_TO_RB[idx][0]:
+ return LTE_MHZ_UPPER_BOUND_TO_RB[idx][1]
+ return 100
+
def set_bandwidth(self, bandwidth):
"""Sets the channel bandwidth of the cell.
Args:
- bandwidth: channel bandwidth of cell.
+ bandwidth: channel bandwidth of cell in MHz.
"""
- if not isinstance(bandwidth, LteBandwidth):
- raise ValueError('bandwidth should be an instance of '
- 'LteBandwidth.')
- self._cell.set_bandwidth(bandwidth.value)
+ self._cell.set_bandwidth(self._to_rb_bandwidth(bandwidth))
def set_cell_frequency_band(self, tdd_cfg=None, ssf_cfg=None):
"""Sets cell frequency band with tdd and ssf config.
@@ -639,6 +677,7 @@
subframe_assignment=tdd_subframe,
special_subframe_pattern=ssf_pattern))
self._cell.stub.SetCellFrequencyBand(CellFrequencyBand(tdd=tdd))
+ self._network.apply_changes()
def set_cfi(self, cfi):
"""Sets number of pdcch symbols (cfi).
@@ -807,5 +846,240 @@
super().__init__(cmx, cell)
+ def _config_scheduler(self, dl_mcs=None, dl_mcs_table=None,
+ dl_rb_alloc=None, dl_mimo_mode=None,
+ ul_mcs=None, ul_mcs_table=None, ul_rb_alloc=None,
+ ul_mimo_mode=None):
+
+ from rs_mrt.testenvironment.signaling.sri.rat.nr import McsTable
+
+ log_list = []
+ if dl_mcs:
+ log_list.append('dl_mcs: {}'.format(dl_mcs))
+ if ul_mcs:
+ log_list.append('ul_mcs: {}'.format(ul_mcs))
+
+ # If rb alloc is not a tuple, add 0 as start RBs for XLAPI NR scheduler
+ if dl_rb_alloc:
+ if not isinstance(dl_rb_alloc, tuple):
+ dl_rb_alloc = (0, dl_rb_alloc)
+ log_list.append('dl_rb_alloc: {}'.format(dl_rb_alloc))
+ if ul_rb_alloc:
+ if not isinstance(ul_rb_alloc, tuple):
+ ul_rb_alloc = (0, ul_rb_alloc)
+ log_list.append('ul_rb_alloc: {}'.format(ul_rb_alloc))
+ if dl_mcs_table:
+ dl_mcs_table = McsTable(dl_mcs_table)
+ log_list.append('dl_mcs_table: {}'.format(dl_mcs_table))
+ if ul_mcs_table:
+ ul_mcs_table = McsTable(ul_mcs_table)
+ log_list.append('ul_mcs_table: {}'.format(ul_mcs_table))
+ if dl_mimo_mode:
+ log_list.append('dl_mimo_mode: {}'.format(dl_mimo_mode))
+ if ul_mimo_mode:
+ log_list.append('ul_mimo_mode: {}'.format(ul_mimo_mode))
+
+ is_on = self._cell.is_on()
+ if is_on:
+ self._cell.stop()
+ time.sleep(1)
+ scheduler = self._cmx.dut.get_scheduler(self._cell)
+ logger.info('configure scheduler for {}'.format(','.join(log_list)))
+
+ scheduler.configure_ue_scheduler(
+ dl_mcs=dl_mcs, dl_mcs_table=dl_mcs_table,
+ dl_rb_alloc=dl_rb_alloc, dl_mimo_mode=dl_mimo_mode,
+ ul_mcs=ul_mcs, ul_mcs_table=ul_mcs_table,
+ ul_rb_alloc=ul_rb_alloc, ul_mimo_mode=ul_mimo_mode)
+ logger.info('Configure scheduler succeeds')
+ self._network.apply_changes()
+
+ if is_on:
+ self._cell.start()
+
+ def attach_as_secondary_cell(self, endc_timer=DEFAULT_ENDC_TIMER):
+ """Enable endc mode for NR cell.
+
+ Args:
+ endc_timer: timeout for endc state
+ """
+ logger.info('enable endc mode for nsa dual connection')
+ self._cmx.dut.signaling.nsa_dual_connect(self._cell)
+ time_count = 0
+ while time_count < endc_timer:
+ if str(self._cmx.dut.state.radio_connectivity) == \
+ 'RadioConnectivityMode.EPS_LTE_NR':
+ logger.info('enter endc mode')
+ return
+ time.sleep(1)
+ time_count += 1
+ if time_count % 30 == 0:
+ logger.info('did not reach endc at {} s'.format(time_count))
+ raise CmxError('Cannot reach endc after {} s'.format(endc_timer))
+
+ @property
+ def dl_channel(self):
+ """Gets the downlink channel of cell.
+
+ Return:
+ the downlink channel (earfcn) in int.
+ """
+ return int(self._cell.get_dl_ref_a())
+
+ def _bandwidth_to_carrier_bandwidth(self, bandwidth):
+ """Converts bandwidth in MHz to CarrierBandwidth.
+ CarrierBandwidth Enum in XLAPI:
+ MHZ_5 = 0
+ MHZ_10 = 1
+ MHZ_15 = 2
+ MHZ_20 = 3
+ MHZ_25 = 4
+ MHZ_30 = 5
+ MHZ_40 = 6
+ MHZ_50 = 7
+ MHZ_60 = 8
+ MHZ_70 = 9
+ MHZ_80 = 10
+ MHZ_90 = 11
+ MHZ_100 = 12
+ MHZ_200 = 13
+ MHZ_400 = 14
+ Args:
+ bandwidth: channel bandwidth in MHz.
+
+ Return:
+ the corresponding NR Carrier Bandwidth.
+ """
+ from mrtype.nr.frequency import CarrierBandwidth
+ if bandwidth > 100:
+ return CarrierBandwidth(12 + bandwidth // 200)
+ elif bandwidth > 30:
+ return CarrierBandwidth(2 + bandwidth // 10)
+ else:
+ return CarrierBandwidth(bandwidth // 5 - 1)
+
+ def set_band(self, band, frequency_range=None):
+ """Sets the Band of cell.
+
+ Args:
+ band: band of cell.
+ frequency_range: LOW, MID and HIGH for NR cell
+ """
+ from mrtype.frequency import FrequencyRange
+ if not frequency_range or frequency_range.upper() == 'LOW':
+ frequency_range = FrequencyRange.LOW
+ elif frequency_range.upper() == 'MID':
+ frequency_range = FrequencyRange.MID
+ elif frequency_range.upper() == 'HIGH':
+ frequency_range = FrequencyRange.HIGH
+ else:
+ raise CmxError('Wrong type FrequencyRange')
+
+ self._cell.set_dl_ref_a_offset(band, frequency_range)
+ logger.info('The band is set to {} and is {} after setting'.format(
+ band, self.band))
+
+ def set_bandwidth(self, bandwidth, scs=None):
+ """Sets the channel bandwidth of the cell.
+
+ Args:
+ bandwidth: channel bandwidth of cell.
+ scs: subcarrier spacing (SCS) of resource grid 0
+ """
+ if not scs:
+ scs = self._cell.get_scs()
+ self._cell.set_carrier_bandwidth_and_scs(
+ self._bandwidth_to_carrier_bandwidth(bandwidth), scs)
+ logger.info('The bandwidth in MHz is {}. After setting, the value is {}'
+ .format(bandwidth, str(self._cell.get_carrier_bandwidth())))
+
+ def set_dl_channel(self, channel):
+ """Sets the downlink channel number of cell.
+
+ Args:
+ channel: downlink channel number of cell.
+ """
+ from mrtype.nr.frequency import NrArfcn
+ if self.dl_channel == channel:
+ logger.info('The dl_channel was at {}'.format(self.dl_channel))
+ return
+ self._cell.set_dl_ref_a_offset(self.band, NrArfcn(channel))
+ logger.info('The dl_channel was set to {}'.format(self.dl_channel))
+
+ def set_dl_modulation_table(self, modulation):
+ """Sets down link modulation table.
+
+ Args:
+ modulation: modulation table setting (ModulationType).
+ """
+ if not isinstance(modulation, ModulationType):
+ raise CmxError('The modulation is not the type of Modulation')
+ self._config_scheduler(dl_mcs_table=modulation.value)
+
+ def set_mimo_mode(self, mimo):
+ """Sets mimo mode for NR nsa scenario.
+
+ Args:
+ mimo: the mimo mode.
+ """
+ from rs_mrt.testenvironment.signaling.sri.rat.nr import DownlinkMimoMode
+ if not isinstance(mimo, MimoModes):
+ raise CmxError("Wrong type of mimo mode")
+
+ is_on = self._cell.is_on()
+ if is_on:
+ self._cell.stop()
+ self._cc.set_dl_mimo_mode(DownlinkMimoMode.Enum(mimo.value))
+ if is_on:
+ self._cell.start()
+
+ def set_scheduling_mode(
+ self, mcs_dl=None, mcs_ul=None, nrb_dl=None, nrb_ul=None):
+ """Sets scheduling mode.
+
+ Args:
+ mcs_dl: Downlink MCS.
+ mcs_ul: Uplink MCS.
+ nrb_dl: Number of RBs for downlink.
+ nrb_ul: Number of RBs for uplink.
+ """
+ self._config_scheduler(dl_mcs=mcs_dl, ul_mcs=mcs_ul, dl_rb_alloc=nrb_dl,
+ ul_rb_alloc=nrb_ul)
+
+ def set_ssf_config(self, ssf_config):
+ """Sets ssf subframe assignment with tdd_config.
+
+ Args:
+ ssf_config: the special subframe pattern config (from 1-9).
+ """
+ raise CmxError('the set ssf config for nr did not implemente yet')
+
+ def set_tdd_config(self, tdd_config):
+ """Sets tdd subframe assignment with tdd_config.
+
+ Args:
+ tdd_config: the subframe assignemnt config (from 0-6).
+ """
+ raise CmxError('the set tdd config for nr did not implemente yet')
+
+ def set_transmission_mode(self, transmission_mode):
+ """Sets transmission mode with schedular.
+
+ Args:
+ transmission_mode: the download link transmission mode.
+ """
+ logger.info('The set transmission mode for nr is set by mimo mode')
+
+ def set_ul_modulation_table(self, modulation):
+ """Sets down link modulation table.
+
+ Args:
+ modulation: modulation table setting (ModulationType).
+ """
+ if not isinstance(modulation, ModulationType):
+ raise CmxError('The modulation is not the type of Modulation')
+ self._config_scheduler(ul_mcs_table=modulation.value)
+
+
class CmxError(Exception):
"""Class to raise exceptions related to cmx."""
diff --git a/acts/framework/acts/controllers/rohdeschwarz_lib/cmx500_cellular_simulator.py b/acts/framework/acts/controllers/rohdeschwarz_lib/cmx500_cellular_simulator.py
index bd01759..ca281d1 100644
--- a/acts/framework/acts/controllers/rohdeschwarz_lib/cmx500_cellular_simulator.py
+++ b/acts/framework/acts/controllers/rohdeschwarz_lib/cmx500_cellular_simulator.py
@@ -42,14 +42,6 @@
LteSimulation.MimoMode.MIMO_4x4: cmx500.MimoModes.MIMO4x4,
}
-MHZ_UPPER_BOUND_TO_RB = [
- (1.5, 6),
- (4.0, 15),
- (7.5, 25),
- (12.5, 50),
- (17.5, 75),
-]
-
class CMX500CellularSimulator(cc.AbstractCellularSimulator):
""" A cellular simulator for telephony simulations based on the CMX 500
@@ -87,7 +79,8 @@
def setup_nr_nsa_scenario(self):
""" Configures the equipment for an NR non stand alone simulation. """
- raise NotImplementedError()
+ self.log.info('setup nsa scenario (start lte cell and nr cell')
+ self.cmx.switch_on_nsa_signalling()
def set_band_combination(self, bands):
""" Prepares the test equipment for the indicated band combination.
@@ -110,7 +103,7 @@
self.cmx.lte_rrc_state_change_timer = time
- def set_band(self, bts_index, band):
+ def set_band(self, bts_index, band, frequency_range=None):
""" Sets the band for the indicated base station.
Args:
@@ -118,7 +111,11 @@
band: the new band
"""
self.log.info('set band to {}'.format(band))
- self.bts[bts_index].set_band(int(band))
+ if frequency_range:
+ self.bts[bts_index].set_band(
+ int(band), frequency_range=frequency_range)
+ else:
+ self.bts[bts_index].set_band(int(band))
def get_duplex_mode(self, band):
""" Determines if the band uses FDD or TDD duplex mode
@@ -177,24 +174,18 @@
ssf_config: the new ssf config number (from 0 to 9)
"""
self.log.info('set ssf config to {}'.format(ssf_config))
- self.bts[bts_index].set_tdd_config(ssf_config)
+ self.bts[bts_index].set_ssf_config(ssf_config)
def set_bandwidth(self, bts_index, bandwidth):
""" Sets the bandwidth for the indicated base station.
Args:
bts_index: the base station number
- bandwidth: the new bandwidth in rb
+ bandwidth: the new bandwidth in MHz
"""
self.log.info('set bandwidth of bts {} to {}'.format(
bts_index, bandwidth))
- self.bts[bts_index].set_bandwidth(self._to_rb_bandwidth(bandwidth))
-
- def _to_rb_bandwidth(self, bandwidth):
- for idx in range(5):
- if bandwidth < MHZ_UPPER_BOUND_TO_RB[idx][0]:
- return LteBandwidth(MHZ_UPPER_BOUND_TO_RB[idx][1])
- return LteBandwidth(100)
+ self.bts[bts_index].set_bandwidth(int(bandwidth))
def set_downlink_channel_number(self, bts_index, channel_number):
""" Sets the downlink channel number for the indicated base station.
@@ -338,7 +329,8 @@
ue_capability_enquiry: UE capability enquiry message to be sent to
the UE before starting carrier aggregation.
"""
- raise NotImplementedError()
+ self.wait_until_communication_state()
+ self.bts[1].attach_as_secondary_cell()
def wait_until_attached(self, timeout=120):
""" Waits until the DUT is attached to the primary carrier.