blob: a9f957947242834163c8d81f0827984ae1a22839 [file] [log] [blame]
import os
import pathlib
import re
import shutil
import tempfile
from collections import defaultdict
class AdrInfo:
"""Represent one ADR value
An ADR value is a decimal number range from 0 - 31
How to parse the ADR value:
First, transform the decimal number to binary then we will get a 5 bit number
The meaning of each bit is as follow:
0 0 0 0 0
HalfCycleReported HalfCycleResolved CycleSlip Reset Valid
Special rule:
For an ADR value in binary fits the pattern: * * 0 0 1, we call it a usable ADR
More insight of ADR value:
go/adrstates
Attributes:
is_valid: (bool)
is_reset: (bool)
is_cycle_slip: (bool)
is_half_cycle_resolved: (bool)
is_half_cycle_reported: (bool)
is_usable: (bool)
"""
def __init__(self, adr_value: int, count: int):
src = bin(int(adr_value))
self._valid = int(src[-1])
self._reset = int(src[-2])
self._cycle_slip = int(src[-3])
self._half_cycle_resolved = int(src[-4])
self._half_cycle_reported = int(src[-5])
self.count = count
@property
def is_usable(self):
return self.is_valid and not self.is_reset and not self.is_cycle_slip
@property
def is_valid(self):
return bool(self._valid)
@property
def is_reset(self):
return bool(self._reset)
@property
def is_cycle_slip(self):
return bool(self._cycle_slip)
@property
def is_half_cycle_resolved(self):
return bool(self._half_cycle_resolved)
@property
def is_half_cycle_reported(self):
return bool(self._half_cycle_reported)
class AdrStatistic:
"""Represent the ADR statistic
Attributes:
usable_count: (int)
valid_count: (int)
reset_count: (int)
cycle_slip_count: (int)
half_cycle_resolved_count: (int)
half_cycle_reported_count: (int)
total_count: (int)
usable_rate: (float)
usable_count / total_count
valid_rate: (float)
valid_count / total_count
"""
def __init__(self):
self.usable_count = 0
self.valid_count = 0
self.reset_count = 0
self.cycle_slip_count = 0
self.half_cycle_resolved_count = 0
self.half_cycle_reported_count = 0
self.total_count = 0
@property
def usable_rate(self):
denominator = max(1, self.total_count)
result = self.usable_count / denominator
return round(result, 3)
@property
def valid_rate(self):
denominator = max(1, self.total_count)
result = self.valid_count / denominator
return round(result, 3)
def add_adr_info(self, adr_info: AdrInfo):
"""Add ADR info record to increase the statistic
Args:
adr_info: AdrInfo object
"""
if adr_info.is_valid:
self.valid_count += adr_info.count
if adr_info.is_reset:
self.reset_count += adr_info.count
if adr_info.is_cycle_slip:
self.cycle_slip_count += adr_info.count
if adr_info.is_half_cycle_resolved:
self.half_cycle_resolved_count += adr_info.count
if adr_info.is_half_cycle_reported:
self.half_cycle_reported_count += adr_info.count
if adr_info.is_usable:
self.usable_count += adr_info.count
self.total_count += adr_info.count
class GnssMeasurement:
"""Represent the content of measurement file generated by gps tool"""
FILE_PATTERN = "/storage/emulated/0/Android/data/com.android.gpstool/files/MEAS*.txt"
def __init__(self, ad):
self.ad = ad
def _generate_local_temp_path(self, file_name="file.txt"):
"""Generate a file path for temporarily usage
Returns:
string: local file path
"""
folder = tempfile.mkdtemp()
file_path = os.path.join(folder, file_name)
return file_path
def _get_latest_measurement_file_path(self):
"""Get the latest measurement file path on device
Returns:
string: file path on device
"""
command = f"ls -tr {self.FILE_PATTERN} | tail -1"
result = self.ad.adb.shell(command)
return result
def get_latest_measurement_file(self):
"""Pull the latest measurement file from device to local
Returns:
string: local file path to the measurement file
Raise:
FileNotFoundError: can't get measurement file from device
"""
self.ad.log.info("Get measurement file from device")
dest = self._generate_local_temp_path(file_name="measurement.txt")
src = self._get_latest_measurement_file_path()
if not src:
raise FileNotFoundError(f"Can not find measurement file: pattern {self.FILE_PATTERN}")
self.ad.pull_files(src, dest)
return dest
def _get_adr_src_value(self):
"""Get ADR value from measurement file
Returns:
dict: {<ADR_value>: count, <ADR_value>: count...}
"""
try:
file_path = self.get_latest_measurement_file()
adr_src = defaultdict(int)
adr_src_regex = re.compile("=\s(\d*)")
with open(file_path) as f:
for line in f:
if "AccumulatedDeltaRangeState" in line:
value = re.search(adr_src_regex, line)
if not value:
self.ad.log.warn("Can't get ADR value %s" % line)
continue
key = value.group(1)
adr_src[key] += 1
return adr_src
finally:
folder = pathlib.PurePosixPath(file_path).parent
shutil.rmtree(folder, ignore_errors=True)
def get_adr_static(self):
"""Get ADR statistic
Summarize ADR value from measurement file
Returns:
AdrStatistic object
"""
self.ad.log.info("Get ADR statistic")
adr_src = self._get_adr_src_value()
adr_static = AdrStatistic()
for key, value in adr_src.items():
self.ad.log.debug("ADR value: %s - count: %s" % (key, value))
adr_info = AdrInfo(key, value)
adr_static.add_adr_info(adr_info)
return adr_static