[BinderTransport] Fix BinderTransport server race conditions (#31654)

* Set `connected_` to true before `SendSetupTransport()`

A race happens when the server has already sent setup transport to the
client when it's actually not ready. So move `connected_ = true` before
`SendSetupTransport()`.

However, this may cause another race that `wire_writer_` being used when
it's actually not being constructed yet. So add another Notification for
others to wait for it to be ready.

Note: simply moving `SendSetupTransport()` into the mutex section could
potentially cause a deadlock since it's possible that
`ProcessTransaction()` being called on the same call chain.

This solution is considered a workaround before we figure out a better
workable solution. It's not perfect as if the first request sends, e.g.,
64K of data, we'll try to send an ACK and the system will still
deadlock. Furthermore, this solution makes threads waiting (although the
time frame should be very short).

* Store `accept_stream_fn` call count and call it later when possible

We might be signaled to call `accept_stream_fn` before it has been set.
We should memorize this case and call it later once it has been set, or
the stream will never be accepted.

* Add comments and polish the change
diff --git a/src/core/ext/transport/binder/transport/binder_transport.cc b/src/core/ext/transport/binder/transport/binder_transport.cc
index e8553ae..a1a2665 100644
--- a/src/core/ext/transport/binder/transport/binder_transport.cc
+++ b/src/core/ext/transport/binder/transport/binder_transport.cc
@@ -370,6 +370,19 @@
 }  // namespace
 }  // namespace grpc_binder
 
+static void accept_stream_locked(void* gt, grpc_error_handle /*error*/) {
+  grpc_binder_transport* gbt = static_cast<grpc_binder_transport*>(gt);
+  if (gbt->accept_stream_fn) {
+    gpr_log(GPR_INFO, "Accepting a stream");
+    // must pass in a non-null value.
+    (*gbt->accept_stream_fn)(gbt->accept_stream_user_data, &gbt->base, gbt);
+  } else {
+    ++gbt->accept_stream_fn_called_count_;
+    gpr_log(GPR_INFO, "accept_stream_fn not set, current count = %d",
+            gbt->accept_stream_fn_called_count_);
+  }
+}
+
 static void perform_stream_op_locked(void* stream_op,
                                      grpc_error_handle /*error*/) {
   grpc_transport_stream_op_batch* op =
@@ -595,8 +608,16 @@
     gbt->state_tracker.RemoveWatcher(op->stop_connectivity_watch);
   }
   if (op->set_accept_stream) {
-    gbt->accept_stream_fn = op->set_accept_stream_fn;
     gbt->accept_stream_user_data = op->set_accept_stream_user_data;
+    gbt->accept_stream_fn = op->set_accept_stream_fn;
+    gpr_log(GPR_DEBUG, "accept_stream_fn_called_count_ = %d",
+            gbt->accept_stream_fn_called_count_);
+    while (gbt->accept_stream_fn_called_count_ > 0) {
+      --gbt->accept_stream_fn_called_count_;
+      gbt->combiner->Run(
+          GRPC_CLOSURE_CREATE(accept_stream_locked, gbt, nullptr),
+          absl::OkStatus());
+    }
   }
   if (op->on_consumed) {
     grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, absl::OkStatus());
@@ -684,14 +705,6 @@
 
 static const grpc_transport_vtable* get_vtable() { return &vtable; }
 
-static void accept_stream_locked(void* gt, grpc_error_handle /*error*/) {
-  grpc_binder_transport* gbt = static_cast<grpc_binder_transport*>(gt);
-  if (gbt->accept_stream_fn) {
-    // must pass in a non-null value.
-    (*gbt->accept_stream_fn)(gbt->accept_stream_user_data, &gbt->base, gbt);
-  }
-}
-
 grpc_binder_transport::grpc_binder_transport(
     std::unique_ptr<grpc_binder::Binder> binder, bool is_client,
     std::shared_ptr<grpc::experimental::binder::SecurityPolicy> security_policy)
diff --git a/src/core/ext/transport/binder/transport/binder_transport.h b/src/core/ext/transport/binder/transport/binder_transport.h
index 574ab62..9205886 100644
--- a/src/core/ext/transport/binder/transport/binder_transport.h
+++ b/src/core/ext/transport/binder/transport/binder_transport.h
@@ -74,6 +74,10 @@
   void (*accept_stream_fn)(void* user_data, grpc_transport* transport,
                            const void* server_data) = nullptr;
   void* accept_stream_user_data = nullptr;
+  // `accept_stream_locked()` could be called before `accept_stream_fn` has been
+  // set, we need to remember those requests that comes too early and call them
+  // later when we can.
+  int accept_stream_fn_called_count_{0};
 
   grpc_core::ConnectivityStateTracker state_tracker;
   grpc_core::RefCount refs;
diff --git a/src/core/ext/transport/binder/wire_format/wire_reader_impl.cc b/src/core/ext/transport/binder/wire_format/wire_reader_impl.cc
index 1deb3a4..6688cc2 100644
--- a/src/core/ext/transport/binder/wire_format/wire_reader_impl.cc
+++ b/src/core/ext/transport/binder/wire_format/wire_reader_impl.cc
@@ -81,12 +81,13 @@
 std::shared_ptr<WireWriter> WireReaderImpl::SetupTransport(
     std::unique_ptr<Binder> binder) {
   if (!is_client_) {
+    connected_ = true;
     SendSetupTransport(binder.get());
     {
       grpc_core::MutexLock lock(&mu_);
-      connected_ = true;
       wire_writer_ = std::make_shared<WireWriterImpl>(std::move(binder));
     }
+    wire_writer_ready_notification_.Notify();
     return wire_writer_;
   } else {
     SendSetupTransport(binder.get());
@@ -97,6 +98,7 @@
       wire_writer_ =
           std::make_shared<WireWriterImpl>(std::move(other_end_binder));
     }
+    wire_writer_ready_notification_.Notify();
     return wire_writer_;
   }
 }
@@ -213,6 +215,11 @@
       int64_t num_bytes = -1;
       GRPC_RETURN_IF_ERROR(parcel->ReadInt64(&num_bytes));
       gpr_log(GPR_DEBUG, "received acknowledge bytes = %" PRId64, num_bytes);
+      if (!wire_writer_ready_notification_.WaitForNotificationWithTimeout(
+              absl::Seconds(5))) {
+        return absl::DeadlineExceededError(
+            "wire_writer_ is not ready in time!");
+      }
       wire_writer_->OnAckReceived(num_bytes);
       break;
     }
@@ -282,6 +289,10 @@
     }
   }
   if (need_to_send_ack) {
+    if (!wire_writer_ready_notification_.WaitForNotificationWithTimeout(
+            absl::Seconds(5))) {
+      return absl::DeadlineExceededError("wire_writer_ is not ready in time!");
+    }
     GPR_ASSERT(wire_writer_);
     // wire_writer_ should not be accessed while holding mu_!
     // Otherwise, it is possible that
diff --git a/src/core/ext/transport/binder/wire_format/wire_reader_impl.h b/src/core/ext/transport/binder/wire_format/wire_reader_impl.h
index e1a47b4..02f8eee 100644
--- a/src/core/ext/transport/binder/wire_format/wire_reader_impl.h
+++ b/src/core/ext/transport/binder/wire_format/wire_reader_impl.h
@@ -87,7 +87,7 @@
   ///
   /// This is the other half of the SETUP_TRANSPORT process. We wait for
   /// in-coming SETUP_TRANSPORT request with the "sending" part of a binder from
-  /// the other end. For client, the message is coming from the trasnaction
+  /// the other end. For client, the message is coming from the transaction
   /// receiver we just constructed in SendSetupTransport(). For server, we
   /// assume that this step is already completed.
   // TODO(waynetu): In the testing environment, we still use this method (on
@@ -107,7 +107,7 @@
   std::shared_ptr<TransportStreamReceiver> transport_stream_receiver_;
   grpc_core::Notification connection_noti_;
   grpc_core::Mutex mu_;
-  bool connected_ ABSL_GUARDED_BY(mu_) = false;
+  std::atomic_bool connected_{false};
   bool recvd_setup_transport_ ABSL_GUARDED_BY(mu_) = false;
   // NOTE: other_end_binder_ will be moved out when RecvSetupTransport() is
   // called. Be cautious not to access it afterward.
@@ -130,6 +130,25 @@
 
   // Used to send ACK.
   std::shared_ptr<WireWriter> wire_writer_;
+
+  // Workaround for race condition.
+  //
+  // In `SetupTransport()`, we set `connected_` to true, call
+  // `SendSetupTransport()`, and construct `wire_writer_`. There is a potential
+  // race condition between calling `SendSetupTransport()` and constructing
+  // `wire_writer_`. So use this notification to wait. This should be very fast
+  // and waiting is acceptable.
+  //
+  // The original problem was that we can't move `connected_ = true` and
+  // `SendSetupTransport()` into the mutex, as it will deadlock if
+  // `ProcessTransaction()` is called in the same call chain.
+  //
+  // Note: this is not the perfect solution, the system will still deadlock if,
+  // e.g., the first request is 64K and we entered the sending ACK code path.
+  //
+  // TODO(littlecvr): Figure out a better solution to not causing any potential
+  // deadlock and not having to wait.
+  grpc_core::Notification wire_writer_ready_notification_;
 };
 
 }  // namespace grpc_binder