| # Copyright 2011 Yesudeep Mangalapilly <yesudeep@gmail.com> |
| # Copyright 2012 Google, Inc & contributors. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| from watchdog.events import ( |
| FileDeletedEvent, |
| FileModifiedEvent, |
| FileCreatedEvent, |
| DirDeletedEvent, |
| DirModifiedEvent, |
| DirCreatedEvent, |
| FileMovedEvent, |
| DirMovedEvent, |
| RegexMatchingEventHandler, |
| LoggingEventHandler, |
| EVENT_TYPE_MODIFIED, |
| EVENT_TYPE_CREATED, |
| EVENT_TYPE_DELETED, |
| EVENT_TYPE_MOVED, |
| ) |
| |
| path_1 = "/path/xyz" |
| path_2 = "/path/abc" |
| g_allowed_regexes = [r".*\.py", r".*\.txt"] |
| g_allowed_str_regexes = r".*\.py" |
| g_ignore_regexes = [r".*\.pyc"] |
| |
| |
| def test_dispatch(): |
| # Utilities. |
| regexes = [r".*\.py", r".*\.txt"] |
| ignore_regexes = [r".*\.pyc"] |
| |
| def assert_regexes(handler, event): |
| if hasattr(event, "dest_path"): |
| paths = [event.src_path, event.dest_path] |
| else: |
| paths = [event.src_path] |
| filtered_paths = set() |
| for p in paths: |
| if any(r.match(p) for r in handler.regexes): |
| filtered_paths.add(p) |
| assert filtered_paths |
| |
| dir_del_event_match = DirDeletedEvent("/path/blah.py") |
| dir_del_event_not_match = DirDeletedEvent("/path/foobar") |
| dir_del_event_ignored = DirDeletedEvent("/path/foobar.pyc") |
| file_del_event_match = FileDeletedEvent("/path/blah.txt") |
| file_del_event_not_match = FileDeletedEvent("/path/foobar") |
| file_del_event_ignored = FileDeletedEvent("/path/blah.pyc") |
| |
| dir_cre_event_match = DirCreatedEvent("/path/blah.py") |
| dir_cre_event_not_match = DirCreatedEvent("/path/foobar") |
| dir_cre_event_ignored = DirCreatedEvent("/path/foobar.pyc") |
| file_cre_event_match = FileCreatedEvent("/path/blah.txt") |
| file_cre_event_not_match = FileCreatedEvent("/path/foobar") |
| file_cre_event_ignored = FileCreatedEvent("/path/blah.pyc") |
| |
| dir_mod_event_match = DirModifiedEvent("/path/blah.py") |
| dir_mod_event_not_match = DirModifiedEvent("/path/foobar") |
| dir_mod_event_ignored = DirModifiedEvent("/path/foobar.pyc") |
| file_mod_event_match = FileModifiedEvent("/path/blah.txt") |
| file_mod_event_not_match = FileModifiedEvent("/path/foobar") |
| file_mod_event_ignored = FileModifiedEvent("/path/blah.pyc") |
| |
| dir_mov_event_match = DirMovedEvent("/path/blah.py", "/path/blah") |
| dir_mov_event_not_match = DirMovedEvent("/path/foobar", "/path/blah") |
| dir_mov_event_ignored = DirMovedEvent("/path/foobar.pyc", "/path/blah") |
| file_mov_event_match = FileMovedEvent("/path/blah.txt", "/path/blah") |
| file_mov_event_not_match = FileMovedEvent("/path/foobar", "/path/blah") |
| file_mov_event_ignored = FileMovedEvent("/path/blah.pyc", "/path/blah") |
| |
| all_dir_events = [ |
| dir_mod_event_match, |
| dir_mod_event_not_match, |
| dir_mod_event_ignored, |
| dir_del_event_match, |
| dir_del_event_not_match, |
| dir_del_event_ignored, |
| dir_cre_event_match, |
| dir_cre_event_not_match, |
| dir_cre_event_ignored, |
| dir_mov_event_match, |
| dir_mov_event_not_match, |
| dir_mov_event_ignored, |
| ] |
| all_file_events = [ |
| file_mod_event_match, |
| file_mod_event_not_match, |
| file_mod_event_ignored, |
| file_del_event_match, |
| file_del_event_not_match, |
| file_del_event_ignored, |
| file_cre_event_match, |
| file_cre_event_not_match, |
| file_cre_event_ignored, |
| file_mov_event_match, |
| file_mov_event_not_match, |
| file_mov_event_ignored, |
| ] |
| all_events = all_file_events + all_dir_events |
| |
| def assert_check_directory(handler, event): |
| assert not (handler.ignore_directories and event.is_directory) |
| |
| class TestableEventHandler(RegexMatchingEventHandler): |
| def on_any_event(self, event): |
| assert_check_directory(self, event) |
| |
| def on_modified(self, event): |
| assert_check_directory(self, event) |
| assert event.event_type == EVENT_TYPE_MODIFIED |
| assert_regexes(self, event) |
| |
| def on_deleted(self, event): |
| assert_check_directory(self, event) |
| assert event.event_type == EVENT_TYPE_DELETED |
| assert_regexes(self, event) |
| |
| def on_moved(self, event): |
| assert_check_directory(self, event) |
| assert event.event_type == EVENT_TYPE_MOVED |
| assert_regexes(self, event) |
| |
| def on_created(self, event): |
| assert_check_directory(self, event) |
| assert event.event_type == EVENT_TYPE_CREATED |
| assert_regexes(self, event) |
| |
| no_dirs_handler = TestableEventHandler( |
| regexes=regexes, ignore_regexes=ignore_regexes, ignore_directories=True |
| ) |
| handler = TestableEventHandler( |
| regexes=regexes, ignore_regexes=ignore_regexes, ignore_directories=False |
| ) |
| |
| for event in all_events: |
| no_dirs_handler.dispatch(event) |
| for event in all_events: |
| handler.dispatch(event) |
| |
| |
| def test_handler(): |
| handler1 = RegexMatchingEventHandler(g_allowed_regexes, g_ignore_regexes, True) |
| handler2 = RegexMatchingEventHandler(g_allowed_regexes, g_ignore_regexes, False) |
| assert [r.pattern for r in handler1.regexes] == g_allowed_regexes |
| assert [r.pattern for r in handler1.ignore_regexes] == g_ignore_regexes |
| assert handler1.ignore_directories |
| assert not handler2.ignore_directories |
| |
| |
| def test_ignore_directories(): |
| handler1 = RegexMatchingEventHandler(g_allowed_regexes, g_ignore_regexes, True) |
| handler2 = RegexMatchingEventHandler(g_allowed_regexes, g_ignore_regexes, False) |
| assert handler1.ignore_directories |
| assert not handler2.ignore_directories |
| |
| |
| def test_ignore_regexes(): |
| handler1 = RegexMatchingEventHandler(g_allowed_regexes, g_ignore_regexes, True) |
| assert [r.pattern for r in handler1.ignore_regexes] == g_ignore_regexes |
| |
| |
| def test_regexes(): |
| handler1 = RegexMatchingEventHandler(g_allowed_regexes, g_ignore_regexes, True) |
| assert [r.pattern for r in handler1.regexes] == g_allowed_regexes |
| |
| |
| def test_str_regexes(): |
| handler1 = RegexMatchingEventHandler(g_allowed_str_regexes, g_ignore_regexes, True) |
| assert [r.pattern for r in handler1.regexes] == [g_allowed_str_regexes] |
| |
| |
| def test_logging_event_handler_dispatch(): |
| class _TestableEventHandler(LoggingEventHandler): |
| def on_any_event(self, event): |
| assert True |
| |
| def on_modified(self, event): |
| super().on_modified(event) |
| assert event.event_type == EVENT_TYPE_MODIFIED |
| |
| def on_deleted(self, event): |
| super().on_deleted(event) |
| assert event.event_type == EVENT_TYPE_DELETED |
| |
| def on_moved(self, event): |
| super().on_moved(event) |
| assert event.event_type == EVENT_TYPE_MOVED |
| |
| def on_created(self, event): |
| super().on_created(event) |
| assert event.event_type == EVENT_TYPE_CREATED |
| |
| # Utilities. |
| dir_del_event = DirDeletedEvent("/path/blah.py") |
| file_del_event = FileDeletedEvent("/path/blah.txt") |
| dir_cre_event = DirCreatedEvent("/path/blah.py") |
| file_cre_event = FileCreatedEvent("/path/blah.txt") |
| dir_mod_event = DirModifiedEvent("/path/blah.py") |
| file_mod_event = FileModifiedEvent("/path/blah.txt") |
| dir_mov_event = DirMovedEvent("/path/blah.py", "/path/blah") |
| file_mov_event = FileMovedEvent("/path/blah.txt", "/path/blah") |
| |
| all_events = [ |
| dir_mod_event, |
| dir_del_event, |
| dir_cre_event, |
| dir_mov_event, |
| file_mod_event, |
| file_del_event, |
| file_cre_event, |
| file_mov_event, |
| ] |
| |
| handler = _TestableEventHandler() |
| for event in all_events: |
| handler.dispatch(event) |