| #!/usr/bin/env python3 |
| import subprocess |
| import pytest |
| import os |
| import stat |
| import time |
| from os.path import join as pjoin |
| import sys |
| import re |
| import itertools |
| |
| basename = pjoin(os.path.dirname(__file__), '..') |
| |
| def test_printcap(): |
| cmdline = base_cmdline + [ pjoin(basename, 'example', 'printcap') ] |
| proc = subprocess.Popen(cmdline, stdout=subprocess.PIPE, |
| universal_newlines=True) |
| (stdout, _) = proc.communicate(30) |
| assert proc.returncode == 0 |
| |
| proto = None |
| caps = set() |
| for line in stdout.split('\n'): |
| if line.startswith('\t'): |
| caps.add(line.strip()) |
| continue |
| |
| hit = re.match(r'Protocol version: (\d+)\.(\d+)$', line) |
| if hit: |
| proto = (int(hit.group(1)), int(hit.group(2))) |
| |
| return (proto, caps) |
| |
| |
| def wait_for_mount(mount_process, mnt_dir, |
| test_fn=os.path.ismount): |
| elapsed = 0 |
| while elapsed < 30: |
| if test_fn(mnt_dir): |
| return True |
| if mount_process.poll() is not None: |
| pytest.fail('file system process terminated prematurely') |
| time.sleep(0.1) |
| elapsed += 0.1 |
| pytest.fail("mountpoint failed to come up") |
| |
| def cleanup(mount_process, mnt_dir): |
| # Don't bother trying Valgrind if things already went wrong |
| |
| if 'bsd' in sys.platform or 'dragonfly' in sys.platform: |
| cmd = [ 'umount', '-f', mnt_dir ] |
| else: |
| cmd = [pjoin(basename, 'util', 'fusermount3'), |
| '-z', '-u', mnt_dir] |
| subprocess.call(cmd, stdout=subprocess.DEVNULL, |
| stderr=subprocess.STDOUT) |
| mount_process.terminate() |
| try: |
| mount_process.wait(1) |
| except subprocess.TimeoutExpired: |
| mount_process.kill() |
| |
| def umount(mount_process, mnt_dir): |
| |
| if 'bsd' in sys.platform or 'dragonfly' in sys.platform: |
| cmdline = [ 'umount', mnt_dir ] |
| else: |
| # fusermount3 will be setuid root, so we can only trace it with |
| # valgrind if we're root |
| if os.getuid() == 0: |
| cmdline = base_cmdline |
| else: |
| cmdline = [] |
| cmdline = cmdline + [ pjoin(basename, 'util', 'fusermount3'), |
| '-z', '-u', mnt_dir ] |
| |
| subprocess.check_call(cmdline) |
| assert not os.path.ismount(mnt_dir) |
| |
| # Give mount process a little while to terminate. Popen.wait(timeout) |
| # was only added in 3.3... |
| elapsed = 0 |
| while elapsed < 30: |
| code = mount_process.poll() |
| if code is not None: |
| if code == 0: |
| return |
| pytest.fail('file system process terminated with code %s' % (code,)) |
| time.sleep(0.1) |
| elapsed += 0.1 |
| pytest.fail('mount process did not terminate') |
| |
| |
| def safe_sleep(secs): |
| '''Like time.sleep(), but sleep for at least *secs* |
| |
| `time.sleep` may sleep less than the given period if a signal is |
| received. This function ensures that we sleep for at least the |
| desired time. |
| ''' |
| |
| now = time.time() |
| end = now + secs |
| while now < end: |
| time.sleep(end - now) |
| now = time.time() |
| |
| def fuse_test_marker(): |
| '''Return a pytest.marker that indicates FUSE availability |
| |
| If system/user/environment does not support FUSE, return |
| a `pytest.mark.skip` object with more details. If FUSE is |
| supported, return `pytest.mark.uses_fuse()`. |
| ''' |
| |
| skip = lambda x: pytest.mark.skip(reason=x) |
| |
| if 'bsd' in sys.platform or 'dragonfly' in sys.platform: |
| return pytest.mark.uses_fuse() |
| |
| with subprocess.Popen(['which', 'fusermount3'], stdout=subprocess.PIPE, |
| universal_newlines=True) as which: |
| fusermount_path = which.communicate()[0].strip() |
| |
| if not fusermount_path or which.returncode != 0: |
| return skip("Can't find fusermount executable") |
| |
| if not os.path.exists('/dev/fuse'): |
| return skip("FUSE kernel module does not seem to be loaded") |
| |
| if os.getuid() == 0: |
| return pytest.mark.uses_fuse() |
| |
| mode = os.stat(fusermount_path).st_mode |
| if mode & stat.S_ISUID == 0: |
| return skip('fusermount executable not setuid, and we are not root.') |
| |
| try: |
| fd = os.open('/dev/fuse', os.O_RDWR) |
| except OSError as exc: |
| return skip('Unable to open /dev/fuse: %s' % exc.strerror) |
| else: |
| os.close(fd) |
| |
| return pytest.mark.uses_fuse() |
| |
| def powerset(iterable): |
| s = list(iterable) |
| return itertools.chain.from_iterable( |
| itertools.combinations(s, r) for r in range(len(s)+1)) |
| |
| |
| # Use valgrind if requested |
| if os.environ.get('TEST_WITH_VALGRIND', 'no').lower().strip() \ |
| not in ('no', 'false', '0'): |
| base_cmdline = [ 'valgrind', '-q', '--' ] |
| else: |
| base_cmdline = [] |
| |
| # Try to use local fusermount3 |
| os.environ['PATH'] = '%s:%s' % (pjoin(basename, 'util'), os.environ['PATH']) |
| # Put example binaries on PATH |
| os.environ['PATH'] = '%s:%s' % (pjoin(basename, 'example'), os.environ['PATH']) |
| |
| try: |
| (fuse_proto, fuse_caps) = test_printcap() |
| except: |
| # Rely on test to raise error |
| fuse_proto = (0,0) |
| fuse_caps = set() |
| |