blob: 7d2e09f65dcbe7d5df797cab2ed3fa755d9f5ed8 [file] [log] [blame]
#!/usr/bin/env python3.4
#
# Copyright 2021 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
RTT_REGEX = re.compile(r'^\[(?P<timestamp>\S+)\] .*? time=(?P<rtt>\S+)')
LOSS_REGEX = re.compile(r'(?P<loss>\S+)% packet loss')
class PingResult(object):
"""An object that contains the results of running ping command.
Attributes:
connected: True if a connection was made. False otherwise.
packet_loss_percentage: The total percentage of packets lost.
transmission_times: The list of PingTransmissionTimes containing the
timestamps gathered for transmitted packets.
rtts: An list-like object enumerating all round-trip-times of
transmitted packets.
timestamps: A list-like object enumerating the beginning timestamps of
each packet transmission.
ping_interarrivals: A list-like object enumerating the amount of time
between the beginning of each subsequent transmission.
"""
def __init__(self, ping_output):
self.packet_loss_percentage = 100
self.transmission_times = []
self.rtts = _ListWrap(self.transmission_times, lambda entry: entry.rtt)
self.timestamps = _ListWrap(self.transmission_times,
lambda entry: entry.timestamp)
self.ping_interarrivals = _PingInterarrivals(self.transmission_times)
self.start_time = 0
for line in ping_output:
if 'loss' in line:
match = re.search(LOSS_REGEX, line)
self.packet_loss_percentage = float(match.group('loss'))
if 'time=' in line:
match = re.search(RTT_REGEX, line)
if self.start_time == 0:
self.start_time = float(match.group('timestamp'))
self.transmission_times.append(
PingTransmissionTimes(
float(match.group('timestamp')) - self.start_time,
float(match.group('rtt'))))
self.connected = len(
ping_output) > 1 and self.packet_loss_percentage < 100
def __getitem__(self, item):
if item == 'rtt':
return self.rtts
if item == 'connected':
return self.connected
if item == 'packet_loss_percentage':
return self.packet_loss_percentage
raise ValueError('Invalid key. Please use an attribute instead.')
def as_dict(self):
return {
'connected': 1 if self.connected else 0,
'rtt': list(self.rtts),
'time_stamp': list(self.timestamps),
'ping_interarrivals': list(self.ping_interarrivals),
'packet_loss_percentage': self.packet_loss_percentage
}
class PingTransmissionTimes(object):
"""A class that holds the timestamps for a packet sent via the ping command.
Attributes:
rtt: The round trip time for the packet sent.
timestamp: The timestamp the packet started its trip.
"""
def __init__(self, timestamp, rtt):
self.rtt = rtt
self.timestamp = timestamp
class _ListWrap(object):
"""A convenient helper class for treating list iterators as native lists."""
def __init__(self, wrapped_list, func):
self.__wrapped_list = wrapped_list
self.__func = func
def __getitem__(self, key):
return self.__func(self.__wrapped_list[key])
def __iter__(self):
for item in self.__wrapped_list:
yield self.__func(item)
def __len__(self):
return len(self.__wrapped_list)
class _PingInterarrivals(object):
"""A helper class for treating ping interarrivals as a native list."""
def __init__(self, ping_entries):
self.__ping_entries = ping_entries
def __getitem__(self, key):
return (self.__ping_entries[key + 1].timestamp -
self.__ping_entries[key].timestamp)
def __iter__(self):
for index in range(len(self.__ping_entries) - 1):
yield self[index]
def __len__(self):
return max(0, len(self.__ping_entries) - 1)