blob: adf61a554be82bc464e86bb1a52069f730d73038 [file] [log] [blame]
#!/usr/bin/env python3.4
#
# Copyright 2016 - Google, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import acts.jsonrpc as jsonrpc
from acts.test_utils.wifi.wifi_test_utils import WifiEnums
ACTS_CONTROLLER_CONFIG_NAME = "AP"
ACTS_CONTROLLER_REFERENCE_NAME = "access_points"
def create(configs):
results = []
for c in configs:
addr = c[Config.key_address.value]
port = 80
if Config.key_port.value in c:
port = c[Config.key_port.value]
results.append(AP(addr, port))
return results
def destroy(objs):
return
class ServerError(Exception):
pass
class ClientError(Exception):
pass
"""
Controller for OpenWRT routers.
"""
class AP():
"""Interface to OpenWRT using the LuCI interface.
Works via JSON-RPC over HTTP. A generic access method is provided, as well
as more specialized methods.
Can also call LuCI methods generically:
ap_instance.sys.loadavg()
ap_instance.sys.dmesg()
ap_instance.fs.stat("/etc/hosts")
"""
IFACE_DEFAULTS = {"mode": "ap",
"disabled": "0",
"encryption": "psk2",
"network": "lan"}
RADIO_DEFAULTS = {"disabled": "0"}
def __init__(self, addr, port=80):
self._client = jsonrpc.JSONRPCClient("http://"
"{}:{}/cgi-bin/luci/rpc/".format(
addr, port))
self.RADIO_NAMES = []
keys = self._client.get_all("wireless").keys()
if "radio0" in keys:
self.RADIO_NAMES.append("radio0")
if "radio1" in keys:
self.RADIO_NAMES.append("radio1")
def section_id_lookup(self, cfg_name, key, value):
"""Looks up the section id of a section.
Finds the section ids of the sections that have the specified key:value
pair in them.
Args:
cfg_name: Name of the configuration file to look in.
key: Key of the pair.
value: Value of the pair.
Returns:
A list of the section ids found.
"""
section_ids = []
sections = self._client.get_all(cfg_name)
for section_id, section_cfg in sections.items():
if key in section_cfg and section_cfg[key] == value:
section_ids.append(section_id)
return section_ids
def _section_option_lookup(self, cfg_name, conditions, *target_keys):
"""Looks up values of options in sections that match the conditions.
To match a condition, a section needs to have all the key:value pairs
specified in conditions.
Args:
cfg_name: Name of the configuration file to look in.
key: Key of the pair.
value: Value of the pair.
target_key: Key of the options we want to retrieve values from.
Returns:
A list of the values found.
"""
results = []
sections = self._client.get_all(cfg_name)
for section_cfg in sections.values():
if self._match_conditions(conditions, section_cfg):
r = {}
for k in target_keys:
if k not in section_cfg:
break
r[k] = section_cfg[k]
if r:
results.append(r)
return results
@staticmethod
def _match_conditions(conds, cfg):
for cond in conds:
key, value = cond
if key not in cfg or cfg[key] != value:
return False
return True
def run(self, *cmd):
"""Executes a terminal command on the AP.
Args:
cmd: A tuple of command strings.
Returns:
The terminal output of the command.
"""
return self._client.sys("exec", *cmd)
def apply_configs(self, ap_config):
"""Applies configurations to the access point.
Reads the configuration file, adds wifi interfaces, and sets parameters
based on the configuration file.
Args:
ap_config: A dict containing the configurations for the AP.
"""
self.reset()
for k, v in ap_config.items():
if "radio" in k:
self._apply_radio_configs(k, v)
if "network" in k:
# TODO(angli) Implement this.
pass
self.apply_wifi_changes()
def _apply_radio_configs(self, radio_id, radio_config):
"""Applies conigurations on a radio of the AP.
Sets the options in the radio config.
Adds wifi-ifaces to this radio based on the configurations.
"""
for k, v in radio_config.items():
if k == "settings":
self._set_options('wireless', radio_id, v, self.RADIO_DEFAULTS)
if k == "wifi-iface":
for cfg in v:
cfg["device"] = radio_id
self._add_ifaces(v)
def reset(self):
"""Resets the AP to a clean state.
Deletes all wifi-ifaces.
Enable all the radios.
"""
sections = self._client.get_all("wireless")
to_be_deleted = []
for section_id in sections.keys():
if section_id not in self.RADIO_NAMES:
to_be_deleted.append(section_id)
self.delete_ifaces_by_ids(to_be_deleted)
for r in self.RADIO_NAMES:
self.toggle_radio_state(r, True)
def toggle_radio_state(self, radio_name, state=None):
"""Toggles the state of a radio.
If input state is None, toggle the state of the radio.
Otherwise, set the radio's state to input state.
State True is equivalent to 'disabled':'0'
Args:
radio_name: Name of the radio to change state.
state: State to set to, default is None.
Raises:
ClientError: If the radio specified does not exist on the AP.
"""
if radio_name not in self.RADIO_NAMES:
raise ClientError("Trying to change none-existent radio's state")
cur_state = self._client.get("wireless", radio_name, "disabled")
cur_state = True if cur_state == '0' else False
if state == cur_state:
return
new_state = '1' if cur_state else '0'
self._set_option("wireless", radio_name, "disabled", new_state)
self.apply_wifi_changes()
return
def set_ssid_state(self, ssid, state):
"""Sets the state of ssid (turns on/off).
Args:
ssid: The ssid whose state is being changed.
state: State to set the ssid to. Enable the ssid if True, disable
otherwise.
"""
new_state = '0' if state else '1'
section_ids = self.section_id_lookup("wireless", "ssid", ssid)
for s_id in section_ids:
self._set_option("wireless", s_id, "disabled", new_state)
def get_ssids(self, conds):
"""Gets all the ssids that match the conditions.
Params:
conds: An iterable of tuples, each representing a key:value pair
an ssid must have to be included.
Returns:
A list of ssids that contain all the specified key:value pairs.
"""
results = []
for s in self._section_option_lookup("wireless", conds, "ssid"):
results.append(s["ssid"])
return results
def get_active_ssids(self):
"""Gets the ssids that are currently not disabled.
Returns:
A list of ssids that are currently active.
"""
conds = (("disabled", "0"), )
return self.get_ssids(conds)
def get_active_ssids_info(self, *keys):
"""Gets the specified info of currently active ssids
If frequency is requested, it'll be retrieved from the radio section
associated with this ssid.
Params:
keys: Names of the fields to include in the returned info.
e.g. "frequency".
Returns:
Values of the requested info.
"""
conds = (("disabled", "0"), )
keys = [w.replace("frequency", "device") for w in keys]
if "device" not in keys:
keys.append("device")
info = self._section_option_lookup("wireless", conds, "ssid", *keys)
results = []
for i in info:
radio = i["device"]
# Skip this info the radio its ssid is on is disabled.
disabled = self._client.get("wireless", radio, "disabled")
if disabled != '0':
continue
c = int(self._client.get("wireless", radio, "channel"))
if radio == "radio0":
i["frequency"] = WifiEnums.channel_2G_to_freq[c]
elif radio == "radio1":
i["frequency"] = WifiEnums.channel_5G_to_freq[c]
results.append(i)
return results
def get_radio_option(self, key, idx=0):
"""Gets an option from the configured settings of a radio.
Params:
key: Name of the option to retrieve.
idx: Index of the radio to retrieve the option from. Default is 0.
Returns:
The value of the specified option and radio.
"""
r = None
if idx == 0:
r = "radio0"
elif idx == 1:
r = "radio1"
return self._client.get("wireless", r, key)
def apply_wifi_changes(self):
"""Applies committed wifi changes by restarting wifi.
Raises:
ServerError: Something funny happened restarting wifi on the AP.
"""
s = self._client.commit('wireless')
resp = self.run('wifi')
return resp
# if resp != '' or not s:
# raise ServerError(("Exception in refreshing wifi changes, commit"
# " status: ") + str(s) + ", wifi restart response: "
# + str(resp))
def set_wifi_channel(self, channel, device='radio0'):
self.set('wireless', device, 'channel', channel)
def _add_ifaces(self, configs):
"""Adds wifi-ifaces in the AP's wireless config based on a list of
configuration dict.
Args:
configs: A list of dicts each representing a wifi-iface config.
"""
for config in configs:
self._add_cfg_section('wireless', 'wifi-iface', config,
self.IFACE_DEFAULTS)
def _add_cfg_section(self, cfg_name, section, options, defaults=None):
"""Adds a section in a configuration file.
Args:
cfg_name: Name of the config file to add a section to.
e.g. 'wireless'.
section: Type of the secion to add. e.g. 'wifi-iface'.
options: A dict containing all key:value pairs of the options.
e.g. {'ssid': 'test', 'mode': 'ap'}
Raises:
ServerError: Uci add call returned False.
"""
section_id = self._client.add(cfg_name, section)
if not section_id:
raise ServerError(' '.join(("Failed adding", section, "in",
cfg_name)))
self._set_options(cfg_name, section_id, options, defaults)
def _set_options(self, cfg_name, section_id, options, defaults):
"""Sets options in a section.
Args:
cfg_name: Name of the config file to add a section to.
e.g. 'wireless'.
section_id: ID of the secion to add options to. e.g. 'cfg000864'.
options: A dict containing all key:value pairs of the options.
e.g. {'ssid': 'test', 'mode': 'ap'}
Raises:
ServerError: Uci set call returned False.
"""
# Fill the fields not defined in config with default values.
if defaults:
for k, v in defaults.items():
if k not in options:
options[k] = v
# Set value pairs defined in config.
for k, v in options.items():
self._set_option(cfg_name, section_id, k, v)
def _set_option(self, cfg_name, section_id, k, v):
"""Sets an option in a config section.
Args:
cfg_name: Name of the config file the section is in.
e.g. 'wireless'.
section_id: ID of the secion to set option in. e.g. 'cfg000864'.
k: Name of the option.
v: Value to set the option to.
Raises:
ServerError: If the rpc called returned False.
"""
status = self._client.set(cfg_name, section_id, k, v)
if not status:
# Delete whatever was added.
raise ServerError(' '.join(("Failed adding option", str(k), ':',
str(d), "to", str(section_id))))
def delete_ifaces_by_ids(self, ids):
"""Delete wifi-ifaces that are specified by the ids from the AP's
wireless config.
Args:
ids: A list of ids whose wifi-iface sections to be deleted.
"""
for i in ids:
self._delete_cfg_section_by_id('wireless', i)
def delete_ifaces(self, key, value):
"""Delete wifi-ifaces that contain the specified key:value pair.
Args:
key: Key of the pair.
value: Value of the pair.
"""
self._delete_cfg_sections('wireless', key, value)
def _delete_cfg_sections(self, cfg_name, key, value):
"""Deletes config sections that have the specified key:value pair.
Finds the ids of sections that match a key:value pair in the specified
config file and delete the section.
Args:
cfg_name: Name of the config file to delete sections from.
e.g. 'wireless'.
key: Name of the option to be matched.
value: Value of the option to be matched.
Raises:
ClientError: Could not find any section that has the key:value
pair.
"""
section_ids = self.section_id_lookup(cfg_name, key, value)
if not section_ids:
raise ClientError(' '.join(("Could not find any section that has ",
key, ":", value)))
for section_id in section_ids:
self._delete_cfg_section_by_id(cfg_name, section_id)
def _delete_cfg_section_by_id(self, cfg_name, section_id):
"""Deletes the config section with specified id.
Args:
cfg_name: Name of the config file to the delete a section from.
e.g. 'wireless'.
section_id: ID of the section to be deleted. e.g. 'cfg0d3777'.
Raises:
ServerError: Uci delete call returned False.
"""
self._client.delete(cfg_name, section_id)
def _get_iw_info(self):
results = []
text = self.run("iw dev").replace('\t', '')
interfaces = text.split("Interface")
for intf in interfaces:
if len(intf.strip()) < 6:
# This is a PHY mark.
continue
# This is an interface line.
intf = intf.replace(', ', '\n')
lines = intf.split('\n')
r = {}
for l in lines:
if ' ' in l:
# Only the lines with space are processed.
k, v = l.split(' ', 1)
if k == "addr":
k = "bssid"
if "wlan" in v:
k = "interface"
if k == "channel":
vs = v.split(' ', 1)
v = int(vs[0])
r["frequency"] = int(vs[1].split(' ', 1)[0][1:5])
if k[-1] == ':':
k = k[:-1]
r[k] = v
results.append(r)
return results
def get_active_bssids_info(self, radio, *args):
wlan = None
if radio == "radio0":
wlan = "wlan0"
if radio == "radio1":
wlan = "wlan1"
infos = self._get_iw_info()
bssids = []
for i in infos:
if wlan in i["interface"]:
r = {}
for k, v in i.items():
if k in args:
r[k] = v
r["bssid"] = i["bssid"].upper()
bssids.append(r)
return bssids
def toggle_bssid_state(self, bssid):
if bssid == self.get_bssid("radio0"):
self.toggle_radio_state("radio0")
return True
elif bssid == self.get_bssid("radio1"):
self.toggle_radio_state("radio1")
return True
return False
def __getattr__(self, name):
return _LibCaller(self._client, name)
class _LibCaller:
def __init__(self, client, *args):
self._client = client
self._args = args
def __getattr__(self, name):
return _LibCaller(self._client, *self._args + (name, ))
def __call__(self, *args):
return self._client.call("/".join(self._args[:-1]), self._args[-1],
*args)