Fix pacer to accept duplicate sequence numbers on different SSRCs.

BUG=3550
R=stefan@webrtc.org

Review URL: https://webrtc-codereview.appspot.com/17889004

git-svn-id: http://webrtc.googlecode.com/svn/trunk/webrtc@6610 4adac7df-926f-26a2-2b94-8c16560cd09d
diff --git a/modules/pacing/include/paced_sender.h b/modules/pacing/include/paced_sender.h
index 41bbbd6..55497db 100644
--- a/modules/pacing/include/paced_sender.h
+++ b/modules/pacing/include/paced_sender.h
@@ -16,6 +16,7 @@
 
 #include "webrtc/modules/interface/module.h"
 #include "webrtc/system_wrappers/interface/scoped_ptr.h"
+#include "webrtc/system_wrappers/interface/thread_annotations.h"
 #include "webrtc/system_wrappers/interface/tick_util.h"
 #include "webrtc/typedefs.h"
 
@@ -113,41 +114,50 @@
  private:
   // Return true if next packet in line should be transmitted.
   // Return packet list that contains the next packet.
-  bool ShouldSendNextPacket(paced_sender::PacketList** packet_list);
+  bool ShouldSendNextPacket(paced_sender::PacketList** packet_list)
+      EXCLUSIVE_LOCKS_REQUIRED(critsect_);
 
   // Local helper function to GetNextPacket.
-  paced_sender::Packet GetNextPacketFromList(paced_sender::PacketList* packets);
+  paced_sender::Packet GetNextPacketFromList(paced_sender::PacketList* packets)
+      EXCLUSIVE_LOCKS_REQUIRED(critsect_);
 
-  bool SendPacketFromList(paced_sender::PacketList* packet_list);
+  bool SendPacketFromList(paced_sender::PacketList* packet_list)
+      EXCLUSIVE_LOCKS_REQUIRED(critsect_);
 
   // Updates the number of bytes that can be sent for the next time interval.
-  void UpdateBytesPerInterval(uint32_t delta_time_in_ms);
+  void UpdateBytesPerInterval(uint32_t delta_time_in_ms)
+      EXCLUSIVE_LOCKS_REQUIRED(critsect_);
 
   // Updates the buffers with the number of bytes that we sent.
-  void UpdateMediaBytesSent(int num_bytes);
+  void UpdateMediaBytesSent(int num_bytes) EXCLUSIVE_LOCKS_REQUIRED(critsect_);
 
-  Clock* clock_;
-  Callback* callback_;
-  bool enabled_;
-  bool paused_;
-  int max_queue_length_ms_;
+  Clock* const clock_;
+  Callback* const callback_;
+
   scoped_ptr<CriticalSectionWrapper> critsect_;
+  bool enabled_ GUARDED_BY(critsect_);
+  bool paused_ GUARDED_BY(critsect_);
+  int max_queue_length_ms_ GUARDED_BY(critsect_);
   // This is the media budget, keeping track of how many bits of media
   // we can pace out during the current interval.
-  scoped_ptr<paced_sender::IntervalBudget> media_budget_;
+  scoped_ptr<paced_sender::IntervalBudget> media_budget_ GUARDED_BY(critsect_);
   // This is the padding budget, keeping track of how many bits of padding we're
   // allowed to send out during the current interval. This budget will be
   // utilized when there's no media to send.
-  scoped_ptr<paced_sender::IntervalBudget> padding_budget_;
+  scoped_ptr<paced_sender::IntervalBudget> padding_budget_
+      GUARDED_BY(critsect_);
 
-  TickTime time_last_update_;
-  TickTime time_last_send_;
-  int64_t capture_time_ms_last_queued_;
-  int64_t capture_time_ms_last_sent_;
+  TickTime time_last_update_ GUARDED_BY(critsect_);
+  TickTime time_last_send_ GUARDED_BY(critsect_);
+  int64_t capture_time_ms_last_queued_ GUARDED_BY(critsect_);
+  int64_t capture_time_ms_last_sent_ GUARDED_BY(critsect_);
 
-  scoped_ptr<paced_sender::PacketList> high_priority_packets_;
-  scoped_ptr<paced_sender::PacketList> normal_priority_packets_;
-  scoped_ptr<paced_sender::PacketList> low_priority_packets_;
+  scoped_ptr<paced_sender::PacketList> high_priority_packets_
+      GUARDED_BY(critsect_);
+  scoped_ptr<paced_sender::PacketList> normal_priority_packets_
+      GUARDED_BY(critsect_);
+  scoped_ptr<paced_sender::PacketList> low_priority_packets_
+      GUARDED_BY(critsect_);
 };
 }  // namespace webrtc
 #endif  // WEBRTC_MODULES_PACED_SENDER_H_
diff --git a/modules/pacing/paced_sender.cc b/modules/pacing/paced_sender.cc
index 5aab4a0..323cafe 100644
--- a/modules/pacing/paced_sender.cc
+++ b/modules/pacing/paced_sender.cc
@@ -12,6 +12,9 @@
 
 #include <assert.h>
 
+#include <map>
+#include <set>
+
 #include "webrtc/modules/interface/module_common_types.h"
 #include "webrtc/system_wrappers/interface/clock.h"
 #include "webrtc/system_wrappers/interface/critical_section_wrapper.h"
@@ -36,21 +39,24 @@
 
 namespace paced_sender {
 struct Packet {
-  Packet(uint32_t ssrc, uint16_t seq_number, int64_t capture_time_ms,
-         int64_t enqueue_time_ms, int length_in_bytes, bool retransmission)
-      : ssrc_(ssrc),
-        sequence_number_(seq_number),
-        capture_time_ms_(capture_time_ms),
-        enqueue_time_ms_(enqueue_time_ms),
-        bytes_(length_in_bytes),
-        retransmission_(retransmission) {
-  }
-  uint32_t ssrc_;
-  uint16_t sequence_number_;
-  int64_t capture_time_ms_;
-  int64_t enqueue_time_ms_;
-  int bytes_;
-  bool retransmission_;
+  Packet(uint32_t ssrc,
+         uint16_t seq_number,
+         int64_t capture_time_ms,
+         int64_t enqueue_time_ms,
+         int length_in_bytes,
+         bool retransmission)
+      : ssrc(ssrc),
+        sequence_number(seq_number),
+        capture_time_ms(capture_time_ms),
+        enqueue_time_ms(enqueue_time_ms),
+        bytes(length_in_bytes),
+        retransmission(retransmission) {}
+  uint32_t ssrc;
+  uint16_t sequence_number;
+  int64_t capture_time_ms;
+  int64_t enqueue_time_ms;
+  int bytes;
+  bool retransmission;
 };
 
 // STL list style class which prevents duplicates in the list.
@@ -68,23 +74,24 @@
 
   void pop_front() {
     Packet& packet = packet_list_.front();
-    uint16_t sequence_number = packet.sequence_number_;
+    uint16_t sequence_number = packet.sequence_number;
+    uint32_t ssrc = packet.ssrc;
     packet_list_.pop_front();
-    sequence_number_set_.erase(sequence_number);
+    sequence_number_set_[ssrc].erase(sequence_number);
   }
 
   void push_back(const Packet& packet) {
-    if (sequence_number_set_.find(packet.sequence_number_) ==
-        sequence_number_set_.end()) {
+    if (sequence_number_set_[packet.ssrc].find(packet.sequence_number) ==
+        sequence_number_set_[packet.ssrc].end()) {
       // Don't insert duplicates.
       packet_list_.push_back(packet);
-      sequence_number_set_.insert(packet.sequence_number_);
+      sequence_number_set_[packet.ssrc].insert(packet.sequence_number);
     }
   }
 
  private:
   std::list<Packet> packet_list_;
-  std::set<uint16_t> sequence_number_set_;
+  std::map<uint32_t, std::set<uint16_t> > sequence_number_set_;
 };
 
 class IntervalBudget {
@@ -129,10 +136,10 @@
                          int min_bitrate_kbps)
     : clock_(clock),
       callback_(callback),
+      critsect_(CriticalSectionWrapper::CreateCriticalSection()),
       enabled_(true),
       paused_(false),
       max_queue_length_ms_(kDefaultMaxQueueLengthMs),
-      critsect_(CriticalSectionWrapper::CreateCriticalSection()),
       media_budget_(new paced_sender::IntervalBudget(max_bitrate_kbps)),
       padding_budget_(new paced_sender::IntervalBudget(min_bitrate_kbps)),
       time_last_update_(TickTime::Now()),
@@ -222,19 +229,19 @@
   int64_t now_ms = clock_->TimeInMilliseconds();
   int64_t oldest_packet_enqueue_time = now_ms;
   if (!high_priority_packets_->empty()) {
-    oldest_packet_enqueue_time = std::min(
-        oldest_packet_enqueue_time,
-        high_priority_packets_->front().enqueue_time_ms_);
+    oldest_packet_enqueue_time =
+        std::min(oldest_packet_enqueue_time,
+                 high_priority_packets_->front().enqueue_time_ms);
   }
   if (!normal_priority_packets_->empty()) {
-    oldest_packet_enqueue_time = std::min(
-        oldest_packet_enqueue_time,
-        normal_priority_packets_->front().enqueue_time_ms_);
+    oldest_packet_enqueue_time =
+        std::min(oldest_packet_enqueue_time,
+                 normal_priority_packets_->front().enqueue_time_ms);
   }
   if (!low_priority_packets_->empty()) {
-    oldest_packet_enqueue_time = std::min(
-        oldest_packet_enqueue_time,
-        low_priority_packets_->front().enqueue_time_ms_);
+    oldest_packet_enqueue_time =
+        std::min(oldest_packet_enqueue_time,
+                 low_priority_packets_->front().enqueue_time_ms);
   }
   return now_ms - oldest_packet_enqueue_time;
 }
@@ -291,10 +298,10 @@
   paced_sender::Packet packet = GetNextPacketFromList(packet_list);
   critsect_->Leave();
 
-  const bool success = callback_->TimeToSendPacket(packet.ssrc_,
-                                                   packet.sequence_number_,
-                                                   packet.capture_time_ms_,
-                                                   packet.retransmission_);
+  const bool success = callback_->TimeToSendPacket(packet.ssrc,
+                                                   packet.sequence_number,
+                                                   packet.capture_time_ms,
+                                                   packet.retransmission);
   critsect_->Enter();
   // If packet cannot be sent then keep it in packet list and exit early.
   // There's no need to send more packets.
@@ -302,15 +309,15 @@
     return false;
   }
   packet_list->pop_front();
-  const bool last_packet = packet_list->empty() ||
-      packet_list->front().capture_time_ms_ > packet.capture_time_ms_;
+  const bool last_packet =
+      packet_list->empty() ||
+      packet_list->front().capture_time_ms > packet.capture_time_ms;
   if (packet_list != high_priority_packets_.get()) {
-    if (packet.capture_time_ms_ > capture_time_ms_last_sent_) {
-      capture_time_ms_last_sent_ = packet.capture_time_ms_;
-    } else if (packet.capture_time_ms_ == capture_time_ms_last_sent_ &&
+    if (packet.capture_time_ms > capture_time_ms_last_sent_) {
+      capture_time_ms_last_sent_ = packet.capture_time_ms;
+    } else if (packet.capture_time_ms == capture_time_ms_last_sent_ &&
                last_packet) {
-      TRACE_EVENT_ASYNC_END0("webrtc_rtp", "PacedSend",
-          packet.capture_time_ms_);
+      TRACE_EVENT_ASYNC_END0("webrtc_rtp", "PacedSend", packet.capture_time_ms);
     }
   }
   return true;
@@ -344,12 +351,13 @@
       int64_t high_priority_capture_time = -1;
       if (!high_priority_packets_->empty()) {
         high_priority_capture_time =
-            high_priority_packets_->front().capture_time_ms_;
+            high_priority_packets_->front().capture_time_ms;
         *packet_list = high_priority_packets_.get();
       }
       if (!normal_priority_packets_->empty() &&
-          (high_priority_capture_time == -1 || high_priority_capture_time >
-          normal_priority_packets_->front().capture_time_ms_)) {
+          (high_priority_capture_time == -1 ||
+           high_priority_capture_time >
+               normal_priority_packets_->front().capture_time_ms)) {
         *packet_list = normal_priority_packets_.get();
       }
       if (*packet_list)
@@ -375,7 +383,7 @@
 paced_sender::Packet PacedSender::GetNextPacketFromList(
     paced_sender::PacketList* packets) {
   paced_sender::Packet packet = packets->front();
-  UpdateMediaBytesSent(packet.bytes_);
+  UpdateMediaBytesSent(packet.bytes);
   return packet;
 }
 
diff --git a/modules/pacing/paced_sender_unittest.cc b/modules/pacing/paced_sender_unittest.cc
index 39608b3..5518855 100644
--- a/modules/pacing/paced_sender_unittest.cc
+++ b/modules/pacing/paced_sender_unittest.cc
@@ -213,6 +213,30 @@
   send_bucket_->Process();
 }
 
+TEST_F(PacedSenderTest, CanQueuePacketsWithSameSequenceNumberOnDifferentSsrcs) {
+  uint32_t ssrc = 12345;
+  uint16_t sequence_number = 1234;
+
+  SendAndExpectPacket(PacedSender::kNormalPriority,
+                      ssrc,
+                      sequence_number,
+                      clock_.TimeInMilliseconds(),
+                      250,
+                      false);
+
+  // Expect packet on second ssrc to be queued and sent as well.
+  SendAndExpectPacket(PacedSender::kNormalPriority,
+                      ssrc + 1,
+                      sequence_number,
+                      clock_.TimeInMilliseconds(),
+                      250,
+                      false);
+
+  clock_.AdvanceTimeMilliseconds(1000);
+  TickTime::AdvanceFakeClock(1000);
+  send_bucket_->Process();
+}
+
 TEST_F(PacedSenderTest, Padding) {
   uint32_t ssrc = 12345;
   uint16_t sequence_number = 1234;