Allow pacer to boost bitrate in order to meet time constraints.

Currently we limit the enocder so that frames aren't encoded if the
expected pacer queue is longer than 2s. However, if the queue is full
and the bitrate suddenly drops (or there is a large overshoot), the
queue time can be long than the limit.

This CL allows the pacer to temporarily boost the pacing bitrate over
the 2s window.

BUG=

Review URL: https://codereview.webrtc.org/1412293003

Cr-Commit-Position: refs/heads/master@{#10729}
diff --git a/webrtc/modules/pacing/paced_sender.cc b/webrtc/modules/pacing/paced_sender.cc
index 3c299f8..e38405a 100644
--- a/webrtc/modules/pacing/paced_sender.cc
+++ b/webrtc/modules/pacing/paced_sender.cc
@@ -10,12 +10,11 @@
 
 #include "webrtc/modules/pacing/paced_sender.h"
 
-#include <assert.h>
-
 #include <map>
 #include <queue>
 #include <set>
 
+#include "webrtc/base/checks.h"
 #include "webrtc/modules/include/module_common_types.h"
 #include "webrtc/modules/pacing/bitrate_prober.h"
 #include "webrtc/system_wrappers/include/clock.h"
@@ -86,7 +85,11 @@
 // Class encapsulating a priority queue with some extensions.
 class PacketQueue {
  public:
-  PacketQueue() : bytes_(0) {}
+  explicit PacketQueue(Clock* clock)
+      : bytes_(0),
+        clock_(clock),
+        queue_time_sum_(0),
+        time_last_updated_(clock_->TimeInMilliseconds()) {}
   virtual ~PacketQueue() {}
 
   void Push(const Packet& packet) {
@@ -114,6 +117,7 @@
   void FinalizePop(const Packet& packet) {
     RemoveFromDupeSet(packet);
     bytes_ -= packet.bytes;
+    queue_time_sum_ -= (time_last_updated_ - packet.enqueue_time_ms);
     packet_list_.erase(packet.this_it);
   }
 
@@ -123,13 +127,22 @@
 
   uint64_t SizeInBytes() const { return bytes_; }
 
-  int64_t OldestEnqueueTime() const {
-    std::list<Packet>::const_reverse_iterator it = packet_list_.rbegin();
+  int64_t OldestEnqueueTimeMs() const {
+    auto it = packet_list_.rbegin();
     if (it == packet_list_.rend())
       return 0;
     return it->enqueue_time_ms;
   }
 
+  int64_t AverageQueueTimeMs() {
+    int64_t now = clock_->TimeInMilliseconds();
+    RTC_DCHECK_GE(now, time_last_updated_);
+    int64_t delta = now - time_last_updated_;
+    queue_time_sum_ += delta * prio_queue_.size();
+    time_last_updated_ = now;
+    return queue_time_sum_ / prio_queue_.size();
+  }
+
  private:
   // Try to add a packet to the set of ssrc/seqno identifiers currently in the
   // queue. Return true if inserted, false if this is a duplicate.
@@ -147,7 +160,7 @@
 
   void RemoveFromDupeSet(const Packet& packet) {
     SsrcSeqNoMap::iterator it = dupe_map_.find(packet.ssrc);
-    assert(it != dupe_map_.end());
+    RTC_DCHECK(it != dupe_map_.end());
     it->second.erase(packet.sequence_number);
     if (it->second.empty()) {
       dupe_map_.erase(it);
@@ -165,6 +178,9 @@
   // Map<ssrc, set<seq_no> >, for checking duplicates.
   typedef std::map<uint32_t, std::set<uint16_t> > SsrcSeqNoMap;
   SsrcSeqNoMap dupe_map_;
+  Clock* const clock_;
+  int64_t queue_time_sum_;
+  int64_t time_last_updated_;
 };
 
 class IntervalBudget {
@@ -209,6 +225,7 @@
 };
 }  // namespace paced_sender
 
+const int64_t PacedSender::kMaxQueueLengthMs = 2000;
 const float PacedSender::kDefaultPaceMultiplier = 2.5f;
 
 PacedSender::PacedSender(Clock* clock,
@@ -225,8 +242,9 @@
       padding_budget_(new paced_sender::IntervalBudget(min_bitrate_kbps)),
       prober_(new BitrateProber()),
       bitrate_bps_(1000 * bitrate_kbps),
+      max_bitrate_kbps_(max_bitrate_kbps),
       time_last_update_us_(clock->TimeInMicroseconds()),
-      packets_(new paced_sender::PacketQueue()),
+      packets_(new paced_sender::PacketQueue(clock)),
       packet_counter_(0) {
   UpdateBytesPerInterval(kMinPacketLimitMs);
 }
@@ -244,7 +262,7 @@
 }
 
 void PacedSender::SetProbingEnabled(bool enabled) {
-  assert(packet_counter_ == 0);
+  RTC_CHECK_EQ(0u, packet_counter_);
   probing_enabled_ = enabled;
 }
 
@@ -252,9 +270,12 @@
                                 int max_bitrate_kbps,
                                 int min_bitrate_kbps) {
   CriticalSectionScoped cs(critsect_.get());
-  media_budget_->set_target_rate_kbps(max_bitrate_kbps);
+  // Don't set media bitrate here as it may be boosted in order to meet max
+  // queue time constraint. Just update max_bitrate_kbps_ and let media_budget_
+  // be updated in Process().
   padding_budget_->set_target_rate_kbps(min_bitrate_kbps);
   bitrate_bps_ = 1000 * bitrate_kbps;
+  max_bitrate_kbps_ = max_bitrate_kbps;
 }
 
 void PacedSender::InsertPacket(RtpPacketSender::Priority priority,
@@ -265,14 +286,12 @@
                                bool retransmission) {
   CriticalSectionScoped cs(critsect_.get());
 
-  if (probing_enabled_ && !prober_->IsProbing()) {
+  if (probing_enabled_ && !prober_->IsProbing())
     prober_->SetEnabled(true);
-  }
   prober_->MaybeInitializeProbe(bitrate_bps_);
 
-  if (capture_time_ms < 0) {
+  if (capture_time_ms < 0)
     capture_time_ms = clock_->TimeInMilliseconds();
-  }
 
   packets_->Push(paced_sender::Packet(
       priority, ssrc, sequence_number, capture_time_ms,
@@ -281,9 +300,8 @@
 
 int64_t PacedSender::ExpectedQueueTimeMs() const {
   CriticalSectionScoped cs(critsect_.get());
-  int target_rate = media_budget_->target_rate_kbps();
-  assert(target_rate > 0);
-  return static_cast<int64_t>(packets_->SizeInBytes() * 8 / target_rate);
+  RTC_DCHECK_GT(max_bitrate_kbps_, 0);
+  return static_cast<int64_t>(packets_->SizeInBytes() * 8 / max_bitrate_kbps_);
 }
 
 size_t PacedSender::QueueSizePackets() const {
@@ -294,7 +312,7 @@
 int64_t PacedSender::QueueInMs() const {
   CriticalSectionScoped cs(critsect_.get());
 
-  int64_t oldest_packet = packets_->OldestEnqueueTime();
+  int64_t oldest_packet = packets_->OldestEnqueueTimeMs();
   if (oldest_packet == 0)
     return 0;
 
@@ -305,9 +323,8 @@
   CriticalSectionScoped cs(critsect_.get());
   if (prober_->IsProbing()) {
     int64_t ret = prober_->TimeUntilNextProbe(clock_->TimeInMilliseconds());
-    if (ret >= 0) {
+    if (ret >= 0)
       return ret;
-    }
   }
   int64_t elapsed_time_us = clock_->TimeInMicroseconds() - time_last_update_us_;
   int64_t elapsed_time_ms = (elapsed_time_us + 500) / 1000;
@@ -321,14 +338,29 @@
   time_last_update_us_ = now_us;
   if (paused_)
     return 0;
+  int target_bitrate_kbps = max_bitrate_kbps_;
   if (elapsed_time_ms > 0) {
+    size_t queue_size_bytes = packets_->SizeInBytes();
+    if (queue_size_bytes > 0) {
+      // Assuming equal size packets and input/output rate, the average packet
+      // has avg_time_left_ms left to get queue_size_bytes out of the queue, if
+      // time constraint shall be met. Determine bitrate needed for that.
+      int64_t avg_time_left_ms = std::max<int64_t>(
+          1, kMaxQueueLengthMs - packets_->AverageQueueTimeMs());
+      int min_bitrate_needed_kbps =
+          static_cast<int>(queue_size_bytes * 8 / avg_time_left_ms);
+      if (min_bitrate_needed_kbps > target_bitrate_kbps)
+        target_bitrate_kbps = min_bitrate_needed_kbps;
+    }
+
+    media_budget_->set_target_rate_kbps(target_bitrate_kbps);
+
     int64_t delta_time_ms = std::min(kMaxIntervalTimeMs, elapsed_time_ms);
     UpdateBytesPerInterval(delta_time_ms);
   }
   while (!packets_->Empty()) {
-    if (media_budget_->bytes_remaining() == 0 && !prober_->IsProbing()) {
+    if (media_budget_->bytes_remaining() == 0 && !prober_->IsProbing())
       return 0;
-    }
 
     // Since we need to release the lock in order to send, we first pop the
     // element from the priority queue but keep it in storage, so that we can
@@ -337,9 +369,8 @@
     if (SendPacket(packet)) {
       // Send succeeded, remove it from the queue.
       packets_->FinalizePop(packet);
-      if (prober_->IsProbing()) {
+      if (prober_->IsProbing())
         return 0;
-      }
     } else {
       // Send failed, put it back into the queue.
       packets_->CancelPop(packet);
@@ -351,10 +382,11 @@
     return 0;
 
   size_t padding_needed;
-  if (prober_->IsProbing())
+  if (prober_->IsProbing()) {
     padding_needed = prober_->RecommendedPacketSize();
-  else
+  } else {
     padding_needed = padding_budget_->bytes_remaining();
+  }
 
   if (padding_needed > 0)
     SendPadding(static_cast<size_t>(padding_needed));
diff --git a/webrtc/modules/pacing/paced_sender.h b/webrtc/modules/pacing/paced_sender.h
index fa9f59a..d1e5ce3 100644
--- a/webrtc/modules/pacing/paced_sender.h
+++ b/webrtc/modules/pacing/paced_sender.h
@@ -51,7 +51,11 @@
     virtual ~Callback() {}
   };
 
-  static const int64_t kDefaultMaxQueueLengthMs = 2000;
+  // Expected max pacer delay in ms. If ExpectedQueueTimeMs() is higher than
+  // this value, the packet producers should wait (eg drop frames rather than
+  // encoding them). Bitrate sent may temporarily exceed target set by
+  // UpdateBitrate() so that this limit will be upheld.
+  static const int64_t kMaxQueueLengthMs;
   // Pace in kbits/s until we receive first estimate.
   static const int kDefaultInitialPaceKbps = 2000;
   // Pacing-rate relative to our target send rate.
@@ -142,7 +146,10 @@
       GUARDED_BY(critsect_);
 
   rtc::scoped_ptr<BitrateProber> prober_ GUARDED_BY(critsect_);
+  // Actual configured bitrates (media_budget_ may temporarily be higher in
+  // order to meet pace time constraint).
   int bitrate_bps_ GUARDED_BY(critsect_);
+  int max_bitrate_kbps_ GUARDED_BY(critsect_);
 
   int64_t time_last_update_us_ GUARDED_BY(critsect_);
 
diff --git a/webrtc/modules/pacing/paced_sender_unittest.cc b/webrtc/modules/pacing/paced_sender_unittest.cc
index 78a49cd..1a2936d 100644
--- a/webrtc/modules/pacing/paced_sender_unittest.cc
+++ b/webrtc/modules/pacing/paced_sender_unittest.cc
@@ -560,6 +560,9 @@
   EXPECT_CALL(callback_, TimeToSendPacket(_, _, capture_time_ms, false))
       .Times(3)
       .WillRepeatedly(Return(true));
+  EXPECT_CALL(callback_, TimeToSendPacket(_, _, second_capture_time_ms, false))
+      .Times(1)
+      .WillRepeatedly(Return(true));
   send_bucket_->Resume();
 
   EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
@@ -567,13 +570,6 @@
   EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
   EXPECT_EQ(0, send_bucket_->Process());
 
-  EXPECT_CALL(callback_, TimeToSendPacket(_, _, second_capture_time_ms, false))
-      .Times(1)
-      .WillRepeatedly(Return(true));
-  EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
-  clock_.AdvanceTimeMilliseconds(5);
-  EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
-  EXPECT_EQ(0, send_bucket_->Process());
   EXPECT_EQ(0, send_bucket_->QueueInMs());
 }
 
@@ -664,10 +660,9 @@
 
   EXPECT_EQ(0, send_bucket_->ExpectedQueueTimeMs());
 
-  // Allow for aliasing, duration should be in [expected(n - 1), expected(n)].
-  EXPECT_LE(duration, queue_in_ms);
-  EXPECT_GE(duration,
-            queue_in_ms - static_cast<int64_t>(kPacketSize * 8 / kMaxBitrate));
+  // Allow for aliasing, duration should be within one pack of max time limit.
+  EXPECT_NEAR(duration, PacedSender::kMaxQueueLengthMs,
+              static_cast<int64_t>(kPacketSize * 8 / kMaxBitrate));
 }
 
 TEST_F(PacedSenderTest, QueueTimeGrowsOverTime) {
diff --git a/webrtc/video_engine/vie_encoder.cc b/webrtc/video_engine/vie_encoder.cc
index 94f5fda..9eb4a3e 100644
--- a/webrtc/video_engine/vie_encoder.cc
+++ b/webrtc/video_engine/vie_encoder.cc
@@ -327,7 +327,7 @@
                static_cast<int>(target_delay_ms_ * kEncoderPausePacerMargin),
                kMinPacingDelayMs);
   }
-  if (pacer_->ExpectedQueueTimeMs() > PacedSender::kDefaultMaxQueueLengthMs) {
+  if (pacer_->ExpectedQueueTimeMs() > PacedSender::kMaxQueueLengthMs) {
     // Too much data in pacer queue, drop frame.
     return true;
   }