blob: 0b273a62903e11bafe43338ebae25904cff4904c [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright 2016 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from future.moves.urllib.request import urlopen
import re
from acts.controllers.relay_lib.errors import RelayDeviceConnectionError
from acts.controllers.relay_lib.helpers import validate_key
from acts.controllers.relay_lib.relay import RelayState
from acts.controllers.relay_lib.relay_board import RelayBoard
BASE_URL = 'http://192.168.1.4/30000/'
class SainSmartBoard(RelayBoard):
"""Controls and queries SainSmart Web Relay Board.
Controls and queries SainSmart Web Relay Board, found here:
http://www.sainsmart.com/sainsmart-rj45-tcp-ip-remote-controller-board-with-8-channels-relay-integrated.html
this uses a web interface to toggle relays.
There is an unmentioned hidden status page that can be found at <root>/99/.
"""
# No longer used. Here for debugging purposes.
#
# Old status pages. Used before base_url/99 was found.
# STATUS_1 = '40'
# STATUS_2 = '43'
#
# This is the regex used to parse the old status pages:
# r'y-\d(?P<relay>\d).+?> (?:&nbsp)?(?P<status>.*?)&'
#
# Pages that will turn all switches on or off, even the ghost switches.
# ALL_RELAY_OFF = '44'
# ALL_RELAY_ON = '45'
HIDDEN_STATUS_PAGE = '99'
VALID_RELAY_POSITIONS = [0, 1, 2, 3, 4, 5, 6, 7]
NUM_RELAYS = 8
def __init__(self, config):
# This will be lazy loaded
self.status_dict = None
self.base_url = validate_key('base_url', config, str, 'config')
if not self.base_url.endswith('/'):
self.base_url += '/'
RelayBoard.__init__(self, config)
def get_relay_position_list(self):
return self.VALID_RELAY_POSITIONS
def _load_page(self, relative_url):
"""Loads a web page at self.base_url + relative_url.
Properly opens and closes the web page.
Args:
relative_url: The string appended to the base_url
Returns:
the contents of the web page.
Raises:
A RelayDeviceConnectionError is raised if the page cannot be loaded.
"""
try:
page = urlopen(self.base_url + relative_url)
result = page.read().decode("utf-8")
page.close()
except:
raise RelayDeviceConnectionError(
'Unable to connect to board "{}" through {}'.format(
self.name, self.base_url + relative_url))
return result
def _sync_status_dict(self):
"""Returns a dictionary of relays and there current state."""
result = self._load_page(self.HIDDEN_STATUS_PAGE)
status_string = re.search(r'">([01]*)TUX', result).group(1)
self.status_dict = dict()
for index, char in enumerate(status_string):
self.status_dict[index] = \
RelayState.NC if char == '1' else RelayState.NO
def _print_status(self):
"""Prints out the list of relays and their current state."""
for i in range(0, 8):
print('Relay {}: {}'.format(i, self.status_dict[i]))
def get_relay_status(self, relay_position):
"""Returns the current status of the passed in relay."""
if self.status_dict is None:
self._sync_status_dict()
return self.status_dict[relay_position]
def set(self, relay_position, value):
"""Sets the given relay to be either ON or OFF, indicated by value."""
if self.status_dict is None:
self._sync_status_dict()
self._load_page(self._get_relay_url_code(relay_position, value))
self.status_dict[relay_position] = value
@staticmethod
def _get_relay_url_code(relay_position, no_or_nc):
"""Returns the two digit code corresponding to setting the relay."""
if no_or_nc == RelayState.NC:
on_modifier = 1
else:
on_modifier = 0
return '{:02d}'.format(relay_position * 2 + on_modifier)