blob: 00ea9cc83fb20eb45852545753f3ba6ab5835aa3 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2019 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from concurrent.futures import ThreadPoolExecutor
from grpc import RpcError
import logging
class EventCallbackStream(object):
"""
A an object that translate a gRPC stream of events to a Python stream of
callbacks.
All callbacks are non-sticky. This means that user will only receive callback
generated after EventCallbackStream is registered and will not receive any
callback after EventCallbackStream is unregistered
You would need a new EventCallbackStream and anything that depends on this
object once shutdown() is called
"""
def __init__(self, server_stream_call):
"""
Construct this object, call the |grpc_lambda| and trigger event_callback on
the thread used to create this object until |destroy| is called when this
object can no longer be used
:param server_stream_call: A server stream call object returned from
calling a gRPC server stream RPC API. The
object must support iterator interface (i.e.
next() method) and the grpc.Call interface
so that we can cancel it
:param event_callback: callback to be invoked with the only argument as
the generated event. The callback will be invoked
on a separate thread created within this object
"""
if server_stream_call is None:
raise ValueError("server_stream_call must not be None")
self.server_stream_call = server_stream_call
self.handlers = []
self.executor = ThreadPoolExecutor()
self.future = self.executor.submit(EventCallbackStream._event_loop,
self)
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.shutdown()
if traceback is None:
return True
else:
return False
def __del__(self):
self.shutdown()
def register_callback(self, callback, matcher_fn=None):
"""
Register a callback to handle events. Event will be handled by callback
if matcher_fn(event) returns True
callback and matcher are registered as a tuple. Hence the same callback
with different matcher are considered two different handler units. Same
matcher, but different callback are also considered different handling
unit
Callback will be invoked on a ThreadPoolExecutor owned by this
EventCallbackStream
:param callback: Will be called as callback(event)
:param matcher_fn: A boolean function that returns True or False when
calling matcher_fn(event), if None, all event will
be matched
"""
if callback is None:
raise ValueError("callback must not be None")
self.handlers.append((callback, matcher_fn))
def unregister_callback(self, callback, matcher_fn=None):
"""
Unregister callback and matcher_fn from the event stream. Both objects
must match exactly the ones when calling register_callback()
:param callback: callback used in register_callback()
:param matcher_fn: matcher_fn used in register_callback()
:raises ValueError when (callback, matcher_fn) tuple is not found
"""
if callback is None:
raise ValueError("callback must not be None")
self.handlers.remove((callback, matcher_fn))
def shutdown(self):
"""
Stop the gRPC lambda so that event_callback will not be invoked after th
method returns.
This object will be useless after this call as there is no way to restart
the gRPC callback. You would have to create a new EventCallbackStream
:return: None on success, exception object on failure
"""
while not self.server_stream_call.done():
self.server_stream_call.cancel()
exception_for_return = None
try:
result = self.future.result()
if result:
logging.warning("Inner loop error %s" % result)
raise result
except Exception as exp:
logging.warning("Exception: %s" % (exp))
exception_for_return = exp
self.executor.shutdown()
return exception_for_return
def _event_loop(self):
"""
Main loop for consuming the gRPC stream events.
Blocks until computation is cancelled
:return: None on success, exception object on failure
"""
try:
for event in self.server_stream_call:
for (callback, matcher_fn) in self.handlers:
if not matcher_fn or matcher_fn(event):
callback(event)
return None
except RpcError as exp:
if self.server_stream_call.cancelled():
logging.debug("Cancelled")
return None
else:
logging.warning("Some RPC error not due to cancellation")
return exp