Add collection of EmulatedNetworkNode stats to stats collector

Bug: b/240540204
Change-Id: I9c2c2c35d0c3b6a99205e24d8b367fa7dab5d917
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/283760
Commit-Queue: Artem Titov <titovartem@webrtc.org>
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#38694}
diff --git a/test/network/BUILD.gn b/test/network/BUILD.gn
index 71cf2d7..379f604 100644
--- a/test/network/BUILD.gn
+++ b/test/network/BUILD.gn
@@ -76,6 +76,7 @@
     "../../rtc_base:threading",
     "../../rtc_base/memory:always_valid_pointer",
     "../../rtc_base/synchronization:mutex",
+    "../../rtc_base/system:no_unique_address",
     "../../rtc_base/task_utils:repeating_task",
     "../../system_wrappers",
     "../../test:scoped_key_value_config",
diff --git a/test/network/network_emulation.h b/test/network/network_emulation.h
index 7a08495..dffabaf 100644
--- a/test/network/network_emulation.h
+++ b/test/network/network_emulation.h
@@ -33,6 +33,7 @@
 #include "rtc_base/network_constants.h"
 #include "rtc_base/socket_address.h"
 #include "rtc_base/synchronization/mutex.h"
+#include "rtc_base/system/no_unique_address.h"
 #include "rtc_base/task_queue_for_test.h"
 #include "rtc_base/task_utils/repeating_task.h"
 #include "rtc_base/thread_annotations.h"
diff --git a/test/pc/e2e/BUILD.gn b/test/pc/e2e/BUILD.gn
index e73c0aa..2b9a69a 100644
--- a/test/pc/e2e/BUILD.gn
+++ b/test/pc/e2e/BUILD.gn
@@ -34,6 +34,7 @@
         ":peer_connection_e2e_smoke_test",
         ":peer_connection_quality_test_metric_names_test",
         ":peer_connection_quality_test_test",
+        ":stats_based_network_quality_metrics_reporter_test",
         ":stats_poller_test",
       ]
     }
@@ -333,6 +334,32 @@
       ]
     }
 
+    rtc_library("stats_based_network_quality_metrics_reporter_test") {
+      testonly = true
+      sources = [ "stats_based_network_quality_metrics_reporter_test.cc" ]
+      deps = [
+        ":metric_metadata_keys",
+        ":peerconnection_quality_test",
+        ":stats_based_network_quality_metrics_reporter",
+        "../..:test_support",
+        "../../../api:array_view",
+        "../../../api:create_network_emulation_manager",
+        "../../../api:create_peer_connection_quality_test_frame_generator",
+        "../../../api:network_emulation_manager_api",
+        "../../../api:peer_connection_quality_test_fixture_api",
+        "../../../api/test/metrics:metrics_logger",
+        "../../../api/test/metrics:stdout_metrics_exporter",
+        "../../../api/test/pclf:media_configuration",
+        "../../../api/test/pclf:media_quality_test_params",
+        "../../../api/test/pclf:peer_configurer",
+        "../../../api/units:time_delta",
+      ]
+      absl_deps = [
+        "//third_party/abseil-cpp/absl/strings",
+        "//third_party/abseil-cpp/absl/types:optional",
+      ]
+    }
+
     rtc_library("peer_connection_quality_test_test") {
       testonly = true
       sources = [ "peer_connection_quality_test_test.cc" ]
@@ -470,6 +497,7 @@
       "../../../api:peer_connection_quality_test_fixture_api",
       "../../../api:rtc_stats_api",
       "../../../api:scoped_refptr",
+      "../../../api:sequence_checker",
       "../../../api/numerics",
       "../../../api/test/metrics:metric",
       "../../../api/test/metrics:metrics_logger",
@@ -483,6 +511,7 @@
       "../../../rtc_base:rtc_event",
       "../../../rtc_base:stringutils",
       "../../../rtc_base/synchronization:mutex",
+      "../../../rtc_base/system:no_unique_address",
       "../../../system_wrappers:field_trial",
     ]
     absl_deps = [ "//third_party/abseil-cpp/absl/strings" ]
diff --git a/test/pc/e2e/stats_based_network_quality_metrics_reporter.cc b/test/pc/e2e/stats_based_network_quality_metrics_reporter.cc
index d4c2f93..155c81a 100644
--- a/test/pc/e2e/stats_based_network_quality_metrics_reporter.cc
+++ b/test/pc/e2e/stats_based_network_quality_metrics_reporter.cc
@@ -22,6 +22,7 @@
 #include "absl/strings/string_view.h"
 #include "api/array_view.h"
 #include "api/scoped_refptr.h"
+#include "api/sequence_checker.h"
 #include "api/stats/rtc_stats.h"
 #include "api/stats/rtcstats_objects.h"
 #include "api/test/metrics/metric.h"
@@ -34,6 +35,7 @@
 #include "rtc_base/ip_address.h"
 #include "rtc_base/strings/string_builder.h"
 #include "rtc_base/synchronization/mutex.h"
+#include "rtc_base/system/no_unique_address.h"
 #include "system_wrappers/include/field_trial.h"
 #include "test/pc/e2e/metric_metadata_keys.h"
 
@@ -44,6 +46,9 @@
 using ::webrtc::test::ImprovementDirection;
 using ::webrtc::test::Unit;
 
+using NetworkLayerStats =
+    StatsBasedNetworkQualityMetricsReporter::NetworkLayerStats;
+
 constexpr TimeDelta kStatsWaitTimeout = TimeDelta::Seconds(1);
 
 // Field trial which controls whether to report standard-compliant bytes
@@ -78,6 +83,83 @@
   return out;
 }
 
+// Accumulates emulated network stats being executed on the network thread.
+// When all stats are collected stores it in thread safe variable.
+class EmulatedNetworkStatsAccumulator {
+ public:
+  // `expected_stats_count` - the number of calls to
+  // AddEndpointStats/AddUplinkStats/AddDownlinkStats the accumulator is going
+  // to wait. If called more than expected, the program will crash.
+  explicit EmulatedNetworkStatsAccumulator(size_t expected_stats_count)
+      : not_collected_stats_count_(expected_stats_count) {
+    RTC_DCHECK_GE(not_collected_stats_count_, 0);
+    if (not_collected_stats_count_ == 0) {
+      all_stats_collected_.Set();
+    }
+    sequence_checker_.Detach();
+  }
+
+  // Has to be executed on network thread.
+  void AddEndpointStats(std::string peer_name, EmulatedNetworkStats stats) {
+    RTC_DCHECK_RUN_ON(&sequence_checker_);
+    n_stats_[peer_name].endpoints_stats = std::move(stats);
+    DecrementNotCollectedStatsCount();
+  }
+
+  // Has to be executed on network thread.
+  void AddUplinkStats(std::string peer_name, EmulatedNetworkNodeStats stats) {
+    RTC_DCHECK_RUN_ON(&sequence_checker_);
+    n_stats_[peer_name].uplink_stats = std::move(stats);
+    DecrementNotCollectedStatsCount();
+  }
+
+  // Has to be executed on network thread.
+  void AddDownlinkStats(std::string peer_name, EmulatedNetworkNodeStats stats) {
+    RTC_DCHECK_RUN_ON(&sequence_checker_);
+    n_stats_[peer_name].downlink_stats = std::move(stats);
+    DecrementNotCollectedStatsCount();
+  }
+
+  // Can be executed on any thread.
+  // Returns true if count down was completed and false if timeout elapsed
+  // before.
+  bool Wait(TimeDelta timeout) { return all_stats_collected_.Wait(timeout); }
+
+  // Can be called once. Returns all collected stats by moving underlying
+  // object.
+  std::map<std::string, NetworkLayerStats> ReleaseStats() {
+    RTC_DCHECK(!stats_released_);
+    stats_released_ = true;
+    MutexLock lock(&mutex_);
+    return std::move(stats_);
+  }
+
+ private:
+  void DecrementNotCollectedStatsCount() {
+    RTC_DCHECK_RUN_ON(&sequence_checker_);
+    RTC_CHECK_GT(not_collected_stats_count_, 0)
+        << "All stats are already collected";
+    not_collected_stats_count_--;
+    if (not_collected_stats_count_ == 0) {
+      MutexLock lock(&mutex_);
+      stats_ = std::move(n_stats_);
+      all_stats_collected_.Set();
+    }
+  }
+
+  RTC_NO_UNIQUE_ADDRESS SequenceChecker sequence_checker_;
+  size_t not_collected_stats_count_ RTC_GUARDED_BY(sequence_checker_);
+  // Collected on the network thread. Moved into `stats_` after all stats are
+  // collected.
+  std::map<std::string, NetworkLayerStats> n_stats_
+      RTC_GUARDED_BY(sequence_checker_);
+
+  rtc::Event all_stats_collected_;
+  Mutex mutex_;
+  std::map<std::string, NetworkLayerStats> stats_ RTC_GUARDED_BY(mutex_);
+  bool stats_released_ = false;
+};
+
 }  // namespace
 
 StatsBasedNetworkQualityMetricsReporter::
@@ -113,11 +195,15 @@
 
 void StatsBasedNetworkQualityMetricsReporter::NetworkLayerStatsCollector::
     AddPeer(absl::string_view peer_name,
-            std::vector<EmulatedEndpoint*> endpoints) {
+            std::vector<EmulatedEndpoint*> endpoints,
+            std::vector<EmulatedNetworkNode*> uplink,
+            std::vector<EmulatedNetworkNode*> downlink) {
   MutexLock lock(&mutex_);
   // When new peer is added not in the constructor, don't check if it has empty
   // stats, because their endpoint could be used for traffic before.
   peer_endpoints_.emplace(peer_name, std::move(endpoints));
+  peer_uplinks_.emplace(peer_name, std::move(uplink));
+  peer_downlinks_.emplace(peer_name, std::move(downlink));
   for (const EmulatedEndpoint* const endpoint : endpoints) {
     RTC_CHECK(ip_to_peer_.find(endpoint->GetPeerLocalAddress()) ==
               ip_to_peer_.end())
@@ -126,19 +212,43 @@
   }
 }
 
-std::map<std::string,
-         StatsBasedNetworkQualityMetricsReporter::NetworkLayerStats>
+std::map<std::string, NetworkLayerStats>
 StatsBasedNetworkQualityMetricsReporter::NetworkLayerStatsCollector::
     GetStats() {
   MutexLock lock(&mutex_);
-  std::map<std::string, NetworkLayerStats> peer_to_stats;
+  EmulatedNetworkStatsAccumulator stats_accumulator(
+      peer_endpoints_.size() + peer_uplinks_.size() + peer_downlinks_.size());
+  for (const auto& entry : peer_endpoints_) {
+    network_emulation_->GetStats(
+        entry.second, [&stats_accumulator,
+                       peer = entry.first](EmulatedNetworkStats s) mutable {
+          stats_accumulator.AddEndpointStats(std::move(peer), std::move(s));
+        });
+  }
+  for (const auto& entry : peer_uplinks_) {
+    network_emulation_->GetStats(
+        entry.second, [&stats_accumulator,
+                       peer = entry.first](EmulatedNetworkNodeStats s) mutable {
+          stats_accumulator.AddUplinkStats(std::move(peer), std::move(s));
+        });
+  }
+  for (const auto& entry : peer_downlinks_) {
+    network_emulation_->GetStats(
+        entry.second, [&stats_accumulator,
+                       peer = entry.first](EmulatedNetworkNodeStats s) mutable {
+          stats_accumulator.AddDownlinkStats(std::move(peer), std::move(s));
+        });
+  }
+  bool stats_collected = stats_accumulator.Wait(kStatsWaitTimeout);
+  RTC_CHECK(stats_collected);
+  std::map<std::string, NetworkLayerStats> peer_to_stats =
+      stats_accumulator.ReleaseStats();
   std::map<std::string, std::vector<std::string>> sender_to_receivers;
   for (const auto& entry : peer_endpoints_) {
-    NetworkLayerStats stats;
-    stats.stats = PopulateStats(entry.second, network_emulation_);
     const std::string& peer_name = entry.first;
+    const NetworkLayerStats& stats = peer_to_stats[peer_name];
     for (const auto& income_stats_entry :
-         stats.stats.incoming_stats_per_source) {
+         stats.endpoints_stats.incoming_stats_per_source) {
       const rtc::IPAddress& source_ip = income_stats_entry.first;
       auto it = ip_to_peer_.find(source_ip);
       if (it == ip_to_peer_.end()) {
@@ -147,7 +257,6 @@
       }
       sender_to_receivers[it->second].push_back(peer_name);
     }
-    peer_to_stats.emplace(peer_name, std::move(stats));
   }
   for (auto& entry : peer_to_stats) {
     const std::vector<std::string>& receivers =
@@ -161,7 +270,17 @@
 void StatsBasedNetworkQualityMetricsReporter::AddPeer(
     absl::string_view peer_name,
     std::vector<EmulatedEndpoint*> endpoints) {
-  collector_.AddPeer(peer_name, std::move(endpoints));
+  collector_.AddPeer(peer_name, std::move(endpoints), /*uplink=*/{},
+                     /*downlink=*/{});
+}
+
+void StatsBasedNetworkQualityMetricsReporter::AddPeer(
+    absl::string_view peer_name,
+    std::vector<EmulatedEndpoint*> endpoints,
+    std::vector<EmulatedNetworkNode*> uplink,
+    std::vector<EmulatedNetworkNode*> downlink) {
+  collector_.AddPeer(peer_name, std::move(endpoints), std::move(uplink),
+                     std::move(downlink));
 }
 
 void StatsBasedNetworkQualityMetricsReporter::Start(
@@ -255,12 +374,12 @@
       {MetricMetadataKey::kPeerMetadataKey, pc_label}};
   metrics_logger_->LogSingleValueMetric(
       "bytes_discarded_no_receiver", GetTestCaseName(pc_label),
-      network_layer_stats.stats.overall_incoming_stats
+      network_layer_stats.endpoints_stats.overall_incoming_stats
           .bytes_discarded_no_receiver.bytes(),
       Unit::kBytes, ImprovementDirection::kNeitherIsBetter, metric_metadata);
   metrics_logger_->LogSingleValueMetric(
       "packets_discarded_no_receiver", GetTestCaseName(pc_label),
-      network_layer_stats.stats.overall_incoming_stats
+      network_layer_stats.endpoints_stats.overall_incoming_stats
           .packets_discarded_no_receiver,
       Unit::kUnitless, ImprovementDirection::kNeitherIsBetter, metric_metadata);
 
@@ -312,55 +431,60 @@
     const std::string& peer_name,
     const NetworkLayerStats& stats) const {
   DataRate average_send_rate =
-      stats.stats.overall_outgoing_stats.packets_sent >= 2
-          ? stats.stats.overall_outgoing_stats.AverageSendRate()
+      stats.endpoints_stats.overall_outgoing_stats.packets_sent >= 2
+          ? stats.endpoints_stats.overall_outgoing_stats.AverageSendRate()
           : DataRate::Zero();
   DataRate average_receive_rate =
-      stats.stats.overall_incoming_stats.packets_received >= 2
-          ? stats.stats.overall_incoming_stats.AverageReceiveRate()
+      stats.endpoints_stats.overall_incoming_stats.packets_received >= 2
+          ? stats.endpoints_stats.overall_incoming_stats.AverageReceiveRate()
           : DataRate::Zero();
   std::map<std::string, std::string> metric_metadata{
       {MetricMetadataKey::kPeerMetadataKey, peer_name}};
   rtc::StringBuilder log;
   log << "Raw network layer statistic for [" << peer_name << "]:\n"
       << "Local IPs:\n";
-  for (size_t i = 0; i < stats.stats.local_addresses.size(); ++i) {
-    log << "  " << stats.stats.local_addresses[i].ToString() << "\n";
+  for (size_t i = 0; i < stats.endpoints_stats.local_addresses.size(); ++i) {
+    log << "  " << stats.endpoints_stats.local_addresses[i].ToString() << "\n";
   }
-  if (!stats.stats.overall_outgoing_stats.sent_packets_size.IsEmpty()) {
-    metrics_logger_->LogMetric(
-        "sent_packets_size", GetTestCaseName(peer_name),
-        stats.stats.overall_outgoing_stats.sent_packets_size, Unit::kBytes,
-        ImprovementDirection::kNeitherIsBetter, metric_metadata);
-  }
-  if (!stats.stats.overall_incoming_stats.received_packets_size.IsEmpty()) {
-    metrics_logger_->LogMetric(
-        "received_packets_size", GetTestCaseName(peer_name),
-        stats.stats.overall_incoming_stats.received_packets_size, Unit::kBytes,
-        ImprovementDirection::kNeitherIsBetter, metric_metadata);
-  }
-  if (!stats.stats.overall_incoming_stats.packets_discarded_no_receiver_size
+  if (!stats.endpoints_stats.overall_outgoing_stats.sent_packets_size
            .IsEmpty()) {
     metrics_logger_->LogMetric(
-        "packets_discarded_no_receiver_size", GetTestCaseName(peer_name),
-        stats.stats.overall_incoming_stats.packets_discarded_no_receiver_size,
+        "sent_packets_size", GetTestCaseName(peer_name),
+        stats.endpoints_stats.overall_outgoing_stats.sent_packets_size,
         Unit::kBytes, ImprovementDirection::kNeitherIsBetter, metric_metadata);
   }
-  if (!stats.stats.sent_packets_queue_wait_time_us.IsEmpty()) {
+  if (!stats.endpoints_stats.overall_incoming_stats.received_packets_size
+           .IsEmpty()) {
+    metrics_logger_->LogMetric(
+        "received_packets_size", GetTestCaseName(peer_name),
+        stats.endpoints_stats.overall_incoming_stats.received_packets_size,
+        Unit::kBytes, ImprovementDirection::kNeitherIsBetter, metric_metadata);
+  }
+  if (!stats.endpoints_stats.overall_incoming_stats
+           .packets_discarded_no_receiver_size.IsEmpty()) {
+    metrics_logger_->LogMetric(
+        "packets_discarded_no_receiver_size", GetTestCaseName(peer_name),
+        stats.endpoints_stats.overall_incoming_stats
+            .packets_discarded_no_receiver_size,
+        Unit::kBytes, ImprovementDirection::kNeitherIsBetter, metric_metadata);
+  }
+  if (!stats.endpoints_stats.sent_packets_queue_wait_time_us.IsEmpty()) {
     metrics_logger_->LogMetric(
         "sent_packets_queue_wait_time_us", GetTestCaseName(peer_name),
-        stats.stats.sent_packets_queue_wait_time_us, Unit::kUnitless,
+        stats.endpoints_stats.sent_packets_queue_wait_time_us, Unit::kUnitless,
         ImprovementDirection::kNeitherIsBetter, metric_metadata);
   }
 
   log << "Send statistic:\n"
-      << "  packets: " << stats.stats.overall_outgoing_stats.packets_sent
-      << " bytes: " << stats.stats.overall_outgoing_stats.bytes_sent.bytes()
+      << "  packets: "
+      << stats.endpoints_stats.overall_outgoing_stats.packets_sent << " bytes: "
+      << stats.endpoints_stats.overall_outgoing_stats.bytes_sent.bytes()
       << " avg_rate (bytes/sec): " << average_send_rate.bytes_per_sec()
       << " avg_rate (bps): " << average_send_rate.bps() << "\n"
       << "Send statistic per destination:\n";
 
-  for (const auto& entry : stats.stats.outgoing_stats_per_destination) {
+  for (const auto& entry :
+       stats.endpoints_stats.outgoing_stats_per_destination) {
     DataRate source_average_send_rate = entry.second.packets_sent >= 2
                                             ? entry.second.AverageSendRate()
                                             : DataRate::Zero();
@@ -378,14 +502,38 @@
     }
   }
 
+  if (!stats.uplink_stats.packet_transport_time.IsEmpty()) {
+    log << "[Debug stats] packet_transport_time=("
+        << stats.uplink_stats.packet_transport_time.GetAverage() << ", "
+        << stats.uplink_stats.packet_transport_time.GetStandardDeviation()
+        << ")\n";
+    metrics_logger_->LogMetric(
+        "uplink_packet_transport_time", GetTestCaseName(peer_name),
+        stats.uplink_stats.packet_transport_time, Unit::kMilliseconds,
+        ImprovementDirection::kNeitherIsBetter, metric_metadata);
+  }
+  if (!stats.uplink_stats.size_to_packet_transport_time.IsEmpty()) {
+    log << "[Debug stats] size_to_packet_transport_time=("
+        << stats.uplink_stats.size_to_packet_transport_time.GetAverage() << ", "
+        << stats.uplink_stats.size_to_packet_transport_time
+               .GetStandardDeviation()
+        << ")\n";
+    metrics_logger_->LogMetric(
+        "uplink_size_to_packet_transport_time", GetTestCaseName(peer_name),
+        stats.uplink_stats.size_to_packet_transport_time, Unit::kUnitless,
+        ImprovementDirection::kNeitherIsBetter, metric_metadata);
+  }
+
   log << "Receive statistic:\n"
-      << "  packets: " << stats.stats.overall_incoming_stats.packets_received
-      << " bytes: " << stats.stats.overall_incoming_stats.bytes_received.bytes()
+      << "  packets: "
+      << stats.endpoints_stats.overall_incoming_stats.packets_received
+      << " bytes: "
+      << stats.endpoints_stats.overall_incoming_stats.bytes_received.bytes()
       << " avg_rate (bytes/sec): " << average_receive_rate.bytes_per_sec()
       << " avg_rate (bps): " << average_receive_rate.bps() << "\n"
       << "Receive statistic per source:\n";
 
-  for (const auto& entry : stats.stats.incoming_stats_per_source) {
+  for (const auto& entry : stats.endpoints_stats.incoming_stats_per_source) {
     DataRate source_average_receive_rate =
         entry.second.packets_received >= 2 ? entry.second.AverageReceiveRate()
                                            : DataRate::Zero();
@@ -410,6 +558,28 @@
           ImprovementDirection::kNeitherIsBetter, metric_metadata);
     }
   }
+  if (!stats.downlink_stats.packet_transport_time.IsEmpty()) {
+    log << "[Debug stats] packet_transport_time=("
+        << stats.downlink_stats.packet_transport_time.GetAverage() << ", "
+        << stats.downlink_stats.packet_transport_time.GetStandardDeviation()
+        << ")\n";
+    metrics_logger_->LogMetric(
+        "downlink_packet_transport_time", GetTestCaseName(peer_name),
+        stats.downlink_stats.packet_transport_time, Unit::kMilliseconds,
+        ImprovementDirection::kNeitherIsBetter, metric_metadata);
+  }
+  if (!stats.downlink_stats.size_to_packet_transport_time.IsEmpty()) {
+    log << "[Debug stats] size_to_packet_transport_time=("
+        << stats.downlink_stats.size_to_packet_transport_time.GetAverage()
+        << ", "
+        << stats.downlink_stats.size_to_packet_transport_time
+               .GetStandardDeviation()
+        << ")\n";
+    metrics_logger_->LogMetric(
+        "downlink_size_to_packet_transport_time", GetTestCaseName(peer_name),
+        stats.downlink_stats.size_to_packet_transport_time, Unit::kUnitless,
+        ImprovementDirection::kNeitherIsBetter, metric_metadata);
+  }
 
   RTC_LOG(LS_INFO) << log.str();
 }
diff --git a/test/pc/e2e/stats_based_network_quality_metrics_reporter.h b/test/pc/e2e/stats_based_network_quality_metrics_reporter.h
index 8516c40..60daf40 100644
--- a/test/pc/e2e/stats_based_network_quality_metrics_reporter.h
+++ b/test/pc/e2e/stats_based_network_quality_metrics_reporter.h
@@ -37,6 +37,14 @@
 class StatsBasedNetworkQualityMetricsReporter
     : public PeerConnectionE2EQualityTestFixture::QualityMetricsReporter {
  public:
+  // Emulated network layer stats for single peer.
+  struct NetworkLayerStats {
+    EmulatedNetworkStats endpoints_stats;
+    EmulatedNetworkNodeStats uplink_stats;
+    EmulatedNetworkNodeStats downlink_stats;
+    std::set<std::string> receivers;
+  };
+
   // `networks` map peer name to network to report network layer stability stats
   // and to log network layer metrics.
   StatsBasedNetworkQualityMetricsReporter(
@@ -47,6 +55,10 @@
 
   void AddPeer(absl::string_view peer_name,
                std::vector<EmulatedEndpoint*> endpoints);
+  void AddPeer(absl::string_view peer_name,
+               std::vector<EmulatedEndpoint*> endpoints,
+               std::vector<EmulatedNetworkNode*> uplink,
+               std::vector<EmulatedNetworkNode*> downlink);
 
   // Network stats must be empty when this method will be invoked.
   void Start(absl::string_view test_case_name,
@@ -71,11 +83,6 @@
     int64_t packets_sent = 0;
   };
 
-  struct NetworkLayerStats {
-    EmulatedNetworkStats stats;
-    std::set<std::string> receivers;
-  };
-
   class NetworkLayerStatsCollector {
    public:
     NetworkLayerStatsCollector(
@@ -85,7 +92,9 @@
     void Start();
 
     void AddPeer(absl::string_view peer_name,
-                 std::vector<EmulatedEndpoint*> endpoints);
+                 std::vector<EmulatedEndpoint*> endpoints,
+                 std::vector<EmulatedNetworkNode*> uplink,
+                 std::vector<EmulatedNetworkNode*> downlink);
 
     std::map<std::string, NetworkLayerStats> GetStats();
 
@@ -93,6 +102,10 @@
     Mutex mutex_;
     std::map<std::string, std::vector<EmulatedEndpoint*>> peer_endpoints_
         RTC_GUARDED_BY(mutex_);
+    std::map<std::string, std::vector<EmulatedNetworkNode*>> peer_uplinks_
+        RTC_GUARDED_BY(mutex_);
+    std::map<std::string, std::vector<EmulatedNetworkNode*>> peer_downlinks_
+        RTC_GUARDED_BY(mutex_);
     std::map<rtc::IPAddress, std::string> ip_to_peer_ RTC_GUARDED_BY(mutex_);
     NetworkEmulationManager* const network_emulation_;
   };
diff --git a/test/pc/e2e/stats_based_network_quality_metrics_reporter_test.cc b/test/pc/e2e/stats_based_network_quality_metrics_reporter_test.cc
new file mode 100644
index 0000000..be55149
--- /dev/null
+++ b/test/pc/e2e/stats_based_network_quality_metrics_reporter_test.cc
@@ -0,0 +1,150 @@
+/*
+ *  Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "test/pc/e2e/stats_based_network_quality_metrics_reporter.h"
+
+#include <map>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "absl/strings/string_view.h"
+#include "absl/types/optional.h"
+#include "api/array_view.h"
+#include "api/test/create_network_emulation_manager.h"
+#include "api/test/create_peer_connection_quality_test_frame_generator.h"
+#include "api/test/metrics/metrics_logger.h"
+#include "api/test/metrics/stdout_metrics_exporter.h"
+#include "api/test/network_emulation_manager.h"
+#include "api/test/pclf/media_configuration.h"
+#include "api/test/pclf/media_quality_test_params.h"
+#include "api/test/pclf/peer_configurer.h"
+#include "api/test/peerconnection_quality_test_fixture.h"
+#include "api/units/time_delta.h"
+#include "test/gmock.h"
+#include "test/gtest.h"
+#include "test/pc/e2e/metric_metadata_keys.h"
+#include "test/pc/e2e/peer_connection_quality_test.h"
+
+namespace webrtc {
+namespace webrtc_pc_e2e {
+namespace {
+
+using ::testing::UnorderedElementsAre;
+
+using ::webrtc::test::DefaultMetricsLogger;
+using ::webrtc::test::ImprovementDirection;
+using ::webrtc::test::Metric;
+using ::webrtc::test::Unit;
+using ::webrtc::webrtc_pc_e2e::PeerConfigurer;
+
+// Adds a peer with some audio and video (the client should not care about
+// details about audio and video configs).
+void AddDefaultAudioVideoPeer(
+    absl::string_view peer_name,
+    absl::string_view audio_stream_label,
+    absl::string_view video_stream_label,
+    const PeerNetworkDependencies& network_dependencies,
+    PeerConnectionE2EQualityTestFixture& fixture) {
+  AudioConfig audio{std::string(audio_stream_label)};
+  audio.sync_group = std::string(peer_name);
+  VideoConfig video(std::string(video_stream_label), 320, 180, 15);
+  video.sync_group = std::string(peer_name);
+  auto peer = std::make_unique<PeerConfigurer>(network_dependencies);
+  peer->SetName(peer_name);
+  peer->SetAudioConfig(std::move(audio));
+  peer->AddVideoConfig(std::move(video));
+  peer->SetVideoCodecs({VideoCodecConfig(cricket::kVp8CodecName)});
+  fixture.AddPeer(std::move(peer));
+}
+
+absl::optional<Metric> FindMeetricByName(absl::string_view name,
+                                         rtc::ArrayView<const Metric> metrics) {
+  for (const Metric& metric : metrics) {
+    if (metric.name == name) {
+      return metric;
+    }
+  }
+  return absl::nullopt;
+}
+
+TEST(StatsBasedNetworkQualityMetricsReporterTest, DebugStatsAreCollected) {
+  std::unique_ptr<NetworkEmulationManager> network_emulation =
+      CreateNetworkEmulationManager(TimeMode::kSimulated,
+                                    EmulatedNetworkStatsGatheringMode::kDebug);
+  DefaultMetricsLogger metrics_logger(
+      network_emulation->time_controller()->GetClock());
+  PeerConnectionE2EQualityTest fixture(
+      "test_case", *network_emulation->time_controller(),
+      /*audio_quality_analyzer=*/nullptr, /*video_quality_analyzer=*/nullptr,
+      &metrics_logger);
+
+  EmulatedEndpoint* alice_endpoint =
+      network_emulation->CreateEndpoint(EmulatedEndpointConfig());
+  EmulatedEndpoint* bob_endpoint =
+      network_emulation->CreateEndpoint(EmulatedEndpointConfig());
+
+  EmulatedNetworkNode* alice_link = network_emulation->CreateEmulatedNode(
+      BuiltInNetworkBehaviorConfig{.link_capacity_kbps = 500});
+  network_emulation->CreateRoute(alice_endpoint, {alice_link}, bob_endpoint);
+  EmulatedNetworkNode* bob_link = network_emulation->CreateEmulatedNode(
+      BuiltInNetworkBehaviorConfig{.link_capacity_kbps = 500});
+  network_emulation->CreateRoute(bob_endpoint, {bob_link}, alice_endpoint);
+
+  EmulatedNetworkManagerInterface* alice_network =
+      network_emulation->CreateEmulatedNetworkManagerInterface(
+          {alice_endpoint});
+  EmulatedNetworkManagerInterface* bob_network =
+      network_emulation->CreateEmulatedNetworkManagerInterface({bob_endpoint});
+
+  AddDefaultAudioVideoPeer("alice", "alice_audio", "alice_video",
+                           alice_network->network_dependencies(), fixture);
+  AddDefaultAudioVideoPeer("bob", "bob_audio", "bob_video",
+                           bob_network->network_dependencies(), fixture);
+
+  auto network_stats_reporter =
+      std::make_unique<StatsBasedNetworkQualityMetricsReporter>(
+          /*peer_endpoints=*/std::map<std::string,
+                                      std::vector<EmulatedEndpoint*>>{},
+          network_emulation.get(), &metrics_logger);
+  network_stats_reporter->AddPeer("alice", alice_network->endpoints(),
+                                  /*uplink=*/{alice_link},
+                                  /*downlink=*/{bob_link});
+  network_stats_reporter->AddPeer("bob", bob_network->endpoints(),
+                                  /*uplink=*/{bob_link},
+                                  /*downlink=*/{alice_link});
+  fixture.AddQualityMetricsReporter(std::move(network_stats_reporter));
+
+  fixture.Run(RunParams(TimeDelta::Seconds(4)));
+
+  std::vector<Metric> metrics = metrics_logger.GetCollectedMetrics();
+  absl::optional<Metric> uplink_packet_transport_time =
+      FindMeetricByName("uplink_packet_transport_time", metrics);
+  ASSERT_TRUE(uplink_packet_transport_time.has_value());
+  ASSERT_FALSE(uplink_packet_transport_time->time_series.samples.empty());
+  absl::optional<Metric> uplink_size_to_packet_transport_time =
+      FindMeetricByName("uplink_size_to_packet_transport_time", metrics);
+  ASSERT_TRUE(uplink_size_to_packet_transport_time.has_value());
+  ASSERT_FALSE(
+      uplink_size_to_packet_transport_time->time_series.samples.empty());
+  absl::optional<Metric> downlink_packet_transport_time =
+      FindMeetricByName("downlink_packet_transport_time", metrics);
+  ASSERT_TRUE(downlink_packet_transport_time.has_value());
+  ASSERT_FALSE(downlink_packet_transport_time->time_series.samples.empty());
+  absl::optional<Metric> downlink_size_to_packet_transport_time =
+      FindMeetricByName("downlink_size_to_packet_transport_time", metrics);
+  ASSERT_TRUE(downlink_size_to_packet_transport_time.has_value());
+  ASSERT_FALSE(
+      downlink_size_to_packet_transport_time->time_series.samples.empty());
+}
+
+}  // namespace
+}  // namespace webrtc_pc_e2e
+}  // namespace webrtc