blob: 3a271466846d9a38d62a60ee3fc0a115641e83ca [file] [log] [blame]
# Copyright 2016 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import codecs
import logging
import struct
import sys
import time
import hardware as hw
import pyudev
import serial
import serial.tools.list_ports
ARDUINO_ANGLE_MAX = 180.0 # degrees
ARDUINO_ANGLES = [0]*5 + range(0, 90, 3) + [90]*5 + range(90, -1, -3)
ARDUINO_BAUDRATE = 9600
ARDUINO_CMD_LENGTH = 3
ARDUINO_CMD_TIME = 2.0 * ARDUINO_CMD_LENGTH / ARDUINO_BAUDRATE # round trip
ARDUINO_DEFAULT_CH = '1'
ARDUINO_MOVE_TIME = 0.06 - ARDUINO_CMD_TIME # seconds
ARDUINO_START_BYTE = 255
ARDUINO_START_NUM_TRYS = 3
ARDUINO_TEST_CMD = [b'\x01', b'\x02', b'\x03']
ARDUINO_VALID_CH = ['1', '2', '3', '4', '5', '6']
CANAKIT_BAUDRATE = 115200
CANAKIT_COM_SLEEP = 0.05
CANAKIT_DATA_DELIMITER = '\r\n'
CANAKIT_DEFAULT_CH = '1'
CANAKIT_DEVICE = 'relay'
CANAKIT_PID = 'fc73'
CANAKIT_SET_CMD = 'REL'
CANAKIT_SLEEP_TIME = 2 # seconds
CANAKIT_VALID_CMD = ['ON', 'OFF']
CANAKIT_VALID_CH = ['1', '2', '3', '4']
CANAKIT_VID = '04d8'
HS755HB_ANGLE_MAX = 202.0 # degrees
NUM_ROTATIONS = 10
SERIAL_SEND_TIMEOUT = 0.02
def get_cmd_line_args():
"""Get command line arguments.
Args:
None, but gets sys.argv()
Returns:
rotate_cntl: str; 'arduino' or 'canakit'
rotate_ch: dict; arduino -> {'ch': str}
canakit --> {'vid': str, 'pid': str, 'ch': str}
num_rotations: int; number of rotations
"""
num_rotations = NUM_ROTATIONS
rotate_cntl = 'canakit'
rotate_ch = {}
for s in sys.argv[1:]:
if s[:8] == 'rotator=':
if len(s) > 8:
rotator_ids = s[8:].split(':')
if len(rotator_ids) == 1:
# 'rotator=default'
if rotator_ids[0] == 'default':
print ('Using default values %s:%s:%s for VID:PID:CH '
'of rotator' % (CANAKIT_VID, CANAKIT_PID,
CANAKIT_DEFAULT_CH))
vid = '0x' + CANAKIT_VID
pid = '0x' + CANAKIT_PID
ch = CANAKIT_DEFAULT_CH
rotate_ch = {'vid': vid, 'pid': pid, 'ch': ch}
# 'rotator=$ch'
elif rotator_ids[0] in CANAKIT_VALID_CH:
print ('Using default values %s:%s for VID:PID '
'of rotator' % (CANAKIT_VID, CANAKIT_PID))
vid = '0x' + CANAKIT_VID
pid = '0x' + CANAKIT_PID
ch = rotator_ids[0]
rotate_ch = {'vid': vid, 'pid': pid, 'ch': ch}
# 'rotator=arduino'
elif rotator_ids[0] == 'arduino':
rotate_cntl = 'arduino'
rotate_ch = {'ch': ARDUINO_DEFAULT_CH}
# 'rotator=arduino:$ch'
elif len(rotator_ids) == 2:
rotate_cntl = 'arduino'
ch = rotator_ids[1]
rotate_ch = {'ch': ch}
if ch not in ARDUINO_VALID_CH:
print 'Invalid arduino ch: %s' % ch
print 'Valid channels:', ARDUINO_VALID_CH
sys.exit()
# 'rotator=$vid:$pid:$ch'
elif len(rotator_ids) == 3:
vid = '0x' + rotator_ids[0]
pid = '0x' + rotator_ids[1]
rotate_ch = {'vid': vid, 'pid': pid, 'ch': rotator_ids[2]}
else:
err_string = 'Rotator ID (if entered) must be of form: '
err_string += 'rotator=default or rotator=CH or '
err_string += 'rotator=VID:PID:CH or '
err_string += 'rotator=arduino or rotator=arduino:CH'
print err_string
sys.exit()
if (rotate_cntl == 'canakit' and
rotate_ch['ch'] not in CANAKIT_VALID_CH):
print 'Invalid canakit ch: %s' % rotate_ch['ch']
print 'Valid channels:', CANAKIT_VALID_CH
sys.exit()
if s[:14] == 'num_rotations=':
num_rotations = int(s[14:])
return rotate_cntl, rotate_ch, num_rotations
def serial_port_def(name):
"""Determine the serial port and open.
Args:
name: str; device to locate (ie. 'Arduino')
Returns:
serial port object
"""
devices = pyudev.Context()
for device in devices.list_devices(subsystem='tty', ID_BUS='usb'):
if name in device['ID_VENDOR']:
arduino_port = device['DEVNAME']
break
return serial.Serial(arduino_port, ARDUINO_BAUDRATE, timeout=1)
def arduino_read_cmd(port):
"""Read back Arduino command from serial port."""
cmd = []
for _ in range(ARDUINO_CMD_LENGTH):
cmd.append(port.read())
return cmd
def arduino_send_cmd(port, cmd):
"""Send command to serial port."""
for i in range(ARDUINO_CMD_LENGTH):
port.write(cmd[i])
def arduino_loopback_cmd(port, cmd):
"""Send command to serial port."""
arduino_send_cmd(port, cmd)
time.sleep(ARDUINO_CMD_TIME)
return arduino_read_cmd(port)
def establish_serial_comm(port):
"""Establish connection with serial port."""
print 'Establishing communication with %s' % port.name
trys = 1
hex_test = convert_to_hex(ARDUINO_TEST_CMD)
logging.info(' test tx: %s %s %s', hex_test[0], hex_test[1], hex_test[2])
while trys <= ARDUINO_START_NUM_TRYS:
cmd_read = arduino_loopback_cmd(port, ARDUINO_TEST_CMD)
hex_read = convert_to_hex(cmd_read)
logging.info(' test rx: %s %s %s',
hex_read[0], hex_read[1], hex_read[2])
if cmd_read != ARDUINO_TEST_CMD:
trys += 1
else:
logging.info(' Arduino comm established after %d try(s)', trys)
break
def convert_to_hex(cmd):
# compatible with both python 2 and python 3
return [('%0.2x' % int(codecs.encode(x, 'hex_codec'), 16) if x else '--')
for x in cmd]
def arduino_rotate_servo_to_angle(ch, angle, serial_port, delay=0):
"""Rotate servo to the specified angle.
Args:
ch: str; servo to rotate in ARDUINO_VALID_CH
angle: int; servo angle to move to
serial_port: object; serial port
delay: int; time in seconds
"""
err_msg = 'Angle must be between 0 and %d.' % (ARDUINO_ANGLE_MAX)
if angle < 0:
print err_msg
angle = 0
elif angle > ARDUINO_ANGLE_MAX:
print err_msg
angle = ARDUINO_ANGLE_MAX
cmd = [struct.pack('B', i) for i in [ARDUINO_START_BYTE, int(ch), angle]]
arduino_send_cmd(serial_port, cmd)
time.sleep(delay)
def arduino_rotate_servo(ch, serial_port):
"""Rotate servo between 0 --> 90 --> 0.
Args:
ch: str; servo to rotate
serial_port: object; serial port
"""
for angle in ARDUINO_ANGLES:
angle_norm = int(round(angle*ARDUINO_ANGLE_MAX/HS755HB_ANGLE_MAX, 0))
arduino_rotate_servo_to_angle(
ch, angle_norm, serial_port, ARDUINO_MOVE_TIME)
def canakit_cmd_send(vid, pid, cmd_str):
"""Wrapper for sending serial command.
Args:
vid: str; vendor ID
pid: str; product ID
cmd_str: str; value to send to device.
"""
hw_list = hw.Device(CANAKIT_DEVICE, vid, pid, '1', '0')
relay_port = hw_list.get_tty_path('relay')
relay_ser = serial.Serial(relay_port, CANAKIT_BAUDRATE,
timeout=SERIAL_SEND_TIMEOUT,
parity=serial.PARITY_EVEN,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS)
try:
relay_ser.write(CANAKIT_DATA_DELIMITER)
time.sleep(CANAKIT_COM_SLEEP) # This is critical for relay.
relay_ser.write(cmd_str)
relay_ser.close()
except ValueError:
print 'Port %s:%s is not open' % (vid, pid)
sys.exit()
def set_relay_channel_state(vid, pid, ch, relay_state):
"""Set relay channel and state.
Args:
vid: str; vendor ID
pid: str; product ID
ch: str; channel number of relay to set. '1', '2', '3', or '4'
relay_state: str; either 'ON' or 'OFF'
Returns:
None
"""
if ch in CANAKIT_VALID_CH and relay_state in CANAKIT_VALID_CMD:
canakit_cmd_send(
vid, pid, CANAKIT_SET_CMD + ch + '.' + relay_state + '\r\n')
else:
print 'Invalid channel or command, no command sent.'
def main():
"""Main function.
expected rotator string is vid:pid:ch or arduino:ch.
Canakit vid:pid can be found through lsusb on the host.
ch is hard wired and must be determined from the controller.
"""
# set up logging for debug info
logging.basicConfig(level=logging.INFO)
# get cmd line args
rotate_cntl, rotate_ch, num_rotations = get_cmd_line_args()
ch = rotate_ch['ch']
print 'Controller: %s, ch: %s' % (rotate_cntl, ch)
# initialize port and cmd strings
if rotate_cntl == 'arduino':
# initialize Arduino port
serial_port = serial_port_def('Arduino')
# send test cmd to Arduino until cmd returns properly
establish_serial_comm(serial_port)
# initialize servo at origin
print 'Moving servo to origin'
arduino_rotate_servo_to_angle(ch, 0, serial_port, 1)
else:
vid = rotate_ch['vid']
pid = rotate_ch['pid']
# rotate phone
print 'Rotating phone %dx' % num_rotations
for _ in xrange(num_rotations):
if rotate_cntl == 'arduino':
arduino_rotate_servo(ch, serial_port)
else:
set_relay_channel_state(vid, pid, ch, 'ON')
time.sleep(CANAKIT_SLEEP_TIME)
set_relay_channel_state(vid, pid, ch, 'OFF')
time.sleep(CANAKIT_SLEEP_TIME)
print 'Finished rotations'
if __name__ == '__main__':
main()