| """Blueberry gRPC device controller. |
| |
| This is a server to act as a mock device for testing the Blueberry gRPC |
| interface. |
| """ |
| |
| from concurrent import futures |
| from absl import app |
| from absl import flags |
| |
| import grpc |
| |
| # Internal import |
| from blueberry.grpc import blueberry_device_controller_service |
| from blueberry.grpc.proto import blueberry_device_controller_pb2_grpc |
| |
| |
| _HOST = '[::]' |
| |
| FLAGS = flags.FLAGS |
| flags.DEFINE_integer('port', 10000, 'port to listen on') |
| flags.DEFINE_integer('threads', 10, 'number of worker threads in thread pool') |
| |
| |
| def main(unused_argv): |
| server = grpc.server( |
| futures.ThreadPoolExecutor(max_workers=FLAGS.threads), |
| ports=(FLAGS.port,)) # pytype: disable=wrong-keyword-args |
| servicer = ( |
| blueberry_device_controller_service.BlueberryDeviceControllerServicer()) |
| blueberry_device_controller_pb2_grpc.add_BlueberryDeviceControllerServicer_to_server( |
| servicer, server) |
| server_creds = loas2.loas2_server_credentials() |
| server.add_secure_port(f'{_HOST}:{FLAGS.port}', server_creds) |
| server.start() |
| server.wait_for_termination() |
| |
| |
| if __name__ == '__main__': |
| app.run(main) |