blob: f8cfe319ad34cf5c1687c5bb9ad90d5e0dd90631 [file] [log] [blame]
# Copyright 2021 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Device classes to interact with targets via RPC."""
import logging
from pathlib import Path
from types import ModuleType
from typing import Any, Callable, List, Union, Optional
from pw_hdlc.rpc import HdlcRpcClient, channel_output
from pw_hdlc.rpc import NoEncodingSingleChannelRpcClient, RpcClient
from pw_log.log_decoder import (
Log,
LogStreamDecoder,
log_decoded_log,
timestamp_parser_ns_since_boot,
)
from pw_log_rpc.rpc_log_stream import LogStreamHandler
from pw_metric import metric_parser
from pw_rpc import callback_client, Channel, console_tools
from pw_thread.thread_analyzer import ThreadSnapshotAnalyzer
from pw_thread_protos import thread_pb2
from pw_tokenizer import detokenize
from pw_tokenizer.proto import decode_optionally_tokenized
from pw_unit_test.rpc import run_tests as pw_unit_test_run_tests, TestRecord
# Internal log for troubleshooting this tool (the console).
_LOG = logging.getLogger('tools')
DEFAULT_DEVICE_LOGGER = logging.getLogger('rpc_device')
class Device:
"""Represents an RPC Client for a device running a Pigweed target.
The target must have and RPC support, RPC logging.
Note: use this class as a base for specialized device representations.
"""
def __init__(
self,
channel_id: int,
read,
write,
proto_library: List[Union[ModuleType, Path]],
detokenizer: Optional[detokenize.Detokenizer] = None,
timestamp_decoder: Optional[Callable[[int], str]] = None,
rpc_timeout_s: float = 5,
use_rpc_logging: bool = True,
use_hdlc_encoding: bool = True,
):
self.channel_id = channel_id
self.protos = proto_library
self.detokenizer = detokenizer
self.rpc_timeout_s = rpc_timeout_s
self.logger = DEFAULT_DEVICE_LOGGER
self.logger.setLevel(logging.DEBUG) # Allow all device logs through.
callback_client_impl = callback_client.Impl(
default_unary_timeout_s=self.rpc_timeout_s,
default_stream_timeout_s=None,
)
def detokenize_and_log_output(data: bytes, _detokenizer=None):
log_messages = data.decode(
encoding='utf-8', errors='surrogateescape'
)
if self.detokenizer:
log_messages = decode_optionally_tokenized(
self.detokenizer, data
)
for line in log_messages.splitlines():
self.logger.info(line)
self.client: RpcClient
if use_hdlc_encoding:
channels = [Channel(self.channel_id, channel_output(write))]
self.client = HdlcRpcClient(
read,
self.protos,
channels,
detokenize_and_log_output,
client_impl=callback_client_impl,
)
else:
channel = Channel(self.channel_id, write)
self.client = NoEncodingSingleChannelRpcClient(
read,
self.protos,
channel,
client_impl=callback_client_impl,
)
if use_rpc_logging:
# Create the log decoder used by the LogStreamHandler.
def decoded_log_handler(log: Log) -> None:
log_decoded_log(log, self.logger)
self._log_decoder = LogStreamDecoder(
decoded_log_handler=decoded_log_handler,
detokenizer=self.detokenizer,
source_name='RpcDevice',
timestamp_parser=(
timestamp_decoder
if timestamp_decoder
else timestamp_parser_ns_since_boot
),
)
# Start listening to logs as soon as possible.
self.log_stream_handler = LogStreamHandler(
self.rpcs, self._log_decoder
)
self.log_stream_handler.listen_to_logs()
def __enter__(self):
return self
def __exit__(self, *exc_info):
self.close()
def close(self) -> None:
self.client.close()
def info(self) -> console_tools.ClientInfo:
return console_tools.ClientInfo('device', self.rpcs, self.client.client)
@property
def rpcs(self) -> Any:
"""Returns an object for accessing services on the specified channel."""
return next(iter(self.client.client.channels())).rpcs
def run_tests(self, timeout_s: Optional[float] = 5) -> TestRecord:
"""Runs the unit tests on this device."""
return pw_unit_test_run_tests(self.rpcs, timeout_s=timeout_s)
def get_and_log_metrics(self) -> dict:
"""Retrieves the parsed metrics and logs them to the console."""
metrics = metric_parser.parse_metrics(
self.rpcs, self.detokenizer, self.rpc_timeout_s
)
def print_metrics(metrics, path):
"""Traverses dictionaries, until a non-dict value is reached."""
for path_name, metric in metrics.items():
if isinstance(metric, dict):
print_metrics(metric, path + '/' + path_name)
else:
_LOG.info('%s/%s: %s', path, path_name, str(metric))
print_metrics(metrics, '')
return metrics
def snapshot_peak_stack_usage(self, thread_name: Optional[str] = None):
snapshot_service = self.rpcs.pw.thread.proto.ThreadSnapshotService
_, rsp = snapshot_service.GetPeakStackUsage(name=thread_name)
thread_info = thread_pb2.SnapshotThreadInfo()
for thread_info_block in rsp:
for thread in thread_info_block.threads:
thread_info.threads.append(thread)
for line in str(ThreadSnapshotAnalyzer(thread_info)).splitlines():
_LOG.info('%s', line)