blob: 02f8eee4356a02f122cd3f22165fdc38ace65187 [file] [log] [blame]
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_EXT_TRANSPORT_BINDER_WIRE_FORMAT_WIRE_READER_IMPL_H
#define GRPC_CORE_EXT_TRANSPORT_BINDER_WIRE_FORMAT_WIRE_READER_IMPL_H
#include <grpc/support/port_platform.h>
#include <memory>
#include <utility>
#include "absl/container/flat_hash_map.h"
#include <grpcpp/security/binder_security_policy.h>
#include "src/core/ext/transport/binder/utils/transport_stream_receiver.h"
#include "src/core/ext/transport/binder/wire_format/binder.h"
#include "src/core/ext/transport/binder/wire_format/wire_reader.h"
#include "src/core/ext/transport/binder/wire_format/wire_writer.h"
#include "src/core/lib/gprpp/notification.h"
namespace grpc_binder {
class WireReaderImpl : public WireReader {
public:
WireReaderImpl(
std::shared_ptr<TransportStreamReceiver> transport_stream_receiver,
bool is_client,
std::shared_ptr<grpc::experimental::binder::SecurityPolicy>
security_policy,
std::function<void()> on_destruct_callback = nullptr);
~WireReaderImpl() override;
void Orphan() override { Unref(); }
/// Setup the transport between endpoint binders.
///
/// The client and the server both call SetupTransport() when constructing
/// transport.
///
/// High-level overview of transaction setup:
/// 0. Client obtains an |endpoint_binder| from the server (in the Android
/// setting, this can be achieved by "binding" to the server APK).
/// 1. Client creates a binder |client_binder| and hook its on-transaction
/// callback to client's ProcessTransaction(). Client then sends
/// |client_binder| through |endpoint_binder| to server.
/// 2. Server receives |client_binder| via |endpoint_binder|.
/// 3. Server creates a binder |server_binder| and hook its on-transaction
/// callback to server's ProcessTransaction(). Server then sends
/// |server_binder| through |client_binder| back to the client.
/// 4. Client receives |server_binder| via |client_binder|'s on-transaction
/// callback.
///
/// The parameter \p binder here means different things for client nad server.
/// For client, \p binder refers to |endpoint_binder|, and for server, \p
/// binder refers to |client_binder|. That is, for server-side transport
/// setup, we assume that the first half of SETUP_TRANSPORT (up to step 2) is
/// already done somewhere else (see test/end2end/binder_transport_test.cc for
/// how it's handled in the testing environment).
std::shared_ptr<WireWriter> SetupTransport(
std::unique_ptr<Binder> binder) override;
absl::Status ProcessTransaction(transaction_code_t code,
ReadableParcel* parcel, int uid);
/// Send SETUP_TRANSPORT request through \p binder.
///
/// This is the one half (for client it's the first half, and for server it's
/// the second) of the SETUP_TRANSPORT negotiation process. First, a new
/// binder is created. We take its "receiving" part and construct the
/// transaction receiver with it, and sends the "sending" part along with the
/// SETUP_TRANSPORT message through \p binder.
void SendSetupTransport(Binder* binder);
/// Recv SETUP_TRANSPORT request.
///
/// This is the other half of the SETUP_TRANSPORT process. We wait for
/// in-coming SETUP_TRANSPORT request with the "sending" part of a binder from
/// the other end. For client, the message is coming from the transaction
/// receiver we just constructed in SendSetupTransport(). For server, we
/// assume that this step is already completed.
// TODO(waynetu): In the testing environment, we still use this method (on
// another WireReader instance) for server-side transport setup, and thus it
// is marked as public. Try moving this method back to private, and hopefully
// we can also avoid moving |other_end_binder_| out in the implementation.
std::unique_ptr<Binder> RecvSetupTransport();
private:
absl::Status ProcessStreamingTransaction(transaction_code_t code,
ReadableParcel* parcel);
absl::Status ProcessStreamingTransactionImpl(transaction_code_t code,
ReadableParcel* parcel,
int* cancellation_flags)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
std::shared_ptr<TransportStreamReceiver> transport_stream_receiver_;
grpc_core::Notification connection_noti_;
grpc_core::Mutex mu_;
std::atomic_bool connected_{false};
bool recvd_setup_transport_ ABSL_GUARDED_BY(mu_) = false;
// NOTE: other_end_binder_ will be moved out when RecvSetupTransport() is
// called. Be cautious not to access it afterward.
std::unique_ptr<Binder> other_end_binder_;
absl::flat_hash_map<transaction_code_t, int32_t> expected_seq_num_
ABSL_GUARDED_BY(mu_);
absl::flat_hash_map<transaction_code_t, std::string> message_buffer_
ABSL_GUARDED_BY(mu_);
std::unique_ptr<TransactionReceiver> tx_receiver_;
bool is_client_;
std::shared_ptr<grpc::experimental::binder::SecurityPolicy> security_policy_;
// When WireReaderImpl gets destructed, call on_destruct_callback_. This is
// mostly for decrementing the reference count of its transport.
std::function<void()> on_destruct_callback_;
// ACK every 16k bytes.
static constexpr int64_t kFlowControlAckBytes = 16 * 1024;
int64_t num_incoming_bytes_ ABSL_GUARDED_BY(mu_) = 0;
int64_t num_acknowledged_bytes_ ABSL_GUARDED_BY(mu_) = 0;
// Used to send ACK.
std::shared_ptr<WireWriter> wire_writer_;
// Workaround for race condition.
//
// In `SetupTransport()`, we set `connected_` to true, call
// `SendSetupTransport()`, and construct `wire_writer_`. There is a potential
// race condition between calling `SendSetupTransport()` and constructing
// `wire_writer_`. So use this notification to wait. This should be very fast
// and waiting is acceptable.
//
// The original problem was that we can't move `connected_ = true` and
// `SendSetupTransport()` into the mutex, as it will deadlock if
// `ProcessTransaction()` is called in the same call chain.
//
// Note: this is not the perfect solution, the system will still deadlock if,
// e.g., the first request is 64K and we entered the sending ACK code path.
//
// TODO(littlecvr): Figure out a better solution to not causing any potential
// deadlock and not having to wait.
grpc_core::Notification wire_writer_ready_notification_;
};
} // namespace grpc_binder
#endif // GRPC_CORE_EXT_TRANSPORT_BINDER_WIRE_FORMAT_WIRE_READER_IMPL_H