| // Copyright 2021 gRPC authors. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| #include <grpc/impl/codegen/port_platform.h> |
| |
| #include "src/core/ext/transport/binder/wire_format/wire_reader_impl.h" |
| |
| #include <functional> |
| #include <limits> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include "absl/memory/memory.h" |
| #include "absl/status/statusor.h" |
| |
| #include <grpc/support/log.h> |
| |
| #include "src/core/ext/transport/binder/utils/transport_stream_receiver.h" |
| #include "src/core/ext/transport/binder/wire_format/binder.h" |
| #include "src/core/ext/transport/binder/wire_format/wire_writer.h" |
| |
| #define RETURN_IF_ERROR(expr) \ |
| do { \ |
| const absl::Status status = (expr); \ |
| if (!status.ok()) return status; \ |
| } while (0) |
| |
| namespace grpc_binder { |
| namespace { |
| |
| absl::StatusOr<Metadata> parse_metadata(const ReadableParcel* reader) { |
| int num_header; |
| RETURN_IF_ERROR(reader->ReadInt32(&num_header)); |
| gpr_log(GPR_INFO, "num_header = %d", num_header); |
| if (num_header < 0) { |
| return absl::InvalidArgumentError("num_header cannot be negative"); |
| } |
| std::vector<std::pair<std::string, std::string>> ret; |
| for (int i = 0; i < num_header; i++) { |
| int count; |
| RETURN_IF_ERROR(reader->ReadInt32(&count)); |
| gpr_log(GPR_INFO, "count = %d", count); |
| std::string key{}; |
| if (count > 0) RETURN_IF_ERROR(reader->ReadByteArray(&key)); |
| gpr_log(GPR_INFO, "key = %s", key.c_str()); |
| RETURN_IF_ERROR(reader->ReadInt32(&count)); |
| gpr_log(GPR_INFO, "count = %d", count); |
| std::string value{}; |
| if (count > 0) RETURN_IF_ERROR(reader->ReadByteArray(&value)); |
| gpr_log(GPR_INFO, "value = %s", value.c_str()); |
| ret.emplace_back(key, value); |
| } |
| return ret; |
| } |
| |
| } // namespace |
| |
| WireReaderImpl::WireReaderImpl( |
| std::shared_ptr<TransportStreamReceiver> transport_stream_receiver, |
| bool is_client, std::function<void()> on_destruct_callback) |
| : transport_stream_receiver_(std::move(transport_stream_receiver)), |
| is_client_(is_client), |
| on_destruct_callback_(on_destruct_callback) {} |
| |
| WireReaderImpl::~WireReaderImpl() { |
| if (on_destruct_callback_) { |
| on_destruct_callback_(); |
| } |
| } |
| |
| std::shared_ptr<WireWriter> WireReaderImpl::SetupTransport( |
| std::unique_ptr<Binder> binder) { |
| gpr_log(GPR_INFO, "Setting up transport"); |
| if (!is_client_) { |
| SendSetupTransport(binder.get()); |
| { |
| grpc_core::MutexLock lock(&mu_); |
| connected_ = true; |
| wire_writer_ = std::make_shared<WireWriterImpl>(std::move(binder)); |
| } |
| return wire_writer_; |
| } else { |
| SendSetupTransport(binder.get()); |
| auto other_end_binder = RecvSetupTransport(); |
| { |
| grpc_core::MutexLock lock(&mu_); |
| connected_ = true; |
| wire_writer_ = |
| std::make_shared<WireWriterImpl>(std::move(other_end_binder)); |
| } |
| return wire_writer_; |
| } |
| } |
| |
| void WireReaderImpl::SendSetupTransport(Binder* binder) { |
| binder->Initialize(); |
| gpr_log(GPR_INFO, "prepare transaction = %d", |
| binder->PrepareTransaction().ok()); |
| WritableParcel* writable_parcel = binder->GetWritableParcel(); |
| gpr_log(GPR_INFO, "data position = %d", writable_parcel->GetDataPosition()); |
| // gpr_log(GPR_INFO, "set data position to 0 = %d", |
| // writer->SetDataPosition(0)); |
| gpr_log(GPR_INFO, "data position = %d", writable_parcel->GetDataPosition()); |
| int32_t version = 77; |
| gpr_log(GPR_INFO, "write int32 = %d", |
| writable_parcel->WriteInt32(version).ok()); |
| gpr_log(GPR_INFO, "data position = %d", writable_parcel->GetDataPosition()); |
| // The lifetime of the transaction receiver is the same as the wire writer's. |
| // The transaction receiver is responsible for not calling the on-transact |
| // callback when it's dead. |
| // Give TransactionReceiver a Ref() since WireReader cannot be destructed |
| // during callback execution. TransactionReceiver should make sure that the |
| // callback owns a Ref() when it's being invoked. |
| tx_receiver_ = binder->ConstructTxReceiver( |
| /*wire_reader_ref=*/Ref(), |
| [this](transaction_code_t code, const ReadableParcel* readable_parcel) { |
| return this->ProcessTransaction(code, readable_parcel); |
| }); |
| |
| gpr_log(GPR_INFO, "tx_receiver = %p", tx_receiver_->GetRawBinder()); |
| gpr_log(GPR_INFO, "AParcel_writeStrongBinder = %d", |
| writable_parcel->WriteBinder(tx_receiver_.get()).ok()); |
| gpr_log(GPR_INFO, "AIBinder_transact = %d", |
| binder->Transact(BinderTransportTxCode::SETUP_TRANSPORT).ok()); |
| } |
| |
| std::unique_ptr<Binder> WireReaderImpl::RecvSetupTransport() { |
| // TODO(b/191941760): avoid blocking, handle wire_writer_noti lifetime |
| // better |
| gpr_log(GPR_INFO, "start waiting for noti"); |
| connection_noti_.WaitForNotification(); |
| gpr_log(GPR_INFO, "end waiting for noti"); |
| return std::move(other_end_binder_); |
| } |
| |
| absl::Status WireReaderImpl::ProcessTransaction(transaction_code_t code, |
| const ReadableParcel* parcel) { |
| gpr_log(GPR_INFO, __func__); |
| gpr_log(GPR_INFO, "tx code = %u", code); |
| if (code >= static_cast<unsigned>(kFirstCallId)) { |
| gpr_log(GPR_INFO, "This is probably a Streaming Tx"); |
| return ProcessStreamingTransaction(code, parcel); |
| } |
| |
| if (!(code >= static_cast<transaction_code_t>( |
| BinderTransportTxCode::SETUP_TRANSPORT) && |
| code <= static_cast<transaction_code_t>( |
| BinderTransportTxCode::PING_RESPONSE))) { |
| gpr_log(GPR_INFO, |
| "Received unknown control message. Shutdown transport gracefully."); |
| // TODO(waynetu): Shutdown transport gracefully. |
| return absl::OkStatus(); |
| } |
| |
| grpc_core::MutexLock lock(&mu_); |
| |
| if (BinderTransportTxCode(code) != BinderTransportTxCode::SETUP_TRANSPORT && |
| !connected_) { |
| return absl::InvalidArgumentError("Transports not connected yet"); |
| } |
| |
| switch (BinderTransportTxCode(code)) { |
| case BinderTransportTxCode::SETUP_TRANSPORT: { |
| if (recvd_setup_transport_) { |
| return absl::InvalidArgumentError( |
| "Already received a SETUP_TRANSPORT request"); |
| } |
| recvd_setup_transport_ = true; |
| // int datasize; |
| int version; |
| // getDataSize not supported until 31 |
| // gpr_log(GPR_INFO, "getDataSize = %d", AParcel_getDataSize(in, |
| // &datasize)); |
| RETURN_IF_ERROR(parcel->ReadInt32(&version)); |
| // gpr_log(GPR_INFO, "data size = %d", datasize); |
| gpr_log(GPR_INFO, "version = %d", version); |
| std::unique_ptr<Binder> binder{}; |
| RETURN_IF_ERROR(parcel->ReadBinder(&binder)); |
| if (!binder) { |
| return absl::InternalError("Read NULL binder from the parcel"); |
| } |
| binder->Initialize(); |
| other_end_binder_ = std::move(binder); |
| connection_noti_.Notify(); |
| break; |
| } |
| case BinderTransportTxCode::SHUTDOWN_TRANSPORT: { |
| gpr_log(GPR_ERROR, |
| "Received SHUTDOWN_TRANSPORT request but not implemented yet."); |
| return absl::UnimplementedError("SHUTDOWN_TRANSPORT"); |
| } |
| case BinderTransportTxCode::ACKNOWLEDGE_BYTES: { |
| int num_bytes = -1; |
| RETURN_IF_ERROR(parcel->ReadInt32(&num_bytes)); |
| gpr_log(GPR_INFO, "received acknowledge bytes = %d", num_bytes); |
| break; |
| } |
| case BinderTransportTxCode::PING: { |
| if (is_client_) { |
| return absl::FailedPreconditionError("Receive PING request in client"); |
| } |
| int ping_id = -1; |
| RETURN_IF_ERROR(parcel->ReadInt32(&ping_id)); |
| gpr_log(GPR_INFO, "received ping id = %d", ping_id); |
| // TODO(waynetu): Ping back. |
| break; |
| } |
| case BinderTransportTxCode::PING_RESPONSE: { |
| int value = -1; |
| RETURN_IF_ERROR(parcel->ReadInt32(&value)); |
| gpr_log(GPR_INFO, "received ping response = %d", value); |
| break; |
| } |
| } |
| return absl::OkStatus(); |
| } |
| |
| absl::Status WireReaderImpl::ProcessStreamingTransaction( |
| transaction_code_t code, const ReadableParcel* parcel) { |
| grpc_core::MutexLock lock(&mu_); |
| if (!connected_) { |
| return absl::InvalidArgumentError("Transports not connected yet"); |
| } |
| |
| // Indicate which callbacks should be cancelled. It will be initialized as the |
| // flags the in-coming transaction carries, and when a particular callback is |
| // completed, the corresponding bit in cancellation_flag will be set to 0 so |
| // that we won't cancel it afterward. |
| int cancellation_flags = 0; |
| absl::Status status = |
| ProcessStreamingTransactionImpl(code, parcel, &cancellation_flags); |
| if (!status.ok()) { |
| gpr_log(GPR_ERROR, "Failed to process streaming transaction: %s", |
| status.ToString().c_str()); |
| // Something went wrong when receiving transaction. Cancel failed requests. |
| if (cancellation_flags & kFlagPrefix) { |
| gpr_log(GPR_INFO, "cancelling initial metadata"); |
| transport_stream_receiver_->NotifyRecvInitialMetadata(code, status); |
| } |
| if (cancellation_flags & kFlagMessageData) { |
| gpr_log(GPR_INFO, "cancelling message data"); |
| transport_stream_receiver_->NotifyRecvMessage(code, status); |
| } |
| if (cancellation_flags & kFlagSuffix) { |
| gpr_log(GPR_INFO, "cancelling trailing metadata"); |
| transport_stream_receiver_->NotifyRecvTrailingMetadata(code, status, 0); |
| } |
| } |
| if ((num_incoming_bytes_ - num_acknowledged_bytes_) >= kFlowControlAckBytes) { |
| absl::Status ack_status = wire_writer_->Ack(num_incoming_bytes_); |
| if (status.ok()) { |
| status = ack_status; |
| } |
| num_acknowledged_bytes_ = num_incoming_bytes_; |
| } |
| return status; |
| } |
| |
| absl::Status WireReaderImpl::ProcessStreamingTransactionImpl( |
| transaction_code_t code, const ReadableParcel* parcel, |
| int* cancellation_flags) { |
| GPR_ASSERT(cancellation_flags); |
| |
| int flags; |
| RETURN_IF_ERROR(parcel->ReadInt32(&flags)); |
| gpr_log(GPR_INFO, "flags = %d", flags); |
| *cancellation_flags = flags; |
| |
| // Ignore in-coming transaction with flag = 0 to match with Java |
| // implementation. |
| // TODO(waynetu): Check with grpc-java team to see whether this is the |
| // intended behavior. |
| // TODO(waynetu): What should be returned here? |
| if (flags == 0) { |
| gpr_log(GPR_INFO, "[WARNING] Receive empty transaction. Ignored."); |
| return absl::OkStatus(); |
| } |
| |
| int status = flags >> 16; |
| gpr_log(GPR_INFO, "status = %d", status); |
| gpr_log(GPR_INFO, "FLAG_PREFIX = %d", (flags & kFlagPrefix)); |
| gpr_log(GPR_INFO, "FLAG_MESSAGE_DATA = %d", (flags & kFlagMessageData)); |
| gpr_log(GPR_INFO, "FLAG_SUFFIX = %d", (flags & kFlagSuffix)); |
| int seq_num; |
| RETURN_IF_ERROR(parcel->ReadInt32(&seq_num)); |
| // TODO(waynetu): For now we'll just assume that the transactions commit in |
| // the same order they're issued. The following assertion detects |
| // out-of-order or missing transactions. WireReaderImpl should be fixed if |
| // we indeed found such behavior. |
| int32_t& expectation = expected_seq_num_[code]; |
| if (seq_num < 0 || seq_num != expectation) { |
| // Unexpected sequence number. |
| return absl::InternalError("Unexpected sequence number"); |
| } |
| // TODO(waynetu): According to the protocol, "The sequence number will wrap |
| // around to 0 if more than 2^31 messages are sent." For now we'll just |
| // assert that it never reach such circumstances. |
| GPR_ASSERT(expectation < std::numeric_limits<int32_t>::max() && |
| "Sequence number too large"); |
| expectation++; |
| gpr_log(GPR_INFO, "sequence number = %d", seq_num); |
| if (flags & kFlagPrefix) { |
| char method_ref[111]; |
| memset(method_ref, 0, sizeof(method_ref)); |
| if (!is_client_) { |
| RETURN_IF_ERROR(parcel->ReadString(method_ref)); |
| } |
| absl::StatusOr<Metadata> initial_metadata_or_error = parse_metadata(parcel); |
| if (!initial_metadata_or_error.ok()) { |
| return initial_metadata_or_error.status(); |
| } |
| if (!is_client_) { |
| initial_metadata_or_error->emplace_back(":path", |
| std::string("/") + method_ref); |
| } |
| transport_stream_receiver_->NotifyRecvInitialMetadata( |
| code, *initial_metadata_or_error); |
| *cancellation_flags &= ~kFlagPrefix; |
| } |
| if (flags & kFlagMessageData) { |
| int count; |
| RETURN_IF_ERROR(parcel->ReadInt32(&count)); |
| gpr_log(GPR_INFO, "count = %d", count); |
| std::string msg_data{}; |
| if (count > 0) { |
| RETURN_IF_ERROR(parcel->ReadByteArray(&msg_data)); |
| } |
| gpr_log(GPR_INFO, "msg_data = %s", msg_data.c_str()); |
| message_buffer_[code] += msg_data; |
| num_incoming_bytes_ += count; |
| if ((flags & kFlagMessageDataIsPartial) == 0) { |
| std::string s = std::move(message_buffer_[code]); |
| message_buffer_.erase(code); |
| transport_stream_receiver_->NotifyRecvMessage(code, std::move(s)); |
| } |
| *cancellation_flags &= ~kFlagMessageData; |
| } |
| if (flags & kFlagSuffix) { |
| if (flags & kFlagStatusDescription) { |
| // FLAG_STATUS_DESCRIPTION set |
| char desc[111]; |
| memset(desc, 0, sizeof(desc)); |
| RETURN_IF_ERROR(parcel->ReadString(desc)); |
| gpr_log(GPR_INFO, "description = %s", desc); |
| } |
| Metadata trailing_metadata; |
| if (is_client_) { |
| absl::StatusOr<Metadata> trailing_metadata_or_error = |
| parse_metadata(parcel); |
| if (!trailing_metadata_or_error.ok()) { |
| return trailing_metadata_or_error.status(); |
| } |
| trailing_metadata = *trailing_metadata_or_error; |
| } |
| transport_stream_receiver_->NotifyRecvTrailingMetadata( |
| code, std::move(trailing_metadata), status); |
| *cancellation_flags &= ~kFlagSuffix; |
| } |
| return absl::OkStatus(); |
| } |
| |
| } // namespace grpc_binder |