blob: 7fb16209470c3d8fbdc83afaa5ce79b5aa322618 [file] [log] [blame]
# Copyright 2018, Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import logging
import queue
import threading
import mock
import pytest
try:
import grpc
except ImportError:
pytest.skip("No GRPC", allow_module_level=True)
from google.api_core import bidi
from google.api_core import exceptions
class Test_RequestQueueGenerator(object):
def test_bounded_consume(self):
call = mock.create_autospec(grpc.Call, instance=True)
call.is_active.return_value = True
def queue_generator(rpc):
yield mock.sentinel.A
yield queue.Empty()
yield mock.sentinel.B
rpc.is_active.return_value = False
yield mock.sentinel.C
q = mock.create_autospec(queue.Queue, instance=True)
q.get.side_effect = queue_generator(call)
generator = bidi._RequestQueueGenerator(q)
generator.call = call
items = list(generator)
assert items == [mock.sentinel.A, mock.sentinel.B]
def test_yield_initial_and_exit(self):
q = mock.create_autospec(queue.Queue, instance=True)
q.get.side_effect = queue.Empty()
call = mock.create_autospec(grpc.Call, instance=True)
call.is_active.return_value = False
generator = bidi._RequestQueueGenerator(q, initial_request=mock.sentinel.A)
generator.call = call
items = list(generator)
assert items == [mock.sentinel.A]
def test_yield_initial_callable_and_exit(self):
q = mock.create_autospec(queue.Queue, instance=True)
q.get.side_effect = queue.Empty()
call = mock.create_autospec(grpc.Call, instance=True)
call.is_active.return_value = False
generator = bidi._RequestQueueGenerator(
q, initial_request=lambda: mock.sentinel.A
)
generator.call = call
items = list(generator)
assert items == [mock.sentinel.A]
def test_exit_when_inactive_with_item(self):
q = mock.create_autospec(queue.Queue, instance=True)
q.get.side_effect = [mock.sentinel.A, queue.Empty()]
call = mock.create_autospec(grpc.Call, instance=True)
call.is_active.return_value = False
generator = bidi._RequestQueueGenerator(q)
generator.call = call
items = list(generator)
assert items == []
# Make sure it put the item back.
q.put.assert_called_once_with(mock.sentinel.A)
def test_exit_when_inactive_empty(self):
q = mock.create_autospec(queue.Queue, instance=True)
q.get.side_effect = queue.Empty()
call = mock.create_autospec(grpc.Call, instance=True)
call.is_active.return_value = False
generator = bidi._RequestQueueGenerator(q)
generator.call = call
items = list(generator)
assert items == []
def test_exit_with_stop(self):
q = mock.create_autospec(queue.Queue, instance=True)
q.get.side_effect = [None, queue.Empty()]
call = mock.create_autospec(grpc.Call, instance=True)
call.is_active.return_value = True
generator = bidi._RequestQueueGenerator(q)
generator.call = call
items = list(generator)
assert items == []
class Test_Throttle(object):
def test_repr(self):
delta = datetime.timedelta(seconds=4.5)
instance = bidi._Throttle(access_limit=42, time_window=delta)
assert repr(instance) == "_Throttle(access_limit=42, time_window={})".format(
repr(delta)
)
def test_raises_error_on_invalid_init_arguments(self):
with pytest.raises(ValueError) as exc_info:
bidi._Throttle(access_limit=10, time_window=datetime.timedelta(seconds=0.0))
assert "time_window" in str(exc_info.value)
assert "must be a positive timedelta" in str(exc_info.value)
with pytest.raises(ValueError) as exc_info:
bidi._Throttle(access_limit=0, time_window=datetime.timedelta(seconds=10))
assert "access_limit" in str(exc_info.value)
assert "must be positive" in str(exc_info.value)
def test_does_not_delay_entry_attempts_under_threshold(self):
throttle = bidi._Throttle(
access_limit=3, time_window=datetime.timedelta(seconds=1)
)
entries = []
for _ in range(3):
with throttle as time_waited:
entry_info = {
"entered_at": datetime.datetime.now(),
"reported_wait": time_waited,
}
entries.append(entry_info)
# check the reported wait times ...
assert all(entry["reported_wait"] == 0.0 for entry in entries)
# .. and the actual wait times
delta = entries[1]["entered_at"] - entries[0]["entered_at"]
assert delta.total_seconds() < 0.1
delta = entries[2]["entered_at"] - entries[1]["entered_at"]
assert delta.total_seconds() < 0.1
def test_delays_entry_attempts_above_threshold(self):
throttle = bidi._Throttle(
access_limit=3, time_window=datetime.timedelta(seconds=1)
)
entries = []
for _ in range(6):
with throttle as time_waited:
entry_info = {
"entered_at": datetime.datetime.now(),
"reported_wait": time_waited,
}
entries.append(entry_info)
# For each group of 4 consecutive entries the time difference between
# the first and the last entry must have been greater than time_window,
# because a maximum of 3 are allowed in each time_window.
for i, entry in enumerate(entries[3:], start=3):
first_entry = entries[i - 3]
delta = entry["entered_at"] - first_entry["entered_at"]
assert delta.total_seconds() > 1.0
# check the reported wait times
# (NOTE: not using assert all(...), b/c the coverage check would complain)
for i, entry in enumerate(entries):
if i != 3:
assert entry["reported_wait"] == 0.0
# The delayed entry is expected to have been delayed for a significant
# chunk of the full second, and the actual and reported delay times
# should reflect that.
assert entries[3]["reported_wait"] > 0.7
delta = entries[3]["entered_at"] - entries[2]["entered_at"]
assert delta.total_seconds() > 0.7
class _CallAndFuture(grpc.Call, grpc.Future):
pass
def make_rpc():
"""Makes a mock RPC used to test Bidi classes."""
call = mock.create_autospec(_CallAndFuture, instance=True)
rpc = mock.create_autospec(grpc.StreamStreamMultiCallable, instance=True)
def rpc_side_effect(request, metadata=None):
call.is_active.return_value = True
call.request = request
call.metadata = metadata
return call
rpc.side_effect = rpc_side_effect
def cancel_side_effect():
call.is_active.return_value = False
call.cancel.side_effect = cancel_side_effect
return rpc, call
class ClosedCall(object):
def __init__(self, exception):
self.exception = exception
def __next__(self):
raise self.exception
def is_active(self):
return False
class TestBidiRpc(object):
def test_initial_state(self):
bidi_rpc = bidi.BidiRpc(None)
assert bidi_rpc.is_active is False
def test_done_callbacks(self):
bidi_rpc = bidi.BidiRpc(None)
callback = mock.Mock(spec=["__call__"])
bidi_rpc.add_done_callback(callback)
bidi_rpc._on_call_done(mock.sentinel.future)
callback.assert_called_once_with(mock.sentinel.future)
def test_metadata(self):
rpc, call = make_rpc()
bidi_rpc = bidi.BidiRpc(rpc, metadata=mock.sentinel.A)
assert bidi_rpc._rpc_metadata == mock.sentinel.A
bidi_rpc.open()
assert bidi_rpc.call == call
assert bidi_rpc.call.metadata == mock.sentinel.A
def test_open(self):
rpc, call = make_rpc()
bidi_rpc = bidi.BidiRpc(rpc)
bidi_rpc.open()
assert bidi_rpc.call == call
assert bidi_rpc.is_active
call.add_done_callback.assert_called_once_with(bidi_rpc._on_call_done)
def test_open_error_already_open(self):
rpc, _ = make_rpc()
bidi_rpc = bidi.BidiRpc(rpc)
bidi_rpc.open()
with pytest.raises(ValueError):
bidi_rpc.open()
def test_close(self):
rpc, call = make_rpc()
bidi_rpc = bidi.BidiRpc(rpc)
bidi_rpc.open()
bidi_rpc.close()
call.cancel.assert_called_once()
assert bidi_rpc.call == call
assert bidi_rpc.is_active is False
# ensure the request queue was signaled to stop.
assert bidi_rpc.pending_requests == 1
assert bidi_rpc._request_queue.get() is None
def test_close_no_rpc(self):
bidi_rpc = bidi.BidiRpc(None)
bidi_rpc.close()
def test_send(self):
rpc, call = make_rpc()
bidi_rpc = bidi.BidiRpc(rpc)
bidi_rpc.open()
bidi_rpc.send(mock.sentinel.request)
assert bidi_rpc.pending_requests == 1
assert bidi_rpc._request_queue.get() is mock.sentinel.request
def test_send_not_open(self):
rpc, call = make_rpc()
bidi_rpc = bidi.BidiRpc(rpc)
with pytest.raises(ValueError):
bidi_rpc.send(mock.sentinel.request)
def test_send_dead_rpc(self):
error = ValueError()
bidi_rpc = bidi.BidiRpc(None)
bidi_rpc.call = ClosedCall(error)
with pytest.raises(ValueError) as exc_info:
bidi_rpc.send(mock.sentinel.request)
assert exc_info.value == error
def test_recv(self):
bidi_rpc = bidi.BidiRpc(None)
bidi_rpc.call = iter([mock.sentinel.response])
response = bidi_rpc.recv()
assert response == mock.sentinel.response
def test_recv_not_open(self):
rpc, call = make_rpc()
bidi_rpc = bidi.BidiRpc(rpc)
with pytest.raises(ValueError):
bidi_rpc.recv()
class CallStub(object):
def __init__(self, values, active=True):
self.values = iter(values)
self._is_active = active
self.cancelled = False
def __next__(self):
item = next(self.values)
if isinstance(item, Exception):
self._is_active = False
raise item
return item
def is_active(self):
return self._is_active
def add_done_callback(self, callback):
pass
def cancel(self):
self.cancelled = True
class TestResumableBidiRpc(object):
def test_ctor_defaults(self):
start_rpc = mock.Mock()
should_recover = mock.Mock()
bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover)
assert bidi_rpc.is_active is False
assert bidi_rpc._finalized is False
assert bidi_rpc._start_rpc is start_rpc
assert bidi_rpc._should_recover is should_recover
assert bidi_rpc._should_terminate is bidi._never_terminate
assert bidi_rpc._initial_request is None
assert bidi_rpc._rpc_metadata is None
assert bidi_rpc._reopen_throttle is None
def test_ctor_explicit(self):
start_rpc = mock.Mock()
should_recover = mock.Mock()
should_terminate = mock.Mock()
initial_request = mock.Mock()
metadata = {"x-foo": "bar"}
bidi_rpc = bidi.ResumableBidiRpc(
start_rpc,
should_recover,
should_terminate=should_terminate,
initial_request=initial_request,
metadata=metadata,
throttle_reopen=True,
)
assert bidi_rpc.is_active is False
assert bidi_rpc._finalized is False
assert bidi_rpc._should_recover is should_recover
assert bidi_rpc._should_terminate is should_terminate
assert bidi_rpc._initial_request is initial_request
assert bidi_rpc._rpc_metadata == metadata
assert isinstance(bidi_rpc._reopen_throttle, bidi._Throttle)
def test_done_callbacks_terminate(self):
cancellation = mock.Mock()
start_rpc = mock.Mock()
should_recover = mock.Mock(spec=["__call__"], return_value=True)
should_terminate = mock.Mock(spec=["__call__"], return_value=True)
bidi_rpc = bidi.ResumableBidiRpc(
start_rpc, should_recover, should_terminate=should_terminate
)
callback = mock.Mock(spec=["__call__"])
bidi_rpc.add_done_callback(callback)
bidi_rpc._on_call_done(cancellation)
should_terminate.assert_called_once_with(cancellation)
should_recover.assert_not_called()
callback.assert_called_once_with(cancellation)
assert not bidi_rpc.is_active
def test_done_callbacks_recoverable(self):
start_rpc = mock.create_autospec(grpc.StreamStreamMultiCallable, instance=True)
should_recover = mock.Mock(spec=["__call__"], return_value=True)
bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover)
callback = mock.Mock(spec=["__call__"])
bidi_rpc.add_done_callback(callback)
bidi_rpc._on_call_done(mock.sentinel.future)
callback.assert_not_called()
start_rpc.assert_called_once()
should_recover.assert_called_once_with(mock.sentinel.future)
assert bidi_rpc.is_active
def test_done_callbacks_non_recoverable(self):
start_rpc = mock.create_autospec(grpc.StreamStreamMultiCallable, instance=True)
should_recover = mock.Mock(spec=["__call__"], return_value=False)
bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover)
callback = mock.Mock(spec=["__call__"])
bidi_rpc.add_done_callback(callback)
bidi_rpc._on_call_done(mock.sentinel.future)
callback.assert_called_once_with(mock.sentinel.future)
should_recover.assert_called_once_with(mock.sentinel.future)
assert not bidi_rpc.is_active
def test_send_terminate(self):
cancellation = ValueError()
call_1 = CallStub([cancellation], active=False)
call_2 = CallStub([])
start_rpc = mock.create_autospec(
grpc.StreamStreamMultiCallable, instance=True, side_effect=[call_1, call_2]
)
should_recover = mock.Mock(spec=["__call__"], return_value=False)
should_terminate = mock.Mock(spec=["__call__"], return_value=True)
bidi_rpc = bidi.ResumableBidiRpc(
start_rpc, should_recover, should_terminate=should_terminate
)
bidi_rpc.open()
bidi_rpc.send(mock.sentinel.request)
assert bidi_rpc.pending_requests == 1
assert bidi_rpc._request_queue.get() is None
should_recover.assert_not_called()
should_terminate.assert_called_once_with(cancellation)
assert bidi_rpc.call == call_1
assert bidi_rpc.is_active is False
assert call_1.cancelled is True
def test_send_recover(self):
error = ValueError()
call_1 = CallStub([error], active=False)
call_2 = CallStub([])
start_rpc = mock.create_autospec(
grpc.StreamStreamMultiCallable, instance=True, side_effect=[call_1, call_2]
)
should_recover = mock.Mock(spec=["__call__"], return_value=True)
bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover)
bidi_rpc.open()
bidi_rpc.send(mock.sentinel.request)
assert bidi_rpc.pending_requests == 1
assert bidi_rpc._request_queue.get() is mock.sentinel.request
should_recover.assert_called_once_with(error)
assert bidi_rpc.call == call_2
assert bidi_rpc.is_active is True
def test_send_failure(self):
error = ValueError()
call = CallStub([error], active=False)
start_rpc = mock.create_autospec(
grpc.StreamStreamMultiCallable, instance=True, return_value=call
)
should_recover = mock.Mock(spec=["__call__"], return_value=False)
bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover)
bidi_rpc.open()
with pytest.raises(ValueError) as exc_info:
bidi_rpc.send(mock.sentinel.request)
assert exc_info.value == error
should_recover.assert_called_once_with(error)
assert bidi_rpc.call == call
assert bidi_rpc.is_active is False
assert call.cancelled is True
assert bidi_rpc.pending_requests == 1
assert bidi_rpc._request_queue.get() is None
def test_recv_terminate(self):
cancellation = ValueError()
call = CallStub([cancellation])
start_rpc = mock.create_autospec(
grpc.StreamStreamMultiCallable, instance=True, return_value=call
)
should_recover = mock.Mock(spec=["__call__"], return_value=False)
should_terminate = mock.Mock(spec=["__call__"], return_value=True)
bidi_rpc = bidi.ResumableBidiRpc(
start_rpc, should_recover, should_terminate=should_terminate
)
bidi_rpc.open()
bidi_rpc.recv()
should_recover.assert_not_called()
should_terminate.assert_called_once_with(cancellation)
assert bidi_rpc.call == call
assert bidi_rpc.is_active is False
assert call.cancelled is True
def test_recv_recover(self):
error = ValueError()
call_1 = CallStub([1, error])
call_2 = CallStub([2, 3])
start_rpc = mock.create_autospec(
grpc.StreamStreamMultiCallable, instance=True, side_effect=[call_1, call_2]
)
should_recover = mock.Mock(spec=["__call__"], return_value=True)
bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover)
bidi_rpc.open()
values = []
for n in range(3):
values.append(bidi_rpc.recv())
assert values == [1, 2, 3]
should_recover.assert_called_once_with(error)
assert bidi_rpc.call == call_2
assert bidi_rpc.is_active is True
def test_recv_recover_already_recovered(self):
call_1 = CallStub([])
call_2 = CallStub([])
start_rpc = mock.create_autospec(
grpc.StreamStreamMultiCallable, instance=True, side_effect=[call_1, call_2]
)
callback = mock.Mock()
callback.return_value = True
bidi_rpc = bidi.ResumableBidiRpc(start_rpc, callback)
bidi_rpc.open()
bidi_rpc._reopen()
assert bidi_rpc.call is call_1
assert bidi_rpc.is_active is True
def test_recv_failure(self):
error = ValueError()
call = CallStub([error])
start_rpc = mock.create_autospec(
grpc.StreamStreamMultiCallable, instance=True, return_value=call
)
should_recover = mock.Mock(spec=["__call__"], return_value=False)
bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover)
bidi_rpc.open()
with pytest.raises(ValueError) as exc_info:
bidi_rpc.recv()
assert exc_info.value == error
should_recover.assert_called_once_with(error)
assert bidi_rpc.call == call
assert bidi_rpc.is_active is False
assert call.cancelled is True
def test_close(self):
call = mock.create_autospec(_CallAndFuture, instance=True)
def cancel_side_effect():
call.is_active.return_value = False
call.cancel.side_effect = cancel_side_effect
start_rpc = mock.create_autospec(
grpc.StreamStreamMultiCallable, instance=True, return_value=call
)
should_recover = mock.Mock(spec=["__call__"], return_value=False)
bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover)
bidi_rpc.open()
bidi_rpc.close()
should_recover.assert_not_called()
call.cancel.assert_called_once()
assert bidi_rpc.call == call
assert bidi_rpc.is_active is False
# ensure the request queue was signaled to stop.
assert bidi_rpc.pending_requests == 1
assert bidi_rpc._request_queue.get() is None
assert bidi_rpc._finalized
def test_reopen_failure_on_rpc_restart(self):
error1 = ValueError("1")
error2 = ValueError("2")
call = CallStub([error1])
# Invoking start RPC a second time will trigger an error.
start_rpc = mock.create_autospec(
grpc.StreamStreamMultiCallable, instance=True, side_effect=[call, error2]
)
should_recover = mock.Mock(spec=["__call__"], return_value=True)
callback = mock.Mock(spec=["__call__"])
bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover)
bidi_rpc.add_done_callback(callback)
bidi_rpc.open()
with pytest.raises(ValueError) as exc_info:
bidi_rpc.recv()
assert exc_info.value == error2
should_recover.assert_called_once_with(error1)
assert bidi_rpc.call is None
assert bidi_rpc.is_active is False
callback.assert_called_once_with(error2)
def test_using_throttle_on_reopen_requests(self):
call = CallStub([])
start_rpc = mock.create_autospec(
grpc.StreamStreamMultiCallable, instance=True, return_value=call
)
should_recover = mock.Mock(spec=["__call__"], return_value=True)
bidi_rpc = bidi.ResumableBidiRpc(
start_rpc, should_recover, throttle_reopen=True
)
patcher = mock.patch.object(bidi_rpc._reopen_throttle.__class__, "__enter__")
with patcher as mock_enter:
bidi_rpc._reopen()
mock_enter.assert_called_once()
def test_send_not_open(self):
bidi_rpc = bidi.ResumableBidiRpc(None, lambda _: False)
with pytest.raises(ValueError):
bidi_rpc.send(mock.sentinel.request)
def test_recv_not_open(self):
bidi_rpc = bidi.ResumableBidiRpc(None, lambda _: False)
with pytest.raises(ValueError):
bidi_rpc.recv()
def test_finalize_idempotent(self):
error1 = ValueError("1")
error2 = ValueError("2")
callback = mock.Mock(spec=["__call__"])
should_recover = mock.Mock(spec=["__call__"], return_value=False)
bidi_rpc = bidi.ResumableBidiRpc(mock.sentinel.start_rpc, should_recover)
bidi_rpc.add_done_callback(callback)
bidi_rpc._on_call_done(error1)
bidi_rpc._on_call_done(error2)
callback.assert_called_once_with(error1)
class TestBackgroundConsumer(object):
def test_consume_once_then_exit(self):
bidi_rpc = mock.create_autospec(bidi.BidiRpc, instance=True)
bidi_rpc.is_active = True
bidi_rpc.recv.side_effect = [mock.sentinel.response_1]
recved = threading.Event()
def on_response(response):
assert response == mock.sentinel.response_1
bidi_rpc.is_active = False
recved.set()
consumer = bidi.BackgroundConsumer(bidi_rpc, on_response)
consumer.start()
recved.wait()
bidi_rpc.recv.assert_called_once()
assert bidi_rpc.is_active is False
consumer.stop()
bidi_rpc.close.assert_called_once()
assert consumer.is_active is False
def test_pause_resume_and_close(self):
# This test is relatively complex. It attempts to start the consumer,
# consume one item, pause the consumer, check the state of the world,
# then resume the consumer. Doing this in a deterministic fashion
# requires a bit more mocking and patching than usual.
bidi_rpc = mock.create_autospec(bidi.BidiRpc, instance=True)
bidi_rpc.is_active = True
def close_side_effect():
bidi_rpc.is_active = False
bidi_rpc.close.side_effect = close_side_effect
# These are used to coordinate the two threads to ensure deterministic
# execution.
should_continue = threading.Event()
responses_and_events = {
mock.sentinel.response_1: threading.Event(),
mock.sentinel.response_2: threading.Event(),
}
bidi_rpc.recv.side_effect = [mock.sentinel.response_1, mock.sentinel.response_2]
recved_responses = []
consumer = None
def on_response(response):
if response == mock.sentinel.response_1:
consumer.pause()
recved_responses.append(response)
responses_and_events[response].set()
should_continue.wait()
consumer = bidi.BackgroundConsumer(bidi_rpc, on_response)
consumer.start()
# Wait for the first response to be recved.
responses_and_events[mock.sentinel.response_1].wait()
# Ensure only one item has been recved and that the consumer is paused.
assert recved_responses == [mock.sentinel.response_1]
assert consumer.is_paused is True
assert consumer.is_active is True
# Unpause the consumer, wait for the second item, then close the
# consumer.
should_continue.set()
consumer.resume()
responses_and_events[mock.sentinel.response_2].wait()
assert recved_responses == [mock.sentinel.response_1, mock.sentinel.response_2]
consumer.stop()
assert consumer.is_active is False
def test_wake_on_error(self):
should_continue = threading.Event()
bidi_rpc = mock.create_autospec(bidi.BidiRpc, instance=True)
bidi_rpc.is_active = True
bidi_rpc.add_done_callback.side_effect = lambda _: should_continue.set()
consumer = bidi.BackgroundConsumer(bidi_rpc, mock.sentinel.on_response)
# Start the consumer paused, which should immediately put it into wait
# state.
consumer.pause()
consumer.start()
# Wait for add_done_callback to be called
should_continue.wait()
bidi_rpc.add_done_callback.assert_called_once_with(consumer._on_call_done)
# The consumer should now be blocked on waiting to be unpaused.
assert consumer.is_active
assert consumer.is_paused
# Trigger the done callback, it should unpause the consumer and cause
# it to exit.
bidi_rpc.is_active = False
consumer._on_call_done(bidi_rpc)
# It may take a few cycles for the thread to exit.
while consumer.is_active:
pass
def test_consumer_expected_error(self, caplog):
caplog.set_level(logging.DEBUG)
bidi_rpc = mock.create_autospec(bidi.BidiRpc, instance=True)
bidi_rpc.is_active = True
bidi_rpc.recv.side_effect = exceptions.ServiceUnavailable("Gone away")
on_response = mock.Mock(spec=["__call__"])
consumer = bidi.BackgroundConsumer(bidi_rpc, on_response)
consumer.start()
# Wait for the consumer's thread to exit.
while consumer.is_active:
pass
on_response.assert_not_called()
bidi_rpc.recv.assert_called_once()
assert "caught error" in caplog.text
def test_consumer_unexpected_error(self, caplog):
caplog.set_level(logging.DEBUG)
bidi_rpc = mock.create_autospec(bidi.BidiRpc, instance=True)
bidi_rpc.is_active = True
bidi_rpc.recv.side_effect = ValueError()
on_response = mock.Mock(spec=["__call__"])
consumer = bidi.BackgroundConsumer(bidi_rpc, on_response)
consumer.start()
# Wait for the consumer's thread to exit.
while consumer.is_active:
pass # pragma: NO COVER (race condition)
on_response.assert_not_called()
bidi_rpc.recv.assert_called_once()
assert "caught unexpected exception" in caplog.text
def test_double_stop(self, caplog):
caplog.set_level(logging.DEBUG)
bidi_rpc = mock.create_autospec(bidi.BidiRpc, instance=True)
bidi_rpc.is_active = True
on_response = mock.Mock(spec=["__call__"])
def close_side_effect():
bidi_rpc.is_active = False
bidi_rpc.close.side_effect = close_side_effect
consumer = bidi.BackgroundConsumer(bidi_rpc, on_response)
consumer.start()
assert consumer.is_active is True
consumer.stop()
assert consumer.is_active is False
# calling stop twice should not result in an error.
consumer.stop()