[binder] Fix WireReaderImpl bugs & races (#27303)

There was a bug found by the fuzzer where we might access wire_writer_ before
finishing SETUP_TRANSPORT (and thus constructing wire_writer_). This PR
fixes such issue by making sure that we won't proceed with any requests
until the connection is fully established.

Since binder transactions may be coming from multiple different threads,
this PRs guard some of the WireReaderImpl's member with a mutex to make
sure there's no races between threads.
diff --git a/src/core/ext/transport/binder/wire_format/wire_reader_impl.cc b/src/core/ext/transport/binder/wire_format/wire_reader_impl.cc
index e39e070..62f2507 100644
--- a/src/core/ext/transport/binder/wire_format/wire_reader_impl.cc
+++ b/src/core/ext/transport/binder/wire_format/wire_reader_impl.cc
@@ -85,12 +85,21 @@
   gpr_log(GPR_INFO, "Setting up transport");
   if (!is_client_) {
     SendSetupTransport(binder.get());
-    return absl::make_unique<WireWriterImpl>(std::move(binder));
+    {
+      grpc_core::MutexLock lock(&mu_);
+      connected_ = true;
+      wire_writer_ = std::make_shared<WireWriterImpl>(std::move(binder));
+    }
+    return wire_writer_;
   } else {
     SendSetupTransport(binder.get());
     auto other_end_binder = RecvSetupTransport();
-    wire_writer_ =
-        std::make_shared<WireWriterImpl>(std::move(other_end_binder));
+    {
+      grpc_core::MutexLock lock(&mu_);
+      connected_ = true;
+      wire_writer_ =
+          std::make_shared<WireWriterImpl>(std::move(other_end_binder));
+    }
     return wire_writer_;
   }
 }
@@ -155,12 +164,20 @@
     return absl::OkStatus();
   }
 
+  grpc_core::MutexLock lock(&mu_);
+
+  if (BinderTransportTxCode(code) != BinderTransportTxCode::SETUP_TRANSPORT &&
+      !connected_) {
+    return absl::InvalidArgumentError("Transports not connected yet");
+  }
+
   switch (BinderTransportTxCode(code)) {
     case BinderTransportTxCode::SETUP_TRANSPORT: {
-      if (connected_) {
-        return absl::InvalidArgumentError("Already connected");
+      if (recvd_setup_transport_) {
+        return absl::InvalidArgumentError(
+            "Already received a SETUP_TRANSPORT request");
       }
-      connected_ = true;
+      recvd_setup_transport_ = true;
       // int datasize;
       int version;
       // getDataSize not supported until 31
@@ -212,6 +229,11 @@
 
 absl::Status WireReaderImpl::ProcessStreamingTransaction(
     transaction_code_t code, const ReadableParcel* parcel) {
+  grpc_core::MutexLock lock(&mu_);
+  if (!connected_) {
+    return absl::InvalidArgumentError("Transports not connected yet");
+  }
+
   // Indicate which callbacks should be cancelled. It will be initialized as the
   // flags the in-coming transaction carries, and when a particular callback is
   // completed, the corresponding bit in cancellation_flag will be set to 0 so
diff --git a/src/core/ext/transport/binder/wire_format/wire_reader_impl.h b/src/core/ext/transport/binder/wire_format/wire_reader_impl.h
index 89c4132..db1f7d5 100644
--- a/src/core/ext/transport/binder/wire_format/wire_reader_impl.h
+++ b/src/core/ext/transport/binder/wire_format/wire_reader_impl.h
@@ -96,16 +96,21 @@
                                            const ReadableParcel* parcel);
   absl::Status ProcessStreamingTransactionImpl(transaction_code_t code,
                                                const ReadableParcel* parcel,
-                                               int* cancellation_flags);
+                                               int* cancellation_flags)
+      ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
 
   std::shared_ptr<TransportStreamReceiver> transport_stream_receiver_;
   absl::Notification connection_noti_;
-  bool connected_ = false;
+  grpc_core::Mutex mu_;
+  bool connected_ ABSL_GUARDED_BY(mu_) = false;
+  bool recvd_setup_transport_ ABSL_GUARDED_BY(mu_) = false;
   // NOTE: other_end_binder_ will be moved out when RecvSetupTransport() is
   // called. Be cautious not to access it afterward.
   std::unique_ptr<Binder> other_end_binder_;
-  absl::flat_hash_map<transaction_code_t, int32_t> expected_seq_num_;
-  absl::flat_hash_map<transaction_code_t, std::string> message_buffer_;
+  absl::flat_hash_map<transaction_code_t, int32_t> expected_seq_num_
+      ABSL_GUARDED_BY(mu_);
+  absl::flat_hash_map<transaction_code_t, std::string> message_buffer_
+      ABSL_GUARDED_BY(mu_);
   std::unique_ptr<TransactionReceiver> tx_receiver_;
   bool is_client_;
   // When WireReaderImpl gets destructed, call on_destruct_callback_. This is
@@ -114,8 +119,8 @@
 
   // ACK every 16k bytes.
   static constexpr int64_t kFlowControlAckBytes = 16 * 1024;
-  int64_t num_incoming_bytes_ = 0;
-  int64_t num_acknowledged_bytes_ = 0;
+  int64_t num_incoming_bytes_ ABSL_GUARDED_BY(mu_) = 0;
+  int64_t num_acknowledged_bytes_ ABSL_GUARDED_BY(mu_) = 0;
 
   // Used to send ACK.
   std::shared_ptr<WireWriter> wire_writer_;
diff --git a/test/core/transport/binder/wire_reader_test.cc b/test/core/transport/binder/wire_reader_test.cc
index ede9086..d20ec35 100644
--- a/test/core/transport/binder/wire_reader_test.cc
+++ b/test/core/transport/binder/wire_reader_test.cc
@@ -20,6 +20,7 @@
 // receiver are correct in all possible situations.
 #include <memory>
 #include <string>
+#include <thread>
 #include <utility>
 
 #include <gtest/gtest.h>
@@ -63,6 +64,14 @@
     }
   }
 
+  void UnblockSetupTransport() {
+    // SETUP_TRANSPORT should finish before we can proceed with any other
+    // requests and streaming calls. The MockBinder will construct a
+    // MockTransactionReceiver, which will then sends SETUP_TRANSPORT request
+    // back to us.
+    wire_reader_.SetupTransport(absl::make_unique<MockBinder>());
+  }
+
   template <typename T>
   absl::Status CallProcessTransaction(T tx_code) {
     return wire_reader_.ProcessTransaction(
@@ -117,23 +126,12 @@
 
 TEST_F(WireReaderTest, ProcessTransactionControlMessageSetupTransport) {
   ::testing::InSequence sequence;
-
-  EXPECT_CALL(mock_readable_parcel_, ReadInt32);
-  EXPECT_CALL(mock_readable_parcel_, ReadBinder)
-      .WillOnce([&](std::unique_ptr<Binder>* binder) {
-        auto mock_binder = absl::make_unique<MockBinder>();
-        // binder that is read from the output parcel must first be initialized
-        // before it can be used.
-        EXPECT_CALL(*mock_binder, Initialize);
-        *binder = std::move(mock_binder);
-        return absl::OkStatus();
-      });
-
-  EXPECT_TRUE(
-      CallProcessTransaction(BinderTransportTxCode::SETUP_TRANSPORT).ok());
+  UnblockSetupTransport();
 }
 
 TEST_F(WireReaderTest, ProcessTransactionControlMessagePingResponse) {
+  ::testing::InSequence sequence;
+  UnblockSetupTransport();
   EXPECT_CALL(mock_readable_parcel_, ReadInt32);
   EXPECT_TRUE(
       CallProcessTransaction(BinderTransportTxCode::PING_RESPONSE).ok());
@@ -141,6 +139,7 @@
 
 TEST_F(WireReaderTest, ProcessTransactionServerRpcDataEmptyFlagIgnored) {
   ::testing::InSequence sequence;
+  UnblockSetupTransport();
 
   // first transaction: empty flag
   ExpectReadInt32(0);
@@ -151,6 +150,7 @@
 TEST_F(WireReaderTest,
        ProcessTransactionServerRpcDataFlagPrefixWithoutMetadata) {
   ::testing::InSequence sequence;
+  UnblockSetupTransport();
 
   // flag
   ExpectReadInt32(kFlagPrefix);
@@ -168,6 +168,7 @@
 
 TEST_F(WireReaderTest, ProcessTransactionServerRpcDataFlagPrefixWithMetadata) {
   ::testing::InSequence sequence;
+  UnblockSetupTransport();
 
   // flag
   ExpectReadInt32(kFlagPrefix);
@@ -200,6 +201,7 @@
 
 TEST_F(WireReaderTest, ProcessTransactionServerRpcDataFlagMessageDataNonEmpty) {
   ::testing::InSequence sequence;
+  UnblockSetupTransport();
 
   // flag
   ExpectReadInt32(kFlagMessageData);
@@ -218,6 +220,7 @@
 
 TEST_F(WireReaderTest, ProcessTransactionServerRpcDataFlagMessageDataEmpty) {
   ::testing::InSequence sequence;
+  UnblockSetupTransport();
 
   // flag
   ExpectReadInt32(kFlagMessageData);
@@ -236,6 +239,7 @@
 
 TEST_F(WireReaderTest, ProcessTransactionServerRpcDataFlagSuffixWithStatus) {
   ::testing::InSequence sequence;
+  UnblockSetupTransport();
 
   constexpr int kStatus = 0x1234;
   // flag
@@ -255,6 +259,7 @@
 
 TEST_F(WireReaderTest, ProcessTransactionServerRpcDataFlagSuffixWithoutStatus) {
   ::testing::InSequence sequence;
+  UnblockSetupTransport();
 
   // flag
   ExpectReadInt32(kFlagSuffix);
@@ -272,6 +277,7 @@
 
 TEST_F(WireReaderTest, InBoundFlowControl) {
   ::testing::InSequence sequence;
+  UnblockSetupTransport();
 
   // flag
   ExpectReadInt32(kFlagMessageData | kFlagMessageDataIsPartial);