Move pc.transport_controller_ to be network thread only

A pointer to the transport controller is now maintained on
both the network thread and the signaling thread. We use
thread specific accessors to make it explicit which copy we
are accessing at any given time.

We also move the initial offerer value to the SDP offer/answer
class; this is determined on the basis of SDP offer/answer, so
there is no need to hop to the network thread for that.

Work in progress.

Bug: webrtc:9987
Change-Id: Idbe5a7fbf44f667adcd119e486133cf6e43ab1f5
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/251382
Reviewed-by: Tomas Gunnarsson <tommi@webrtc.org>
Commit-Queue: Harald Alvestrand <hta@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#35965}
diff --git a/pc/peer_connection.cc b/pc/peer_connection.cc
index 4e951a5..b80725f 100644
--- a/pc/peer_connection.cc
+++ b/pc/peer_connection.cc
@@ -567,6 +567,7 @@
 
   // port_allocator_ and transport_controller_ live on the network thread and
   // should be destroyed there.
+  transport_controller_copy_ = nullptr;
   network_thread()->Invoke<void>(RTC_FROM_HERE, [this] {
     RTC_DCHECK_RUN_ON(network_thread());
     TeardownDataChannelTransport_n();
@@ -615,20 +616,20 @@
   }
 
   // Network thread initialization.
-  network_thread()->Invoke<void>(RTC_FROM_HERE, [this, &stun_servers,
-                                                 &turn_servers, &configuration,
-                                                 &dependencies] {
-    RTC_DCHECK_RUN_ON(network_thread());
-    network_thread_safety_ = PendingTaskSafetyFlag::Create();
-    InitializePortAllocatorResult pa_result =
-        InitializePortAllocator_n(stun_servers, turn_servers, configuration);
-    // Send information about IPv4/IPv6 status.
-    PeerConnectionAddressFamilyCounter address_family =
-        pa_result.enable_ipv6 ? kPeerConnection_IPv6 : kPeerConnection_IPv4;
-    RTC_HISTOGRAM_ENUMERATION("WebRTC.PeerConnection.IPMetrics", address_family,
-                              kPeerConnectionAddressFamilyCounter_Max);
-    InitializeTransportController_n(configuration, dependencies);
-  });
+  transport_controller_copy_ =
+      network_thread()->Invoke<JsepTransportController*>(RTC_FROM_HERE, [&] {
+        RTC_DCHECK_RUN_ON(network_thread());
+        network_thread_safety_ = PendingTaskSafetyFlag::Create();
+        InitializePortAllocatorResult pa_result = InitializePortAllocator_n(
+            stun_servers, turn_servers, configuration);
+        // Send information about IPv4/IPv6 status.
+        PeerConnectionAddressFamilyCounter address_family =
+            pa_result.enable_ipv6 ? kPeerConnection_IPv6 : kPeerConnection_IPv4;
+        RTC_HISTOGRAM_ENUMERATION("WebRTC.PeerConnection.IPMetrics",
+                                  address_family,
+                                  kPeerConnectionAddressFamilyCounter_Max);
+        return InitializeTransportController_n(configuration, dependencies);
+      });
 
   configuration_ = configuration;
 
@@ -674,7 +675,7 @@
   return RTCError::OK();
 }
 
-void PeerConnection::InitializeTransportController_n(
+JsepTransportController* PeerConnection::InitializeTransportController_n(
     const RTCConfiguration& configuration,
     const PeerConnectionDependencies& dependencies) {
   JsepTransportController::Config config;
@@ -793,6 +794,7 @@
       });
 
   transport_controller_->SetIceConfig(ParseIceConfig(configuration));
+  return transport_controller_.get();
 }
 
 rtc::scoped_refptr<StreamCollectionInterface> PeerConnection::local_streams() {
@@ -914,9 +916,12 @@
 }
 
 RtpTransportInternal* PeerConnection::GetRtpTransport(const std::string& mid) {
+  // TODO(bugs.webrtc.org/9987): Avoid the thread jump.
+  // This might be done by caching the value on the signaling thread.
   RTC_DCHECK_RUN_ON(signaling_thread());
   return network_thread()->Invoke<RtpTransportInternal*>(
       RTC_FROM_HERE, [this, &mid] {
+        RTC_DCHECK_RUN_ON(network_thread());
         auto rtp_transport = transport_controller_->GetRtpTransport(mid);
         RTC_DCHECK(rtp_transport);
         return rtp_transport;
@@ -1506,6 +1511,7 @@
           RTC_FROM_HERE,
           [this, needs_ice_restart, &ice_config, &stun_servers, &turn_servers,
            &modified_config, has_local_description] {
+            RTC_DCHECK_RUN_ON(network_thread());
             // As described in JSEP, calling setConfiguration with new ICE
             // servers or candidate policy must set a "needs-ice-restart" bit so
             // that the next offer triggers an ICE restart which will pick up
@@ -1528,9 +1534,12 @@
 
   if (configuration_.active_reset_srtp_params !=
       modified_config.active_reset_srtp_params) {
-    // TODO(tommi): move to the network thread - this hides an invoke.
-    transport_controller_->SetActiveResetSrtpParams(
-        modified_config.active_reset_srtp_params);
+    // TODO(tommi): merge invokes
+    network_thread()->Invoke<void>(RTC_FROM_HERE, [this, &modified_config] {
+      RTC_DCHECK_RUN_ON(network_thread());
+      transport_controller_->SetActiveResetSrtpParams(
+          modified_config.active_reset_srtp_params);
+    });
   }
 
   if (modified_config.allow_codec_switching.has_value()) {
@@ -1693,7 +1702,13 @@
 rtc::scoped_refptr<DtlsTransport>
 PeerConnection::LookupDtlsTransportByMidInternal(const std::string& mid) {
   RTC_DCHECK_RUN_ON(signaling_thread());
-  return transport_controller_->LookupDtlsTransportByMid(mid);
+  // TODO(bugs.webrtc.org/9987): Avoid the thread jump.
+  // This might be done by caching the value on the signaling thread.
+  return network_thread()->Invoke<rtc::scoped_refptr<DtlsTransport>>(
+      RTC_FROM_HERE, [this, mid]() {
+        RTC_DCHECK_RUN_ON(network_thread());
+        return transport_controller_->LookupDtlsTransportByMid(mid);
+      });
 }
 
 rtc::scoped_refptr<SctpTransportInterface> PeerConnection::GetSctpTransport()
@@ -1793,6 +1808,7 @@
     // TODO(tommi): ^^ That's not exactly optimal since this is yet another
     // blocking hop to the network thread during Close(). Further still, the
     // voice/video/data channels will be cleared on the worker thread.
+    RTC_DCHECK_RUN_ON(network_thread());
     transport_controller_.reset();
     port_allocator_->DiscardCandidatePool();
     if (network_thread_safety_) {
@@ -2146,7 +2162,11 @@
 
   absl::optional<rtc::SSLRole> dtls_role;
   if (sctp_mid_s_) {
-    dtls_role = transport_controller_->GetDtlsRole(*sctp_mid_s_);
+    dtls_role = network_thread()->Invoke<absl::optional<rtc::SSLRole>>(
+        RTC_FROM_HERE, [this] {
+          RTC_DCHECK_RUN_ON(network_thread());
+          return transport_controller_->GetDtlsRole(*sctp_mid_n_);
+        });
     if (!dtls_role && sdp_handler_->is_caller().has_value()) {
       dtls_role =
           *sdp_handler_->is_caller() ? rtc::SSL_SERVER : rtc::SSL_CLIENT;
@@ -2167,7 +2187,11 @@
     return false;
   }
 
-  auto dtls_role = transport_controller_->GetDtlsRole(content_name);
+  auto dtls_role = network_thread()->Invoke<absl::optional<rtc::SSLRole>>(
+      RTC_FROM_HERE, [this, content_name]() {
+        RTC_DCHECK_RUN_ON(network_thread());
+        return transport_controller_->GetDtlsRole(content_name);
+      });
   if (dtls_role) {
     *role = *dtls_role;
     return true;
@@ -2198,7 +2222,7 @@
 
 absl::optional<std::string> PeerConnection::sctp_transport_name() const {
   RTC_DCHECK_RUN_ON(signaling_thread());
-  if (sctp_mid_s_ && transport_controller_)
+  if (sctp_mid_s_ && transport_controller_copy_)
     return sctp_transport_name_s_;
   return absl::optional<std::string>();
 }
@@ -2864,7 +2888,7 @@
       network_thread_safety_,
       [this, mid = *sctp_mid_s_, local_port, remote_port, max_message_size] {
         rtc::scoped_refptr<SctpTransport> sctp_transport =
-            transport_controller()->GetSctpTransport(mid);
+            transport_controller_n()->GetSctpTransport(mid);
         if (sctp_transport)
           sctp_transport->Start(local_port, remote_port, max_message_size);
       }));
diff --git a/pc/peer_connection.h b/pc/peer_connection.h
index 653f5f5..4855d32 100644
--- a/pc/peer_connection.h
+++ b/pc/peer_connection.h
@@ -278,7 +278,7 @@
 
   bool initial_offerer() const override {
     RTC_DCHECK_RUN_ON(signaling_thread());
-    return transport_controller_ && transport_controller_->initial_offerer();
+    return sdp_handler_->initial_offerer();
   }
 
   std::vector<
@@ -357,7 +357,12 @@
   }
   cricket::ChannelManager* channel_manager();
 
-  JsepTransportController* transport_controller() override {
+  JsepTransportController* transport_controller_s() override {
+    RTC_DCHECK_RUN_ON(signaling_thread());
+    return transport_controller_copy_;
+  }
+  JsepTransportController* transport_controller_n() override {
+    RTC_DCHECK_RUN_ON(network_thread());
     return transport_controller_.get();
   }
   cricket::PortAllocator* port_allocator() override {
@@ -463,7 +468,7 @@
   RTCError Initialize(
       const PeerConnectionInterface::RTCConfiguration& configuration,
       PeerConnectionDependencies dependencies);
-  void InitializeTransportController_n(
+  JsepTransportController* InitializeTransportController_n(
       const RTCConfiguration& configuration,
       const PeerConnectionDependencies& dependencies)
       RTC_RUN_ON(network_thread());
@@ -664,9 +669,14 @@
 
   const std::string session_id_;
 
-  std::unique_ptr<JsepTransportController>
-      transport_controller_;  // TODO(bugs.webrtc.org/9987): Accessed on both
-                              // signaling and network thread.
+  // The transport controller is set and used on the network thread.
+  // Some functions pass the value of the transport_controller_ pointer
+  // around as arguments while running on the signaling thread; these
+  // use the transport_controller_copy.
+  std::unique_ptr<JsepTransportController> transport_controller_
+      RTC_GUARDED_BY(network_thread());
+  JsepTransportController* transport_controller_copy_
+      RTC_GUARDED_BY(signaling_thread()) = nullptr;
 
   // `sctp_mid_` is the content name (MID) in SDP.
   // Note: this is used as the data channel MID by both SCTP and data channel
diff --git a/pc/peer_connection_internal.h b/pc/peer_connection_internal.h
index 38a1fd1..16caade 100644
--- a/pc/peer_connection_internal.h
+++ b/pc/peer_connection_internal.h
@@ -71,7 +71,8 @@
   // return the RTCConfiguration.crypto_options if set and will only default
   // back to the PeerConnectionFactory settings if nothing was set.
   virtual CryptoOptions GetCryptoOptions() = 0;
-  virtual JsepTransportController* transport_controller() = 0;
+  virtual JsepTransportController* transport_controller_s() = 0;
+  virtual JsepTransportController* transport_controller_n() = 0;
   virtual DataChannelController* data_channel_controller() = 0;
   virtual cricket::PortAllocator* port_allocator() = 0;
   virtual StatsCollector* stats() = 0;
diff --git a/pc/sdp_offer_answer.cc b/pc/sdp_offer_answer.cc
index 586c0e5..8acc19b 100644
--- a/pc/sdp_offer_answer.cc
+++ b/pc/sdp_offer_answer.cc
@@ -1228,7 +1228,8 @@
           pc_->dtls_enabled(), std::move(dependencies.cert_generator),
           certificate,
           [this](const rtc::scoped_refptr<rtc::RTCCertificate>& certificate) {
-            transport_controller()->SetLocalCertificate(certificate);
+            RTC_DCHECK_RUN_ON(signaling_thread());
+            transport_controller_s()->SetLocalCertificate(certificate);
           });
 
   if (pc_->options()->disable_encryption) {
@@ -1265,12 +1266,19 @@
   }
   return pc_->rtp_manager()->transceivers();
 }
-JsepTransportController* SdpOfferAnswerHandler::transport_controller() {
-  return pc_->transport_controller();
+JsepTransportController* SdpOfferAnswerHandler::transport_controller_s() {
+  return pc_->transport_controller_s();
 }
-const JsepTransportController* SdpOfferAnswerHandler::transport_controller()
+JsepTransportController* SdpOfferAnswerHandler::transport_controller_n() {
+  return pc_->transport_controller_n();
+}
+const JsepTransportController* SdpOfferAnswerHandler::transport_controller_s()
     const {
-  return pc_->transport_controller();
+  return pc_->transport_controller_s();
+}
+const JsepTransportController* SdpOfferAnswerHandler::transport_controller_n()
+    const {
+  return pc_->transport_controller_n();
 }
 DataChannelController* SdpOfferAnswerHandler::data_channel_controller() {
   return pc_->data_channel_controller();
@@ -1314,6 +1322,10 @@
   return context_->signaling_thread();
 }
 
+rtc::Thread* SdpOfferAnswerHandler::network_thread() const {
+  return context_->network_thread();
+}
+
 void SdpOfferAnswerHandler::CreateOffer(
     CreateSessionDescriptionObserver* observer,
     const PeerConnectionInterface::RTCOfferAnswerOptions& options) {
@@ -1502,6 +1514,9 @@
     replaced_local_description = std::move(pending_local_description_);
     pending_local_description_ = std::move(desc);
   }
+  if (!initial_offerer_) {
+    initial_offerer_.emplace(type == SdpType::kOffer);
+  }
   // The session description to apply now must be accessed by
   // `local_description()`.
   RTC_DCHECK(local_description());
@@ -1547,7 +1562,7 @@
       // information about DTLS transports.
       if (transceiver->mid()) {
         auto dtls_transport = LookupDtlsTransportByMid(
-            context_->network_thread(), transport_controller(),
+            context_->network_thread(), transport_controller_s(),
             *transceiver->mid());
         transceiver->sender_internal()->set_transport(dtls_transport);
         transceiver->receiver_internal()->set_transport(dtls_transport);
@@ -1792,7 +1807,7 @@
                             *session_desc);
 
   // NOTE: This will perform an Invoke() to the network thread.
-  return transport_controller()->SetRemoteDescription(sdp_type, session_desc);
+  return transport_controller_s()->SetRemoteDescription(sdp_type, session_desc);
 }
 
 void SdpOfferAnswerHandler::ApplyRemoteDescription(
@@ -1982,7 +1997,7 @@
       // 2.2.8.1.11.[3-6]: Set the transport internal slots.
       if (transceiver->mid()) {
         auto dtls_transport = LookupDtlsTransportByMid(
-            context_->network_thread(), transport_controller(),
+            context_->network_thread(), transport_controller_s(),
             *transceiver->mid());
         transceiver->sender_internal()->set_transport(dtls_transport);
         transceiver->receiver_internal()->set_transport(dtls_transport);
@@ -2190,7 +2205,7 @@
   // MaybeStartGathering needs to be called after informing the observer so that
   // we don't signal any candidates before signaling that SetLocalDescription
   // completed.
-  transport_controller()->MaybeStartGathering();
+  transport_controller_s()->MaybeStartGathering();
 }
 
 void SdpOfferAnswerHandler::DoCreateOffer(
@@ -2572,7 +2587,7 @@
   }
 
   // Remove the candidates from the transport controller.
-  RTCError error = transport_controller()->RemoveRemoteCandidates(candidates);
+  RTCError error = transport_controller_s()->RemoveRemoteCandidates(candidates);
   if (!error.ok()) {
     RTC_LOG(LS_ERROR)
         << "RemoveIceCandidates: Error when removing remote candidates: "
@@ -2920,7 +2935,7 @@
     transceiver->internal()->set_mid(state.mid());
     transceiver->internal()->set_mline_index(state.mline_index());
   }
-  RTCError e = transport_controller()->RollbackTransports();
+  RTCError e = transport_controller_s()->RollbackTransports();
   if (!e.ok()) {
     return e;
   }
@@ -2995,7 +3010,8 @@
 
 absl::optional<rtc::SSLRole> SdpOfferAnswerHandler::GetDtlsRole(
     const std::string& mid) const {
-  return transport_controller()->GetDtlsRole(mid);
+  RTC_DCHECK_RUN_ON(signaling_thread());
+  return transport_controller_s()->GetDtlsRole(mid);
 }
 
 void SdpOfferAnswerHandler::UpdateNegotiationNeeded() {
@@ -3565,8 +3581,11 @@
         return RTCError(RTCErrorType::INTERNAL_ERROR,
                         "Failed to create channel for mid=" + content.name);
       }
+      // Note: this is a thread hop; the lambda will be executed
+      // on the network thread.
       transceiver->internal()->SetChannel(channel, [&](const std::string& mid) {
-        return transport_controller()->GetRtpTransport(mid);
+        RTC_DCHECK_RUN_ON(network_thread());
+        return transport_controller_n()->GetRtpTransport(mid);
       });
     }
   }
@@ -4491,13 +4510,13 @@
   if (source == cricket::CS_LOCAL) {
     const SessionDescriptionInterface* sdesc = local_description();
     RTC_DCHECK(sdesc);
-    return transport_controller()->SetLocalDescription(type,
-                                                       sdesc->description());
+    return transport_controller_s()->SetLocalDescription(type,
+                                                         sdesc->description());
   } else {
     const SessionDescriptionInterface* sdesc = remote_description();
     RTC_DCHECK(sdesc);
-    return transport_controller()->SetRemoteDescription(type,
-                                                        sdesc->description());
+    return transport_controller_s()->SetRemoteDescription(type,
+                                                          sdesc->description());
   }
 }
 
@@ -4749,7 +4768,8 @@
     }
     rtp_manager()->GetAudioTransceiver()->internal()->SetChannel(
         voice_channel, [&](const std::string& mid) {
-          return transport_controller()->GetRtpTransport(mid);
+          RTC_DCHECK_RUN_ON(network_thread());
+          return transport_controller_n()->GetRtpTransport(mid);
         });
   }
 
@@ -4763,7 +4783,8 @@
     }
     rtp_manager()->GetVideoTransceiver()->internal()->SetChannel(
         video_channel, [&](const std::string& mid) {
-          return transport_controller()->GetRtpTransport(mid);
+          RTC_DCHECK_RUN_ON(network_thread());
+          return transport_controller_n()->GetRtpTransport(mid);
         });
   }
 
diff --git a/pc/sdp_offer_answer.h b/pc/sdp_offer_answer.h
index c9c3046..67ead47 100644
--- a/pc/sdp_offer_answer.h
+++ b/pc/sdp_offer_answer.h
@@ -182,6 +182,14 @@
   rtc::scoped_refptr<StreamCollectionInterface> local_streams();
   rtc::scoped_refptr<StreamCollectionInterface> remote_streams();
 
+  bool initial_offerer() {
+    RTC_DCHECK_RUN_ON(signaling_thread());
+    if (initial_offerer_) {
+      return *initial_offerer_;
+    }
+    return false;
+  }
+
  private:
   class RemoteDescriptionOperation;
   class ImplicitCreateSessionDescriptionObserver;
@@ -215,6 +223,7 @@
       PeerConnectionDependencies& dependencies);
 
   rtc::Thread* signaling_thread() const;
+  rtc::Thread* network_thread() const;
   // Non-const versions of local_description()/remote_description(), for use
   // internally.
   SessionDescriptionInterface* mutable_local_description()
@@ -587,8 +596,14 @@
   const cricket::PortAllocator* port_allocator() const;
   RtpTransmissionManager* rtp_manager();
   const RtpTransmissionManager* rtp_manager() const;
-  JsepTransportController* transport_controller();
-  const JsepTransportController* transport_controller() const;
+  JsepTransportController* transport_controller_s()
+      RTC_RUN_ON(signaling_thread());
+  const JsepTransportController* transport_controller_s() const
+      RTC_RUN_ON(signaling_thread());
+  JsepTransportController* transport_controller_n()
+      RTC_RUN_ON(network_thread());
+  const JsepTransportController* transport_controller_n() const
+      RTC_RUN_ON(network_thread());
   // ===================================================================
   const cricket::AudioOptions& audio_options() { return audio_options_; }
   const cricket::VideoOptions& video_options() { return video_options_; }
@@ -683,6 +698,10 @@
   std::unique_ptr<webrtc::VideoBitrateAllocatorFactory>
       video_bitrate_allocator_factory_ RTC_GUARDED_BY(signaling_thread());
 
+  // Whether we are the initial offerer on the association. This
+  // determines the SSL role.
+  absl::optional<bool> initial_offerer_ RTC_GUARDED_BY(signaling_thread());
+
   rtc::WeakPtrFactory<SdpOfferAnswerHandler> weak_ptr_factory_
       RTC_GUARDED_BY(signaling_thread());
 };
diff --git a/pc/test/fake_peer_connection_base.h b/pc/test/fake_peer_connection_base.h
index 3462c8c..7d64ab8 100644
--- a/pc/test/fake_peer_connection_base.h
+++ b/pc/test/fake_peer_connection_base.h
@@ -309,7 +309,8 @@
   }
 
   CryptoOptions GetCryptoOptions() override { return CryptoOptions(); }
-  JsepTransportController* transport_controller() override { return nullptr; }
+  JsepTransportController* transport_controller_s() override { return nullptr; }
+  JsepTransportController* transport_controller_n() override { return nullptr; }
   DataChannelController* data_channel_controller() override { return nullptr; }
   cricket::PortAllocator* port_allocator() override { return nullptr; }
   StatsCollector* stats() override { return nullptr; }