Fix issue with pacing rate after long queue times.

A recent cleanup cl (r36900) had an unintended side-effect.

If the queue-time limit is expected to be hit, we adjust the pacing
bitrate up to make sure all packets are sent within the nominal time
frame.
However after that change we stopped adjusting the pacing rate back to
normal levels when queue clears - at least not until the next BWE
update (which is fairly often - but not immediate).

This CL fixes that, and also makes sure whe properly update the
adjusted media rate on enqueu, dequeue and set rate calls.

Bug: webrtc:10809
Change-Id: If00dc35169f1a1347fea6eb44fdb2868282ed3b7
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/265387
Reviewed-by: Per Kjellander <perkj@webrtc.org>
Commit-Queue: Erik Språng <sprang@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37178}
diff --git a/modules/pacing/pacing_controller.cc b/modules/pacing/pacing_controller.cc
index 98123c8..e997f95 100644
--- a/modules/pacing/pacing_controller.cc
+++ b/modules/pacing/pacing_controller.cc
@@ -93,7 +93,8 @@
       paused_(false),
       media_debt_(DataSize::Zero()),
       padding_debt_(DataSize::Zero()),
-      media_rate_(DataRate::Zero()),
+      pacing_rate_(DataRate::Zero()),
+      adjusted_media_rate_(DataRate::Zero()),
       padding_rate_(DataRate::Zero()),
       prober_(field_trials_),
       probing_send_failure_(false),
@@ -198,21 +199,22 @@
                         << " kbps, padding = " << padding_rate.kbps()
                         << " kbps.";
   }
-  media_rate_ = pacing_rate;
+  pacing_rate_ = pacing_rate;
   padding_rate_ = padding_rate;
+  MaybeUpdateMediaRateDueToLongQueue(CurrentTime());
 
-  RTC_LOG(LS_VERBOSE) << "bwe:pacer_updated pacing_kbps=" << media_rate_.kbps()
+  RTC_LOG(LS_VERBOSE) << "bwe:pacer_updated pacing_kbps=" << pacing_rate_.kbps()
                       << " padding_budget_kbps=" << padding_rate.kbps();
 }
 
 void PacingController::EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) {
-  RTC_DCHECK(media_rate_ > DataRate::Zero())
+  RTC_DCHECK(pacing_rate_ > DataRate::Zero())
       << "SetPacingRate must be called before InsertPacket.";
   RTC_CHECK(packet->packet_type());
 
   prober_.OnIncomingPacket(DataSize::Bytes(packet->payload_size()));
 
-  Timestamp now = CurrentTime();
+  const Timestamp now = CurrentTime();
   if (packet_queue_->Empty()) {
     // If queue is empty, we need to "fast-forward" the last process time,
     // so that we don't use passed time as budget for sending the first new
@@ -228,6 +230,9 @@
   }
   packet_queue_->Push(now, std::move(packet));
   seen_first_packet_ = true;
+
+  // Queue length has increased, check if we need to change the pacing rate.
+  MaybeUpdateMediaRateDueToLongQueue(now);
 }
 
 void PacingController::SetAccountForAudioPackets(bool account_for_audio) {
@@ -249,10 +254,8 @@
 }
 
 TimeDelta PacingController::ExpectedQueueTime() const {
-  RTC_DCHECK_GT(media_rate_, DataRate::Zero());
-  return TimeDelta::Millis(
-      (QueueSizeData().bytes() * 8 * rtc::kNumMillisecsPerSec) /
-      media_rate_.bps());
+  RTC_DCHECK_GT(adjusted_media_rate_, DataRate::Zero());
+  return QueueSizeData() / adjusted_media_rate_;
 }
 
 size_t PacingController::QueueSizePackets() const {
@@ -343,11 +346,11 @@
     return last_send_time_ + kCongestedPacketInterval;
   }
 
-  if (media_rate_ > DataRate::Zero() && !packet_queue_->Empty()) {
+  if (adjusted_media_rate_ > DataRate::Zero() && !packet_queue_->Empty()) {
     // If packets are allowed to be sent in a burst, the
     // debt is allowed to grow up to one packet more than what can be sent
     // during 'send_burst_period_'.
-    TimeDelta drain_time = media_debt_ / media_rate_;
+    TimeDelta drain_time = media_debt_ / adjusted_media_rate_;
     next_send_time =
         last_process_time_ +
         ((send_burst_interval_ > drain_time) ? TimeDelta::Zero() : drain_time);
@@ -355,9 +358,9 @@
     // If we _don't_ have pending packets, check how long until we have
     // bandwidth for padding packets. Both media and padding debts must
     // have been drained to do this.
-    RTC_DCHECK_GT(media_rate_, DataRate::Zero());
-    TimeDelta drain_time =
-        std::max(media_debt_ / media_rate_, padding_debt_ / padding_rate_);
+    RTC_DCHECK_GT(adjusted_media_rate_, DataRate::Zero());
+    TimeDelta drain_time = std::max(media_debt_ / adjusted_media_rate_,
+                                    padding_debt_ / padding_rate_);
 
     if (drain_time.IsZero() &&
         (!media_debt_.IsZero() || !padding_debt_.IsZero())) {
@@ -380,7 +383,7 @@
 }
 
 void PacingController::ProcessPackets() {
-  Timestamp now = CurrentTime();
+  const Timestamp now = CurrentTime();
   Timestamp target_send_time = now;
 
   if (ShouldSendKeepalive(now)) {
@@ -420,27 +423,6 @@
   TimeDelta elapsed_time = UpdateTimeAndGetElapsed(target_send_time);
 
   if (elapsed_time > TimeDelta::Zero()) {
-    DataRate target_rate = media_rate_;
-    DataSize queue_size_data = QueueSizeData();
-    if (queue_size_data > DataSize::Zero()) {
-      // Assuming equal size packets and input/output rate, the average packet
-      // has avg_time_left_ms left to get queue_size_bytes out of the queue, if
-      // time constraint shall be met. Determine bitrate needed for that.
-      packet_queue_->UpdateAverageQueueTime(now);
-      if (drain_large_queues_) {
-        TimeDelta avg_time_left =
-            std::max(TimeDelta::Millis(1),
-                     queue_time_limit_ - packet_queue_->AverageQueueTime());
-        DataRate min_rate_needed = queue_size_data / avg_time_left;
-        if (min_rate_needed > target_rate) {
-          target_rate = min_rate_needed;
-          RTC_LOG(LS_VERBOSE) << "bwe:large_pacing_queue pacing_rate_kbps="
-                              << target_rate.kbps();
-        }
-      }
-    }
-
-    media_rate_ = target_rate;
     UpdateBudgetWithElapsedTime(elapsed_time);
   }
 
@@ -556,6 +538,11 @@
       prober_.ProbeSent(CurrentTime(), data_sent);
     }
   }
+
+  // Queue length has probably decreased, check if pacing rate needs to updated.
+  // Poll the time again, since we might have enqueued new fec/padding packets
+  // with a later timestamp than `now`.
+  MaybeUpdateMediaRateDueToLongQueue(CurrentTime());
 }
 
 DataSize PacingController::PaddingToAdd(DataSize recommended_probe_size,
@@ -630,7 +617,7 @@
       // is not more than would be reduced to zero at the target sent time.
       // If we allow packets to be sent in a burst, packet are allowed to be
       // sent early.
-      TimeDelta flush_time = media_debt_ / media_rate_;
+      TimeDelta flush_time = media_debt_ / adjusted_media_rate_;
       if (now + flush_time > target_send_time) {
         return nullptr;
       }
@@ -656,13 +643,13 @@
 }
 
 void PacingController::UpdateBudgetWithElapsedTime(TimeDelta delta) {
-  media_debt_ -= std::min(media_debt_, media_rate_ * delta);
+  media_debt_ -= std::min(media_debt_, adjusted_media_rate_ * delta);
   padding_debt_ -= std::min(padding_debt_, padding_rate_ * delta);
 }
 
 void PacingController::UpdateBudgetWithSentData(DataSize size) {
   media_debt_ += size;
-  media_debt_ = std::min(media_debt_, media_rate_ * kMaxDebtInTime);
+  media_debt_ = std::min(media_debt_, adjusted_media_rate_ * kMaxDebtInTime);
   UpdatePaddingBudgetWithSentData(size);
 }
 
@@ -675,4 +662,28 @@
   queue_time_limit_ = limit;
 }
 
+void PacingController::MaybeUpdateMediaRateDueToLongQueue(Timestamp now) {
+  adjusted_media_rate_ = pacing_rate_;
+  if (!drain_large_queues_) {
+    return;
+  }
+
+  DataSize queue_size_data = QueueSizeData();
+  if (queue_size_data > DataSize::Zero()) {
+    // Assuming equal size packets and input/output rate, the average packet
+    // has avg_time_left_ms left to get queue_size_bytes out of the queue, if
+    // time constraint shall be met. Determine bitrate needed for that.
+    packet_queue_->UpdateAverageQueueTime(now);
+    TimeDelta avg_time_left =
+        std::max(TimeDelta::Millis(1),
+                 queue_time_limit_ - packet_queue_->AverageQueueTime());
+    DataRate min_rate_needed = queue_size_data / avg_time_left;
+    if (min_rate_needed > pacing_rate_) {
+      adjusted_media_rate_ = min_rate_needed;
+      RTC_LOG(LS_VERBOSE) << "bwe:large_pacing_queue pacing_rate_kbps="
+                          << pacing_rate_.kbps();
+    }
+  }
+}
+
 }  // namespace webrtc
diff --git a/modules/pacing/pacing_controller.h b/modules/pacing/pacing_controller.h
index b3949b6..b8cbc88 100644
--- a/modules/pacing/pacing_controller.h
+++ b/modules/pacing/pacing_controller.h
@@ -145,7 +145,7 @@
 
   // Sets the pacing rates. Must be called once before packets can be sent.
   void SetPacingRates(DataRate pacing_rate, DataRate padding_rate);
-  DataRate pacing_rate() const { return media_rate_; }
+  DataRate pacing_rate() const { return adjusted_media_rate_; }
 
   // Currently audio traffic is not accounted by pacer and passed through.
   // With the introduction of audio BWE audio traffic will be accounted for
@@ -217,6 +217,7 @@
   void OnPacketSent(RtpPacketMediaType packet_type,
                     DataSize packet_size,
                     Timestamp send_time);
+  void MaybeUpdateMediaRateDueToLongQueue(Timestamp now);
 
   Timestamp CurrentTime() const;
 
@@ -241,9 +242,17 @@
   mutable Timestamp last_timestamp_;
   bool paused_;
 
+  // Amount of outstanding data for media and padding.
   DataSize media_debt_;
   DataSize padding_debt_;
-  DataRate media_rate_;
+
+  // The target pacing rate, signaled via SetPacingRates().
+  DataRate pacing_rate_;
+  // The media send rate, which might adjusted from pacing_rate_, e.g. if the
+  // pacing queue is growing too long.
+  DataRate adjusted_media_rate_;
+  // The padding target rate. We aim to fill up to this rate with padding what
+  // is not already used by media.
   DataRate padding_rate_;
 
   BitrateProber prober_;
diff --git a/modules/pacing/pacing_controller_unittest.cc b/modules/pacing/pacing_controller_unittest.cc
index b9ec80e..79ab5ee 100644
--- a/modules/pacing/pacing_controller_unittest.cc
+++ b/modules/pacing/pacing_controller_unittest.cc
@@ -1096,45 +1096,6 @@
             2 * PacingController::kPausedProcessInterval);
 }
 
-TEST_F(PacingControllerTest, ExpectedQueueTimeMs) {
-  uint32_t ssrc = 12346;
-  uint16_t sequence_number = 1234;
-  const size_t kNumPackets = 60;
-  const size_t kPacketSize = 1200;
-  const int32_t kMaxBitrate = kPaceMultiplier * 30000;
-  auto pacer = std::make_unique<PacingController>(&clock_, &callback_, trials_);
-  pacer->SetPacingRates(kTargetRate * kPaceMultiplier, DataRate::Zero());
-  EXPECT_TRUE(pacer->OldestPacketEnqueueTime().IsInfinite());
-
-  pacer->SetPacingRates(DataRate::BitsPerSec(30000 * kPaceMultiplier),
-                        DataRate::Zero());
-  for (size_t i = 0; i < kNumPackets; ++i) {
-    SendAndExpectPacket(pacer.get(), RtpPacketMediaType::kVideo, ssrc,
-                        sequence_number++, clock_.TimeInMilliseconds(),
-                        kPacketSize);
-  }
-
-  // Queue in ms = 1000 * (bytes in queue) *8 / (bits per second)
-  TimeDelta queue_time =
-      TimeDelta::Millis(1000 * kNumPackets * kPacketSize * 8 / kMaxBitrate);
-  EXPECT_EQ(queue_time, pacer->ExpectedQueueTime());
-
-  const Timestamp time_start = clock_.CurrentTime();
-  while (pacer->QueueSizePackets() > 0) {
-    AdvanceTimeUntil(pacer->NextSendTime());
-    pacer->ProcessPackets();
-  }
-  TimeDelta duration = clock_.CurrentTime() - time_start;
-
-  EXPECT_EQ(TimeDelta::Zero(), pacer->ExpectedQueueTime());
-
-  // Allow for aliasing, duration should be within one pack of max time limit.
-  const TimeDelta deviation =
-      duration - PacingController::kMaxExpectedQueueLength;
-  EXPECT_LT(deviation.Abs(),
-            TimeDelta::Millis(1000 * kPacketSize * 8 / kMaxBitrate));
-}
-
 TEST_F(PacingControllerTest, QueueTimeGrowsOverTime) {
   uint32_t ssrc = 12346;
   uint16_t sequence_number = 1234;
@@ -1756,7 +1717,7 @@
   }
 }
 
-TEST_F(PacingControllerTest, AccountsForAudioEnqueuTime) {
+TEST_F(PacingControllerTest, AccountsForAudioEnqueueTime) {
   const uint32_t kSsrc = 12345;
   const DataRate kPacingDataRate = DataRate::KilobitsPerSec(125);
   const DataRate kPaddingDataRate = DataRate::Zero();
@@ -2063,5 +2024,45 @@
   EXPECT_EQ(number_of_bursts, 4);
 }
 
+TEST_F(PacingControllerTest, RespectsQueueTimeLimit) {
+  static constexpr DataSize kPacketSize = DataSize::Bytes(100);
+  static constexpr DataRate kNominalPacingRate = DataRate::KilobitsPerSec(200);
+  static constexpr TimeDelta kPacketPacingTime =
+      kPacketSize / kNominalPacingRate;
+  static constexpr TimeDelta kQueueTimeLimit = TimeDelta::Millis(1000);
+
+  PacingController pacer(&clock_, &callback_, trials_);
+  pacer.SetPacingRates(kNominalPacingRate, /*padding_rate=*/DataRate::Zero());
+  pacer.SetQueueTimeLimit(kQueueTimeLimit);
+
+  // Fill pacer up to queue time limit.
+  static constexpr int kNumPackets = kQueueTimeLimit / kPacketPacingTime;
+  for (int i = 0; i < kNumPackets; ++i) {
+    pacer.EnqueuePacket(video_.BuildNextPacket(kPacketSize.bytes()));
+  }
+  EXPECT_EQ(pacer.ExpectedQueueTime(), kQueueTimeLimit);
+  EXPECT_EQ(pacer.pacing_rate(), kNominalPacingRate);
+
+  // Double the amount of packets in the queue, the queue time limit should
+  // effectively double the pacing rate in response.
+  for (int i = 0; i < kNumPackets; ++i) {
+    pacer.EnqueuePacket(video_.BuildNextPacket(kPacketSize.bytes()));
+  }
+  EXPECT_EQ(pacer.ExpectedQueueTime(), kQueueTimeLimit);
+  EXPECT_EQ(pacer.pacing_rate(), 2 * kNominalPacingRate);
+
+  // Send all the packets, should take as long as the queue time limit.
+  Timestamp start_time = clock_.CurrentTime();
+  while (pacer.QueueSizePackets() > 0) {
+    AdvanceTimeUntil(pacer.NextSendTime());
+    pacer.ProcessPackets();
+  }
+  EXPECT_EQ(clock_.CurrentTime() - start_time, kQueueTimeLimit);
+
+  // We're back in a normal state - pacing rate should be back to previous
+  // levels.
+  EXPECT_EQ(pacer.pacing_rate(), kNominalPacingRate);
+}
+
 }  // namespace
 }  // namespace webrtc