blob: 1e3ccb5483386a40049dc8e51846f25174b6829a [file] [log] [blame]
# Copyright 2023 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Pigweed renode frontend."""
import socket
import time
import xmlrpc.client
from pathlib import Path
from typing import Optional, List, Any
from pw_emu.core import (
Connector,
Handles,
InvalidChannelType,
Launcher,
Error,
WrongEmulator,
)
class RenodeRobotError(Error):
"""Exception for Renode robot errors."""
def __init__(self, err: str):
super().__init__(err)
class RenodeLauncher(Launcher):
"""Start a new renode process for a given target and config file."""
def __init__(self, config_path: Optional[Path] = None):
super().__init__('renode', config_path)
self._start_cmd: List[str] = []
@staticmethod
def _allocate_port() -> int:
"""Allocate renode ports.
This is inherently racy but renode currently does not have proper
support for dynamic ports. It accecept 0 as a port and the OS allocates
a dynamic port but there is no API to retrive the port.
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('localhost', 0))
port = sock.getsockname()[1]
sock.close()
return port
def _pre_start(
self,
target: str,
file: Optional[Path] = None,
pause: bool = False,
debug: bool = False,
args: Optional[str] = None,
) -> List[str]:
renode = self._config.get_target_emu(['executable'])
if not renode:
renode = self._config.get_emu(['executable'], optional=False)
self._start_cmd.extend([f'{renode}', '--disable-xwt'])
port = self._allocate_port()
self._start_cmd.extend(['--robot-server-port', str(port)])
self._handles.add_channel_tcp('robot', 'localhost', port)
machine = self._config.get_target_emu(['machine'], optional=False)
self._start_cmd.extend(['--execute', f'mach add "{target}"'])
self._start_cmd.extend(
['--execute', f'machine LoadPlatformDescription @{machine}']
)
terms = self._config.get_target_emu(
['channels', 'terminals'], entry_type=dict
)
for name in terms.keys():
port = self._allocate_port()
dev_path = self._config.get_target_emu(
['channels', 'terminals', name, 'device-path'],
optional=False,
entry_type=str,
)
term_type = self._config.get_target_emu(
['channels', 'terminals', name, 'type'],
entry_type=str,
)
if not term_type:
term_type = self._config.get_emu(
['channels', 'terminals', 'type'],
entry_type=str,
)
if not term_type:
term_type = 'tcp'
cmd = 'emulation '
if term_type == 'tcp':
cmd += f'CreateServerSocketTerminal {port} "{name}" false'
self._handles.add_channel_tcp(name, 'localhost', port)
elif term_type == 'pty':
path = self._path(name)
cmd += f'CreateUartPtyTerminal "{name}" "{path}"'
self._handles.add_channel_pty(name, str(path))
else:
raise InvalidChannelType(term_type)
self._start_cmd.extend(['--execute', cmd])
self._start_cmd.extend(
['--execute', f'connector Connect {dev_path} {name}']
)
port = self._allocate_port()
self._start_cmd.extend(['--execute', f'machine StartGdbServer {port}'])
self._handles.add_channel_tcp('gdb', 'localhost', port)
if file:
self._start_cmd.extend(['--execute', f'sysbus LoadELF @{file}'])
if not pause:
self._start_cmd.extend(['--execute', 'start'])
return self._start_cmd
def _post_start(self) -> None:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
robot = self._handles.channels['gdb']
assert isinstance(robot, Handles.TcpChannel)
# renode is slow to start especially during host load
deadline = time.monotonic() + 120
connected = False
err = None
while time.monotonic() < deadline:
try:
sock.connect((robot.host, robot.port))
connected = True
break
except OSError as exc:
err = exc
time.sleep(1)
if not connected:
msg = 'failed to connect to robot channel'
msg += f'({robot.host}:{robot.port}): {err}'
raise RenodeRobotError(msg)
sock.close()
def _get_connector(self, wdir: Path) -> Connector:
return RenodeConnector(wdir)
class RenodeConnector(Connector):
"""renode implementation for the emulator specific connector methods."""
def __init__(self, wdir: Path) -> None:
super().__init__(wdir)
if self.get_emu() != 'renode':
raise WrongEmulator('renode', self.get_emu())
robot = self._handles.channels['robot']
host = robot.host
port = robot.port
self._proxy = xmlrpc.client.ServerProxy(f'http://{host}:{port}/')
def _request(self, cmd: str, args: List[str]) -> Any:
"""Send a request using the robot interface.
Using the robot interface is not ideal since it is designed
for testing. However, it is more robust than the ANSI colored,
echoed, log mixed, telnet interface.
"""
resp = self._proxy.run_keyword(cmd, args)
if not isinstance(resp, dict):
raise RenodeRobotError('expected dictionary in response')
if resp['status'] != 'PASS':
raise RenodeRobotError(resp['error'])
if resp.get('return'):
return resp['return']
return None
def reset(self) -> None:
self._request('ResetEmulation', [])
def cont(self) -> None:
self._request('StartEmulation', [])
def list_properties(self, path: str) -> List[Any]:
return self._request('ExecuteCommand', [f'{path}'])
def get_property(self, path: str, prop: str) -> Any:
return self._request('ExecuteCommand', [f'{path} {prop}'])
def set_property(self, path: str, prop: str, value: Any) -> None:
return self._request('ExecuteCommand', [f'{path} {prop} {value}'])