pw_console: Log filtering backend impl
Non-ui implementation to filter incoming and histoical log messages.
Bug: 410
No-Docs-Update-Reason: Docs coming in followup to filtering
Change-Id: I557a4383fb6eb42171dd2c8641b60775ff5fd426
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/53400
Pigweed-Auto-Submit: Anthony DiGirolamo <tonymd@google.com>
Commit-Queue: Auto-Submit <auto-submit@pigweed.google.com.iam.gserviceaccount.com>
Reviewed-by: Keir Mierle <keir@google.com>
diff --git a/pw_console/py/BUILD.gn b/pw_console/py/BUILD.gn
index 79e4f13..610c94c 100644
--- a/pw_console/py/BUILD.gn
+++ b/pw_console/py/BUILD.gn
@@ -24,6 +24,7 @@
"pw_console/console_app.py",
"pw_console/help_window.py",
"pw_console/key_bindings.py",
+ "pw_console/log_filter.py",
"pw_console/log_line.py",
"pw_console/log_pane.py",
"pw_console/log_pane_toolbars.py",
@@ -43,6 +44,7 @@
tests = [
"console_app_test.py",
"help_window_test.py",
+ "log_filter_test.py",
"log_store_test.py",
"log_view_test.py",
"repl_pane_test.py",
diff --git a/pw_console/py/log_filter_test.py b/pw_console/py/log_filter_test.py
new file mode 100644
index 0000000..d885560
--- /dev/null
+++ b/pw_console/py/log_filter_test.py
@@ -0,0 +1,215 @@
+# Copyright 2021 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""Tests for pw_console.log_view"""
+
+import logging
+import re
+import unittest
+from parameterized import parameterized # type: ignore
+
+from prompt_toolkit.document import Document
+from prompt_toolkit.validation import ValidationError
+
+from pw_console.log_line import LogLine
+from pw_console.log_filter import (
+ LogFilter,
+ RegexValidator,
+ SearchMatcher,
+ preprocess_search_regex,
+)
+
+
+class TestLogFilter(unittest.TestCase):
+ """Tests for LogFilter."""
+ def setUp(self):
+ self.maxDiff = None # pylint: disable=invalid-name
+
+ # pylint: disable=anomalous-backslash-in-string
+ @parameterized.expand([
+ (
+ 'raw string',
+ SearchMatcher.STRING,
+ 'f(x)',
+ 'f\(x\)',
+ re.IGNORECASE,
+ ),
+ (
+ 'simple regex',
+ SearchMatcher.REGEX,
+ 'f(x)',
+ 'f(x)',
+ re.IGNORECASE,
+ ),
+ (
+ 'regex with case sensitivity',
+ SearchMatcher.REGEX,
+ 'f(X)',
+ 'f(X)',
+ re.RegexFlag(0),
+ ),
+ (
+ 'regex with error',
+ SearchMatcher.REGEX,
+ 'f of (x', # Un-terminated open paren
+ 'f of (x',
+ re.IGNORECASE,
+ True, # fails_validation
+ ),
+ (
+ 'simple fuzzy',
+ SearchMatcher.FUZZY,
+ 'f x y',
+ '(f)(.*?)(x)(.*?)(y)',
+ re.IGNORECASE,
+ ),
+ (
+ 'fuzzy with case sensitivity',
+ SearchMatcher.FUZZY,
+ 'f X y',
+ '(f)(.*?)(X)(.*?)(y)',
+ re.RegexFlag(0),
+ ),
+ ]) # yapf: disable
+ def test_preprocess_search_regex(
+ self,
+ _name,
+ input_matcher,
+ input_text,
+ expected_regex,
+ expected_re_flag,
+ should_fail_validation=False,
+ ) -> None:
+ """Test preprocess_search_regex returns the expected regex settings."""
+ result_text, re_flag = preprocess_search_regex(input_text,
+ input_matcher)
+ self.assertEqual(expected_regex, result_text)
+ self.assertEqual(expected_re_flag, re_flag)
+
+ if should_fail_validation:
+ document = Document(text=input_text)
+ with self.assertRaisesRegex(ValidationError,
+ r'Regex Error.*at position [0-9]+'):
+ RegexValidator().validate(document)
+
+ def _create_logs(self, log_messages):
+ test_log = logging.getLogger('log_filter.test')
+ with self.assertLogs(test_log, level='DEBUG') as log_context:
+ for log, extra_arg in log_messages:
+ test_log.debug('%s', log, extra=extra_arg)
+
+ return log_context
+
+ @parameterized.expand([
+ (
+ 'simple fuzzy',
+ SearchMatcher.FUZZY,
+ 'log item',
+ [
+ ('Log some item', {'planet': 'Jupiter'}),
+ ('Log another item', {'planet': 'Earth'}),
+ ('Some exception', {'planet': 'Earth'}),
+ ],
+ [
+ 'Log some item',
+ 'Log another item',
+ ],
+ None, # field
+ False, # invert
+ ),
+ (
+ 'simple fuzzy inverted',
+ SearchMatcher.FUZZY,
+ 'log item',
+ [
+ ('Log some item', dict()),
+ ('Log another item', dict()),
+ ('Some exception', dict()),
+ ],
+ [
+ 'Some exception',
+ ],
+ None, # field
+ True, # invert
+ ),
+ (
+ 'regex with field',
+ SearchMatcher.REGEX,
+ 'earth',
+ [
+ ('Log some item',
+ dict(extra_metadata_fields={'planet': 'Jupiter'})),
+ ('Log another item',
+ dict(extra_metadata_fields={'planet': 'Earth'})),
+ ('Some exception',
+ dict(extra_metadata_fields={'planet': 'Earth'})),
+ ],
+ [
+ 'Log another item',
+ 'Some exception',
+ ],
+ 'planet', # field
+ False, # invert
+ ),
+ (
+ 'regex with field inverted',
+ SearchMatcher.REGEX,
+ 'earth',
+ [
+ ('Log some item',
+ dict(extra_metadata_fields={'planet': 'Jupiter'})),
+ ('Log another item',
+ dict(extra_metadata_fields={'planet': 'Earth'})),
+ ('Some exception',
+ dict(extra_metadata_fields={'planet': 'Earth'})),
+ ],
+ [
+ 'Log some item',
+ ],
+ 'planet', # field
+ True, # invert
+ ),
+ ]) # yapf: disable
+ def test_log_filter_matches(
+ self,
+ _name,
+ input_matcher,
+ input_text,
+ input_lines,
+ expected_matched_lines,
+ field=None,
+ invert=False,
+ ) -> None:
+ """Test log filter matches expected lines."""
+ result_text, re_flag = preprocess_search_regex(input_text,
+ input_matcher)
+ log_filter = LogFilter(
+ regex=re.compile(result_text, re_flag),
+ input_text=input_text,
+ invert=invert,
+ field=field,
+ )
+
+ matched_lines = []
+ logs = self._create_logs(input_lines)
+
+ for record in logs.records:
+ if log_filter.matches(
+ LogLine(record, record.message, record.message)):
+ matched_lines.append(record.message)
+
+ self.assertEqual(expected_matched_lines, matched_lines)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/pw_console/py/log_view_test.py b/pw_console/py/log_view_test.py
index d2c1482..0084bfd 100644
--- a/pw_console/py/log_view_test.py
+++ b/pw_console/py/log_view_test.py
@@ -15,16 +15,21 @@
import logging
import time
+import sys
import unittest
from datetime import datetime
from unittest.mock import MagicMock, patch
from parameterized import parameterized # type: ignore
-
from prompt_toolkit.data_structures import Point
from prompt_toolkit.formatted_text import FormattedText
from pw_console.log_view import LogView
+_PYTHON_3_8 = sys.version_info >= (
+ 3,
+ 8,
+)
+
def _create_log_view():
log_pane = MagicMock()
@@ -264,5 +269,101 @@
expected_line_cache)
+if _PYTHON_3_8:
+ # pylint: disable=no-name-in-module
+ from unittest import IsolatedAsyncioTestCase # type: ignore
+
+ class TestLogViewFiltering(IsolatedAsyncioTestCase): # pylint: disable=undefined-variable
+ """Test LogView log filtering capabilities."""
+ def _create_log_view_from_list(self, log_messages):
+ log_view, log_pane = _create_log_view()
+
+ test_log = logging.getLogger('log_view.test')
+ with self.assertLogs(test_log, level='DEBUG') as _log_context:
+ test_log.addHandler(log_view.log_store)
+ for log, extra_arg in log_messages:
+ test_log.debug('%s', log, extra=extra_arg)
+
+ return log_view, log_pane
+
+ @parameterized.expand([
+ (
+ 'regex filter',
+ 'log.*item',
+ [
+ ('Log some item', dict()),
+ ('Log another item', dict()),
+ ('Some exception', dict()),
+ ],
+ [
+ 'Log some item',
+ 'Log another item',
+ ],
+ None, # field
+ False, # invert
+ ),
+ (
+ 'regex filter with field',
+ 'earth',
+ [
+ ('Log some item',
+ dict(extra_metadata_fields={'planet': 'Jupiter'})),
+ ('Log another item',
+ dict(extra_metadata_fields={'planet': 'Earth'})),
+ ('Some exception',
+ dict(extra_metadata_fields={'planet': 'Earth'})),
+ ],
+ [
+ 'Log another item',
+ 'Some exception',
+ ],
+ 'planet', # field
+ False, # invert
+ ),
+ (
+ 'regex filter with field inverted',
+ 'earth',
+ [
+ ('Log some item',
+ dict(extra_metadata_fields={'planet': 'Jupiter'})),
+ ('Log another item',
+ dict(extra_metadata_fields={'planet': 'Earth'})),
+ ('Some exception',
+ dict(extra_metadata_fields={'planet': 'Earth'})),
+ ],
+ [
+ 'Log some item',
+ ],
+ 'planet', # field
+ True, # invert
+ ),
+ ]) # yapf: disable
+ async def test_log_filtering(
+ self,
+ _name,
+ input_text,
+ input_lines,
+ expected_matched_lines,
+ field=None,
+ invert=False,
+ ) -> None:
+ """Test run log view filtering."""
+ log_view, _log_pane = self._create_log_view_from_list(input_lines)
+ self.assertEqual(log_view.get_total_count(), len(input_lines))
+
+ log_view.new_search(input_text, invert=invert, field=field)
+ log_view.apply_filter()
+ await log_view.filter_existing_logs_task
+
+ self.assertEqual(log_view.get_total_count(),
+ len(expected_matched_lines))
+ self.assertEqual(
+ [log.record.message for log in log_view.filtered_logs],
+ expected_matched_lines)
+
+ log_view.clear_filters()
+ self.assertEqual(log_view.get_total_count(), len(input_lines))
+
+
if __name__ == '__main__':
unittest.main()
diff --git a/pw_console/py/pw_console/__main__.py b/pw_console/py/pw_console/__main__.py
index f54e1d8..049abf7 100644
--- a/pw_console/py/pw_console/__main__.py
+++ b/pw_console/py/pw_console/__main__.py
@@ -87,7 +87,7 @@
log_file=args.logfile)
if args.console_debug_log_file:
- pw_cli.log.install(level=args.loglevel,
+ pw_cli.log.install(level=logging.DEBUG,
use_color=True,
hide_timestamp=False,
log_file=args.console_debug_log_file,
diff --git a/pw_console/py/pw_console/log_filter.py b/pw_console/py/pw_console/log_filter.py
new file mode 100644
index 0000000..d7ece0b
--- /dev/null
+++ b/pw_console/py/pw_console/log_filter.py
@@ -0,0 +1,139 @@
+# Copyright 2021 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""LogFilters define how to search log lines in LogViews."""
+
+from __future__ import annotations
+import logging
+import re
+from dataclasses import dataclass
+from enum import Enum
+from typing import Optional
+
+from prompt_toolkit.formatted_text.utils import fragment_list_to_text
+from prompt_toolkit.layout.utils import explode_text_fragments
+from prompt_toolkit.validation import ValidationError, Validator
+
+from pw_console.log_line import LogLine
+
+_LOG = logging.getLogger(__package__)
+
+_UPPERCASE_REGEX = re.compile(r'[A-Z]')
+
+
+class SearchMatcher(Enum):
+ """Possible search match methods."""
+ FUZZY = 'FUZZY'
+ REGEX = 'REGEX'
+ STRING = 'STRING'
+
+
+DEFAULT_SEARCH_MATCHER = SearchMatcher.REGEX
+
+
+def preprocess_search_regex(text,
+ matcher: SearchMatcher = DEFAULT_SEARCH_MATCHER):
+ # Ignorecase unless the text has capital letters in it.
+ regex_flags = re.IGNORECASE
+ if _UPPERCASE_REGEX.search(text):
+ regex_flags = re.RegexFlag(0)
+
+ if matcher == SearchMatcher.FUZZY:
+ # Fuzzy match replace spaces with .*
+ text_tokens = text.split(' ')
+ if len(text_tokens) > 1:
+ text = '(.*?)'.join(
+ ['({})'.format(re.escape(text)) for text in text_tokens])
+ elif matcher == SearchMatcher.STRING:
+ # Escape any regex specific characters to match the string literal.
+ text = re.escape(text)
+ elif matcher == SearchMatcher.REGEX:
+ # Don't modify search text input.
+ pass
+
+ return text, regex_flags
+
+
+class RegexValidator(Validator):
+ """Validation of regex input."""
+ def validate(self, document):
+ """Check search input for regex syntax errors."""
+ regex_text, regex_flags = preprocess_search_regex(document.text)
+ try:
+ re.compile(regex_text, regex_flags)
+ except re.error as error:
+ raise ValidationError(error.pos,
+ "Regex Error: %s" % error) from error
+
+
+@dataclass
+class LogFilter:
+ """Log Filter Dataclass."""
+ regex: re.Pattern
+ input_text: Optional[str] = None
+ invert: bool = False
+ field: Optional[str] = None
+
+ def pattern(self):
+ return self.regex.pattern
+
+ def matches(self, log: LogLine):
+ field = log.ansi_stripped_log
+ if self.field:
+ if hasattr(log, 'metadata') and hasattr(log.metadata, 'fields'):
+ field = log.metadata.fields.get(self.field,
+ log.ansi_stripped_log)
+ if hasattr(log.record, 'extra_metadata_fields'): # type: ignore
+ field = log.record.extra_metadata_fields.get( # type: ignore
+ self.field, log.ansi_stripped_log)
+ if self.field == 'lvl':
+ field = log.record.levelname
+ elif self.field == 'time':
+ field = log.record.asctime
+
+ match = self.regex.search(field)
+
+ if self.invert:
+ return not match
+ return match
+
+ def highlight_search_matches(self, line_fragments, selected=False):
+ """Highlight search matches in the current line_fragment."""
+ line_text = fragment_list_to_text(line_fragments)
+ exploded_fragments = explode_text_fragments(line_fragments)
+
+ def apply_highlighting(fragments, i):
+ # Expand all fragments and apply the highlighting style.
+ old_style, _text, *_ = fragments[i]
+ if selected:
+ fragments[i] = (
+ old_style + ' class:search.current ',
+ fragments[i][1],
+ )
+ else:
+ fragments[i] = (
+ old_style + ' class:search ',
+ fragments[i][1],
+ )
+
+ if self.invert:
+ # Highlight the whole line
+ for i, _fragment in enumerate(exploded_fragments):
+ apply_highlighting(exploded_fragments, i)
+ else:
+ # Highlight each non-overlapping search match.
+ for match in self.regex.finditer(line_text):
+ for fragment_i in range(match.start(), match.end()):
+ apply_highlighting(exploded_fragments, fragment_i)
+
+ return exploded_fragments
diff --git a/pw_console/py/pw_console/log_view.py b/pw_console/py/pw_console/log_view.py
index 4e02e33..af18eea 100644
--- a/pw_console/py/pw_console/log_view.py
+++ b/pw_console/py/pw_console/log_view.py
@@ -16,13 +16,12 @@
from __future__ import annotations
import asyncio
import collections
+import copy
import logging
import re
import time
from typing import List, Optional, TYPE_CHECKING
-from prompt_toolkit.formatted_text.utils import fragment_list_to_text
-from prompt_toolkit.layout.utils import explode_text_fragments
from prompt_toolkit.data_structures import Point
from prompt_toolkit.formatted_text import (
to_formatted_text,
@@ -32,14 +31,20 @@
import pw_console.text_formatting
from pw_console.log_store import LogStore
+from pw_console.log_filter import (
+ DEFAULT_SEARCH_MATCHER,
+ LogFilter,
+ RegexValidator,
+ SearchMatcher,
+ preprocess_search_regex,
+)
if TYPE_CHECKING:
+ from pw_console.log_line import LogLine
from pw_console.log_pane import LogPane
_LOG = logging.getLogger(__package__)
-_UPPERCASE_REGEX = re.compile(r'[A-Z]')
-
class LogView:
"""Viewing window into a LogStore."""
@@ -56,19 +61,21 @@
# Search variables
self.search_text = None
- self.search_re_flags = None
- self.search_regex = None
+ self.search_filter = None
self.search_highlight = False
+ self.search_matcher = DEFAULT_SEARCH_MATCHER
+ self.search_validator = RegexValidator()
# Filter
self.filtering_on = False
- self.filter_text = None
- self.filter_regex = None
+ self.filters: 'collections.OrderedDict[str, LogFilter]' = (
+ collections.OrderedDict())
self.filtered_logs: collections.deque = collections.deque()
self.filter_existing_logs_task = None
# Current log line index state variables:
- self.line_index = 0
+ self._line_index = 0
+ self._filtered_line_index = 0
self._last_start_index = 0
self._last_end_index = 0
self._current_start_index = 0
@@ -82,6 +89,7 @@
# log lines.
self._ui_update_frequency = 0.1
self._last_ui_update_time = time.time()
+ self._last_log_store_index = 0
# Should new log lines be tailed?
self.follow = True
@@ -90,98 +98,189 @@
# rendering by `get_cursor_position()`.
self._line_fragment_cache: collections.deque = collections.deque()
+ @property
+ def line_index(self):
+ if self.filtering_on:
+ return self._filtered_line_index
+ return self._line_index
+
+ @line_index.setter
+ def line_index(self, line_index):
+ if self.filtering_on:
+ self._filtered_line_index = line_index
+ else:
+ self._line_index = line_index
+
def _set_match_position(self, position: int):
self.follow = False
self.line_index = position
self.log_pane.application.redraw_ui()
+ def select_next_search_matcher(self):
+ matchers = list(SearchMatcher)
+ index = matchers.index(self.search_matcher)
+ new_index = (index + 1) % len(matchers)
+ self.search_matcher = matchers[new_index]
+
def search_forwards(self):
- if not self.search_regex:
+ if not self.search_filter:
return
self.search_highlight = True
- starting_index = self.get_current_line() + 1
- if starting_index > self.log_store.get_last_log_line_index():
+ starting_index = self.line_index + 1
+ if starting_index > self.get_last_log_line_index():
starting_index = 0
+ logs = self._get_log_lines()
+
# From current position +1 and down
- for i in range(starting_index,
- self.log_store.get_last_log_line_index() + 1):
- if self.search_regex.search(
- self.log_store.logs[i].ansi_stripped_log):
+ for i in range(starting_index, self.get_last_log_line_index() + 1):
+ if self.search_filter.matches(logs[i]):
self._set_match_position(i)
return
# From the beginning to the original start
for i in range(0, starting_index):
- if self.search_regex.search(
- self.log_store.logs[i].ansi_stripped_log):
+ if self.search_filter.matches(logs[i]):
self._set_match_position(i)
return
def search_backwards(self):
- if not self.search_regex:
+ if not self.search_filter:
return
self.search_highlight = True
- starting_index = self.get_current_line() - 1
+ starting_index = self.line_index - 1
if starting_index < 0:
- starting_index = self.log_store.get_last_log_line_index()
+ starting_index = self.get_last_log_line_index()
+
+ logs = self._get_log_lines()
# From current position - 1 and up
for i in range(starting_index, -1, -1):
- if self.search_regex.search(
- self.log_store.logs[i].ansi_stripped_log):
+ if self.search_filter.matches(logs[i]):
self._set_match_position(i)
return
# From the end to the original start
- for i in range(self.log_store.get_last_log_line_index(),
- starting_index, -1):
- if self.search_regex.search(
- self.log_store.logs[i].ansi_stripped_log):
+ for i in range(self.get_last_log_line_index(), starting_index, -1):
+ if self.search_filter.matches(logs[i]):
self._set_match_position(i)
return
- def _set_search_regex(self, text):
- # Reset search text
- self.search_text = text
+ def _set_search_regex(self, text, invert, field):
+ regex_text, regex_flags = preprocess_search_regex(
+ text, matcher=self.search_matcher)
+
+ try:
+ compiled_regex = re.compile(regex_text, regex_flags)
+ self.search_filter = LogFilter(
+ regex=compiled_regex,
+ input_text=text,
+ invert=invert,
+ field=field,
+ )
+ except re.error as error:
+ _LOG.debug(error)
+ return False
+
self.search_highlight = True
+ self.search_text = regex_text
+ return True
- # Ignorecase unless the text has capital letters in it.
- if _UPPERCASE_REGEX.search(text):
- self.search_re_flags = re.RegexFlag(0)
- else:
- self.search_re_flags = re.IGNORECASE
-
- self.search_regex = re.compile(re.escape(self.search_text),
- self.search_re_flags)
-
- def new_search(self, text):
+ def new_search(self,
+ text,
+ invert=False,
+ field: Optional[str] = None) -> bool:
"""Start a new search for the given text."""
- self._set_search_regex(text)
- # Default search direction when hitting enter in the search bar.
- self.search_backwards()
+ if self._set_search_regex(text, invert, field):
+ # Default search direction when hitting enter in the search bar.
+ self.search_backwards()
+ return True
+ return False
def disable_search_highlighting(self):
self.log_pane.log_view.search_highlight = False
- def apply_filter(self, text=None):
- """Set a filter."""
- if not text:
- text = self.search_text
- self._set_search_regex(text)
- self.filter_text = text
- self.filter_regex = self.search_regex
+ def _restart_filtering(self):
+ # Turn on follow
+ if not self.follow:
+ self.toggle_follow()
+
+ # Reset filtered logs.
+ self.filtered_logs.clear()
+
+ # Start filtering existing log lines.
+ self.filter_existing_logs_task = asyncio.create_task(
+ self.filter_past_logs())
+
+ # Reset existing search
+ self.clear_search()
+
+ # Redraw the UI
+ self.log_pane.application.redraw_ui()
+
+ def apply_filter(self):
+ """Set a filter using the current search_regex."""
+ if not self.search_filter:
+ return
self.search_highlight = False
- self.filter_existing_logs_task = asyncio.create_task(
- self.filter_logs())
+ self.filtering_on = True
+ self.filters[self.search_text] = copy.deepcopy(self.search_filter)
- async def filter_logs(self):
- """Filter"""
- # TODO(tonymd): Filter existing lines here.
- await asyncio.sleep(.3)
+ self._restart_filtering()
+
+ def clear_search(self):
+ self.search_text = None
+ self.search_filter = None
+ self.search_highlight = False
+
+ def _get_log_lines(self):
+ if self.filtering_on:
+ return self.filtered_logs
+ return self.log_store.logs
+
+ def delete_filter(self, filter_text):
+ if filter_text not in self.filters:
+ return
+
+ # Delete this filter
+ del self.filters[filter_text]
+
+ # If no filters left, stop filtering.
+ if len(self.filters) == 0:
+ self.clear_filters()
+ else:
+ # Erase existing filtered lines.
+ self._restart_filtering()
+
+ def clear_filters(self):
+ if not self.filtering_on:
+ return
+ self.clear_search()
+ self.filtering_on = False
+ self.filters: 'collections.OrderedDict[str, re.Pattern]' = (
+ collections.OrderedDict())
+ self.filtered_logs.clear()
+ if not self.follow:
+ self.toggle_follow()
+
+ async def filter_past_logs(self):
+ """Filter past log lines."""
+ starting_index = self.log_store.get_last_log_line_index()
+ ending_index = -1
+
+ # From the end of the log store to the beginning.
+ for i in range(starting_index, ending_index, -1):
+ # Is this log a match?
+ if self.filter_scan(self.log_store.logs[i]):
+ # Add to the beginning of the deque.
+ self.filtered_logs.appendleft(self.log_store.logs[i])
+ # TODO(tonymd): Tune these values.
+ # Pause every 100 lines or so
+ if i % 100 == 0:
+ await asyncio.sleep(.1)
def set_log_pane(self, log_pane: 'LogPane'):
"""Set the parent LogPane instance."""
@@ -193,8 +292,14 @@
def get_total_count(self):
"""Total size of the logs store."""
+ if self.filtering_on:
+ return len(self.filtered_logs)
return self.log_store.get_total_count()
+ def get_last_log_line_index(self):
+ total = self.get_total_count()
+ return 0 if total < 0 else total - 1
+
def clear_scrollback(self):
"""Hide log lines before the max length of the stored logs."""
# TODO(tonymd): Should the LogStore be erased?
@@ -218,9 +323,30 @@
return self.log_store.longest_channel_prefix_width
return 0
+ def filter_scan(self, log: 'LogLine'):
+ filter_match_count = 0
+ for _filter_text, log_filter in self.filters.items():
+ if log_filter.matches(log):
+ filter_match_count += 1
+ else:
+ break
+
+ if filter_match_count == len(self.filters):
+ return True
+ return False
+
def new_logs_arrived(self):
# If follow is on, scroll to the last line.
- # TODO(tonymd): Filter new lines here.
+ latest_total = self.log_store.get_total_count()
+
+ if self.filtering_on:
+ # Scan newly arived log lines
+ for i in range(self._last_log_store_index, latest_total):
+ if self.filter_scan(self.log_store.logs[i]):
+ self.filtered_logs.append(self.log_store.logs[i])
+
+ self._last_log_store_index = latest_total
+
if self.follow:
self.scroll_to_bottom()
@@ -270,7 +396,9 @@
def scroll_to_bottom(self):
"""Move selected index to the end."""
# Don't change following state like scroll_to_top.
- self.line_index = max(0, self.log_store.get_last_log_line_index())
+ self.line_index = max(0, self.get_last_log_line_index())
+ # Sticky follow mode
+ self.follow = True
def scroll(self, lines):
"""Scroll up or down by plus or minus lines.
@@ -280,13 +408,18 @@
# If the user starts scrolling, stop auto following.
self.follow = False
+ last_index = self.get_last_log_line_index()
+
# If scrolling to an index below zero, set to zero.
new_line_index = max(0, self.line_index + lines)
# If past the end, set to the last index of self.logs.
- if new_line_index >= self.log_store.get_total_count():
- new_line_index = self.log_store.get_last_log_line_index()
+ if new_line_index >= self.get_total_count():
+ new_line_index = last_index
# Set the new selected line index.
self.line_index = new_line_index
+ # Sticky follow mode
+ if self.line_index == last_index:
+ self.follow = True
def scroll_to_position(self, mouse_position: Point):
"""Set the selected log line to the mouse_position."""
@@ -348,8 +481,8 @@
# Use the current_window_height if line_index is less
ending_index = max(self.line_index, max_window_row_index)
- if ending_index > self.log_store.get_last_log_line_index():
- ending_index = self.log_store.get_last_log_line_index()
+ if ending_index > self.get_last_log_line_index():
+ ending_index = self.get_last_log_line_index()
# Save start and end index.
self._current_start_index = starting_index
@@ -368,8 +501,11 @@
the current log line position and the given window size. It also sets
the cursor position depending on which line is selected.
"""
+
+ logs = self._get_log_lines()
+
# Reset _line_fragment_cache ( used in self.get_cursor_position )
- self._line_fragment_cache = collections.deque()
+ self._line_fragment_cache.clear()
# Track used lines.
total_used_lines = 0
@@ -377,7 +513,7 @@
# If we have no logs add one with at least a single space character for
# the cursor to land on. Otherwise the cursor will be left on the line
# above the log pane container.
- if self.log_store.get_total_count() < 1:
+ if self.get_total_count() < 1:
return [(
'[SetCursorPosition]', '\n' * self._window_height
# LogContentControl.mouse_handler will handle focusing the log
@@ -399,9 +535,8 @@
# Grab the rendered log line using the table or standard view.
line_fragments: StyleAndTextTuples = (
- self.log_store.table.formatted_row(self.log_store.logs[i])
- if self.log_pane.table_view else
- self.log_store.logs[i].get_fragments())
+ self.log_store.table.formatted_row(logs[i])
+ if self.log_pane.table_view else logs[i].get_fragments())
# Get the width, height and remaining width.
fragment_width = fragment_list_width(line_fragments)
@@ -419,7 +554,7 @@
used_lines = line_height
# Count the number of line breaks are included in the log line.
- line_breaks = self.log_store.logs[i].ansi_stripped_log.count('\n')
+ line_breaks = logs[i].ansi_stripped_log.count('\n')
used_lines += line_breaks
# If this is the selected line apply a style class for highlighting.
@@ -441,10 +576,9 @@
line_fragments, style='class:selected-log-line')
# Apply search term highlighting.
- if self.search_regex and self.search_highlight and (
- self.search_regex.search(
- self.log_store.logs[i].ansi_stripped_log)):
- line_fragments = self._highlight_search_matches(
+ if self.search_filter and self.search_highlight and (
+ self.search_filter.matches(logs[i])):
+ line_fragments = self.search_filter.highlight_search_matches(
line_fragments, selected)
# Save this line to the beginning of the cache.
@@ -460,25 +594,3 @@
return pw_console.text_formatting.flatten_formatted_text_tuples(
self._line_fragment_cache)
-
- def _highlight_search_matches(self, line_fragments, selected=False):
- """Highlight search matches in the current line_fragment."""
- line_text = fragment_list_to_text(line_fragments)
- exploded_fragments = explode_text_fragments(line_fragments)
-
- # Loop through each non-overlapping search match.
- for match in self.search_regex.finditer(line_text):
- for fragment_i in range(match.start(), match.end()):
- # Expand all fragments and apply the highlighting style.
- old_style, _text, *_ = exploded_fragments[fragment_i]
- if selected:
- exploded_fragments[fragment_i] = (
- old_style + ' class:search.current ',
- exploded_fragments[fragment_i][1],
- )
- else:
- exploded_fragments[fragment_i] = (
- old_style + ' class:search ',
- exploded_fragments[fragment_i][1],
- )
- return exploded_fragments