blob: 7cde95d9d7aa704dc8500178a4632061a61f8123 [file] [log] [blame]
/*
* Copyright 2019 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <functional>
#include <future>
#include <memory>
#include <mutex>
#include <grpc++/grpc++.h>
#include "os/log.h"
namespace bluetooth {
namespace grpc {
// To be passed to gRPC async invocations as tag.
// Function is called when the CompletionQueue.Next() returns this tag.
// Then, user needs to delete this object.
using GrpcAsyncEventCallback = std::function<void(bool)>;
template <typename REQ, typename RES>
class GrpcAsyncServerStreamingHandler {
public:
virtual ~GrpcAsyncServerStreamingHandler() = default;
// Implementation for requesting the next specific type RPC, using provided parameters.
virtual void OnReadyForNextRequest(::grpc::ServerContext*, REQ* req, ::grpc::ServerAsyncWriter<RES>* res,
::grpc::CompletionQueue* new_call_cq,
::grpc::ServerCompletionQueue* notification_cq, void* tag) = 0;
virtual void OnRpcRequestReceived(REQ req) = 0;
virtual void OnRpcRequestFailed() {}
virtual void OnRpcFinished() {}
virtual void OnWriteSuccess() {}
};
// Provides API to upper layer users to control (request, write, finish) a server-streaming asynchronous RPC.
// When each API is done, callback will be sent to the given GrpcAsyncServerStreamingHandler.
// Each control box can take one active RPC at one time.
// TODO: problems with this control box:
// 1. RequestNewRpc is async, but Write and Stop is blocking users. Do we want to do this?
// 2. Callback to user is done in the gRPC thread. Let's create a pool thread to give it to user?
// 3. Currently it uses promise to synchronize between events. If we use os/handler it should be easier.
template <typename REQ, typename RES>
class GrpcAsyncServerStreamingControlBox {
public:
GrpcAsyncServerStreamingControlBox(GrpcAsyncServerStreamingHandler<REQ, RES>* async_handler,
::grpc::ServerCompletionQueue* cq)
: async_handler_(async_handler), cq_(cq) {}
void RequestNewRpc() {
ASSERT(my_state_ == MyState::IDLE);
context_ = std::make_unique<::grpc::ServerContext>();
req_ = std::make_unique<REQ>();
res_ = std::make_unique<::grpc::ServerAsyncWriter<RES>>(context_.get());
request_done_ = std::make_unique<GrpcAsyncEventCallback>([this](bool ok) { this->RequestDone(ok); });
async_handler_->OnReadyForNextRequest(context_.get(), req_.get(), res_.get(), cq_, cq_, request_done_.get());
my_state_ = MyState::REQUESTING;
}
void Write(const RES& res) {
std::unique_lock<std::mutex> lock(mutex_);
if (my_state_ == MyState::IDLE || my_state_ == MyState::REQUESTING) {
LOG_INFO("stream already stopped");
return;
}
ASSERT(my_state_ == MyState::OPEN);
write_done_ = std::make_unique<GrpcAsyncEventCallback>([this](bool ok) { this->WriteDone(ok); });
my_state_ = MyState::WRITING;
res_->Write(res, write_done_.get());
promise_ = new std::promise<void>();
auto future = promise_->get_future();
future.wait();
}
void StopStreaming() {
std::unique_lock<std::mutex> lock(mutex_);
ASSERT(my_state_ == MyState::OPEN);
rpc_finish_ = std::make_unique<GrpcAsyncEventCallback>([this](bool ok) { this->RpcFinish(ok); });
my_state_ = MyState::FINISHING;
res_->Finish(::grpc::Status::OK, rpc_finish_.get());
promise_ = new std::promise<void>();
auto future = promise_->get_future();
future.wait();
}
private:
void RequestDone(bool ok) {
ASSERT(my_state_ == MyState::REQUESTING);
if (ok) {
async_handler_->OnRpcRequestReceived(*req_);
my_state_ = MyState::OPEN;
} else {
clean_up();
async_handler_->OnRpcRequestFailed();
my_state_ = MyState::IDLE;
}
}
void WriteDone(bool ok) {
ASSERT(my_state_ == MyState::WRITING);
if (ok) {
my_state_ = MyState::OPEN;
async_handler_->OnWriteSuccess();
} else {
clean_up();
my_state_ = MyState::IDLE;
async_handler_->OnRpcFinished();
}
promise_->set_value();
}
void RpcFinish(bool ok) {
ASSERT(ok);
ASSERT(my_state_ == MyState::FINISHING);
clean_up();
my_state_ = MyState::IDLE;
async_handler_->OnRpcFinished();
promise_->set_value();
}
void clean_up() {
context_ = nullptr;
req_ = nullptr;
res_ = nullptr;
}
mutable std::mutex mutex_;
std::promise<void>* promise_ = nullptr;
GrpcAsyncServerStreamingHandler<REQ, RES>* async_handler_;
::grpc::ServerCompletionQueue* cq_;
std::unique_ptr<::grpc::ServerContext> context_ = nullptr;
std::unique_ptr<REQ> req_ = nullptr;
std::unique_ptr<::grpc::ServerAsyncWriter<RES>> res_ = nullptr;
std::unique_ptr<GrpcAsyncEventCallback> request_done_ = nullptr;
std::unique_ptr<GrpcAsyncEventCallback> write_done_ = nullptr;
std::unique_ptr<GrpcAsyncEventCallback> rpc_finish_ = nullptr;
enum class MyState { IDLE, REQUESTING, OPEN, WRITING, FINISHING } my_state_ = MyState::IDLE;
};
} // namespace grpc
} // namespace bluetooth