blob: 430f396495b847f9db0454f22fe72093ae285e6a [file] [log] [blame]
#!/usr/bin/env python3.4
#
# Copyright 2021 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import itertools
import pyvisa
import time
from acts import logger
from acts import asserts as acts_asserts
from acts_contrib.test_utils.cellular.performance import cellular_performance_test_utils as cputils
SHORT_SLEEP = 1
VERY_SHORT_SLEEP = 0.1
MEDIUM_SLEEP = 5
SUBFRAME_DURATION = 0.001
VISA_QUERY_DELAY = 0.01
class Keysight5GTestApp(object):
"""Controller for the Keysight 5G NR Test Application.
This controller enables interacting with a Keysight Test Application
running on a remote test PC and implements many of the configuration
parameters supported in test app.
"""
VISA_LOCATION = '/opt/keysight/iolibs/libktvisa32.so'
def __init__(self, config):
self.config = config
self.test_app_settings = {
'lte_cell_count': 0,
'nr_cell_count': 0,
'lte_cell_configs': [],
'nr_cell_configs': []
}
self.log = logger.create_tagged_trace_logger("{}{}".format(
self.config['brand'], self.config['model']))
self.resource_manager = pyvisa.ResourceManager(self.VISA_LOCATION)
self.test_app = self.resource_manager.open_resource(
'TCPIP0::{}::{}::INSTR'.format(self.config['ip_address'],
self.config['hislip_interface']))
self.test_app.timeout = 200000
self.test_app.write_termination = '\n'
self.test_app.read_termination = '\n'
self.test_app.query_delay = VISA_QUERY_DELAY
self.last_loaded_scpi = None
inst_id = self.send_cmd('*IDN?', 1)
if 'Keysight' not in inst_id[0]:
self.log.error(
'Failed to connect to Keysight Test App: {}'.format(inst_id))
else:
self.log.info("Test App ID: {}".format(inst_id))
self.test_app_settings['lte_cell_count'] = self.get_cell_count('LTE')
self.test_app_settings['nr_cell_count'] = self.get_cell_count('NR5G')
def destroy(self):
self.test_app.close()
### Programming Utilities
@staticmethod
def _format_cells(cells):
"Helper function to format list of cells."
if isinstance(cells, int):
return 'CELL{}'.format(cells)
elif isinstance(cells, str):
return cells
elif isinstance(cells, list):
cell_list = [
Keysight5GTestApp._format_cells(cell) for cell in cells
]
cell_list = ','.join(cell_list)
return cell_list
@staticmethod
def _format_response(response):
"Helper function to format test app response."
def _format_response_entry(entry):
try:
formatted_entry = float(entry)
except:
formatted_entry = entry
return formatted_entry
if ',' not in response:
return _format_response_entry(response)
response = response.split(',')
formatted_response = [
_format_response_entry(entry) for entry in response
]
return formatted_response
def send_cmd(self, command, read_response=0, check_errors=1):
"Helper function to write to or query test app."
if read_response:
try:
response = Keysight5GTestApp._format_response(
self.test_app.query(command))
time.sleep(VISA_QUERY_DELAY)
if check_errors:
error = self.test_app.query('SYSTem:ERRor?')
time.sleep(VISA_QUERY_DELAY)
if 'No error' not in error:
self.log.warning("Command: {}. Error: {}".format(
command, error))
return response
except:
raise RuntimeError('Lost connection to test app.')
else:
try:
self.test_app.write(command)
time.sleep(VISA_QUERY_DELAY)
if check_errors:
error = self.test_app.query('SYSTem:ERRor?')
if 'No error' not in error:
self.log.warning("Command: {}. Error: {}".format(
command, error))
self.send_cmd('*OPC?', 1, check_errors)
time.sleep(VISA_QUERY_DELAY)
except:
raise RuntimeError('Lost connection to test app.')
return None
def check_error(self):
error = self.test_app.query('SYSTem:ERRor?')
if 'No error' not in error:
self.log.warning("Error: {}".format(error))
return True
else:
return False
def import_scpi_file(self, file_name, check_last_loaded=0):
"""Function to import SCPI file specified in file_name.
Args:
file_name: name of SCPI file to run
check_last_loaded: flag to check last loaded scpi and
only load if different.
"""
if file_name == self.last_loaded_scpi and check_last_loaded:
self.log.info('Skipping SCPI import.')
self.send_cmd("SYSTem:SCPI:IMPort '{}'".format(file_name))
while int(self.send_cmd('SYSTem:SCPI:IMPort:STATus?', 1)):
self.send_cmd('*OPC?', 1)
self.log.info('Done with SCPI import')
### Configure Cells
def assert_cell_off_decorator(func):
"Decorator function that ensures cells or off when configuring them"
def inner(self, *args, **kwargs):
if "nr" in func.__name__:
cell_type = 'NR5G'
else:
cell_type = kwargs.get('cell_type', args[0])
cell = kwargs.get('cell', args[1])
cell_state = self.get_cell_state(cell_type, cell)
if cell_state:
self.log.error('Cell must be off when calling {}'.format(
func.__name__))
return (func(self, *args, **kwargs))
return inner
### Configure Cells
def skip_config_if_none_decorator(func):
"Decorator function that skips the config function if any args are none"
def inner(self, *args, **kwargs):
none_arg = False
for arg in args:
if arg is None:
none_arg = True
for key, value in kwargs.items():
if value is None:
none_arg = True
if none_arg:
self.log.warning(
'Skipping {}. Received incomplete arguments.'.format(
func.__name__))
return
return (func(self, *args, **kwargs))
return inner
def assert_cell_off(self, cell_type, cell):
cell_state = self.get_cell_state(cell_type, cell)
if cell_state:
self.log.error('Cell must be off')
def select_cell(self, cell_type, cell):
"""Function to select active cell.
Args:
cell_type: LTE or NR5G cell
cell: cell/carrier number
"""
self.send_cmd('BSE:SELected:CELL {},{}'.format(
cell_type, Keysight5GTestApp._format_cells(cell)))
def select_display_tab(self, cell_type, cell, tab, subtab):
"""Function to select display tab.
Args:
cell_type: LTE or NR5G cell
cell: cell/carrier number
tab: tab to display for the selected cell
"""
supported_tabs = {
'PHY': [
'BWP', 'HARQ', 'PDSCH', 'PDCCH', 'PRACH', 'PUSCH', 'PUCCH',
'SRSC'
],
'BTHR': ['SUMMARY', 'OTAGRAPH', 'ULOTA', 'DLOTA'],
'CSI': []
}
if (tab not in supported_tabs) or (subtab not in supported_tabs[tab]):
return
self.select_cell(cell_type, cell)
self.send_cmd('DISPlay:{} {},{}'.format(cell_type, tab, subtab))
def get_cell_count(self, cell_type):
"""Function to get cell count
Args:
cell_type: LTE or NR5G cell
Returns:
cell_count: number of cells of cell_type supported.
"""
cell_count = int(
self.send_cmd('BSE:CONFig:{}:CELL:COUNt?'.format(cell_type), 1))
return cell_count
def get_cell_state(self, cell_type, cell):
"""Function to get cell on/off state.
Args:
cell_type: LTE or NR5G cell
cell: cell/carrier number
Returns:
cell_state: boolean. True if cell on
"""
cell_state = int(
self.send_cmd(
'BSE:CONFig:{}:{}:ACTive:STATe?'.format(
cell_type, Keysight5GTestApp._format_cells(cell)), 1))
return cell_state
def wait_for_cell_status(self,
cell_type,
cell,
states,
timeout,
polling_interval=SHORT_SLEEP):
"""Function to wait for a specific cell status
Args:
cell_type: LTE or NR5G cell
cell: cell/carrier number
states: list of acceptable states (ON, CONN, AGG, ACT, etc)
timeout: amount of time to wait for requested status
Returns:
True if one of the listed states is achieved
False if timed out waiting for acceptable state.
"""
states = [states] if isinstance(states, str) else states
for i in range(int(timeout / polling_interval)):
current_state = self.send_cmd(
'BSE:STATus:{}:{}?'.format(
cell_type, Keysight5GTestApp._format_cells(cell)), 1)
if current_state in states:
return True
time.sleep(polling_interval)
self.log.warning('Timeout waiting for {} {} {}'.format(
cell_type, Keysight5GTestApp._format_cells(cell), states))
return False
def set_cell_state(self, cell_type, cell, state):
"""Function to set cell state
Args:
cell_type: LTE or NR5G cell
cell: cell/carrier number
state: requested state
"""
self.send_cmd('BSE:CONFig:{}:{}:ACTive:STATe {}'.format(
cell_type, Keysight5GTestApp._format_cells(cell), state))
def turn_all_cells_off(self):
for cell in range(self.test_app_settings['lte_cell_count']):
self.set_cell_state('LTE', cell + 1, 0)
for cell in range(self.test_app_settings['nr_cell_count']):
self.set_cell_state('NR5G', cell + 1, 0)
def set_nr_cell_type(self, cell_type, cell, nr_cell_type):
"""Function to set cell duplex mode
Args:
cell_type: LTE or NR5G cell
cell: cell/carrier number
nr_cell_type: SA or NSA
"""
self.assert_cell_off(cell_type, cell)
self.send_cmd('BSE:CONFig:{}:{}:TYPE {}'.format(
cell_type, Keysight5GTestApp._format_cells(cell), nr_cell_type))
def set_cell_duplex_mode(self, cell_type, cell, duplex_mode):
"""Function to set cell duplex mode
Args:
cell_type: LTE or NR5G cell
cell: cell/carrier number
duplex_mode: TDD or FDD
"""
self.assert_cell_off(cell_type, cell)
self.send_cmd('BSE:CONFig:{}:{}:DUPLEX:MODe {}'.format(
cell_type, Keysight5GTestApp._format_cells(cell), duplex_mode))
def set_cell_band(self, cell_type, cell, band):
"""Function to set cell band
Args:
cell_type: LTE or NR5G cell
cell: cell/carrier number
band: LTE or NR band (e.g. 1,3,N260, N77)
"""
self.assert_cell_off(cell_type, cell)
self.send_cmd('BSE:CONFig:{}:{}:BAND {}'.format(
cell_type, Keysight5GTestApp._format_cells(cell), band))
def set_cell_channel(self, cell_type, cell, channel, arfcn=1):
"""Function to set cell frequency/channel
Args:
cell_type: LTE or NR5G cell
cell: cell/carrier number
channel: requested channel (ARFCN) or frequency in MHz
"""
self.assert_cell_off(cell_type, cell)
if cell_type == 'NR5G' and isinstance(
channel, str) and channel.lower() in ['low', 'mid', 'high']:
self.send_cmd('BSE:CONFig:{}:{}:TESTChanLoc {}'.format(
cell_type, Keysight5GTestApp._format_cells(cell),
channel.upper()))
elif arfcn == 1:
self.send_cmd('BSE:CONFig:{}:{}:DL:CHANnel {}'.format(
cell_type, Keysight5GTestApp._format_cells(cell), channel))
else:
self.send_cmd('BSE:CONFig:{}:{}:DL:FREQuency:MAIN {}'.format(
cell_type, Keysight5GTestApp._format_cells(cell),
channel * 1e6))
def toggle_contiguous_nr_channels(self, force_contiguous):
self.assert_cell_off('NR5G', 1)
self.log.warning(
'Forcing contiguous NR channels overrides channel config.')
self.send_cmd('BSE:CONFig:NR5G:PHY:OPTimize:CONTiguous:STATe 0')
if force_contiguous:
self.send_cmd('BSE:CONFig:NR5G:PHY:OPTimize:CONTiguous:STATe 1')
def configure_contiguous_nr_channels(self, cell, band, channel):
"""Function to set cell frequency/channel
Args:
cell: cell/carrier number
band: band to set channel in (only required for preset)
channel_preset: frequency in MHz or preset in [low, mid, or high]
"""
self.assert_cell_off('NR5G', cell)
self.send_cmd('BSE:CONFig:NR5G:PHY:OPTimize:CONTiguous:STATe 0')
if channel.lower() in ['low', 'mid', 'high']:
pcc_arfcn = cputils.PCC_PRESET_MAPPING[band][channel]
self.set_cell_channel('NR5G', cell, pcc_arfcn, 1)
else:
self.set_cell_channel('NR5G', cell, channel, 0)
self.send_cmd('BSE:CONFig:NR5G:PHY:OPTimize:CONTiguous:STATe 1')
def configure_noncontiguous_nr_channels(self, cells, band, channels):
"""Function to set cell frequency/channel
Args:
cell: cell/carrier number
band: band number
channel: frequency in MHz
"""
for cell in cells:
self.assert_cell_off('NR5G', cell)
self.send_cmd('BSE:CONFig:NR5G:PHY:OPTimize:CONTiguous:STATe 0')
for cell, channel in zip(cells, channels):
self.set_cell_channel('NR5G', cell, channel, arfcn=0)
def set_cell_bandwidth(self, cell_type, cell, bandwidth):
"""Function to set cell bandwidth
Args:
cell_type: LTE or NR5G cell
cell: cell/carrier number
bandwidth: requested bandwidth
"""
self.assert_cell_off(cell_type, cell)
self.send_cmd('BSE:CONFig:{}:{}:DL:BW {}'.format(
cell_type, Keysight5GTestApp._format_cells(cell), bandwidth))
def set_nr_subcarrier_spacing(self, cell, subcarrier_spacing):
"""Function to set cell bandwidth
Args:
cell: cell/carrier number
subcarrier_spacing: requested SCS
"""
self.assert_cell_off('NR5G', cell)
self.send_cmd('BSE:CONFig:NR5G:{}:SUBCarrier:SPACing:COMMon {}'.format(
Keysight5GTestApp._format_cells(cell), subcarrier_spacing))
def set_cell_mimo_config(self, cell_type, cell, link, mimo_config):
"""Function to set cell mimo config.
Args:
cell_type: LTE or NR5G cell
cell: cell/carrier number
link: uplink or downlink
mimo_config: requested mimo configuration (refer to SCPI
documentation for allowed range of values)
"""
self.assert_cell_off(cell_type, cell)
if cell_type == 'NR5G':
self.send_cmd('BSE:CONFig:{}:{}:{}:MIMO:CONFig {}'.format(
cell_type, Keysight5GTestApp._format_cells(cell), link,
mimo_config))
else:
self.send_cmd('BSE:CONFig:{}:{}:PHY:DL:ANTenna:CONFig {}'.format(
cell_type, Keysight5GTestApp._format_cells(cell), mimo_config))
def set_lte_cell_transmission_mode(self, cell, transmission_mode):
"""Function to set LTE cell transmission mode.
Args:
cell: cell/carrier number
transmission_mode: one of TM1, TM2, TM3, TM4 ...
"""
self.assert_cell_off('LTE', cell)
self.send_cmd('BSE:CONFig:LTE:{}:RRC:TMODe {}'.format(
Keysight5GTestApp._format_cells(cell), transmission_mode))
@skip_config_if_none_decorator
def set_lte_cell_num_layers(self, cell, num_layers):
"""Function to set LTE cell number of layers."""
self.assert_cell_off('LTE', cell)
self.send_cmd('BSE:CONFig:LTE:{}:PHY:DL:NUMLayers {}'.format(
Keysight5GTestApp._format_cells(cell), num_layers))
@skip_config_if_none_decorator
def set_lte_cell_num_codewords(self, cell, num_codewords):
"""Function to set LTE number of codewords."""
self.assert_cell_off('LTE', cell)
self.send_cmd('BSE:CONFig:LTE:{}:PHY:DL:NUMCodewords {}'.format(
Keysight5GTestApp._format_cells(cell), num_codewords))
@skip_config_if_none_decorator
def set_lte_cell_dl_subframe_allocation(self,
cell,
dl_subframe_allocation=[1] * 10):
"""Function to set LTE downlink subrframe allocation.
Args:
cell: cell/carrier number
dl_subframe_allocation: string or bool list indicating allocation
(1 enabled, 0 disabled)
"""
if isinstance(dl_subframe_allocation, list):
dl_subframe_allocation = str(dl_subframe_allocation)[1:-1].replace(
'\'', '')
self.assert_cell_off('LTE', cell)
self.send_cmd(
'BSE:CONFig:LTE:{}:PHY:DL:SFRame:ALLocation:ALL {}'.format(
Keysight5GTestApp._format_cells(cell), dl_subframe_allocation))
def set_cell_dl_power(self, cell_type, cell, power, full_bw):
"""Function to set cell power
Args:
cell_type: LTE or NR5G cell
cell: cell/carrier number
power: requested power
full_bw: boolean controlling if requested power is per channel
or subcarrier
"""
if full_bw:
self.send_cmd('BSE:CONFig:{}:{}:DL:POWer:CHANnel {}'.format(
cell_type, Keysight5GTestApp._format_cells(cell), power))
else:
self.send_cmd('BSE:CONFig:{}:{}:DL:POWer:EPRE {}'.format(
cell_type, Keysight5GTestApp._format_cells(cell), power))
time.sleep(VERY_SHORT_SLEEP)
self.send_cmd('BSE:CONFig:{}:APPLY'.format(cell_type))
def set_cell_ul_power_control(self, cell_type, cell, mode, target_power=0):
"""Function configure UL power control
Args:
cell_type: LTE or NR5G cell
cell: cell/carrier number
mode: UL power control mode. One of [TARget | MANual | UP | DOWN | DISabled]
target_power: target power for PUSCH
"""
self.send_cmd('BSE:CONFig:{}:{}:UL:PUSCh:CLPControl:MODE {}'.format(
cell_type, Keysight5GTestApp._format_cells(cell), mode))
if cell_type == 'NR5G' and mode == 'TARget':
self.send_cmd(
'BSE:CONFig:{}:{}:UL:PUSCh:CLPControl:TARGet:POWer {}'.format(
cell_type, Keysight5GTestApp._format_cells(cell),
target_power))
elif cell_type == 'LTE' and mode == 'TARget':
self.send_cmd(
'BSE:CONFig:{}:{}:UL:CLPControl:TARGet:POWer:PUSCH {}'.format(
cell_type, Keysight5GTestApp._format_cells(cell),
target_power))
self.send_cmd('BSE:CONFig:{}:APPLY'.format(cell_type))
def set_cell_input_power(self, cell_type, cell, power):
"""Function to set cell input power
Args:
cell_type: LTE or NR5G cell
cell: cell/carrier number
power: expected input power
"""
if power == "AUTO" and cell_type == "LTE":
self.send_cmd('BSE:CONFig:{}:{}:CONTrol:POWer:AUTO ON'.format(
cell_type, Keysight5GTestApp._format_cells(cell)))
elif cell_type == "LTE":
self.send_cmd('BSE:CONFig:{}:{}:CONTrol:POWer:AUTO OFF'.format(
cell_type, Keysight5GTestApp._format_cells(cell)))
self.send_cmd('BSE:CONFig:{}:{}:MANual:POWer {}'.format(
cell_type, Keysight5GTestApp._format_cells(cell), power))
if power == "AUTO" and cell_type == "NR5G":
self.send_cmd('BSE:CONFig:{}:UL:EIP:AUTO ON'.format(cell_type))
elif cell_type == "NR5G":
self.send_cmd('BSE:CONFig:{}:UL:EIP:AUTO OFF'.format(cell_type))
self.send_cmd('BSE:CONFig:{}:{}:MANual:POWer {}'.format(
cell_type, Keysight5GTestApp._format_cells(cell), power))
self.send_cmd('BSE:CONFig:{}:APPLY'.format(cell_type))
def set_cell_duplex_mode(self, cell_type, cell, duplex_mode):
"""Function to set cell power
Args:
cell_type: LTE or NR5G cell
cell: cell/carrier number
duplex mode: TDD or FDD
"""
self.assert_cell_off(cell_type, cell)
self.send_cmd('BSE:CONFig:{}:{}:DUPLEX:MODe {}'.format(
cell_type, Keysight5GTestApp._format_cells(cell), duplex_mode))
def set_dl_carriers(self, cells):
"""Function to set aggregated DL NR5G carriers
Args:
cells: list of DL cells/carriers to aggregate with LTE (e.g. [1,2])
"""
self.send_cmd('BSE:CONFig:NR5G:CELL1:CAGGregation:NRCC:DL {}'.format(
Keysight5GTestApp._format_cells(cells)))
def set_ul_carriers(self, cells):
"""Function to set aggregated UL NR5G carriers
Args:
cells: list of DL cells/carriers to aggregate with LTE (e.g. [1,2])
"""
self.send_cmd('BSE:CONFig:NR5G:CELL1:CAGGregation:NRCC:UL {}'.format(
Keysight5GTestApp._format_cells(cells)))
def set_nr_cell_schedule_scenario(self, cell, scenario):
"""Function to set NR schedule to one of predefince quick configs.
Args:
cell: cell number to address. schedule will apply to all cells
scenario: one of the predefined test app schedlue quick configs
(e.g. FULL_TPUT, BASIC).
"""
self.assert_cell_off('NR5G', cell)
self.send_cmd(
'BSE:CONFig:NR5G:{}:SCHeduling:QCONFig:SCENario {}'.format(
Keysight5GTestApp._format_cells(cell), scenario))
self.send_cmd('BSE:CONFig:NR5G:SCHeduling:QCONFig:APPLy:ALL')
def set_nr_schedule_slot_ratio(self, cell, slot_ratio):
"""Function to set NR schedule to one of predefince quick configs.
Args:
cell: cell number to address. schedule will apply to all cells
slot_ratio: downlink slot ratio
"""
self.assert_cell_off('NR5G', cell)
self.send_cmd('BSE:CONFig:NR5G:{}:SCHeduling:QCONFig:RATIo {}'.format(
Keysight5GTestApp._format_cells(cell), slot_ratio))
self.send_cmd('BSE:CONFig:NR5G:SCHeduling:QCONFig:APPLy:ALL')
def set_nr_schedule_tdd_pattern(self, cell, tdd_pattern):
"""Function to set NR schedule to one of predefince quick configs.
Args:
cell: cell number to address. schedule will apply to all cells
tdd_pattern: 0 for disabled, 1/enabled, or current
"""
tdd_pattern_mapping = {
0: 'DISabled',
1: 'ENABled',
'current': 'CURRent'
}
self.assert_cell_off('NR5G', cell)
self.send_cmd(
'BSE:CONFig:NR5G:{}:SCHeduling:QCONFig:TDD:PATTern {}'.format(
Keysight5GTestApp._format_cells(cell),
tdd_pattern_mapping[tdd_pattern]))
self.send_cmd('BSE:CONFig:NR5G:SCHeduling:QCONFig:APPLy:ALL')
def set_nr_cell_mcs(self, cell, dl_mcs, ul_mcs):
"""Function to set NR cell DL & UL MCS
Args:
cell: cell number to address. MCS will apply to all cells
dl_mcs: mcs index to use on DL
ul_mcs: mcs index to use on UL
"""
self.assert_cell_off('NR5G', cell)
frame_config_count = 5
slot_config_count = 8
if isinstance(dl_mcs, dict):
self.configure_nr_link_adaptation(cell, link_config=dl_mcs)
else:
for frame, slot in itertools.product(range(frame_config_count),
range(slot_config_count)):
self.send_cmd(
'BSE:CONFig:NR5G:{}:SCHeduling:BWP0:FC{}:SC{}:DL:RRESource:APOLicy FIXed'
.format(Keysight5GTestApp._format_cells(cell), frame,
slot))
self.send_cmd(
'BSE:CONFig:NR5G:SCHeduling:SETParameter "CELLALL:BWPALL:FCALL:SCALL", "DL:IMCS", "{}"'
.format(dl_mcs))
self.send_cmd(
'BSE:CONFig:NR5G:SCHeduling:SETParameter "CELLALL:BWPALL:FCALL:SCALL", "UL:IMCS", "{}"'
.format(ul_mcs))
def configure_nr_link_adaptation(self, cell, link_config):
frame_config_count = 5
slot_config_count = 8
for frame, slot in itertools.product(range(frame_config_count),
range(slot_config_count)):
self.send_cmd(
'BSE:CONFig:NR5G:{}:SCHeduling:BWP0:FC{}:SC{}:DL:RRESource:APOLicy {}'
.format(Keysight5GTestApp._format_cells(cell), frame, slot,
link_config['link_policy']))
self.send_cmd(
'BSE:CONFig:NR5G:{}:SCHeduling:BWP0:FC{}:SC{}:DL:IMCS {}'.
format(Keysight5GTestApp._format_cells(cell), frame, slot,
link_config['initial_mcs']))
self.send_cmd(
'BSE:CONFig:NR5G:{}:SCHeduling:BWP0:FC{}:SC{}:DL:MAXimum:IMCS {}'
.format(Keysight5GTestApp._format_cells(cell), frame, slot,
link_config['maximum_mcs']))
self.send_cmd(
'BSE:CONFig:NR5G:{}:MAC:LADaptation:NTX:BEValuation {}'.format(
Keysight5GTestApp._format_cells(cell),
link_config.get('adaptation_interval', 10000)))
self.send_cmd(
'BSE:CONFig:NR5G:{}:MAC:LADaptation:TARGet:NACK:COUNt {}'.format(
Keysight5GTestApp._format_cells(cell),
link_config.get('target_nack_count', 1000)))
self.send_cmd(
'BSE:CONFig:NR5G:{}:MAC:LADaptation:TARGet:NACK:MARGin {}'.format(
Keysight5GTestApp._format_cells(cell),
link_config.get('target_nack_margin', 100)))
self.send_cmd(
'BSE:CONFig:NR5G:{}:MAC:DL:LADaptation:MCS:INCRement {}'.format(
Keysight5GTestApp._format_cells(cell),
link_config.get('mcs_step', 1)))
def set_lte_cell_mcs(
self,
cell,
dl_mcs_table,
dl_mcs,
ul_mcs_table,
ul_mcs,
):
"""Function to set NR cell DL & UL MCS
Args:
cell: cell number to address. MCS will apply to all cells
dl_mcs: mcs index to use on DL
ul_mcs: mcs index to use on UL
"""
if dl_mcs_table == 'QAM256':
dl_mcs_table_formatted = 'ASUBframe'
elif dl_mcs_table == 'QAM1024':
dl_mcs_table_formatted = 'ASUB1024'
elif dl_mcs_table == 'QAM64':
dl_mcs_table_formatted = 'DISabled'
self.assert_cell_off('LTE', cell)
self.send_cmd(
'BSE:CONFig:LTE:SCHeduling:SETParameter "CELLALL", "DL:MCS:TABle", "{}"'
.format(dl_mcs_table_formatted))
self.configure_lte_periodic_csi_reporting(cell, 1)
if dl_mcs == 'WCQI':
self.send_cmd('BSE:CONFig:LTE:{}:PHY:DL:IMCS:MODE WCQI'.format(
Keysight5GTestApp._format_cells(cell)))
else:
self.send_cmd('BSE:CONFig:LTE:{}:PHY:DL:IMCS:MODE EXPLicit'.format(
Keysight5GTestApp._format_cells(cell)))
self.send_cmd(
'BSE:CONFig:LTE:SCHeduling:SETParameter "CELLALL:SFALL:CWALL", "DL:IMCS", "{}"'
.format(dl_mcs))
self.send_cmd(
'BSE:CONFig:LTE:SCHeduling:SETParameter "CELLALL", "UL:MCS:TABle", "{}"'
.format(ul_mcs_table))
self.send_cmd(
'BSE:CONFig:LTE:SCHeduling:SETParameter "CELLALL:SFALL", "UL:IMCS", "{}"'
.format(ul_mcs))
def configure_lte_periodic_csi_reporting(self, cell, enable):
"""Function to enable/disable LTE CSI reporting."""
self.send_cmd('BSE:CONFig:LTE:{}:PHY:CSI:PERiodic:STATe {}'.format(
Keysight5GTestApp._format_cells(cell), enable))
def set_lte_control_region_size(self, cell, num_symbols):
self.assert_cell_off('LTE', cell)
self.send_cmd('BSE:CONFig:LTE:{}:PHY:PCFich:CFI {}'.format(
Keysight5GTestApp._format_cells(cell), num_symbols))
def set_lte_ul_mac_padding(self, mac_padding):
self.assert_cell_off('LTE', 'CELL1')
padding_str = 'TRUE' if mac_padding else 'FALSE'
self.send_cmd(
'BSE:CONFig:LTE:SCHeduling:SETParameter "CELLALL", "UL:MAC:PADDING", "{}"'
.format(padding_str))
def set_nr_ul_dft_precoding(self, cell, precoding):
"""Function to configure DFT-precoding on uplink.
Args:
cell: cell number to address. MCS will apply to all cells
precoding: 0/1 to disable/enable precoding
"""
self.assert_cell_off('NR5G', cell)
precoding_str = "ENABled" if precoding else "DISabled"
self.send_cmd(
'BSE:CONFig:NR5G:{}:SCHeduling:QCONFig:UL:TRANsform:PRECoding {}'.
format(Keysight5GTestApp._format_cells(cell), precoding_str))
precoding_str = "True" if precoding else "False"
self.send_cmd(
'BSE:CONFig:NR5G:SCHeduling:SETParameter "CELLALL:BWPALL", "UL:TPEnabled", "{}"'
.format(precoding_str))
def configure_ul_clpc(self, channel, mode, target):
"""Function to configure UL power control on all cells/carriers
Args:
channel: physical channel must be PUSCh or PUCCh
mode: mode supported by test app (all up/down bits, target, etc)
target: target power if mode is set to target
"""
self.send_cmd('BSE:CONFig:NR5G:UL:{}:CLPControl:MODE:ALL {}'.format(
channel, mode))
if "tar" in mode.lower():
self.send_cmd(
'BSE:CONFig:NR5G:UL:{}:CLPControl:TARGet:POWer:ALL {}'.format(
channel, target))
def configure_channel_emulator(self, cell_type, cell, fading_model):
if cell_type == 'LTE':
self.send_cmd('BSE:CONFig:{}:{}:CMODel {}'.format(
cell_type, Keysight5GTestApp._format_cells(cell),
fading_model['channel_model']))
self.send_cmd('BSE:CONFig:{}:{}:CMATrix {}'.format(
cell_type, Keysight5GTestApp._format_cells(cell),
fading_model['correlation_matrix']))
self.send_cmd('BSE:CONFig:{}:{}:MDSHift {}'.format(
cell_type, Keysight5GTestApp._format_cells(cell),
fading_model['max_doppler']))
elif cell_type == 'NR5G':
#TODO: check that this is FR1
self.send_cmd('BSE:CONFig:{}:{}:FRANge1:CMODel {}'.format(
cell_type, Keysight5GTestApp._format_cells(cell),
fading_model['channel_model']))
self.send_cmd('BSE:CONFig:{}:{}:FRANge1:CMATrix {}'.format(
cell_type, Keysight5GTestApp._format_cells(cell),
fading_model['correlation_matrix']))
self.send_cmd('BSE:CONFig:{}:{}:FRANge1:MDSHift {}'.format(
cell_type, Keysight5GTestApp._format_cells(cell),
fading_model['max_doppler']))
def set_channel_emulator_state(self, state):
self.send_cmd('BSE:CONFig:FADing:ENABle {}'.format(int(state)))
def apply_lte_carrier_agg(self, cells):
"""Function to start LTE carrier aggregation on already configured cells"""
if self.wait_for_cell_status('LTE', 'CELL1', 'CONN', 60):
self.send_cmd(
'BSE:CONFig:LTE:CELL1:CAGGregation:AGGRegate:SCC {}'.format(
Keysight5GTestApp._format_cells(cells)))
self.send_cmd(
'BSE:CONFig:LTE:CELL1:CAGGregation:ACTivate:SCC {}'.format(
Keysight5GTestApp._format_cells(cells)))
def apply_carrier_agg(self):
"""Function to start carrier aggregation on already configured cells"""
if not self.wait_for_cell_status('LTE', 'CELL1', 'CONN', 60):
raise RuntimeError('LTE must be connected to start aggregation.')
# Continue if LTE connected
self.send_cmd('BSE:CONFig:LTE:CELL1:CAGGregation:AGGRegate:NRCC:APPly',
0, 0)
time.sleep(MEDIUM_SLEEP)
error = self.check_error()
if error:
acts_asserts.fail('Failed to apply NR carrier aggregation.')
def get_ip_throughput(self, cell_type):
"""Function to query IP layer throughput on LTE or NR
Args:
cell_type: LTE or NR5G
Returns:
dict containing DL and UL IP-layer throughput
"""
#Tester reply format
#{ report-count, total-bytes, current transfer-rate, average transfer-rate, peak transfer-rate }
dl_tput = self.send_cmd(
'BSE:MEASure:{}:BTHRoughput:DL:THRoughput:IP?'.format(cell_type),
1)
ul_tput = self.send_cmd(
'BSE:MEASure:{}:BTHRoughput:UL:THRoughput:IP?'.format(cell_type),
1)
return {'dl_tput': dl_tput, 'ul_tput': ul_tput}
def _get_throughput(self, cell_type, link, cell):
"""Helper function to get PHY layer throughput on single cell"""
if cell_type == 'LTE':
tput_response = self.send_cmd(
'BSE:MEASure:LTE:BTHRoughput:{}:THRoughput:OTA:{}?'.format(
link, Keysight5GTestApp._format_cells(cell)), 1)
elif cell_type == 'NR5G':
# Tester reply format
#progress-count, ack-count, ack-ratio, nack-count, nack-ratio, statdtx-count, statdtx-ratio, pdschBlerCount, pdschBlerRatio, pdschTputRatio.
tput_response = self.send_cmd(
'BSE:MEASure:NR5G:BTHRoughput:{}:THRoughput:OTA:{}?'.format(
link, Keysight5GTestApp._format_cells(cell)), 1)
tput_result = {
'frame_count': tput_response[0] / 1e6,
'current_tput': tput_response[1] / 1e6,
'min_tput': tput_response[2] / 1e6,
'max_tput': tput_response[3] / 1e6,
'average_tput': tput_response[4] / 1e6,
'theoretical_tput': tput_response[5] / 1e6,
}
return tput_result
def get_throughput(self, cell_type, dl_cells, ul_cells):
"""Function to get PHY layer throughput on on or more cells
This function returns the throughput data on the requested cells
during the last BLER test run, i.e., throughpt data must be fetch at
the end/after a BLE test run on the Keysight Test App.
Args:
cell_type: LTE or NR5G
cells: list of cells to query for throughput data
Returns:
tput_result: dict containing all throughput statistics in Mbps
"""
if not isinstance(dl_cells, list):
dl_cells = [dl_cells]
if not isinstance(ul_cells, list):
ul_cells = [ul_cells]
tput_result = collections.OrderedDict()
for cell in dl_cells:
tput_result.setdefault(cell, {})
tput_result[cell]['DL'] = self._get_throughput(
cell_type, 'DL', cell)
frame_count = tput_result[cell]['DL']['frame_count']
for cell in ul_cells:
tput_result.setdefault(cell, {})
tput_result[cell]['UL'] = self._get_throughput(
cell_type, 'UL', cell)
agg_tput = {
'DL': {
'frame_count': frame_count,
'current_tput': 0,
'min_tput': 0,
'max_tput': 0,
'average_tput': 0,
'theoretical_tput': 0
},
'UL': {
'frame_count': frame_count,
'current_tput': 0,
'min_tput': 0,
'max_tput': 0,
'average_tput': 0,
'theoretical_tput': 0
}
}
for cell, cell_tput in tput_result.items():
for link, link_tput in cell_tput.items():
for key, value in link_tput.items():
if 'tput' in key:
agg_tput[link][key] = agg_tput[link][key] + value
tput_result['total'] = agg_tput
return tput_result
def _clear_bler_measurement(self, cell_type):
"""Helper function to clear BLER results."""
if cell_type == 'LTE':
self.send_cmd('BSE:MEASure:LTE:CELL1:BTHRoughput:CLEar')
elif cell_type == 'NR5G':
self.send_cmd('BSE:MEASure:NR5G:BTHRoughput:CLEar')
def _configure_bler_measurement(self, cell_type, continuous, length):
"""Helper function to configure BLER results."""
if continuous:
if cell_type == 'LTE':
self.send_cmd('BSE:MEASure:LTE:CELL1:BTHRoughput:CONTinuous 1')
elif cell_type == 'NR5G':
self.send_cmd('BSE:MEASure:NR5G:BTHRoughput:CONTinuous 1')
elif length > 1:
if cell_type == 'LTE':
self.send_cmd(
'BSE:MEASure:LTE:CELL1:BTHRoughput:LENGth {}'.format(
length))
self.send_cmd('BSE:MEASure:LTE:CELL1:BTHRoughput:CONTinuous 0')
elif cell_type == 'NR5G':
self.send_cmd(
'BSE:MEASure:NR5G:BTHRoughput:LENGth {}'.format(length))
self.send_cmd('BSE:MEASure:NR5G:BTHRoughput:CONTinuous 0')
def _set_bler_measurement_state(self, cell_type, state):
"""Helper function to start or stop BLER measurement."""
if cell_type == 'LTE':
self.send_cmd(
'BSE:MEASure:LTE:CELL1:BTHRoughput:STATe {}'.format(state))
elif cell_type == 'NR5G':
self.send_cmd(
'BSE:MEASure:NR5G:BTHRoughput:STATe {}'.format(state))
def start_bler_measurement(self, cell_type, cells, length):
"""Function to kick off a BLER measurement
Args:
cell_type: LTE or NR5G
length: integer length of BLER measurements in subframes
"""
self._clear_bler_measurement(cell_type)
self._set_bler_measurement_state(cell_type, 0)
self._configure_bler_measurement(cell_type, 0, length)
self._set_bler_measurement_state(cell_type, 1)
time.sleep(0.1)
#bler_check = self.get_bler_result(cell_type, cells, length, 0)
#if bler_check['total']['DL']['frame_count'] == 0:
# self.log.warning('BLER measurement did not start. Retrying')
# self.start_bler_measurement(cell_type, cells, length)
def _get_bler(self, cell_type, link, cell):
"""Helper function to get single-cell BLER measurement results."""
if cell_type == 'LTE':
bler_response = self.send_cmd(
'BSE:MEASure:LTE:BTHRoughput:{}:BLER:{}?'.format(
link, Keysight5GTestApp._format_cells(cell)), 1)
bler_items = [
'frame_count', 'ack_count', 'ack_ratio', 'nack_count',
'nack_ratio', 'statDtx_count', 'statDtx_ratio',
'nackStatDtx_count', 'nackStatDtx_ratio', 'pdschBler_count',
'pdschBler_ratio', 'any_count', 'any_ratio'
]
bler_result = {
bler_items[x]: bler_response[x]
for x in range(len(bler_response))
}
elif cell_type == 'NR5G':
bler_response = self.send_cmd(
'BSE:MEASure:NR5G:BTHRoughput:{}:BLER:{}?'.format(
link, Keysight5GTestApp._format_cells(cell)), 1)
bler_items = [
'frame_count', 'ack_count', 'ack_ratio', 'nack_count',
'nack_ratio', 'statDtx_count', 'statDtx_ratio',
'pdschBler_count', 'pdschBler_ratio', 'pdschTputRatio'
]
bler_result = {
bler_items[x]: bler_response[x]
for x in range(len(bler_response))
}
return bler_result
def get_bler_result(self,
cell_type,
dl_cells,
ul_cells,
length,
wait_for_length=1,
polling_interval=SHORT_SLEEP):
"""Function to get BLER results.
This function gets the BLER measurements results on one or more
requested cells. The function can either return BLER statistics
immediately or wait until a certain number of subframes have been
counted (e.g. if the BLER measurement is done)
Args:
cell_type: LTE or NR5G
cells: list of cells for which to get BLER
length: number of subframes to wait for (typically set to the
configured length of the BLER measurements)
wait_for_length: boolean to block/wait till length subframes have
been counted.
Returns:
bler_result: dict containing per-cell and aggregate BLER results
"""
if not isinstance(dl_cells, list):
dl_cells = [dl_cells]
if not isinstance(ul_cells, list):
ul_cells = [ul_cells]
while wait_for_length:
dl_bler = self._get_bler(cell_type, 'DL', dl_cells[0])
if dl_bler['frame_count'] < length:
time.sleep(polling_interval)
else:
break
bler_result = collections.OrderedDict()
for cell in dl_cells:
bler_result.setdefault(cell, {})
bler_result[cell]['DL'] = self._get_bler(cell_type, 'DL', cell)
for cell in ul_cells:
bler_result.setdefault(cell, {})
bler_result[cell]['UL'] = self._get_bler(cell_type, 'UL', cell)
agg_bler = {
'DL': {
'frame_count': length,
'ack_count': 0,
'ack_ratio': 0,
'nack_count': 0,
'nack_ratio': 0
},
'UL': {
'frame_count': length,
'ack_count': 0,
'ack_ratio': 0,
'nack_count': 0,
'nack_ratio': 0
}
}
for cell, cell_bler in bler_result.items():
for link, link_bler in cell_bler.items():
for key, value in link_bler.items():
if 'ack_count' in key:
agg_bler[link][key] = agg_bler[link][key] + value
dl_ack_nack = agg_bler['DL']['ack_count'] + agg_bler['DL']['nack_count']
ul_ack_nack = agg_bler['UL']['ack_count'] + agg_bler['UL']['nack_count']
try:
agg_bler['DL'][
'ack_ratio'] = agg_bler['DL']['ack_count'] / dl_ack_nack
agg_bler['DL'][
'nack_ratio'] = agg_bler['DL']['nack_count'] / dl_ack_nack
agg_bler['UL'][
'ack_ratio'] = agg_bler['UL']['ack_count'] / ul_ack_nack
agg_bler['UL'][
'nack_ratio'] = agg_bler['UL']['nack_count'] / ul_ack_nack
except:
self.log.debug(bler_result)
agg_bler['DL']['ack_ratio'] = 0
agg_bler['DL']['nack_ratio'] = 1
agg_bler['UL']['ack_ratio'] = 0
agg_bler['UL']['nack_ratio'] = 1
bler_result['total'] = agg_bler
return bler_result
def measure_bler(self, cell_type, cells, length):
"""Function to start and wait for BLER results.
This function starts a BLER test on a number of cells and waits for the
test to complete before returning the BLER measurements.
Args:
cell_type: LTE or NR5G
cells: list of cells for which to get BLER
length: number of subframes to wait for (typically set to the
configured length of the BLER measurements)
Returns:
bler_result: dict containing per-cell and aggregate BLER results
"""
self.start_bler_measurement(cell_type, cells, length)
time.sleep(length * SUBFRAME_DURATION)
bler_result = self.get_bler_result(cell_type, cells, length, 1)
return bler_result
def start_nr_rsrp_measurement(self, cells, length):
"""Function to start 5G NR RSRP measurement.
Args:
cells: list of NR cells to get RSRP on
length: length of RSRP measurement in milliseconds
Returns:
rsrp_result: dict containing per-cell and aggregate BLER results
"""
for cell in cells:
self.send_cmd('BSE:MEASure:NR5G:{}:L1:RSRPower:STOP'.format(
Keysight5GTestApp._format_cells(cell)))
for cell in cells:
self.send_cmd('BSE:MEASure:NR5G:{}:L1:RSRPower:LENGth {}'.format(
Keysight5GTestApp._format_cells(cell), length))
for cell in cells:
self.send_cmd('BSE:MEASure:NR5G:{}:L1:RSRPower:STARt'.format(
Keysight5GTestApp._format_cells(cell)))
def get_nr_rsrp_measurement_state(self, cells):
for cell in cells:
self.log.info(
self.send_cmd(
'BSE:MEASure:NR5G:{}:L1:RSRPower:STATe?'.format(
Keysight5GTestApp._format_cells(cell)), 1))
def get_nr_rsrp_measurement_results(self, cells):
for cell in cells:
self.log.info(
self.send_cmd(
'BSE:MEASure:NR5G:{}:L1:RSRPower:REPorts:JSON?'.format(
Keysight5GTestApp._format_cells(cell)), 1))
def release_rrc_connection(self, cell_type, cell):
if cell_type == 'LTE':
self.send_cmd('BSE:FUNCtion:LTE:{}:RELease:SEND'.format(
Keysight5GTestApp._format_cells(cell)))
elif cell_type == 'NR5G':
self.send_cmd(
'BSE:CONFig:NR5G:{}:RCONtrol:RRC:STARt RRELease'.format(
Keysight5GTestApp._format_cells(cell)))
def send_rrc_paging(self, cell_type, cell):
if cell_type == 'LTE':
self.send_cmd('BSE:FUNCtion:LTE:{}:PAGing:PAGE'.format(
Keysight5GTestApp._format_cells(cell)))
elif cell_type == 'NR5G':
self.send_cmd(
'BSE:CONFig:NR5G:{}:RCONtrol:RRC:STARt PAGing'.format(
Keysight5GTestApp._format_cells(cell)))
def enable_rach(self, cell_type, cell, enabled):
self.send_cmd('BSE:CONFig:{}:{}:MAC:RACH:IGNore {}'.format(
cell_type, Keysight5GTestApp._format_cells(cell), int(enabled)))
self.send_cmd('BSE:CONFig:{}:APPLY'.format(cell_type))
def enable_preamble_report(self, cell_type, enable):
self.send_cmd('BSE:CONFig:{}:PReamble:REPort:STATe {}'.format(
cell_type, int(enable)))
def fetch_preamble_report(self, cell_type, cell, num_reports=10):
report = self.send_cmd(
'BSE:CONFig:{}:{}:PReamble:REPort:FETCh:JSON? {}'.format(
cell_type, Keysight5GTestApp._format_cells(cell), num_reports),
read_response=1)
self.send_cmd('BSE:CONFig:{}:PReamble:REPort:CLEAr'.format(cell_type))
if 'No Data' in report:
report = None
return report