Full Cast Standalone Sender (Mirroring)

This patch adds all the necessary sender-side logic for talking to the
agent on a Cast Receiver, to LAUNCH the Mirroring App, and to estabilsh
a VirtualConnection for Mirroring App message routing.

It heavily modifies the role of LoopingFileCastAgent: This class now
becomes a "single-use workflow" that walks through all the steps needed
for establishing streaming and tearing it down.

With this patch, the Standalone Sender can now be used with the
Standalone Receiver once again.

Bug: b/162542369
Change-Id: Ib7d990995b5a036bd0299806a42d4a92f89d9209
Reviewed-on: https://chromium-review.googlesource.com/c/openscreen/+/2586061
Commit-Queue: Yuri Wiitala <miu@chromium.org>
Reviewed-by: Jordan Bayles <jophba@chromium.org>
diff --git a/cast/standalone_sender/looping_file_cast_agent.cc b/cast/standalone_sender/looping_file_cast_agent.cc
index 6f563e2..1a90cbb 100644
--- a/cast/standalone_sender/looping_file_cast_agent.cc
+++ b/cast/standalone_sender/looping_file_cast_agent.cc
@@ -8,10 +8,14 @@
 #include <utility>
 #include <vector>
 
+#include "cast/common/channel/message_util.h"
 #include "cast/standalone_sender/looping_file_sender.h"
 #include "cast/streaming/capture_recommendations.h"
 #include "cast/streaming/constants.h"
 #include "cast/streaming/offer_messages.h"
+#include "json/value.h"
+#include "platform/api/tls_connection_factory.h"
+#include "util/stringprintf.h"
 #include "util/trace_logging.h"
 
 namespace openscreen {
@@ -20,25 +24,68 @@
 
 using DeviceMediaPolicy = SenderSocketFactory::DeviceMediaPolicy;
 
-}  // namespace
+// TODO(miu): These string constants appear in a few places and should be
+// de-duped to a common location.
+constexpr char kMirroringAppId[] = "0F5096E8";
+constexpr char kMirroringAudioOnlyAppId[] = "85CDB22F";
 
-LoopingFileCastAgent::LoopingFileCastAgent(TaskRunner* task_runner)
-    : task_runner_(task_runner) {
-  message_port_ =
-      MakeSerialDelete<CastSocketMessagePort>(task_runner_, &router_);
-  socket_factory_ =
-      MakeSerialDelete<SenderSocketFactory>(task_runner_, this, task_runner_);
-  connection_factory_ = SerialDeletePtr<TlsConnectionFactory>(
-      task_runner_,
-      TlsConnectionFactory::CreateFactory(socket_factory_.get(), task_runner_)
-          .release());
-  socket_factory_->set_factory(connection_factory_.get());
+// Parses the given string as a JSON object. If the parse fails, an empty object
+// is returned.
+//
+// TODO(miu): De-dupe this code (same as in cast/receiver/application_agent.cc)!
+Json::Value ParseAsObject(absl::string_view value) {
+  ErrorOr<Json::Value> parsed = json::Parse(value);
+  if (parsed.is_value() && parsed.value().isObject()) {
+    return std::move(parsed.value());
+  }
+  return Json::Value(Json::objectValue);
 }
 
-LoopingFileCastAgent::~LoopingFileCastAgent() = default;
+// Returns true if the 'type' field in |object| has the given |type|.
+//
+// TODO(miu): De-dupe this code (same as in cast/receiver/application_agent.cc)!
+bool HasType(const Json::Value& object, CastMessageType type) {
+  OSP_DCHECK(object.isObject());
+  const Json::Value& value =
+      object.get(kMessageKeyType, Json::Value::nullSingleton());
+  return value.isString() && value.asString() == CastMessageTypeToString(type);
+}
+
+// Returns the string found in object[field] if possible; otherwise, returns
+// |fallback|. The fallback string is returned if |object| is not an object or
+// the |field| key does not reference a string within the object.
+std::string ExtractStringFieldValue(const Json::Value& object,
+                                    const char* field,
+                                    std::string fallback = {}) {
+  if (object.isObject() && object[field].isString()) {
+    return object[field].asString();
+  }
+  return fallback;
+}
+
+}  // namespace
+
+LoopingFileCastAgent::LoopingFileCastAgent(TaskRunner* task_runner,
+                                           ShutdownCallback shutdown_callback)
+    : task_runner_(task_runner),
+      shutdown_callback_(std::move(shutdown_callback)),
+      connection_handler_(&router_, this),
+      socket_factory_(this, task_runner_),
+      connection_factory_(
+          TlsConnectionFactory::CreateFactory(&socket_factory_, task_runner_)),
+      message_port_(&router_) {
+  router_.AddHandlerForLocalId(kPlatformSenderId, this);
+  socket_factory_.set_factory(connection_factory_.get());
+}
+
+LoopingFileCastAgent::~LoopingFileCastAgent() {
+  Shutdown();
+}
 
 void LoopingFileCastAgent::Connect(ConnectionSettings settings) {
   TRACE_DEFAULT_SCOPED(TraceCategory::kStandaloneSender);
+
+  OSP_DCHECK(!connection_settings_);
   connection_settings_ = std::move(settings);
   const auto policy = connection_settings_->should_include_video
                           ? DeviceMediaPolicy::kIncludesVideo
@@ -46,19 +93,8 @@
 
   task_runner_->PostTask([this, policy] {
     wake_lock_ = ScopedWakeLock::Create(task_runner_);
-    socket_factory_->Connect(connection_settings_->receiver_endpoint, policy,
-                             &router_);
-  });
-}
-
-void LoopingFileCastAgent::Stop() {
-  task_runner_->PostTask([this] {
-    StopCurrentSession();
-
-    connection_factory_.reset();
-    connection_settings_.reset();
-    socket_factory_.reset();
-    wake_lock_.reset();
+    socket_factory_.Connect(connection_settings_->receiver_endpoint, policy,
+                            &router_);
   });
 }
 
@@ -66,32 +102,192 @@
                                        const IPEndpoint& endpoint,
                                        std::unique_ptr<CastSocket> socket) {
   TRACE_DEFAULT_SCOPED(TraceCategory::kStandaloneSender);
-  if (current_session_) {
+
+  if (message_port_.GetSocketId() != ToCastSocketId(nullptr)) {
     OSP_LOG_WARN << "Already connected, dropping peer at: " << endpoint;
     return;
   }
-
-  OSP_LOG_INFO << "Received connection from peer at: " << endpoint;
-  message_port_->SetSocket(socket->GetWeakPtr());
+  message_port_.SetSocket(socket->GetWeakPtr());
   router_.TakeSocket(this, std::move(socket));
-  CreateAndStartSession();
+
+  OSP_LOG_INFO << "Launching Mirroring App on the Cast Receiver...";
+  static constexpr char kLaunchMessageTemplate[] =
+      R"({"type":"LAUNCH", "requestId":%d, "appId":"%s"})";
+  router_.Send(VirtualConnection{kPlatformSenderId, kPlatformReceiverId,
+                                 message_port_.GetSocketId()},
+               MakeSimpleUTF8Message(
+                   kReceiverNamespace,
+                   StringPrintf(kLaunchMessageTemplate, next_request_id_++,
+                                GetMirroringAppId())));
 }
 
 void LoopingFileCastAgent::OnError(SenderSocketFactory* factory,
                                    const IPEndpoint& endpoint,
                                    Error error) {
   OSP_LOG_ERROR << "Cast agent received socket factory error: " << error;
-  StopCurrentSession();
+  Shutdown();
 }
 
 void LoopingFileCastAgent::OnClose(CastSocket* cast_socket) {
   OSP_VLOG << "Cast agent socket closed.";
-  StopCurrentSession();
+  Shutdown();
 }
 
 void LoopingFileCastAgent::OnError(CastSocket* socket, Error error) {
   OSP_LOG_ERROR << "Cast agent received socket error: " << error;
-  StopCurrentSession();
+  Shutdown();
+}
+
+bool LoopingFileCastAgent::IsConnectionAllowed(
+    const VirtualConnection& virtual_conn) const {
+  return true;
+}
+
+void LoopingFileCastAgent::OnMessage(VirtualConnectionRouter* router,
+                                     CastSocket* socket,
+                                     ::cast::channel::CastMessage message) {
+  if (message_port_.GetSocketId() == ToCastSocketId(socket) &&
+      !message_port_.client_sender_id().empty() &&
+      message_port_.client_sender_id() == message.destination_id()) {
+    OSP_DCHECK(message_port_.client_sender_id() != kPlatformSenderId);
+    message_port_.OnMessage(router, socket, std::move(message));
+    return;
+  }
+
+  if (message.destination_id() != kPlatformSenderId &&
+      message.destination_id() != kBroadcastId) {
+    return;  // Message not for us.
+  }
+
+  if (message.namespace_() == kReceiverNamespace &&
+      message_port_.GetSocketId() == ToCastSocketId(socket)) {
+    const Json::Value payload = ParseAsObject(message.payload_utf8());
+    if (HasType(payload, CastMessageType::kReceiverStatus)) {
+      HandleReceiverStatus(payload);
+    } else if (HasType(payload, CastMessageType::kLaunchError)) {
+      OSP_LOG_ERROR
+          << "Failed to launch the Cast Mirroring App on the Receiver! Reason: "
+          << ExtractStringFieldValue(payload, kMessageKeyReason, "UNKNOWN");
+      Shutdown();
+    } else if (HasType(payload, CastMessageType::kInvalidRequest)) {
+      OSP_LOG_ERROR << "Cast Receiver thinks our request is invalid: "
+                    << ExtractStringFieldValue(payload, kMessageKeyReason,
+                                               "UNKNOWN");
+    }
+  }
+}
+
+const char* LoopingFileCastAgent::GetMirroringAppId() const {
+  if (connection_settings_ && !connection_settings_->should_include_video) {
+    return kMirroringAudioOnlyAppId;
+  }
+  return kMirroringAppId;
+}
+
+void LoopingFileCastAgent::HandleReceiverStatus(const Json::Value& status) {
+  const Json::Value& details =
+      (status[kMessageKeyStatus].isObject() &&
+       status[kMessageKeyStatus][kMessageKeyApplications].isArray())
+          ? status[kMessageKeyStatus][kMessageKeyApplications][0]
+          : Json::Value();
+
+  const std::string& running_app_id =
+      ExtractStringFieldValue(details, kMessageKeyAppId);
+  if (running_app_id != GetMirroringAppId()) {
+    // The mirroring app is not running. If it was just stopped, Shutdown() will
+    // tear everything down. If it has been stopped already, Shutdown() is a
+    // no-op.
+    Shutdown();
+    return;
+  }
+
+  const std::string& session_id =
+      ExtractStringFieldValue(details, kMessageKeySessionId);
+  if (session_id.empty()) {
+    OSP_LOG_ERROR
+        << "Cannot continue: Cast Receiver did not provide a session ID for "
+           "the Mirroring App running on it.";
+    Shutdown();
+    return;
+  }
+  if (app_session_id_ != session_id) {
+    if (app_session_id_.empty()) {
+      app_session_id_ = session_id;
+    } else {
+      OSP_LOG_ERROR << "Cannot continue: Different Mirroring App session is "
+                       "now running on the Cast Receiver.";
+      Shutdown();
+      return;
+    }
+  }
+
+  if (remote_connection_) {
+    // The mirroring app is running and this LoopingFileCastAgent is already
+    // streaming to it (or is awaiting message routing to be established). There
+    // are no additional actions to be taken in response to this extra
+    // RECEIVER_STATUS message.
+    return;
+  }
+
+  const std::string& message_destination_id =
+      ExtractStringFieldValue(details, kMessageKeyTransportId);
+  if (message_destination_id.empty()) {
+    OSP_LOG_ERROR
+        << "Cannot continue: Cast Receiver did not provide a transport ID for "
+           "routing messages to the Mirroring App running on it.";
+    Shutdown();
+    return;
+  }
+
+  remote_connection_.emplace(
+      VirtualConnection{MakeUniqueSessionId("streaming_sender"),
+                        message_destination_id, message_port_.GetSocketId()});
+  OSP_LOG_INFO << "Starting-up message routing to the Cast Receiver's "
+                  "Mirroring App (sessionId="
+               << app_session_id_ << ")...";
+  connection_handler_.OpenRemoteConnection(
+      *remote_connection_,
+      [this](bool success) { OnRemoteMessagingOpened(success); });
+}
+
+void LoopingFileCastAgent::OnRemoteMessagingOpened(bool success) {
+  if (!remote_connection_) {
+    return;  // Shutdown() was called in the meantime.
+  }
+
+  if (success) {
+    OSP_LOG_INFO << "Starting streaming session...";
+    CreateAndStartSession();
+  } else {
+    OSP_LOG_INFO << "Failed to establish messaging to the Cast Receiver's "
+                    "Mirroring App. Perhaps another Cast Sender is using it?";
+    Shutdown();
+  }
+}
+
+void LoopingFileCastAgent::CreateAndStartSession() {
+  TRACE_DEFAULT_SCOPED(TraceCategory::kStandaloneSender);
+
+  environment_ =
+      std::make_unique<Environment>(&Clock::now, task_runner_, IPEndpoint{});
+  OSP_DCHECK(remote_connection_.has_value());
+  current_session_ = std::make_unique<SenderSession>(
+      connection_settings_->receiver_endpoint.address, this, environment_.get(),
+      &message_port_, remote_connection_->local_id,
+      remote_connection_->peer_id);
+  OSP_DCHECK(!message_port_.client_sender_id().empty());
+
+  AudioCaptureConfig audio_config;
+  VideoCaptureConfig video_config;
+  // Use default display resolution of 1080P.
+  video_config.resolutions.emplace_back(DisplayResolution{});
+
+  OSP_VLOG << "Starting session negotiation.";
+  const Error negotiation_error =
+      current_session_->Negotiate({audio_config}, {video_config});
+  if (!negotiation_error.ok()) {
+    OSP_LOG_ERROR << "Failed to negotiate a session: " << negotiation_error;
+  }
 }
 
 void LoopingFileCastAgent::OnNegotiated(
@@ -110,39 +306,54 @@
       connection_settings_->max_bitrate);
 }
 
-// Currently, we just kill the session if an error is encountered.
 void LoopingFileCastAgent::OnError(const SenderSession* session, Error error) {
-  OSP_LOG_ERROR << "Cast agent received sender session error: " << error;
-  StopCurrentSession();
+  OSP_LOG_ERROR << "SenderSession fatal error: " << error;
+  Shutdown();
 }
 
-void LoopingFileCastAgent::CreateAndStartSession() {
+void LoopingFileCastAgent::Shutdown() {
   TRACE_DEFAULT_SCOPED(TraceCategory::kStandaloneSender);
-  environment_ =
-      std::make_unique<Environment>(&Clock::now, task_runner_, IPEndpoint{});
-  current_session_ = std::make_unique<SenderSession>(
-      connection_settings_->receiver_endpoint.address, this, environment_.get(),
-      message_port_.get());
 
-  AudioCaptureConfig audio_config;
-  VideoCaptureConfig video_config;
-  // Use default display resolution of 1080P.
-  video_config.resolutions.emplace_back(DisplayResolution{});
-
-  OSP_VLOG << "Starting session negotiation.";
-  const Error negotiation_error =
-      current_session_->Negotiate({audio_config}, {video_config});
-  if (!negotiation_error.ok()) {
-    OSP_LOG_ERROR << "Failed to negotiate a session: " << negotiation_error;
-  }
-}
-
-void LoopingFileCastAgent::StopCurrentSession() {
-  current_session_.reset();
-  environment_.reset();
   file_sender_.reset();
-  router_.CloseSocket(message_port_->GetSocketId());
-  message_port_->SetSocket(nullptr);
+  if (current_session_) {
+    OSP_LOG_INFO << "Stopping mirroring session...";
+    current_session_.reset();
+  }
+  OSP_DCHECK(message_port_.client_sender_id().empty());
+  environment_.reset();
+
+  if (remote_connection_) {
+    const VirtualConnection connection = *remote_connection_;
+    // Reset |remote_connection_| because ConnectionNamespaceHandler may
+    // call-back into OnRemoteMessagingOpened().
+    remote_connection_.reset();
+    connection_handler_.CloseRemoteConnection(connection);
+  }
+
+  if (!app_session_id_.empty()) {
+    OSP_LOG_INFO << "Stopping the Cast Receiver's Mirroring App...";
+    static constexpr char kStopMessageTemplate[] =
+        R"({"type":"STOP", "requestId":%d, "sessionId":"%s"})";
+    std::string stop_json = StringPrintf(
+        kStopMessageTemplate, next_request_id_++, app_session_id_.c_str());
+    router_.Send(
+        VirtualConnection{kPlatformSenderId, kPlatformReceiverId,
+                          message_port_.GetSocketId()},
+        MakeSimpleUTF8Message(kReceiverNamespace, std::move(stop_json)));
+    app_session_id_.clear();
+  }
+
+  if (message_port_.GetSocketId() != ToCastSocketId(nullptr)) {
+    router_.CloseSocket(message_port_.GetSocketId());
+    message_port_.SetSocket({});
+  }
+
+  wake_lock_.reset();
+
+  if (shutdown_callback_) {
+    const ShutdownCallback callback = std::move(shutdown_callback_);
+    callback();
+  }
 }
 
 }  // namespace cast
diff --git a/cast/standalone_sender/looping_file_cast_agent.h b/cast/standalone_sender/looping_file_cast_agent.h
index eae7a8b..74ca7f3 100644
--- a/cast/standalone_sender/looping_file_cast_agent.h
+++ b/cast/standalone_sender/looping_file_cast_agent.h
@@ -7,12 +7,15 @@
 
 #include <openssl/x509.h>
 
+#include <functional>
 #include <memory>
 #include <string>
 #include <vector>
 
 #include "absl/types/optional.h"
+#include "cast/common/channel/cast_message_handler.h"
 #include "cast/common/channel/cast_socket_message_port.h"
+#include "cast/common/channel/connection_namespace_handler.h"
 #include "cast/common/channel/virtual_connection_router.h"
 #include "cast/common/public/cast_socket.h"
 #include "cast/sender/public/sender_socket_factory.h"
@@ -25,23 +28,58 @@
 #include "platform/base/interface_info.h"
 #include "platform/impl/task_runner.h"
 
+namespace Json {
+class Value;
+}
+
 namespace openscreen {
 namespace cast {
 
-// This class manages sender connections, starting with listening over TLS for
-// connection attempts, constructing SenderSessions when OFFER messages are
-// received, and linking Senders to the output decoder and SDL visualizer.
+// A single-use sender-side Cast Agent that manages the workflow for a mirroring
+// session, casting the content from a local file indefinitely. After being
+// constructed and having its Connect() method called, the LoopingFileCastAgent
+// steps through the following workflow:
+//
+//   1. Waits for a CastSocket representing a successful connection to a remote
+//      Cast Receiver's agent.
+//   2. Sends a LAUNCH request to the Cast Receiver to start its Mirroring App.
+//   3. Waits for a RECEIVER_STATUS message from the Receiver indicating launch
+//      success, or a LAUNCH_ERROR.
+//   4. Once launched, message routing (i.e., a VirtualConnection) is requested,
+//      for messaging between the SenderSession (locally) and the remote
+//      Mirroring App.
+//   5. Once message routing is established, the local SenderSession is created
+//      and begins the mirroring-specific OFFER/ANSWER messaging to negotiate
+//      the streaming parameters.
+//   6. Streaming commences.
+//
+// If at any point an error occurs, the LoopingFileCastAgent executes a clean
+// shut-down (both locally, and with the remote Cast Receiver), and then invokes
+// the ShutdownCallback that was passed to the constructor.
+//
+// Normal shutdown happens when either:
+//
+//   1. Receiver-side, the Mirroring App is shut down. This will cause the
+//      ShutdownCallback passed to the constructor to be invoked.
+//   2. This LoopingFileCastAgent is destroyed (automatic shutdown is part of
+//      the destruction procedure).
 class LoopingFileCastAgent final
     : public SenderSocketFactory::Client,
       public VirtualConnectionRouter::SocketErrorHandler,
+      public ConnectionNamespaceHandler::VirtualConnectionPolicy,
+      public CastMessageHandler,
       public SenderSession::Client {
  public:
-  explicit LoopingFileCastAgent(TaskRunner* task_runner);
+  using ShutdownCallback = std::function<void()>;
+
+  // |shutdown_callback| is invoked after normal shutdown, whether initiated
+  // sender- or receiver-side; or, for any fatal error.
+  LoopingFileCastAgent(TaskRunner* task_runner,
+                       ShutdownCallback shutdown_callback);
   ~LoopingFileCastAgent();
 
   struct ConnectionSettings {
-    // The endpoint of the receiver we wish to connect to. Eventually this
-    // will come from discovery, instead of an endpoint here.
+    // The endpoint of the receiver we wish to connect to.
     IPEndpoint receiver_endpoint;
 
     // The path to the file that we want to play.
@@ -59,9 +97,12 @@
     bool use_android_rtp_hack = true;
   };
 
+  // Connect to a Cast Receiver, and start the workflow to establish a
+  // mirroring/streaming session. Destroy the LoopingFileCastAgent to shutdown
+  // and disconnect.
   void Connect(ConnectionSettings settings);
-  void Stop();
 
+ private:
   // SenderSocketFactory::Client overrides.
   void OnConnected(SenderSocketFactory* factory,
                    const IPEndpoint& endpoint,
@@ -74,6 +115,33 @@
   void OnClose(CastSocket* cast_socket) override;
   void OnError(CastSocket* socket, Error error) override;
 
+  // ConnectionNamespaceHandler::VirtualConnectionPolicy overrides.
+  bool IsConnectionAllowed(
+      const VirtualConnection& virtual_conn) const override;
+
+  // CastMessageHandler overrides.
+  void OnMessage(VirtualConnectionRouter* router,
+                 CastSocket* socket,
+                 ::cast::channel::CastMessage message) override;
+
+  // Returns the Cast application ID for either A/V mirroring or audio-only
+  // mirroring, as configured by the ConnectionSettings.
+  const char* GetMirroringAppId() const;
+
+  // Called by OnMessage() to determine whether the Cast Receiver has launched
+  // or unlaunched the Mirroring App. If the former, a VirtualConnection is
+  // requested. Otherwise, the workflow is aborted and Shutdown() is called.
+  void HandleReceiverStatus(const Json::Value& status);
+
+  // Called by the |connection_handler_| after message routing to the Cast
+  // Receiver's Mirroring App has been established (if |success| is true).
+  void OnRemoteMessagingOpened(bool success);
+
+  // Once we have a connection to the receiver we need to create and start
+  // a sender session. This method results in the OFFER/ANSWER exchange
+  // being completed and a session should be started.
+  void CreateAndStartSession();
+
   // SenderSession::Client overrides.
   void OnNegotiated(const SenderSession* session,
                     SenderSession::ConfiguredSenders senders,
@@ -81,32 +149,38 @@
                         capture_recommendations) override;
   void OnError(const SenderSession* session, Error error) override;
 
- private:
-  // Once we have a connection to the receiver we need to create and start
-  // a sender session. This method results in the OFFER/ANSWER exchange
-  // being completed and a session should be started.
-  void CreateAndStartSession();
-
-  // Helper for stopping the current session. This is useful for when we don't
-  // want to completely stop (e.g. an issue with a specific Sender) but need
-  // to terminate the current connection.
-  void StopCurrentSession();
+  // Helper for stopping the current session, and/or unwinding a remote
+  // connection request (pre-session). This ensures LoopingFileCastAgent is in a
+  // terminal shutdown state.
+  void Shutdown();
 
   // Member variables set as part of construction.
   TaskRunner* const task_runner_;
+  ShutdownCallback shutdown_callback_;
   VirtualConnectionRouter router_;
-  SerialDeletePtr<CastSocketMessagePort> message_port_;
-  SerialDeletePtr<SenderSocketFactory> socket_factory_;
-  SerialDeletePtr<TlsConnectionFactory> connection_factory_;
+  ConnectionNamespaceHandler connection_handler_;
+  SenderSocketFactory socket_factory_;
+  std::unique_ptr<TlsConnectionFactory> connection_factory_;
+  CastSocketMessagePort message_port_;
 
-  // Member variables set as part of starting up.
-  std::unique_ptr<Environment> environment_;
+  // Counter for distinguishing request messages sent to the Cast Receiver.
+  int next_request_id_ = 1;
+
+  // Initialized by Connect().
   absl::optional<ConnectionSettings> connection_settings_;
   SerialDeletePtr<ScopedWakeLock> wake_lock_;
 
-  // Member variables set as part of a sender connection.
-  // NOTE: currently we only support a single sender connection and a
-  // single streaming session.
+  // If non-empty, this is the sessionId associated with the Cast Receiver
+  // application that this LoopingFileCastAgent launched.
+  std::string app_session_id_;
+
+  // This is set once LoopingFileCastAgent has requested to start messaging to
+  // the mirroring app on a Cast Receiver.
+  absl::optional<VirtualConnection> remote_connection_;
+
+  // Member variables set while a streaming to the mirroring app on a Cast
+  // Receiver.
+  std::unique_ptr<Environment> environment_;
   std::unique_ptr<SenderSession> current_session_;
   std::unique_ptr<LoopingFileSender> file_sender_;
 };
diff --git a/cast/standalone_sender/main.cc b/cast/standalone_sender/main.cc
index f99fa3e..3a0dca6 100644
--- a/cast/standalone_sender/main.cc
+++ b/cast/standalone_sender/main.cc
@@ -198,9 +198,12 @@
     }
   }
 
-  std::unique_ptr<LoopingFileCastAgent> cast_agent;
+  // |cast_agent| must be constructed and destroyed from a Task run by the
+  // TaskRunner.
+  LoopingFileCastAgent* cast_agent = nullptr;
   task_runner->PostTask([&] {
-    cast_agent = std::make_unique<LoopingFileCastAgent>(task_runner);
+    cast_agent = new LoopingFileCastAgent(
+        task_runner, [&] { task_runner->RequestStopSoon(); });
     cast_agent->Connect({remote_endpoint, path, max_bitrate,
                          true /* should_include_video */,
                          use_android_rtp_hack});
@@ -210,6 +213,16 @@
   // SIGTERM are signaled.
   task_runner->RunUntilSignaled();
 
+  // Spin the TaskRunner to destroy the |cast_agent| and execute any lingering
+  // destruction/shutdown tasks.
+  OSP_LOG_INFO << "Shutting down...";
+  task_runner->PostTask([&] {
+    delete cast_agent;
+    task_runner->RequestStopSoon();
+  });
+  task_runner->RunUntilStopped();
+  OSP_LOG_INFO << "Bye!";
+
   PlatformClientPosix::ShutDown();
   return 0;
 }
diff --git a/cast/streaming/sender_session.cc b/cast/streaming/sender_session.cc
index e3ad33a..fbeef4f 100644
--- a/cast/streaming/sender_session.cc
+++ b/cast/streaming/sender_session.cc
@@ -14,7 +14,6 @@
 
 #include "absl/strings/match.h"
 #include "absl/strings/numbers.h"
-#include "cast/common/channel/message_util.h"
 #include "cast/streaming/capture_recommendations.h"
 #include "cast/streaming/environment.h"
 #include "cast/streaming/message_fields.h"
@@ -137,8 +136,6 @@
                      IsValidVideoCaptureConfig);
 }
 
-static constexpr char kPlaceholderReceiverSenderId[] = "receiver-12345";
-
 }  // namespace
 
 SenderSession::Client::~Client() = default;
@@ -146,14 +143,16 @@
 SenderSession::SenderSession(IPAddress remote_address,
                              Client* const client,
                              Environment* environment,
-                             MessagePort* message_port)
+                             MessagePort* message_port,
+                             std::string message_source_id,
+                             std::string message_destination_id)
     : remote_address_(remote_address),
       client_(client),
       environment_(environment),
       messager_(
           message_port,
-          MakeUniqueSessionId("sender"),
-          kPlaceholderReceiverSenderId,
+          std::move(message_source_id),
+          std::move(message_destination_id),
           [this](Error error) {
             OSP_DLOG_WARN << "SenderSession message port error: " << error;
             client_->OnError(this, error);
@@ -181,9 +180,6 @@
   current_negotiation_ = std::unique_ptr<Negotiation>(new Negotiation{
       offer, std::move(audio_configs), std::move(video_configs)});
 
-  // Currently we don't have a way to discover the ID of the receiver we
-  // are connected to, since we have to send the first message.
-  // TODO(jophba): migrate to discovered receiver ID when available.
   return messager_.SendRequest(
       SenderMessage{SenderMessage::Type::kOffer, ++current_sequence_number_,
                     true, std::move(offer)},
diff --git a/cast/streaming/sender_session.h b/cast/streaming/sender_session.h
index 83ef2c3..f5659f8 100644
--- a/cast/streaming/sender_session.h
+++ b/cast/streaming/sender_session.h
@@ -75,10 +75,17 @@
   // message port persist for at least the lifetime of the SenderSession. If
   // one of these classes needs to be reset, a new SenderSession should be
   // created.
+  //
+  // |message_source_id| and |message_destination_id| are the local and remote
+  // ID, respectively, to use when sending or receiving control messages (e.g.,
+  // OFFERs or ANSWERs) over the |message_port|. |message_port|'s SetClient()
+  // method will be called.
   SenderSession(IPAddress remote_address,
                 Client* const client,
                 Environment* environment,
-                MessagePort* message_port);
+                MessagePort* message_port,
+                std::string message_source_id,
+                std::string message_destination_id);
   SenderSession(const SenderSession&) = delete;
   SenderSession(SenderSession&&) noexcept = delete;
   SenderSession& operator=(const SenderSession&) = delete;
@@ -123,9 +130,6 @@
   // Spawn a set of configured senders from the currently stored negotiation.
   ConfiguredSenders SpawnSenders(const Answer& answer);
 
-  // The sender ID of the Receiver for this session.
-  std::string receiver_sender_id_;
-
   // The remote address of the receiver we are communicating with. Used
   // for both TLS and UDP traffic.
   const IPAddress remote_address_;
diff --git a/cast/streaming/sender_session_unittest.cc b/cast/streaming/sender_session_unittest.cc
index c593dc1..4b8441d 100644
--- a/cast/streaming/sender_session_unittest.cc
+++ b/cast/streaming/sender_session_unittest.cc
@@ -153,9 +153,9 @@
   void SetUp() {
     message_port_ = std::make_unique<SimpleMessagePort>("receiver-12345");
     environment_ = MakeEnvironment();
-    session_ = std::make_unique<SenderSession>(IPAddress::kV4LoopbackAddress(),
-                                               &client_, environment_.get(),
-                                               message_port_.get());
+    session_ = std::make_unique<SenderSession>(
+        IPAddress::kV4LoopbackAddress(), &client_, environment_.get(),
+        message_port_.get(), "sender-12345", "receiver-12345");
   }
 
   std::string NegotiateOfferAndConstructAnswer() {
diff --git a/cast/streaming/session_messager.cc b/cast/streaming/session_messager.cc
index 4581456..31e634d 100644
--- a/cast/streaming/session_messager.cc
+++ b/cast/streaming/session_messager.cc
@@ -76,7 +76,7 @@
                                              TaskRunner* task_runner)
     : SessionMessager(message_port, std::move(source_id), std::move(cb)),
       task_runner_(task_runner),
-      receiver_id_(receiver_id) {}
+      receiver_id_(std::move(receiver_id)) {}
 
 void SenderSessionMessager::SetHandler(ReceiverMessage::Type type,
                                        ReplyCallback cb) {
@@ -125,8 +125,8 @@
                                       const std::string& message_namespace,
                                       const std::string& message) {
   if (source_id != receiver_id_) {
-    OSP_DLOG_WARN << "Received message from unknown/incorrect sender, expected "
-                     "id \""
+    OSP_DLOG_WARN << "Received message from unknown/incorrect Cast Receiver, "
+                     "expected id \""
                   << receiver_id_ << "\", got \"" << source_id << "\"";
     return;
   }