blob: 4d7978c0f0285c12d3ad8ebc874f432038a30412 [file] [log] [blame]
# Copyright 2014 Thomas Amland <thomas.amland@gmail.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import stat
import time
import pytest
import logging
from functools import partial
from queue import Queue, Empty
from typing import Type
from .shell import mkfile, mkdir, touch, mv, rm
from watchdog.utils import platform
from watchdog.events import (
FileDeletedEvent,
FileModifiedEvent,
FileCreatedEvent,
FileMovedEvent,
DirDeletedEvent,
DirModifiedEvent,
DirCreatedEvent,
DirMovedEvent,
FileClosedEvent,
FileOpenedEvent,
)
from watchdog.observers.api import EventEmitter, ObservedWatch
Emitter: Type[EventEmitter]
if platform.is_linux():
from watchdog.observers.inotify import (
InotifyEmitter as Emitter,
InotifyFullEmitter,
)
elif platform.is_darwin():
from watchdog.observers.fsevents import FSEventsEmitter as Emitter
elif platform.is_windows():
from watchdog.observers.read_directory_changes import WindowsApiEmitter as Emitter
elif platform.is_bsd():
from watchdog.observers.kqueue import ( # type: ignore[attr-defined,no-redef]
KqueueEmitter as Emitter
)
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
if platform.is_darwin():
# enable more verbose logs
fsevents_logger = logging.getLogger("fsevents")
fsevents_logger.setLevel(logging.DEBUG)
@pytest.fixture(autouse=True)
def setup_teardown(tmpdir):
global p, emitter, event_queue
p = partial(os.path.join, tmpdir)
event_queue = Queue()
yield
emitter.stop()
emitter.join(5)
assert not emitter.is_alive()
def start_watching(path=None, use_full_emitter=False, recursive=True):
# todo: check if other platforms expect the trailing slash (e.g. `p('')`)
path = p() if path is None else path
global emitter
if platform.is_linux() and use_full_emitter:
emitter = InotifyFullEmitter(
event_queue, ObservedWatch(path, recursive=recursive)
)
else:
emitter = Emitter(event_queue, ObservedWatch(path, recursive=recursive))
if platform.is_darwin():
emitter.suppress_history = True
emitter.start()
def rerun_filter(exc, *args):
time.sleep(5)
if issubclass(exc[0], Empty) and platform.is_windows():
return True
return False
def expect_event(expected_event, timeout=2):
"""Utility function to wait up to `timeout` seconds for an `event_type` for `path` to show up in the queue.
Provides some robustness for the otherwise flaky nature of asynchronous notifications.
"""
try:
event = event_queue.get(timeout=timeout)[0]
assert event == expected_event
except Empty:
raise
@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)
def test_create():
start_watching()
open(p("a"), "a").close()
expect_event(FileCreatedEvent(p("a")))
if not platform.is_windows():
expect_event(DirModifiedEvent(p()))
if platform.is_linux():
event = event_queue.get(timeout=5)[0]
assert event.src_path == p("a")
assert isinstance(event, FileOpenedEvent)
event = event_queue.get(timeout=5)[0]
assert event.src_path == p("a")
assert isinstance(event, FileClosedEvent)
@pytest.mark.skipif(
not platform.is_linux(), reason="FileCloseEvent only supported in GNU/Linux"
)
@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)
def test_close():
f_d = open(p("a"), "a")
start_watching()
f_d.close()
# After file creation/open in append mode
event = event_queue.get(timeout=5)[0]
assert event.src_path == p("a")
assert isinstance(event, FileClosedEvent)
event = event_queue.get(timeout=5)[0]
assert os.path.normpath(event.src_path) == os.path.normpath(p(""))
assert isinstance(event, DirModifiedEvent)
# After read-only, only IN_CLOSE_NOWRITE is emitted but not caught for now #747
open(p("a"), "r").close()
assert event_queue.empty()
@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)
@pytest.mark.skipif(
platform.is_darwin() or platform.is_windows(),
reason="Windows and macOS enforce proper encoding",
)
def test_create_wrong_encoding():
start_watching()
open(p("a_\udce4"), "a").close()
event = event_queue.get(timeout=5)[0]
assert event.src_path == p("a_\udce4")
assert isinstance(event, FileCreatedEvent)
if not platform.is_windows():
event = event_queue.get(timeout=5)[0]
assert os.path.normpath(event.src_path) == os.path.normpath(p(""))
assert isinstance(event, DirModifiedEvent)
@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)
def test_delete():
mkfile(p("a"))
start_watching()
rm(p("a"))
expect_event(FileDeletedEvent(p("a")))
if not platform.is_windows():
expect_event(DirModifiedEvent(p()))
@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)
def test_modify():
mkfile(p("a"))
start_watching()
touch(p("a"))
if platform.is_linux():
event = event_queue.get(timeout=5)[0]
assert event.src_path == p("a")
assert isinstance(event, FileOpenedEvent)
expect_event(FileModifiedEvent(p("a")))
if platform.is_linux():
event = event_queue.get(timeout=5)[0]
assert event.src_path == p("a")
assert isinstance(event, FileClosedEvent)
@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)
def test_chmod():
mkfile(p("a"))
start_watching()
# Note: We use S_IREAD here because chmod on Windows only
# allows setting the read-only flag.
os.chmod(p("a"), stat.S_IREAD)
expect_event(FileModifiedEvent(p("a")))
# Reset permissions to allow cleanup.
os.chmod(p("a"), stat.S_IWRITE)
@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)
def test_move():
mkdir(p("dir1"))
mkdir(p("dir2"))
mkfile(p("dir1", "a"))
start_watching()
mv(p("dir1", "a"), p("dir2", "b"))
if not platform.is_windows():
expect_event(FileMovedEvent(p("dir1", "a"), p("dir2", "b")))
else:
event = event_queue.get(timeout=5)[0]
assert event.src_path == p("dir1", "a")
assert isinstance(event, FileDeletedEvent)
event = event_queue.get(timeout=5)[0]
assert event.src_path == p("dir2", "b")
assert isinstance(event, FileCreatedEvent)
event = event_queue.get(timeout=5)[0]
assert event.src_path in [p("dir1"), p("dir2")]
assert isinstance(event, DirModifiedEvent)
if not platform.is_windows():
event = event_queue.get(timeout=5)[0]
assert event.src_path in [p("dir1"), p("dir2")]
assert isinstance(event, DirModifiedEvent)
@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)
def test_case_change():
mkdir(p("dir1"))
mkdir(p("dir2"))
mkfile(p("dir1", "file"))
start_watching()
mv(p("dir1", "file"), p("dir2", "FILE"))
if not platform.is_windows():
expect_event(FileMovedEvent(p("dir1", "file"), p("dir2", "FILE")))
else:
event = event_queue.get(timeout=5)[0]
assert event.src_path == p("dir1", "file")
assert isinstance(event, FileDeletedEvent)
event = event_queue.get(timeout=5)[0]
assert event.src_path == p("dir2", "FILE")
assert isinstance(event, FileCreatedEvent)
event = event_queue.get(timeout=5)[0]
assert event.src_path in [p("dir1"), p("dir2")]
assert isinstance(event, DirModifiedEvent)
if not platform.is_windows():
event = event_queue.get(timeout=5)[0]
assert event.src_path in [p("dir1"), p("dir2")]
assert isinstance(event, DirModifiedEvent)
@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)
def test_move_to():
mkdir(p("dir1"))
mkdir(p("dir2"))
mkfile(p("dir1", "a"))
start_watching(p("dir2"))
mv(p("dir1", "a"), p("dir2", "b"))
expect_event(FileCreatedEvent(p("dir2", "b")))
if not platform.is_windows():
expect_event(DirModifiedEvent(p("dir2")))
@pytest.mark.skipif(
not platform.is_linux(), reason="InotifyFullEmitter only supported in Linux"
)
def test_move_to_full():
mkdir(p("dir1"))
mkdir(p("dir2"))
mkfile(p("dir1", "a"))
start_watching(p("dir2"), use_full_emitter=True)
mv(p("dir1", "a"), p("dir2", "b"))
event = event_queue.get(timeout=5)[0]
assert isinstance(event, FileMovedEvent)
assert event.dest_path == p("dir2", "b")
assert event.src_path is None # Should equal None since the path was not watched
@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)
def test_move_from():
mkdir(p("dir1"))
mkdir(p("dir2"))
mkfile(p("dir1", "a"))
start_watching(p("dir1"))
mv(p("dir1", "a"), p("dir2", "b"))
expect_event(FileDeletedEvent(p("dir1", "a")))
if not platform.is_windows():
expect_event(DirModifiedEvent(p("dir1")))
@pytest.mark.skipif(
not platform.is_linux(), reason="InotifyFullEmitter only supported in Linux"
)
def test_move_from_full():
mkdir(p("dir1"))
mkdir(p("dir2"))
mkfile(p("dir1", "a"))
start_watching(p("dir1"), use_full_emitter=True)
mv(p("dir1", "a"), p("dir2", "b"))
event = event_queue.get(timeout=5)[0]
assert isinstance(event, FileMovedEvent)
assert event.src_path == p("dir1", "a")
assert event.dest_path is None # Should equal None since path not watched
@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)
def test_separate_consecutive_moves():
mkdir(p("dir1"))
mkfile(p("dir1", "a"))
mkfile(p("b"))
start_watching(p("dir1"))
mv(p("dir1", "a"), p("c"))
mv(p("b"), p("dir1", "d"))
dir_modif = DirModifiedEvent(p("dir1"))
a_deleted = FileDeletedEvent(p("dir1", "a"))
d_created = FileCreatedEvent(p("dir1", "d"))
expected_events = [a_deleted, dir_modif, d_created, dir_modif]
if platform.is_windows():
expected_events = [a_deleted, d_created]
if platform.is_bsd():
# Due to the way kqueue works, we can't really order
# 'Created' and 'Deleted' events in time, so creation queues first
expected_events = [d_created, a_deleted, dir_modif, dir_modif]
for expected_event in expected_events:
expect_event(expected_event)
@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)
@pytest.mark.skipif(
platform.is_bsd(), reason="BSD create another set of events for this test"
)
def test_delete_self():
mkdir(p("dir1"))
start_watching(p("dir1"))
rm(p("dir1"), True)
expect_event(DirDeletedEvent(p("dir1")))
emitter.join(5)
assert not emitter.is_alive()
@pytest.mark.skipif(
platform.is_windows() or platform.is_bsd(),
reason="Windows|BSD create another set of events for this test",
)
def test_fast_subdirectory_creation_deletion():
root_dir = p("dir1")
sub_dir = p("dir1", "subdir1")
times = 30
mkdir(root_dir)
start_watching(root_dir)
for _ in range(times):
mkdir(sub_dir)
rm(sub_dir, True)
time.sleep(0.1) # required for macOS emitter to catch up with us
count = {DirCreatedEvent: 0, DirModifiedEvent: 0, DirDeletedEvent: 0}
etype_for_dir = {
DirCreatedEvent: sub_dir,
DirModifiedEvent: root_dir,
DirDeletedEvent: sub_dir,
}
for _ in range(times * 4):
event = event_queue.get(timeout=5)[0]
logger.debug(event)
etype = type(event)
count[etype] += 1
assert event.src_path == etype_for_dir[etype]
assert count[DirCreatedEvent] >= count[DirDeletedEvent]
assert (
count[DirCreatedEvent] + count[DirDeletedEvent] >= count[DirModifiedEvent]
)
assert count == {
DirCreatedEvent: times,
DirModifiedEvent: times * 2,
DirDeletedEvent: times,
}
@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)
def test_passing_unicode_should_give_unicode():
start_watching(str(p()))
mkfile(p("a"))
event = event_queue.get(timeout=5)[0]
assert isinstance(event.src_path, str)
@pytest.mark.skipif(
platform.is_windows(),
reason="Windows ReadDirectoryChangesW supports only" " unicode for paths.",
)
def test_passing_bytes_should_give_bytes():
start_watching(p().encode())
mkfile(p("a"))
event = event_queue.get(timeout=5)[0]
assert isinstance(event.src_path, bytes)
@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)
def test_recursive_on():
mkdir(p("dir1", "dir2", "dir3"), True)
start_watching()
touch(p("dir1", "dir2", "dir3", "a"))
event = event_queue.get(timeout=5)[0]
assert event.src_path == p("dir1", "dir2", "dir3", "a")
assert isinstance(event, FileCreatedEvent)
if not platform.is_windows():
event = event_queue.get(timeout=5)[0]
assert event.src_path == p("dir1", "dir2", "dir3")
assert isinstance(event, DirModifiedEvent)
if platform.is_linux():
event = event_queue.get(timeout=5)[0]
assert event.src_path == p("dir1", "dir2", "dir3", "a")
assert isinstance(event, FileOpenedEvent)
if not platform.is_bsd():
event = event_queue.get(timeout=5)[0]
assert event.src_path == p("dir1", "dir2", "dir3", "a")
assert isinstance(event, FileModifiedEvent)
@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)
def test_recursive_off():
mkdir(p("dir1"))
start_watching(recursive=False)
touch(p("dir1", "a"))
with pytest.raises(Empty):
event_queue.get(timeout=5)
mkfile(p("b"))
expect_event(FileCreatedEvent(p("b")))
if not platform.is_windows():
expect_event(DirModifiedEvent(p()))
if platform.is_linux():
expect_event(FileOpenedEvent(p("b")))
expect_event(FileClosedEvent(p("b")))
# currently limiting these additional events to macOS only, see https://github.com/gorakhargosh/watchdog/pull/779
if platform.is_darwin():
mkdir(p("dir1", "dir2"))
with pytest.raises(Empty):
event_queue.get(timeout=5)
mkfile(p("dir1", "dir2", "somefile"))
with pytest.raises(Empty):
event_queue.get(timeout=5)
mkdir(p("dir3"))
expect_event(
DirModifiedEvent(p())
) # the contents of the parent directory changed
mv(p("dir1", "dir2", "somefile"), p("somefile"))
expect_event(FileMovedEvent(p("dir1", "dir2", "somefile"), p("somefile")))
expect_event(DirModifiedEvent(p()))
mv(p("dir1", "dir2"), p("dir2"))
expect_event(DirMovedEvent(p("dir1", "dir2"), p("dir2")))
expect_event(DirModifiedEvent(p()))
@pytest.mark.skipif(
platform.is_windows(), reason="Windows create another set of events for this test"
)
def test_renaming_top_level_directory():
start_watching()
mkdir(p("a"))
expect_event(DirCreatedEvent(p("a")))
expect_event(DirModifiedEvent(p()))
mkdir(p("a", "b"))
expect_event(DirCreatedEvent(p("a", "b")))
expect_event(DirModifiedEvent(p("a")))
mv(p("a"), p("a2"))
expect_event(DirMovedEvent(p("a"), p("a2")))
expect_event(DirModifiedEvent(p()))
expect_event(DirModifiedEvent(p()))
expect_event(DirMovedEvent(p("a", "b"), p("a2", "b")))
if platform.is_bsd():
expect_event(DirModifiedEvent(p()))
open(p("a2", "b", "c"), "a").close()
# DirModifiedEvent may emitted, but sometimes after waiting time is out.
events = []
while True:
events.append(event_queue.get(timeout=5)[0])
if event_queue.empty():
break
assert all(
[
isinstance(
e,
(
FileCreatedEvent,
FileMovedEvent,
FileOpenedEvent,
DirModifiedEvent,
FileClosedEvent,
),
)
for e in events
]
)
for event in events:
if isinstance(event, FileCreatedEvent):
assert event.src_path == p("a2", "b", "c")
elif isinstance(event, FileMovedEvent):
assert event.dest_path == p("a2", "b", "c")
assert event.src_path == p("a", "b", "c")
elif isinstance(event, DirModifiedEvent):
assert event.src_path == p("a2", "b")
@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)
@pytest.mark.skipif(
not platform.is_windows(),
reason="Non-Windows create another set of events for this test",
)
def test_renaming_top_level_directory_on_windows():
start_watching()
mkdir(p("a"))
event = event_queue.get(timeout=5)[0]
assert isinstance(event, DirCreatedEvent)
assert event.src_path == p("a")
mkdir(p("a", "b"))
event = event_queue.get(timeout=5)[0]
assert isinstance(event, DirCreatedEvent)
assert event.src_path == p("a", "b")
event = event_queue.get(timeout=5)[0]
assert isinstance(event, DirCreatedEvent)
assert event.src_path == p("a", "b")
event = event_queue.get(timeout=5)[0]
assert isinstance(event, DirModifiedEvent)
assert event.src_path == p("a")
mv(p("a"), p("a2"))
event = event_queue.get(timeout=5)[0]
assert isinstance(event, DirMovedEvent)
assert event.src_path == p("a", "b")
open(p("a2", "b", "c"), "a").close()
events = []
while True:
events.append(event_queue.get(timeout=5)[0])
if event_queue.empty():
break
assert all(
[
isinstance(
e, (FileCreatedEvent, FileMovedEvent, DirMovedEvent, DirModifiedEvent)
)
for e in events
]
)
for event in events:
if isinstance(event, FileCreatedEvent):
assert event.src_path == p("a2", "b", "c")
elif isinstance(event, FileMovedEvent):
assert event.dest_path == p("a2", "b", "c")
assert event.src_path == p("a", "b", "c")
elif isinstance(event, DirMovedEvent):
assert event.dest_path == p("a2")
assert event.src_path == p("a")
elif isinstance(event, DirModifiedEvent):
assert event.src_path == p("a2", "b")
@pytest.mark.skipif(
platform.is_windows(), reason="Windows create another set of events for this test"
)
def test_move_nested_subdirectories():
mkdir(p("dir1/dir2/dir3"), parents=True)
mkfile(p("dir1/dir2/dir3", "a"))
start_watching()
mv(p("dir1/dir2"), p("dir2"))
expect_event(DirMovedEvent(p("dir1", "dir2"), p("dir2")))
expect_event(DirModifiedEvent(p("dir1")))
expect_event(DirModifiedEvent(p()))
expect_event(DirMovedEvent(p("dir1", "dir2", "dir3"), p("dir2", "dir3")))
expect_event(FileMovedEvent(p("dir1", "dir2", "dir3", "a"), p("dir2", "dir3", "a")))
if platform.is_bsd():
event = event_queue.get(timeout=5)[0]
assert p(event.src_path) == p()
assert isinstance(event, DirModifiedEvent)
event = event_queue.get(timeout=5)[0]
assert p(event.src_path) == p("dir1")
assert isinstance(event, DirModifiedEvent)
touch(p("dir2/dir3", "a"))
if platform.is_linux():
event = event_queue.get(timeout=5)[0]
assert event.src_path == p("dir2/dir3", "a")
assert isinstance(event, FileOpenedEvent)
event = event_queue.get(timeout=5)[0]
assert event.src_path == p("dir2/dir3", "a")
assert isinstance(event, FileModifiedEvent)
@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)
@pytest.mark.skipif(
not platform.is_windows(),
reason="Non-Windows create another set of events for this test",
)
def test_move_nested_subdirectories_on_windows():
mkdir(p("dir1/dir2/dir3"), parents=True)
mkfile(p("dir1/dir2/dir3", "a"))
start_watching(p(""))
mv(p("dir1/dir2"), p("dir2"))
event = event_queue.get(timeout=5)[0]
assert event.src_path == p("dir1", "dir2")
assert isinstance(event, FileDeletedEvent)
event = event_queue.get(timeout=5)[0]
assert event.src_path == p("dir2")
assert isinstance(event, DirCreatedEvent)
event = event_queue.get(timeout=5)[0]
assert event.src_path == p("dir2", "dir3")
assert isinstance(event, DirCreatedEvent)
event = event_queue.get(timeout=5)[0]
assert event.src_path == p("dir2", "dir3", "a")
assert isinstance(event, FileCreatedEvent)
touch(p("dir2/dir3", "a"))
events = []
while True:
events.append(event_queue.get(timeout=5)[0])
if event_queue.empty():
break
assert all([isinstance(e, (FileModifiedEvent, DirModifiedEvent)) for e in events])
for event in events:
if isinstance(event, FileModifiedEvent):
assert event.src_path == p("dir2", "dir3", "a")
elif isinstance(event, DirModifiedEvent):
assert event.src_path in [p("dir2"), p("dir2", "dir3")]
@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)
@pytest.mark.skipif(
platform.is_bsd(), reason="BSD create another set of events for this test"
)
def test_file_lifecyle():
start_watching()
mkfile(p("a"))
touch(p("a"))
mv(p("a"), p("b"))
rm(p("b"))
expect_event(FileCreatedEvent(p("a")))
if not platform.is_windows():
expect_event(DirModifiedEvent(p()))
if platform.is_linux():
expect_event(FileOpenedEvent(p("a")))
expect_event(FileClosedEvent(p("a")))
expect_event(DirModifiedEvent(p()))
expect_event(FileOpenedEvent(p("a")))
expect_event(FileModifiedEvent(p("a")))
if platform.is_linux():
expect_event(FileClosedEvent(p("a")))
expect_event(DirModifiedEvent(p()))
expect_event(FileMovedEvent(p("a"), p("b")))
if not platform.is_windows():
expect_event(DirModifiedEvent(p()))
expect_event(DirModifiedEvent(p()))
expect_event(FileDeletedEvent(p("b")))
if not platform.is_windows():
expect_event(DirModifiedEvent(p()))