blob: 5c3578f9e6381ed36ad22604792aa3c47ed03900 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2016 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from enum import Enum
from time import sleep
from acts.controllers.relay_lib.errors import RelayConfigError
class RelayState(Enum):
"""Enum for possible Relay States."""
# Pretend this means 'OFF'
NO = 'NORMALLY_OPEN'
# Pretend this means 'ON'
NC = 'NORMALLY_CLOSED'
class SynchronizeRelays:
"""A class that allows for relays to change state nearly simultaneously.
Can be used with the 'with' statement in Python:
with SynchronizeRelays():
relay1.set_no()
relay2.set_nc()
Note that the thread will still wait for RELAY_TRANSITION_WAIT_TIME
after execution leaves the 'with' statement.
"""
_sync_sleep_flag = False
def __enter__(self):
self.prev_toggle_time = Relay.transition_wait_time
self.prev_sync_flag = SynchronizeRelays._sync_sleep_flag
Relay.transition_wait_time = 0
SynchronizeRelays._sync_sleep_flag = False
def __exit__(self, type, value, traceback):
if SynchronizeRelays._sync_sleep_flag:
sleep(Relay.transition_wait_time)
Relay.transition_wait_time = self.prev_toggle_time
SynchronizeRelays._sync_sleep_flag = self.prev_sync_flag
class Relay(object):
"""A class representing a single relay switch on a RelayBoard.
References to these relays are stored in both the RelayBoard and the
RelayDevice classes under the variable "relays". GenericRelayDevice can also
access these relays through the subscript ([]) operator.
At the moment, relays only have a valid state of 'ON' or 'OFF'. This may be
extended in a subclass if needed. Keep in mind that if this is done, changes
will also need to be made in the RelayRigParser class to initialize the
relays.
"""
"""How long to wait for relays to transition state."""
transition_wait_time = .2
button_press_time = .25
def __init__(self, relay_board, position):
self.relay_board = relay_board
self.position = position
self._original_state = None
self.relay_id = "%s/%s" % (self.relay_board.name, self.position)
def set_no(self):
"""Sets the relay to the 'NO' state. Shorthand for set(RelayState.NO).
Blocks the thread for Relay.transition_wait_time.
"""
self.set(RelayState.NO)
def set_nc(self):
"""Sets the relay to the 'NC' state. Shorthand for set(RelayState.NC).
Blocks the thread for Relay.transition_wait_time.
"""
self.set(RelayState.NC)
def toggle(self):
"""Swaps the state from 'NO' to 'NC' or 'NC' to 'NO'.
Blocks the thread for Relay.transition_wait_time.
"""
if self.get_status() == RelayState.NO:
self.set(RelayState.NC)
else:
self.set(RelayState.NO)
def set(self, state):
"""Sets the relay to the 'NO' or 'NC' state.
Blocks the thread for Relay.transition_wait_time.
Args:
state: either 'NO' or 'NC'.
Raises:
ValueError if state is not 'NO' or 'NC'.
"""
if self._original_state is None:
self._original_state = self.relay_board.get_relay_status(
self.position)
if state is not RelayState.NO and state is not RelayState.NC:
raise ValueError(
'Invalid state. Received "%s". Expected any of %s.' %
(state, [state for state in RelayState]))
if self.get_status() != state:
self.relay_board.set(self.position, state)
SynchronizeRelays._sync_sleep_flag = True
sleep(Relay.transition_wait_time)
def set_no_for(self, seconds=button_press_time):
"""Sets the relay to 'NORMALLY_OPEN' for seconds. Blocks the thread.
Args:
seconds: The number of seconds to sleep for.
"""
self.set_no()
sleep(seconds)
self.set_nc()
def set_nc_for(self, seconds=button_press_time):
"""Sets the relay to 'NORMALLY_CLOSED' for seconds. Blocks the thread.
Respects Relay.transition_wait_time for toggling state.
Args:
seconds: The number of seconds to sleep for.
"""
self.set_nc()
sleep(seconds)
self.set_no()
def get_status(self):
return self.relay_board.get_relay_status(self.position)
def clean_up(self):
"""Does any clean up needed to allow the next series of tests to run.
For now, all this does is switches to its previous state. Inheriting
from this class and overriding this method would be the best course of
action to allow a more complex clean up to occur. If you do this, be
sure to make the necessary modifications in RelayRig.initialize_relay
and RelayRigParser.parse_json_relays.
"""
if self._original_state is not None:
self.set(self._original_state)
def is_dirty(self):
return self._original_state is not None
class RelayDict(object):
"""A wrapped dictionary that gives config errors upon failure.
Has the same interface as a dictionary, but when getting the key fails, the
dictionary returns a RelayConfigError, letting the user know that the reason
the dict failed to return a relay is because the relay was not found in the
config.
Also prevents modification of elements, because changing the relays here
does not change what they are in hardware.
"""
ERROR_MESSAGE = ('Error: Attempted to get relay "%s" in %s "%s" but the '
'relay does not exist.\nExisting relays are: %s.\nMake '
'sure the missing relay is added to the config file, and '
'is properly setup.')
def __init__(self, relay_device, input_dict):
self.relay_device = relay_device
self._store = input_dict
def __getitem__(self, key):
try:
return self._store[key]
except KeyError:
raise RelayConfigError(self.ERROR_MESSAGE %
(key, type(self.relay_device),
self.relay_device.name, self._store))
def __iter__(self):
return iter(self._store)
def __len__(self):
return len(self._store)
def __repr__(self):
return repr(self._store)