| #!/usr/bin/env python3 |
| # |
| # Copyright (C) 2022 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| # A tool that can feed data into GNSS HAL over virtio-console |
| # This tool can be invoked as gnss_replay.py -s <socket> -i <input> |
| # where <socket> is the host-side of a UNIX domain socket that backs |
| # virtio-console GNSS vport and <input> is a log file generated by |
| # GnssLogger or equivalent tooling |
| |
| import argparse |
| import sys |
| import socket |
| import time |
| |
| class DataStore(object): |
| def __init__(self, prefix, lines): |
| self.idx = 0 |
| self.lines = [] |
| for line in lines: |
| if line.startswith(prefix): |
| self.lines.append(line.strip('\n')) |
| |
| def next(self): |
| line = '' |
| if len(self.lines) > 0: |
| line = self.lines[self.idx] |
| self.idx = (self.idx + 1) % len(self.lines) |
| return line |
| |
| def __str__(self): |
| return str(self.lines) |
| |
| def load(input): |
| with open(input) as f: |
| lines = f.readlines() |
| return {'locations' : DataStore('Fix', lines), |
| 'measurements' : DataStore('Raw', lines)} |
| |
| class CommandLoop(object): |
| def __init__(self, socket): |
| self.socket = socket |
| self.commands = {} |
| |
| def command(self, action, reaction): |
| self.commands[action] = reaction |
| |
| def loop(self): |
| while True: |
| inp = self.socket.recv(128) |
| reaction = self.commands.get(inp, None) |
| if reaction: |
| response = reaction() + '\n\n\n\n' |
| self.socket.send(str.encode(response)) |
| else: |
| print("Unknown command '%s'" % inp) |
| self.socket.send(b'\n\n\n\n') |
| |
| def main(): |
| parser = argparse.ArgumentParser(description="GNSS mock host agent") |
| parser.add_argument('--socket', '-s', action='store', required=True, help='The UNIX socket to interact with') |
| parser.add_argument('--input', '-i', action='store', required=True, help='The file with fix information to send over') |
| |
| args = parser.parse_args() |
| |
| sk = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET) |
| sk.connect(args.socket) |
| |
| data = load(args.input) |
| |
| loop = CommandLoop(sk) |
| loop.command(b'CMD_GET_LOCATION', lambda: data['locations'].next()) |
| loop.command(b'CMD_GET_RAWMEASUREMENT', lambda: data['measurements'].next()) |
| |
| loop.loop() |
| |
| if __name__ == "__main__": |
| main() |