blob: 34ce092c6d08d9cdc2704840b7539de7b5ae1dcc [file] [log] [blame]
# util.py
import warnings
import types
import collections
import itertools
from functools import lru_cache
from typing import List, Union, Iterable
_bslash = chr(92)
class __config_flags:
"""Internal class for defining compatibility and debugging flags"""
_all_names: List[str] = []
_fixed_names: List[str] = []
_type_desc = "configuration"
@classmethod
def _set(cls, dname, value):
if dname in cls._fixed_names:
warnings.warn(
"{}.{} {} is {} and cannot be overridden".format(
cls.__name__,
dname,
cls._type_desc,
str(getattr(cls, dname)).upper(),
)
)
return
if dname in cls._all_names:
setattr(cls, dname, value)
else:
raise ValueError("no such {} {!r}".format(cls._type_desc, dname))
enable = classmethod(lambda cls, name: cls._set(name, True))
disable = classmethod(lambda cls, name: cls._set(name, False))
@lru_cache(maxsize=128)
def col(loc: int, strg: str) -> int:
"""
Returns current column within a string, counting newlines as line separators.
The first column is number 1.
Note: the default parsing behavior is to expand tabs in the input string
before starting the parsing process. See
:class:`ParserElement.parseString` for more
information on parsing strings containing ``<TAB>`` s, and suggested
methods to maintain a consistent view of the parsed string, the parse
location, and line and column positions within the parsed string.
"""
s = strg
return 1 if 0 < loc < len(s) and s[loc - 1] == "\n" else loc - s.rfind("\n", 0, loc)
@lru_cache(maxsize=128)
def lineno(loc: int, strg: str) -> int:
"""Returns current line number within a string, counting newlines as line separators.
The first line is number 1.
Note - the default parsing behavior is to expand tabs in the input string
before starting the parsing process. See :class:`ParserElement.parseString`
for more information on parsing strings containing ``<TAB>`` s, and
suggested methods to maintain a consistent view of the parsed string, the
parse location, and line and column positions within the parsed string.
"""
return strg.count("\n", 0, loc) + 1
@lru_cache(maxsize=128)
def line(loc: int, strg: str) -> str:
"""
Returns the line of text containing loc within a string, counting newlines as line separators.
"""
last_cr = strg.rfind("\n", 0, loc)
next_cr = strg.find("\n", loc)
return strg[last_cr + 1 : next_cr] if next_cr >= 0 else strg[last_cr + 1 :]
class _UnboundedCache:
def __init__(self):
cache = {}
cache_get = cache.get
self.not_in_cache = not_in_cache = object()
def get(_, key):
return cache_get(key, not_in_cache)
def set_(_, key, value):
cache[key] = value
def clear(_):
cache.clear()
self.size = None
self.get = types.MethodType(get, self)
self.set = types.MethodType(set_, self)
self.clear = types.MethodType(clear, self)
class _FifoCache:
def __init__(self, size):
self.not_in_cache = not_in_cache = object()
cache = collections.OrderedDict()
cache_get = cache.get
def get(_, key):
return cache_get(key, not_in_cache)
def set_(_, key, value):
cache[key] = value
while len(cache) > size:
cache.popitem(last=False)
def clear(_):
cache.clear()
self.size = size
self.get = types.MethodType(get, self)
self.set = types.MethodType(set_, self)
self.clear = types.MethodType(clear, self)
class LRUMemo:
"""
A memoizing mapping that retains `capacity` deleted items
The memo tracks retained items by their access order; once `capacity` items
are retained, the least recently used item is discarded.
"""
def __init__(self, capacity):
self._capacity = capacity
self._active = {}
self._memory = collections.OrderedDict()
def __getitem__(self, key):
try:
return self._active[key]
except KeyError:
self._memory.move_to_end(key)
return self._memory[key]
def __setitem__(self, key, value):
self._memory.pop(key, None)
self._active[key] = value
def __delitem__(self, key):
try:
value = self._active.pop(key)
except KeyError:
pass
else:
while len(self._memory) >= self._capacity:
self._memory.popitem(last=False)
self._memory[key] = value
def clear(self):
self._active.clear()
self._memory.clear()
class UnboundedMemo(dict):
"""
A memoizing mapping that retains all deleted items
"""
def __delitem__(self, key):
pass
def _escape_regex_range_chars(s: str) -> str:
# escape these chars: ^-[]
for c in r"\^-[]":
s = s.replace(c, _bslash + c)
s = s.replace("\n", r"\n")
s = s.replace("\t", r"\t")
return str(s)
def _collapse_string_to_ranges(
s: Union[str, Iterable[str]], re_escape: bool = True
) -> str:
def is_consecutive(c):
c_int = ord(c)
is_consecutive.prev, prev = c_int, is_consecutive.prev
if c_int - prev > 1:
is_consecutive.value = next(is_consecutive.counter)
return is_consecutive.value
is_consecutive.prev = 0
is_consecutive.counter = itertools.count()
is_consecutive.value = -1
def escape_re_range_char(c):
return "\\" + c if c in r"\^-][" else c
def no_escape_re_range_char(c):
return c
if not re_escape:
escape_re_range_char = no_escape_re_range_char
ret = []
s = "".join(sorted(set(s)))
if len(s) > 3:
for _, chars in itertools.groupby(s, key=is_consecutive):
first = last = next(chars)
last = collections.deque(
itertools.chain(iter([last]), chars), maxlen=1
).pop()
if first == last:
ret.append(escape_re_range_char(first))
else:
sep = "" if ord(last) == ord(first) + 1 else "-"
ret.append(
"{}{}{}".format(
escape_re_range_char(first), sep, escape_re_range_char(last)
)
)
else:
ret = [escape_re_range_char(c) for c in s]
return "".join(ret)
def _flatten(ll: list) -> list:
ret = []
for i in ll:
if isinstance(i, list):
ret.extend(_flatten(i))
else:
ret.append(i)
return ret