| # Copyright (C) 2020 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http:#www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| """Test utility code""" |
| |
| # pylint: disable=missing-docstring |
| |
| import logging |
| import os |
| from socket import socket |
| import weakref |
| |
| import pytest |
| |
| from .util import ( |
| ChainableFuture, |
| FdHolder, |
| MultiDict, |
| ReferenceCountTracker, |
| fdid, |
| unix_pipe, |
| weak_bind, |
| ) |
| |
| log = logging.getLogger(__name__) |
| |
| def test_fdholder_steal(): |
| fd = os.open(os.devnull, os.O_RDONLY) |
| assert not os.get_inheritable(fd) |
| fdh = FdHolder.steal(fd) |
| fdh.close() |
| with pytest.raises(OSError): |
| os.get_inheritable(fd) |
| |
| def test_fdholder_detach(): |
| fd = os.open(os.devnull, os.O_RDONLY) |
| assert not os.get_inheritable(fd) |
| fdh = FdHolder.steal(fd) |
| assert fdh.detach() == fd |
| fdh.close() |
| assert not os.get_inheritable(fd) |
| |
| def test_fdholder_dup(): |
| fd = os.open(os.devnull, os.O_RDONLY) |
| expected_id = fdid(fd) |
| assert not os.get_inheritable(fd) |
| fdh = FdHolder.dup_of(fd) |
| assert fdh.fileno() != fd |
| dup_fd = fdid(fdh.fileno()) |
| assert expected_id == dup_fd |
| assert dup_fd == fdh.fdid |
| fdh.close() |
| os.close(fd) |
| |
| def test_fdholder_socket_steal(): |
| sock = socket() |
| assert not os.get_inheritable(sock.fileno()) |
| fdh = FdHolder.steal(sock) |
| fdh.close() |
| with pytest.raises(OSError): |
| os.get_inheritable(sock.fileno()) |
| |
| def test_fdholder_socket_detach(): |
| sock = socket() |
| sock_fileno = sock.fileno() |
| assert not os.get_inheritable(sock.fileno()) |
| fdh = FdHolder.steal(sock) |
| assert fdh.detach() == sock_fileno |
| fdh.close() |
| assert not os.get_inheritable(sock_fileno) |
| os.close(sock_fileno) |
| |
| def test_fdholder_socket_dup(): |
| sock = socket() |
| expected_id = fdid(sock.fileno()) |
| assert not os.get_inheritable(sock.fileno()) |
| fdh = FdHolder.dup_of(sock) |
| assert fdh.fdid == expected_id |
| assert fdh.fileno() != sock.fileno() |
| dup_id = fdid(fdh.fileno()) |
| assert expected_id == dup_id |
| fdh.close() |
| |
| def test_fdholder_fileobj(): |
| read_end, write_end = unix_pipe() |
| write_end_fileobj = write_end.as_file("w") |
| assert write_end.valid |
| assert write_end_fileobj.fileno() != write_end.fileno() |
| write_end_fileobj.write("hello") |
| write_end_fileobj.flush() |
| write_end_fileobj.close() |
| write_end.close() |
| read_end_numeric_fd = read_end.fileno() |
| read_end_fileobj = read_end.as_file(steal=True) |
| assert not read_end.valid |
| read_end.close() # Just make sure it works |
| assert read_end_fileobj.fileno() == read_end_numeric_fd |
| assert read_end_fileobj.read() == "hello" |
| del read_end_fileobj |
| with pytest.raises(OSError): |
| os.get_inheritable(read_end_numeric_fd) |
| |
| def test_fdholder_borrow_sock(): |
| sock = socket() |
| fdh = FdHolder.borrow(sock) |
| del sock |
| with pytest.raises(ValueError): |
| fdh.detach() |
| fileno = fdh.fileno() |
| assert not os.get_inheritable(fileno) |
| fdh.close() |
| with pytest.raises(OSError): |
| assert not os.get_inheritable(fileno) |
| |
| def test_fdholder_auto_close(): |
| fdh = unix_pipe()[0] |
| fdh_wr = weakref.ref(fdh) |
| fd = fdh.fileno() |
| assert fdid(fd) |
| assert fdh_wr() |
| del fdh |
| assert not fdh_wr() |
| with pytest.raises(OSError): |
| fdid(fd) |
| |
| def test_future_subclass(): |
| class _TestFuture(ChainableFuture): |
| foo = 1 |
| def __init__(self, foo=2): |
| super().__init__() |
| assert self.foo == 1 |
| self.foo = foo |
| assert _TestFuture().foo == 2 |
| foo = _TestFuture(5) |
| assert foo.foo == 5 |
| bar = foo.then(lambda x: 9) |
| assert not isinstance(bar, _TestFuture) |
| # pylint: disable=unidiomatic-typecheck |
| assert type(bar) is ChainableFuture |
| foo.set_result(99) |
| assert bar.done() |
| assert bar.result() == 9 |
| |
| def test_future_completes_once(): |
| foo = ChainableFuture() |
| ctr = 0 |
| def _on_done(_ftr): |
| nonlocal ctr |
| ctr += 1 |
| foo.add_done_callback(_on_done) |
| assert ctr == 0 # pylint: disable=compare-to-zero |
| assert not foo.done() |
| foo.set_result(1) |
| assert ctr == 1 |
| assert foo.result() == 1 |
| foo.set_result(2) |
| assert foo.result() == 1 |
| foo.set_exception(ValueError("blarg")) |
| assert foo.result() == 1 |
| assert ctr == 1 |
| |
| def test_future_of(): |
| foo = ChainableFuture.of(4) |
| assert foo.done() |
| assert foo.result() == 4 |
| |
| class _FakeFatalError(BaseException): |
| pass |
| |
| def test_future_callback_error(monkeypatch): |
| def _die_non_fatally(msg, ex): |
| raise _FakeFatalError |
| from . import util |
| monkeypatch.setattr(util, "_do_die_due_to_fatal_exception", _die_non_fatally) |
| foo = ChainableFuture() |
| def _on_foo_done(_future): |
| raise ValueError("test") |
| foo.add_done_callback(_on_foo_done) |
| with pytest.raises(_FakeFatalError): |
| foo.set_result(5) |
| |
| def test_weak_bind(): |
| blarg = [0] |
| class _Dummy: |
| def foo(self, x): # pylint: disable=no-self-use |
| blarg[0] += x |
| return 4 |
| dummy = _Dummy() |
| dummy_wr = weakref.ref(dummy) |
| assert dummy_wr() |
| assert dummy.foo(1) == 4 |
| assert blarg == [1] |
| foo_wrapper = weak_bind(dummy.foo) |
| assert foo_wrapper(2) is None |
| assert blarg == [3] |
| del dummy |
| assert not dummy_wr() |
| assert foo_wrapper(3) is None |
| assert blarg == [3] |
| |
| def test_multi_dict(): |
| md = MultiDict() |
| md.add("foo", 1) |
| md.add("foo", 2) |
| assert md["foo"] == {1, 2} |
| md.add("foo", 2) |
| assert md["foo"] == {1, 2} |
| md.remove("foo", 1) |
| assert md["foo"] == {2} |
| with pytest.raises(KeyError): |
| md.remove("foo", 1) |
| md.remove("foo", 2) |
| assert "foo" not in md |
| with pytest.raises(KeyError): |
| del md["foo"] |
| assert not md |
| |
| def test_reference_count_tracker(): |
| rc = ReferenceCountTracker() |
| key1 = "foo" |
| key2 = "bar" |
| assert key1 not in rc |
| assert key2 not in rc |
| rc.addref(key1) |
| assert key1 in rc |
| assert key2 not in rc |
| assert rc[key1] == 1 |
| rc.addref(key1) |
| assert rc[key1] == 2 |
| rc.release(key1) |
| assert rc[key1] == 1 |
| rc.release(key1) |
| assert key1 not in rc |
| with pytest.raises(KeyError): |
| rc[key1] # pylint: disable=pointless-statement |
| with pytest.raises(KeyError): |
| rc.release(key1) |
| |
| def test_reference_count_tracker_multi_release(): |
| rc = ReferenceCountTracker() |
| key1 = "foo" |
| key2 = "bar" |
| rc.addref(key1) |
| rc.addref(key1) |
| assert rc[key1] == 2 |
| with pytest.raises(KeyError): |
| rc.release(key2) |
| with pytest.raises(ValueError): |
| rc.release(key1, 3) |
| assert rc[key1] == 2 |
| rc.release(key1, 2) |
| assert key1 not in rc |
| rc.addref(key1) |
| rc.addref(key1) |
| rc.addref(key1) |
| rc.release(key1, 2) |
| assert rc[key1] == 1 |
| rc.release(key1) |
| assert key1 not in rc |