blob: 2d3e382698305168136b32d082603e74f6e150f1 [file] [log] [blame]
# Copyright (C) 2020 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test utility code"""
# pylint: disable=missing-docstring
import logging
import os
from socket import socket
import weakref
import pytest
from .util import (
ChainableFuture,
FdHolder,
MultiDict,
ReferenceCountTracker,
fdid,
unix_pipe,
weak_bind,
)
log = logging.getLogger(__name__)
def test_fdholder_steal():
fd = os.open(os.devnull, os.O_RDONLY)
assert not os.get_inheritable(fd)
fdh = FdHolder.steal(fd)
fdh.close()
with pytest.raises(OSError):
os.get_inheritable(fd)
def test_fdholder_detach():
fd = os.open(os.devnull, os.O_RDONLY)
assert not os.get_inheritable(fd)
fdh = FdHolder.steal(fd)
assert fdh.detach() == fd
fdh.close()
assert not os.get_inheritable(fd)
def test_fdholder_dup():
fd = os.open(os.devnull, os.O_RDONLY)
expected_id = fdid(fd)
assert not os.get_inheritable(fd)
fdh = FdHolder.dup_of(fd)
assert fdh.fileno() != fd
dup_fd = fdid(fdh.fileno())
assert expected_id == dup_fd
assert dup_fd == fdh.fdid
fdh.close()
os.close(fd)
def test_fdholder_socket_steal():
sock = socket()
assert not os.get_inheritable(sock.fileno())
fdh = FdHolder.steal(sock)
fdh.close()
with pytest.raises(OSError):
os.get_inheritable(sock.fileno())
def test_fdholder_socket_detach():
sock = socket()
sock_fileno = sock.fileno()
assert not os.get_inheritable(sock.fileno())
fdh = FdHolder.steal(sock)
assert fdh.detach() == sock_fileno
fdh.close()
assert not os.get_inheritable(sock_fileno)
os.close(sock_fileno)
def test_fdholder_socket_dup():
sock = socket()
expected_id = fdid(sock.fileno())
assert not os.get_inheritable(sock.fileno())
fdh = FdHolder.dup_of(sock)
assert fdh.fdid == expected_id
assert fdh.fileno() != sock.fileno()
dup_id = fdid(fdh.fileno())
assert expected_id == dup_id
fdh.close()
def test_fdholder_fileobj():
read_end, write_end = unix_pipe()
write_end_fileobj = write_end.as_file("w")
assert write_end.valid
assert write_end_fileobj.fileno() != write_end.fileno()
write_end_fileobj.write("hello")
write_end_fileobj.flush()
write_end_fileobj.close()
write_end.close()
read_end_numeric_fd = read_end.fileno()
read_end_fileobj = read_end.as_file(steal=True)
assert not read_end.valid
read_end.close() # Just make sure it works
assert read_end_fileobj.fileno() == read_end_numeric_fd
assert read_end_fileobj.read() == "hello"
del read_end_fileobj
with pytest.raises(OSError):
os.get_inheritable(read_end_numeric_fd)
def test_fdholder_borrow_sock():
sock = socket()
fdh = FdHolder.borrow(sock)
del sock
with pytest.raises(ValueError):
fdh.detach()
fileno = fdh.fileno()
assert not os.get_inheritable(fileno)
fdh.close()
with pytest.raises(OSError):
assert not os.get_inheritable(fileno)
def test_fdholder_auto_close():
fdh = unix_pipe()[0]
fdh_wr = weakref.ref(fdh)
fd = fdh.fileno()
assert fdid(fd)
assert fdh_wr()
del fdh
assert not fdh_wr()
with pytest.raises(OSError):
fdid(fd)
def test_future_subclass():
class _TestFuture(ChainableFuture):
foo = 1
def __init__(self, foo=2):
super().__init__()
assert self.foo == 1
self.foo = foo
assert _TestFuture().foo == 2
foo = _TestFuture(5)
assert foo.foo == 5
bar = foo.then(lambda x: 9)
assert not isinstance(bar, _TestFuture)
# pylint: disable=unidiomatic-typecheck
assert type(bar) is ChainableFuture
foo.set_result(99)
assert bar.done()
assert bar.result() == 9
def test_future_completes_once():
foo = ChainableFuture()
ctr = 0
def _on_done(_ftr):
nonlocal ctr
ctr += 1
foo.add_done_callback(_on_done)
assert ctr == 0 # pylint: disable=compare-to-zero
assert not foo.done()
foo.set_result(1)
assert ctr == 1
assert foo.result() == 1
foo.set_result(2)
assert foo.result() == 1
foo.set_exception(ValueError("blarg"))
assert foo.result() == 1
assert ctr == 1
def test_future_of():
foo = ChainableFuture.of(4)
assert foo.done()
assert foo.result() == 4
class _FakeFatalError(BaseException):
pass
def test_future_callback_error(monkeypatch):
def _die_non_fatally(msg, ex):
raise _FakeFatalError
from . import util
monkeypatch.setattr(util, "_do_die_due_to_fatal_exception", _die_non_fatally)
foo = ChainableFuture()
def _on_foo_done(_future):
raise ValueError("test")
foo.add_done_callback(_on_foo_done)
with pytest.raises(_FakeFatalError):
foo.set_result(5)
def test_weak_bind():
blarg = [0]
class _Dummy:
def foo(self, x): # pylint: disable=no-self-use
blarg[0] += x
return 4
dummy = _Dummy()
dummy_wr = weakref.ref(dummy)
assert dummy_wr()
assert dummy.foo(1) == 4
assert blarg == [1]
foo_wrapper = weak_bind(dummy.foo)
assert foo_wrapper(2) is None
assert blarg == [3]
del dummy
assert not dummy_wr()
assert foo_wrapper(3) is None
assert blarg == [3]
def test_multi_dict():
md = MultiDict()
md.add("foo", 1)
md.add("foo", 2)
assert md["foo"] == {1, 2}
md.add("foo", 2)
assert md["foo"] == {1, 2}
md.remove("foo", 1)
assert md["foo"] == {2}
with pytest.raises(KeyError):
md.remove("foo", 1)
md.remove("foo", 2)
assert "foo" not in md
with pytest.raises(KeyError):
del md["foo"]
assert not md
def test_reference_count_tracker():
rc = ReferenceCountTracker()
key1 = "foo"
key2 = "bar"
assert key1 not in rc
assert key2 not in rc
rc.addref(key1)
assert key1 in rc
assert key2 not in rc
assert rc[key1] == 1
rc.addref(key1)
assert rc[key1] == 2
rc.release(key1)
assert rc[key1] == 1
rc.release(key1)
assert key1 not in rc
with pytest.raises(KeyError):
rc[key1] # pylint: disable=pointless-statement
with pytest.raises(KeyError):
rc.release(key1)
def test_reference_count_tracker_multi_release():
rc = ReferenceCountTracker()
key1 = "foo"
key2 = "bar"
rc.addref(key1)
rc.addref(key1)
assert rc[key1] == 2
with pytest.raises(KeyError):
rc.release(key2)
with pytest.raises(ValueError):
rc.release(key1, 3)
assert rc[key1] == 2
rc.release(key1, 2)
assert key1 not in rc
rc.addref(key1)
rc.addref(key1)
rc.addref(key1)
rc.release(key1, 2)
assert rc[key1] == 1
rc.release(key1)
assert key1 not in rc