blob: 55ae4fc14ee35f90f209dbf6f6c80ce82ceb164b [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright (C) 2022 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# A tool that can feed data into GNSS HAL over virtio-console
# This tool can be invoked as gnss_replay.py -s <socket> -i <input>
# where <socket> is the host-side of a UNIX domain socket that backs
# virtio-console GNSS vport and <input> is a log file generated by
# GnssLogger or equivalent tooling
import argparse
import sys
import socket
import time
class DataStore(object):
def __init__(self, prefix, lines):
self.idx = 0
self.lines = []
for line in lines:
if line.startswith(prefix):
self.lines.append(line.strip('\n'))
def next(self):
line = ''
if len(self.lines) > 0:
line = self.lines[self.idx]
self.idx = (self.idx + 1) % len(self.lines)
return line
def __str__(self):
return str(self.lines)
def load(input):
with open(input) as f:
lines = f.readlines()
return {'locations' : DataStore('Fix', lines),
'measurements' : DataStore('Raw', lines)}
class CommandLoop(object):
def __init__(self, socket):
self.socket = socket
self.commands = {}
def command(self, action, reaction):
self.commands[action] = reaction
def loop(self):
while True:
inp = self.socket.recv(128)
reaction = self.commands.get(inp, None)
if reaction:
response = reaction() + '\n\n\n\n'
self.socket.send(str.encode(response))
else:
print("Unknown command '%s'" % inp)
self.socket.send(b'\n\n\n\n')
def main():
parser = argparse.ArgumentParser(description="GNSS mock host agent")
parser.add_argument('--socket', '-s', action='store', required=True, help='The UNIX socket to interact with')
parser.add_argument('--input', '-i', action='store', required=True, help='The file with fix information to send over')
args = parser.parse_args()
sk = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
sk.connect(args.socket)
data = load(args.input)
loop = CommandLoop(sk)
loop.command(b'CMD_GET_LOCATION', lambda: data['locations'].next())
loop.command(b'CMD_GET_RAWMEASUREMENT', lambda: data['measurements'].next())
loop.loop()
if __name__ == "__main__":
main()