blob: ea2dedcc94db9f05f9e148c9d32f7db3e793a14d [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2019 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime, timedelta
import logging
from queue import SimpleQueue, Empty
from mobly import asserts
from google.protobuf import text_format
class EventAsserts(object):
"""
A class that handles various asserts with respect to a gRPC unary stream
This class must be created before an event happens as events in a
EventCallbackStream is not sticky and will be lost if you don't subscribe
to them before generating those events.
When asserting on sequential events, a single EventAsserts object is enough
When asserting on simultaneous events, you would need multiple EventAsserts
objects as each EventAsserts object owns a separate queue that is actively
being popped as asserted events happen
"""
DEFAULT_TIMEOUT_SECONDS = 3
def __init__(self, event_callback_stream):
if event_callback_stream is None:
raise ValueError("event_callback_stream cannot be None")
self.event_callback_stream = event_callback_stream
self.event_queue = SimpleQueue()
self.callback = lambda event: self.event_queue.put(event)
self.event_callback_stream.register_callback(self.callback)
def __del__(self):
self.event_callback_stream.unregister_callback(self.callback)
def remaining_time_delta(self, end_time):
remaining = end_time - datetime.now()
if remaining < timedelta(milliseconds=0):
remaining = timedelta(milliseconds=0)
return remaining
def assert_none(self, timeout=timedelta(seconds=DEFAULT_TIMEOUT_SECONDS)):
"""
Assert no event happens within timeout period
:param timeout: a timedelta object
:return:
"""
logging.debug("assert_none %fs" % (timeout.total_seconds()))
try:
event = self.event_queue.get(timeout=timeout.total_seconds())
asserts.assert_true(
event is None,
msg=("Expected None, but got %s" % text_format.MessageToString(
event, as_one_line=True)))
except Empty:
return
def assert_none_matching(
self, match_fn, timeout=timedelta(seconds=DEFAULT_TIMEOUT_SECONDS)):
"""
Assert no events where match_fn(event) is True happen within timeout
period
:param match_fn: return True/False on match_fn(event)
:param timeout: a timedelta object
:return:
"""
logging.debug("assert_none_matching %fs" % (timeout.total_seconds()))
event = None
end_time = datetime.now() + timeout
while event is None and datetime.now() < end_time:
remaining = self.remaining_time_delta(end_time)
logging.debug("Waiting for event (%fs remaining)" %
(remaining.total_seconds()))
try:
current_event = self.event_queue.get(
timeout=remaining.total_seconds())
if match_fn(current_event):
event = current_event
except Empty:
continue
logging.debug("Done waiting for an event")
if event is None:
return # Avoid an assert in MessageToString(None, ...)
asserts.assert_true(
event is None,
msg=("Expected None matching, but got %s" %
text_format.MessageToString(event, as_one_line=True)))
def assert_event_occurs(self,
match_fn,
at_least_times=1,
timeout=timedelta(seconds=DEFAULT_TIMEOUT_SECONDS)):
"""
Assert at least |at_least_times| instances of events happen where
match_fn(event) returns True within timeout period
:param match_fn: returns True/False on match_fn(event)
:param timeout: a timedelta object
:param at_least_times: how many times at least a matching event should
happen
:return:
"""
logging.debug("assert_event_occurs %d %fs" % (at_least_times,
timeout.total_seconds()))
event_list = []
end_time = datetime.now() + timeout
while len(event_list) < at_least_times and datetime.now() < end_time:
remaining = self.remaining_time_delta(end_time)
logging.debug("Waiting for event (%fs remaining)" %
(remaining.total_seconds()))
try:
current_event = self.event_queue.get(
timeout=remaining.total_seconds())
if match_fn(current_event):
event_list.append(current_event)
except Empty:
continue
logging.debug("Done waiting for event")
asserts.assert_true(
len(event_list) >= at_least_times,
msg=("Expected at least %d events, but got %d" % (at_least_times,
len(event_list))))
def assert_event_occurs_at_most(
self,
match_fn,
at_most_times,
timeout=timedelta(seconds=DEFAULT_TIMEOUT_SECONDS)):
"""
Assert at most |at_most_times| instances of events happen where
match_fn(event) returns True within timeout period
:param match_fn: returns True/False on match_fn(event)
:param at_most_times: how many times at most a matching event should
happen
:param timeout:a timedelta object
:return:
"""
logging.debug("assert_event_occurs_at_most")
event_list = []
end_time = datetime.now() + timeout
while len(event_list) <= at_most_times and datetime.now() < end_time:
remaining = self.remaining_time_delta(end_time)
logging.debug("Waiting for event iteration (%fs remaining)" %
(remaining.total_seconds()))
try:
current_event = self.event_queue.get(
timeout=remaining.total_seconds())
if match_fn(current_event):
event_list.append(current_event)
except Empty:
continue
logging.debug("Done waiting, got %d events" % len(event_list))
asserts.assert_true(
len(event_list) <= at_most_times,
msg=("Expected at most %d events, but got %d" % (at_most_times,
len(event_list))))