dcsctp: Implement Round Robin scheduler

Bug: webrtc:12793
Change-Id: I19adb292443def42ee54df67c4869b980db7b7c0
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/219682
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#34093}
diff --git a/net/dcsctp/tx/BUILD.gn b/net/dcsctp/tx/BUILD.gn
index 641c8a6..2f0b27a 100644
--- a/net/dcsctp/tx/BUILD.gn
+++ b/net/dcsctp/tx/BUILD.gn
@@ -127,6 +127,7 @@
       "../public:socket",
       "../public:types",
       "../testing:data_generator",
+      "../testing:testing_macros",
       "../timer",
     ]
     absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
diff --git a/net/dcsctp/tx/rr_send_queue.cc b/net/dcsctp/tx/rr_send_queue.cc
index f2d22c8..7f91339 100644
--- a/net/dcsctp/tx/rr_send_queue.cc
+++ b/net/dcsctp/tx/rr_send_queue.cc
@@ -11,8 +11,7 @@
 
 #include <cstdint>
 #include <deque>
-#include <unordered_map>
-#include <unordered_set>
+#include <map>
 #include <utility>
 #include <vector>
 
@@ -22,52 +21,16 @@
 #include "net/dcsctp/packet/data.h"
 #include "net/dcsctp/public/dcsctp_message.h"
 #include "net/dcsctp/public/dcsctp_socket.h"
+#include "net/dcsctp/public/types.h"
 #include "net/dcsctp/tx/send_queue.h"
 #include "rtc_base/logging.h"
 
 namespace dcsctp {
-void RRSendQueue::Add(TimeMs now,
-                      DcSctpMessage message,
-                      const SendOptions& send_options) {
-  RTC_DCHECK(!message.payload().empty());
-  std::deque<Item>& queue =
-      IsPaused(message.stream_id()) ? paused_items_ : items_;
-  // Any limited lifetime should start counting from now - when the message
-  // has been added to the queue.
-  absl::optional<TimeMs> expires_at = absl::nullopt;
-  if (send_options.lifetime.has_value()) {
-    // `expires_at` is the time when it expires. Which is slightly larger than
-    // the message's lifetime, as the message is alive during its entire
-    // lifetime (which may be zero).
-    expires_at = now + *send_options.lifetime + DurationMs(1);
-  }
-  queue.emplace_back(std::move(message), expires_at, send_options);
-}
 
-size_t RRSendQueue::total_bytes() const {
-  // TODO(boivie): Have the current size as a member variable, so that's it not
-  // calculated for every operation.
-  return absl::c_accumulate(items_, 0,
-                            [](size_t size, const Item& item) {
-                              return size + item.remaining_size;
-                            }) +
-         absl::c_accumulate(paused_items_, 0,
-                            [](size_t size, const Item& item) {
-                              return size + item.remaining_size;
-                            });
-}
-
-bool RRSendQueue::IsFull() const {
-  return total_bytes() >= buffer_size_;
-}
-
-bool RRSendQueue::IsEmpty() const {
-  return items_.empty();
-}
-
-RRSendQueue::Item* RRSendQueue::GetFirstNonExpiredMessage(TimeMs now) {
+RRSendQueue::OutgoingStream::Item*
+RRSendQueue::OutgoingStream::GetFirstNonExpiredMessage(TimeMs now) {
   while (!items_.empty()) {
-    RRSendQueue::Item& item = items_.front();
+    RRSendQueue::OutgoingStream::Item& item = items_.front();
     // An entire item can be discarded iff:
     // 1) It hasn't been partially sent (has been allocated a message_id).
     // 2) It has a non-negative expiry time.
@@ -75,9 +38,6 @@
     if (!item.message_id.has_value() && item.expires_at.has_value() &&
         *item.expires_at <= now) {
       // TODO(boivie): This should be reported to the client.
-      RTC_DLOG(LS_VERBOSE)
-          << log_prefix_
-          << "Message is expired before even partially sent - discarding";
       items_.pop_front();
       continue;
     }
@@ -87,35 +47,42 @@
   return nullptr;
 }
 
-absl::optional<SendQueue::DataToSend> RRSendQueue::Produce(TimeMs now,
-                                                           size_t max_size) {
+void RRSendQueue::OutgoingStream::Add(DcSctpMessage message,
+                                      absl::optional<TimeMs> expires_at,
+                                      const SendOptions& send_options) {
+  items_.emplace_back(std::move(message), expires_at, send_options);
+}
+
+absl::optional<SendQueue::DataToSend> RRSendQueue::OutgoingStream::Produce(
+    TimeMs now,
+    size_t max_size) {
   Item* item = GetFirstNonExpiredMessage(now);
   if (item == nullptr) {
     return absl::nullopt;
   }
 
+  // If a stream is paused, it will allow sending all partially sent messages
+  // but will not start sending new fragments of completely unsent messages.
+  if (is_paused_ && !item->message_id.has_value()) {
+    return absl::nullopt;
+  }
+
   DcSctpMessage& message = item->message;
 
-  // Don't make too small fragments as that can result in increased risk of
-  // failure to assemble a message if a small fragment is missing.
   if (item->remaining_size > max_size && max_size < kMinimumFragmentedPayload) {
-    RTC_DLOG(LS_VERBOSE) << log_prefix_ << "tx-msg: Will not fragment "
-                         << item->remaining_size << " bytes into buffer of "
-                         << max_size << " bytes";
     return absl::nullopt;
   }
 
   // Allocate Message ID and SSN when the first fragment is sent.
   if (!item->message_id.has_value()) {
     MID& mid =
-        mid_by_stream_id_[{item->send_options.unordered, message.stream_id()}];
+        item->send_options.unordered ? next_unordered_mid_ : next_ordered_mid_;
     item->message_id = mid;
     mid = MID(*mid + 1);
   }
   if (!item->send_options.unordered && !item->ssn.has_value()) {
-    SSN& ssn = ssn_by_stream_id_[message.stream_id()];
-    item->ssn = ssn;
-    ssn = SSN(*ssn + 1);
+    item->ssn = next_ssn_;
+    next_ssn_ = SSN(*next_ssn_ + 1);
   }
 
   // Grab the next `max_size` fragment from this message and calculate flags.
@@ -157,38 +124,39 @@
                item->message.payload().size());
     RTC_DCHECK(item->remaining_size > 0);
   }
-  RTC_DLOG(LS_VERBOSE) << log_prefix_ << "tx-msg: Producing chunk of "
-                       << chunk.data.size() << " bytes (max: " << max_size
-                       << ")";
   return chunk;
 }
 
-void RRSendQueue::Discard(IsUnordered unordered,
-                          StreamID stream_id,
-                          MID message_id) {
-  // As this method will only discard partially sent messages, and as the queue
-  // is a FIFO queue, the only partially sent message would be the topmost
-  // message.
+size_t RRSendQueue::OutgoingStream::buffered_amount() const {
+  size_t bytes = 0;
+  for (const auto& item : items_) {
+    bytes += item.remaining_size;
+  }
+  return bytes;
+}
+
+void RRSendQueue::OutgoingStream::Discard(IsUnordered unordered,
+                                          MID message_id) {
   if (!items_.empty()) {
     Item& item = items_.front();
     if (item.send_options.unordered == unordered &&
-        item.message.stream_id() == stream_id && item.message_id.has_value() &&
-        *item.message_id == message_id) {
+        item.message_id.has_value() && *item.message_id == message_id) {
       items_.pop_front();
     }
   }
 }
 
-void RRSendQueue::PrepareResetStreams(rtc::ArrayView<const StreamID> streams) {
-  for (StreamID stream_id : streams) {
-    paused_streams_.insert(stream_id);
-  }
+void RRSendQueue::OutgoingStream::Pause() {
+  is_paused_ = true;
 
-  // Will not discard partially sent messages - only whole messages. Partially
-  // delivered messages (at the time of receiving a Stream Reset command) will
-  // always deliver all the fragments before actually resetting the stream.
+  // A stream is pause when it's about to be reset. In this implementation,
+  // it will throw away all non-partially send messages. This is subject to
+  // change. It will however not discard any partially sent messages - only
+  // whole messages. Partially delivered messages (at the time of receiving a
+  // Stream Reset command) will always deliver all the fragments before actually
+  // resetting the stream.
   for (auto it = items_.begin(); it != items_.end();) {
-    if (IsPaused(it->message.stream_id()) && it->remaining_offset == 0) {
+    if (it->remaining_offset == 0) {
       it = items_.erase(it);
     } else {
       ++it;
@@ -196,37 +164,7 @@
   }
 }
 
-bool RRSendQueue::CanResetStreams() const {
-  for (auto& item : items_) {
-    if (IsPaused(item.message.stream_id())) {
-      return false;
-    }
-  }
-  return true;
-}
-
-void RRSendQueue::CommitResetStreams() {
-  for (StreamID stream_id : paused_streams_) {
-    ssn_by_stream_id_[stream_id] = SSN(0);
-    // https://tools.ietf.org/html/rfc8260#section-2.3.2
-    // "When an association resets the SSN using the SCTP extension defined
-    // in [RFC6525], the two counters (one for the ordered messages, one for
-    // the unordered messages) used for the MIDs MUST be reset to 0."
-    mid_by_stream_id_[{IsUnordered(false), stream_id}] = MID(0);
-    mid_by_stream_id_[{IsUnordered(true), stream_id}] = MID(0);
-  }
-  RollbackResetStreams();
-}
-
-void RRSendQueue::RollbackResetStreams() {
-  while (!paused_items_.empty()) {
-    items_.push_back(std::move(paused_items_.front()));
-    paused_items_.pop_front();
-  }
-  paused_streams_.clear();
-}
-
-void RRSendQueue::Reset() {
+void RRSendQueue::OutgoingStream::Reset() {
   if (!items_.empty()) {
     // If this message has been partially sent, reset it so that it will be
     // re-sent.
@@ -237,13 +175,141 @@
     item.ssn = absl::nullopt;
     item.current_fsn = FSN(0);
   }
-  RollbackResetStreams();
-  mid_by_stream_id_.clear();
-  ssn_by_stream_id_.clear();
+  is_paused_ = false;
+  next_ordered_mid_ = MID(0);
+  next_unordered_mid_ = MID(0);
+  next_ssn_ = SSN(0);
 }
 
-bool RRSendQueue::IsPaused(StreamID stream_id) const {
-  return paused_streams_.find(stream_id) != paused_streams_.end();
+bool RRSendQueue::OutgoingStream::has_partially_sent_message() const {
+  if (items_.empty()) {
+    return false;
+  }
+  return items_.front().message_id.has_value();
 }
 
+void RRSendQueue::Add(TimeMs now,
+                      DcSctpMessage message,
+                      const SendOptions& send_options) {
+  RTC_DCHECK(!message.payload().empty());
+  // Any limited lifetime should start counting from now - when the message
+  // has been added to the queue.
+  absl::optional<TimeMs> expires_at = absl::nullopt;
+  if (send_options.lifetime.has_value()) {
+    // `expires_at` is the time when it expires. Which is slightly larger than
+    // the message's lifetime, as the message is alive during its entire
+    // lifetime (which may be zero).
+    expires_at = now + *send_options.lifetime + DurationMs(1);
+  }
+  GetOrCreateStreamInfo(message.stream_id())
+      .Add(std::move(message), expires_at, send_options);
+}
+
+size_t RRSendQueue::total_bytes() const {
+  // TODO(boivie): Have the current size as a member variable, so that's it not
+  // calculated for every operation.
+  size_t bytes = 0;
+  for (const auto& stream : streams_) {
+    bytes += stream.second.buffered_amount();
+  }
+
+  return bytes;
+}
+
+bool RRSendQueue::IsFull() const {
+  return total_bytes() >= buffer_size_;
+}
+
+bool RRSendQueue::IsEmpty() const {
+  return total_bytes() == 0;
+}
+
+absl::optional<SendQueue::DataToSend> RRSendQueue::Produce(
+    std::map<StreamID, RRSendQueue::OutgoingStream>::iterator it,
+    TimeMs now,
+    size_t max_size) {
+  absl::optional<DataToSend> data = it->second.Produce(now, max_size);
+  if (data.has_value()) {
+    RTC_DLOG(LS_VERBOSE) << log_prefix_ << "tx-msg: Producing chunk of "
+                         << data->data.size() << " bytes (max: " << max_size
+                         << ")";
+
+    if (data->data.is_end) {
+      // No more fragments. Continue with the next stream next time.
+      next_stream_id_ = StreamID(*it->first + 1);
+    }
+  }
+
+  return data;
+}
+
+absl::optional<SendQueue::DataToSend> RRSendQueue::Produce(TimeMs now,
+                                                           size_t max_size) {
+  auto start_it = streams_.lower_bound(next_stream_id_);
+  for (auto it = start_it; it != streams_.end(); ++it) {
+    absl::optional<DataToSend> ret = Produce(it, now, max_size);
+    if (ret.has_value()) {
+      return ret;
+    }
+  }
+
+  for (auto it = streams_.begin(); it != start_it; ++it) {
+    absl::optional<DataToSend> ret = Produce(it, now, max_size);
+    if (ret.has_value()) {
+      return ret;
+    }
+  }
+  return absl::nullopt;
+}
+
+void RRSendQueue::Discard(IsUnordered unordered,
+                          StreamID stream_id,
+                          MID message_id) {
+  GetOrCreateStreamInfo(stream_id).Discard(unordered, message_id);
+}
+
+void RRSendQueue::PrepareResetStreams(rtc::ArrayView<const StreamID> streams) {
+  for (StreamID stream_id : streams) {
+    GetOrCreateStreamInfo(stream_id).Pause();
+  }
+}
+
+bool RRSendQueue::CanResetStreams() const {
+  // Streams can be reset if those streams that are paused don't have any
+  // messages that are partially sent.
+  for (auto& stream : streams_) {
+    if (stream.second.is_paused() &&
+        stream.second.has_partially_sent_message()) {
+      return false;
+    }
+  }
+  return true;
+}
+
+void RRSendQueue::CommitResetStreams() {
+  Reset();
+}
+
+void RRSendQueue::RollbackResetStreams() {
+  for (auto& stream_entry : streams_) {
+    stream_entry.second.Resume();
+  }
+}
+
+void RRSendQueue::Reset() {
+  for (auto& stream_entry : streams_) {
+    OutgoingStream& stream = stream_entry.second;
+    stream.Reset();
+  }
+}
+
+RRSendQueue::OutgoingStream& RRSendQueue::GetOrCreateStreamInfo(
+    StreamID stream_id) {
+  auto it = streams_.find(stream_id);
+  if (it != streams_.end()) {
+    return it->second;
+  }
+
+  return streams_.emplace(stream_id, OutgoingStream()).first->second;
+}
 }  // namespace dcsctp
diff --git a/net/dcsctp/tx/rr_send_queue.h b/net/dcsctp/tx/rr_send_queue.h
index c43dc91..abbe702 100644
--- a/net/dcsctp/tx/rr_send_queue.h
+++ b/net/dcsctp/tx/rr_send_queue.h
@@ -12,9 +12,8 @@
 
 #include <cstdint>
 #include <deque>
+#include <map>
 #include <string>
-#include <unordered_map>
-#include <unordered_set>
 #include <utility>
 
 #include "absl/algorithm/container.h"
@@ -78,44 +77,91 @@
   size_t total_bytes() const;
 
  private:
-  // An enqueued message and metadata.
-  struct Item {
-    explicit Item(DcSctpMessage msg,
-                  absl::optional<TimeMs> expires_at,
-                  const SendOptions& send_options)
-        : message(std::move(msg)),
-          expires_at(expires_at),
-          send_options(send_options),
-          remaining_offset(0),
-          remaining_size(message.payload().size()) {}
-    DcSctpMessage message;
-    absl::optional<TimeMs> expires_at;
-    SendOptions send_options;
-    // The remaining payload (offset and size) to be sent, when it has been
-    // fragmented.
-    size_t remaining_offset;
-    size_t remaining_size;
-    // If set, an allocated Message ID and SSN. Will be allocated when the first
-    // fragment is sent.
-    absl::optional<MID> message_id = absl::nullopt;
-    absl::optional<SSN> ssn = absl::nullopt;
-    // The current Fragment Sequence Number, incremented for each fragment.
-    FSN current_fsn = FSN(0);
+  // Per-stream information.
+  class OutgoingStream {
+   public:
+    // Enqueues a message to this stream.
+    void Add(DcSctpMessage message,
+             absl::optional<TimeMs> expires_at,
+             const SendOptions& send_options);
+
+    // Possibly produces a data chunk to send.
+    absl::optional<DataToSend> Produce(TimeMs now, size_t max_size);
+
+    // The amount of data enqueued on this stream.
+    size_t buffered_amount() const;
+
+    // Discards a partially sent message, see `SendQueue::Discard`.
+    void Discard(IsUnordered unordered, MID message_id);
+
+    // Pauses this stream, which is used before resetting it.
+    void Pause();
+
+    // Resumes a paused stream.
+    void Resume() { is_paused_ = false; }
+
+    bool is_paused() const { return is_paused_; }
+
+    // Resets this stream, meaning MIDs and SSNs are set to zero.
+    void Reset();
+
+    // Indicates if this stream has a partially sent message in it.
+    bool has_partially_sent_message() const;
+
+   private:
+    // An enqueued message and metadata.
+    struct Item {
+      explicit Item(DcSctpMessage msg,
+                    absl::optional<TimeMs> expires_at,
+                    const SendOptions& send_options)
+          : message(std::move(msg)),
+            expires_at(expires_at),
+            send_options(send_options),
+            remaining_offset(0),
+            remaining_size(message.payload().size()) {}
+      DcSctpMessage message;
+      absl::optional<TimeMs> expires_at;
+      SendOptions send_options;
+      // The remaining payload (offset and size) to be sent, when it has been
+      // fragmented.
+      size_t remaining_offset;
+      size_t remaining_size;
+      // If set, an allocated Message ID and SSN. Will be allocated when the
+      // first fragment is sent.
+      absl::optional<MID> message_id = absl::nullopt;
+      absl::optional<SSN> ssn = absl::nullopt;
+      // The current Fragment Sequence Number, incremented for each fragment.
+      FSN current_fsn = FSN(0);
+    };
+
+    // Returns the first non-expired message, or nullptr if there isn't one.
+    Item* GetFirstNonExpiredMessage(TimeMs now);
+
+    // Streams are pause when they are about to be reset.
+    bool is_paused_ = false;
+    // MIDs are different for unordered and ordered messages sent on a stream.
+    MID next_unordered_mid_ = MID(0);
+    MID next_ordered_mid_ = MID(0);
+
+    SSN next_ssn_ = SSN(0);
+    // Enqueued messages, and metadata.
+    std::deque<Item> items_;
   };
 
-  Item* GetFirstNonExpiredMessage(TimeMs now);
-  bool IsPaused(StreamID stream_id) const;
+  OutgoingStream& GetOrCreateStreamInfo(StreamID stream_id);
+  absl::optional<DataToSend> Produce(
+      std::map<StreamID, OutgoingStream>::iterator it,
+      TimeMs now,
+      size_t max_size);
 
   const std::string log_prefix_;
   const size_t buffer_size_;
-  std::deque<Item> items_;
 
-  std::unordered_set<StreamID, StreamID::Hasher> paused_streams_;
-  std::deque<Item> paused_items_;
+  // The next stream to send chunks from.
+  StreamID next_stream_id_ = StreamID(0);
 
-  std::unordered_map<std::pair<IsUnordered, StreamID>, MID, UnorderedStreamHash>
-      mid_by_stream_id_;
-  std::unordered_map<StreamID, SSN, StreamID::Hasher> ssn_by_stream_id_;
+  // All streams, and messages added to those.
+  std::map<StreamID, OutgoingStream> streams_;
 };
 }  // namespace dcsctp
 
diff --git a/net/dcsctp/tx/rr_send_queue_test.cc b/net/dcsctp/tx/rr_send_queue_test.cc
index 0f6fd2b..5e99bb4 100644
--- a/net/dcsctp/tx/rr_send_queue_test.cc
+++ b/net/dcsctp/tx/rr_send_queue_test.cc
@@ -18,20 +18,25 @@
 #include "net/dcsctp/public/dcsctp_options.h"
 #include "net/dcsctp/public/dcsctp_socket.h"
 #include "net/dcsctp/public/types.h"
+#include "net/dcsctp/testing/testing_macros.h"
 #include "net/dcsctp/tx/send_queue.h"
 #include "rtc_base/gunit.h"
 #include "test/gmock.h"
 
 namespace dcsctp {
 namespace {
+using ::testing::SizeIs;
 
 constexpr TimeMs kNow = TimeMs(0);
 constexpr StreamID kStreamID(1);
 constexpr PPID kPPID(53);
+constexpr size_t kMaxQueueSize = 1000;
+constexpr size_t kOneFragmentPacketSize = 100;
+constexpr size_t kTwoFragmentPacketSize = 101;
 
 class RRSendQueueTest : public testing::Test {
  protected:
-  RRSendQueueTest() : buf_("log: ", 100) {}
+  RRSendQueueTest() : buf_("log: ", kMaxQueueSize) {}
 
   const DcSctpOptions options_;
   RRSendQueue buf_;
@@ -39,7 +44,7 @@
 
 TEST_F(RRSendQueueTest, EmptyBuffer) {
   EXPECT_TRUE(buf_.IsEmpty());
-  EXPECT_FALSE(buf_.Produce(kNow, 100).has_value());
+  EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value());
   EXPECT_FALSE(buf_.IsFull());
 }
 
@@ -48,7 +53,8 @@
 
   EXPECT_FALSE(buf_.IsEmpty());
   EXPECT_FALSE(buf_.IsFull());
-  absl::optional<SendQueue::DataToSend> chunk_opt = buf_.Produce(kNow, 100);
+  absl::optional<SendQueue::DataToSend> chunk_opt =
+      buf_.Produce(kNow, kOneFragmentPacketSize);
   ASSERT_TRUE(chunk_opt.has_value());
   EXPECT_TRUE(chunk_opt->data.is_beginning);
   EXPECT_TRUE(chunk_opt->data.is_end);
@@ -76,7 +82,7 @@
   EXPECT_FALSE(chunk_end->data.is_beginning);
   EXPECT_TRUE(chunk_end->data.is_end);
 
-  EXPECT_FALSE(buf_.Produce(kNow, 100).has_value());
+  EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value());
 }
 
 TEST_F(RRSendQueueTest, GetChunksFromTwoMessages) {
@@ -84,14 +90,16 @@
   buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
   buf_.Add(kNow, DcSctpMessage(StreamID(3), PPID(54), payload));
 
-  absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 100);
+  absl::optional<SendQueue::DataToSend> chunk_one =
+      buf_.Produce(kNow, kOneFragmentPacketSize);
   ASSERT_TRUE(chunk_one.has_value());
   EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
   EXPECT_EQ(chunk_one->data.ppid, kPPID);
   EXPECT_TRUE(chunk_one->data.is_beginning);
   EXPECT_TRUE(chunk_one->data.is_end);
 
-  absl::optional<SendQueue::DataToSend> chunk_two = buf_.Produce(kNow, 100);
+  absl::optional<SendQueue::DataToSend> chunk_two =
+      buf_.Produce(kNow, kOneFragmentPacketSize);
   ASSERT_TRUE(chunk_two.has_value());
   EXPECT_EQ(chunk_two->data.stream_id, StreamID(3));
   EXPECT_EQ(chunk_two->data.ppid, PPID(54));
@@ -100,7 +108,7 @@
 }
 
 TEST_F(RRSendQueueTest, BufferBecomesFullAndEmptied) {
-  std::vector<uint8_t> payload(60);
+  std::vector<uint8_t> payload(600);
   EXPECT_FALSE(buf_.IsFull());
   buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
   EXPECT_FALSE(buf_.IsFull());
@@ -112,14 +120,14 @@
   buf_.Add(kNow, DcSctpMessage(StreamID(5), PPID(55), payload));
   EXPECT_TRUE(buf_.IsFull());
 
-  absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 100);
+  absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 1000);
   ASSERT_TRUE(chunk_one.has_value());
   EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
   EXPECT_EQ(chunk_one->data.ppid, kPPID);
 
   EXPECT_TRUE(buf_.IsFull());
 
-  absl::optional<SendQueue::DataToSend> chunk_two = buf_.Produce(kNow, 100);
+  absl::optional<SendQueue::DataToSend> chunk_two = buf_.Produce(kNow, 1000);
   ASSERT_TRUE(chunk_two.has_value());
   EXPECT_EQ(chunk_two->data.stream_id, StreamID(3));
   EXPECT_EQ(chunk_two->data.ppid, PPID(54));
@@ -127,7 +135,7 @@
   EXPECT_FALSE(buf_.IsFull());
   EXPECT_FALSE(buf_.IsEmpty());
 
-  absl::optional<SendQueue::DataToSend> chunk_three = buf_.Produce(kNow, 100);
+  absl::optional<SendQueue::DataToSend> chunk_three = buf_.Produce(kNow, 1000);
   ASSERT_TRUE(chunk_three.has_value());
   EXPECT_EQ(chunk_three->data.stream_id, StreamID(5));
   EXPECT_EQ(chunk_three->data.ppid, PPID(55));
@@ -171,7 +179,7 @@
   // Default is ordered
   buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
   absl::optional<SendQueue::DataToSend> chunk_one =
-      buf_.Produce(kNow, /*max_size=*/100);
+      buf_.Produce(kNow, kOneFragmentPacketSize);
   ASSERT_TRUE(chunk_one.has_value());
   EXPECT_FALSE(chunk_one->data.is_unordered);
 
@@ -180,7 +188,7 @@
   opts.unordered = IsUnordered(true);
   buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload), opts);
   absl::optional<SendQueue::DataToSend> chunk_two =
-      buf_.Produce(kNow, /*max_size=*/100);
+      buf_.Produce(kNow, kOneFragmentPacketSize);
   ASSERT_TRUE(chunk_two.has_value());
   EXPECT_TRUE(chunk_two->data.is_unordered);
 }
@@ -192,7 +200,7 @@
   TimeMs now = kNow;
   buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload));
   now += DurationMs(1000000);
-  ASSERT_TRUE(buf_.Produce(now, 100));
+  ASSERT_TRUE(buf_.Produce(now, kOneFragmentPacketSize));
 
   SendOptions expires_2_seconds;
   expires_2_seconds.lifetime = DurationMs(2000);
@@ -200,17 +208,17 @@
   // Add and consume within lifetime
   buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds);
   now += DurationMs(2000);
-  ASSERT_TRUE(buf_.Produce(now, 100));
+  ASSERT_TRUE(buf_.Produce(now, kOneFragmentPacketSize));
 
   // Add and consume just outside lifetime
   buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds);
   now += DurationMs(2001);
-  ASSERT_FALSE(buf_.Produce(now, 100));
+  ASSERT_FALSE(buf_.Produce(now, kOneFragmentPacketSize));
 
   // A long time after expiry
   buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds);
   now += DurationMs(1000000);
-  ASSERT_FALSE(buf_.Produce(now, 100));
+  ASSERT_FALSE(buf_.Produce(now, kOneFragmentPacketSize));
 
   // Expire one message, but produce the second that is not expired.
   buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds);
@@ -221,8 +229,8 @@
   buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_4_seconds);
   now += DurationMs(2001);
 
-  ASSERT_TRUE(buf_.Produce(now, 100));
-  ASSERT_FALSE(buf_.Produce(now, 100));
+  ASSERT_TRUE(buf_.Produce(now, kOneFragmentPacketSize));
+  ASSERT_FALSE(buf_.Produce(now, kOneFragmentPacketSize));
 }
 
 TEST_F(RRSendQueueTest, DiscardPartialPackets) {
@@ -231,28 +239,31 @@
   buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
   buf_.Add(kNow, DcSctpMessage(StreamID(2), PPID(54), payload));
 
-  absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 100);
+  absl::optional<SendQueue::DataToSend> chunk_one =
+      buf_.Produce(kNow, kOneFragmentPacketSize);
   ASSERT_TRUE(chunk_one.has_value());
   EXPECT_FALSE(chunk_one->data.is_end);
   EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
   buf_.Discard(IsUnordered(false), chunk_one->data.stream_id,
                chunk_one->data.message_id);
 
-  absl::optional<SendQueue::DataToSend> chunk_two = buf_.Produce(kNow, 100);
+  absl::optional<SendQueue::DataToSend> chunk_two =
+      buf_.Produce(kNow, kOneFragmentPacketSize);
   ASSERT_TRUE(chunk_two.has_value());
   EXPECT_FALSE(chunk_two->data.is_end);
   EXPECT_EQ(chunk_two->data.stream_id, StreamID(2));
 
-  absl::optional<SendQueue::DataToSend> chunk_three = buf_.Produce(kNow, 100);
+  absl::optional<SendQueue::DataToSend> chunk_three =
+      buf_.Produce(kNow, kOneFragmentPacketSize);
   ASSERT_TRUE(chunk_three.has_value());
   EXPECT_TRUE(chunk_three->data.is_end);
   EXPECT_EQ(chunk_three->data.stream_id, StreamID(2));
-  ASSERT_FALSE(buf_.Produce(kNow, 100));
+  ASSERT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize));
 
   // Calling it again shouldn't cause issues.
   buf_.Discard(IsUnordered(false), chunk_one->data.stream_id,
                chunk_one->data.message_id);
-  ASSERT_FALSE(buf_.Produce(kNow, 100));
+  ASSERT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize));
 }
 
 TEST_F(RRSendQueueTest, PrepareResetStreamsDiscardsStream) {
@@ -292,7 +303,7 @@
   buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
   EXPECT_EQ(buf_.total_bytes(), payload.size());
 
-  EXPECT_FALSE(buf_.Produce(kNow, 100).has_value());
+  EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value());
   buf_.CommitResetStreams();
   EXPECT_EQ(buf_.total_bytes(), payload.size());
 
@@ -308,11 +319,13 @@
   buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
   buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
 
-  absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 100);
+  absl::optional<SendQueue::DataToSend> chunk_one =
+      buf_.Produce(kNow, kOneFragmentPacketSize);
   ASSERT_TRUE(chunk_one.has_value());
   EXPECT_EQ(chunk_one->data.ssn, SSN(0));
 
-  absl::optional<SendQueue::DataToSend> chunk_two = buf_.Produce(kNow, 100);
+  absl::optional<SendQueue::DataToSend> chunk_two =
+      buf_.Produce(kNow, kOneFragmentPacketSize);
   ASSERT_TRUE(chunk_two.has_value());
   EXPECT_EQ(chunk_two->data.ssn, SSN(1));
 
@@ -325,7 +338,8 @@
   EXPECT_TRUE(buf_.CanResetStreams());
   buf_.CommitResetStreams();
 
-  absl::optional<SendQueue::DataToSend> chunk_three = buf_.Produce(kNow, 100);
+  absl::optional<SendQueue::DataToSend> chunk_three =
+      buf_.Produce(kNow, kOneFragmentPacketSize);
   ASSERT_TRUE(chunk_three.has_value());
   EXPECT_EQ(chunk_three->data.ssn, SSN(0));
 }
@@ -336,11 +350,13 @@
   buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
   buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
 
-  absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 100);
+  absl::optional<SendQueue::DataToSend> chunk_one =
+      buf_.Produce(kNow, kOneFragmentPacketSize);
   ASSERT_TRUE(chunk_one.has_value());
   EXPECT_EQ(chunk_one->data.ssn, SSN(0));
 
-  absl::optional<SendQueue::DataToSend> chunk_two = buf_.Produce(kNow, 100);
+  absl::optional<SendQueue::DataToSend> chunk_two =
+      buf_.Produce(kNow, kOneFragmentPacketSize);
   ASSERT_TRUE(chunk_two.has_value());
   EXPECT_EQ(chunk_two->data.ssn, SSN(1));
 
@@ -352,10 +368,111 @@
   EXPECT_TRUE(buf_.CanResetStreams());
   buf_.RollbackResetStreams();
 
-  absl::optional<SendQueue::DataToSend> chunk_three = buf_.Produce(kNow, 100);
+  absl::optional<SendQueue::DataToSend> chunk_three =
+      buf_.Produce(kNow, kOneFragmentPacketSize);
   ASSERT_TRUE(chunk_three.has_value());
   EXPECT_EQ(chunk_three->data.ssn, SSN(2));
 }
 
+TEST_F(RRSendQueueTest, ReturnsFragmentsForOneMessageBeforeMovingToNext) {
+  std::vector<uint8_t> payload(200);
+  buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload));
+  buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, payload));
+
+  ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
+                              buf_.Produce(kNow, kOneFragmentPacketSize));
+  EXPECT_EQ(chunk1.data.stream_id, StreamID(1));
+
+  ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
+                              buf_.Produce(kNow, kOneFragmentPacketSize));
+  EXPECT_EQ(chunk2.data.stream_id, StreamID(1));
+
+  ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3,
+                              buf_.Produce(kNow, kOneFragmentPacketSize));
+  EXPECT_EQ(chunk3.data.stream_id, StreamID(2));
+
+  ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk4,
+                              buf_.Produce(kNow, kOneFragmentPacketSize));
+  EXPECT_EQ(chunk4.data.stream_id, StreamID(2));
+}
+
+TEST_F(RRSendQueueTest, ReturnsAlsoSmallFragmentsBeforeMovingToNext) {
+  std::vector<uint8_t> payload(kTwoFragmentPacketSize);
+  buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload));
+  buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, payload));
+
+  ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
+                              buf_.Produce(kNow, kOneFragmentPacketSize));
+  EXPECT_EQ(chunk1.data.stream_id, StreamID(1));
+  EXPECT_THAT(chunk1.data.payload, SizeIs(kOneFragmentPacketSize));
+
+  ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
+                              buf_.Produce(kNow, kOneFragmentPacketSize));
+  EXPECT_EQ(chunk2.data.stream_id, StreamID(1));
+  EXPECT_THAT(chunk2.data.payload,
+              SizeIs(kTwoFragmentPacketSize - kOneFragmentPacketSize));
+
+  ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3,
+                              buf_.Produce(kNow, kOneFragmentPacketSize));
+  EXPECT_EQ(chunk3.data.stream_id, StreamID(2));
+  EXPECT_THAT(chunk3.data.payload, SizeIs(kOneFragmentPacketSize));
+
+  ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk4,
+                              buf_.Produce(kNow, kOneFragmentPacketSize));
+  EXPECT_EQ(chunk4.data.stream_id, StreamID(2));
+  EXPECT_THAT(chunk4.data.payload,
+              SizeIs(kTwoFragmentPacketSize - kOneFragmentPacketSize));
+}
+
+TEST_F(RRSendQueueTest, WillCycleInRoundRobinFashionBetweenStreams) {
+  buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(1)));
+  buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(2)));
+  buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, std::vector<uint8_t>(3)));
+  buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, std::vector<uint8_t>(4)));
+  buf_.Add(kNow, DcSctpMessage(StreamID(3), kPPID, std::vector<uint8_t>(5)));
+  buf_.Add(kNow, DcSctpMessage(StreamID(3), kPPID, std::vector<uint8_t>(6)));
+  buf_.Add(kNow, DcSctpMessage(StreamID(4), kPPID, std::vector<uint8_t>(7)));
+  buf_.Add(kNow, DcSctpMessage(StreamID(4), kPPID, std::vector<uint8_t>(8)));
+
+  ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
+                              buf_.Produce(kNow, kOneFragmentPacketSize));
+  EXPECT_EQ(chunk1.data.stream_id, StreamID(1));
+  EXPECT_THAT(chunk1.data.payload, SizeIs(1));
+
+  ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
+                              buf_.Produce(kNow, kOneFragmentPacketSize));
+  EXPECT_EQ(chunk2.data.stream_id, StreamID(2));
+  EXPECT_THAT(chunk2.data.payload, SizeIs(3));
+
+  ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3,
+                              buf_.Produce(kNow, kOneFragmentPacketSize));
+  EXPECT_EQ(chunk3.data.stream_id, StreamID(3));
+  EXPECT_THAT(chunk3.data.payload, SizeIs(5));
+
+  ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk4,
+                              buf_.Produce(kNow, kOneFragmentPacketSize));
+  EXPECT_EQ(chunk4.data.stream_id, StreamID(4));
+  EXPECT_THAT(chunk4.data.payload, SizeIs(7));
+
+  ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk5,
+                              buf_.Produce(kNow, kOneFragmentPacketSize));
+  EXPECT_EQ(chunk5.data.stream_id, StreamID(1));
+  EXPECT_THAT(chunk5.data.payload, SizeIs(2));
+
+  ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk6,
+                              buf_.Produce(kNow, kOneFragmentPacketSize));
+  EXPECT_EQ(chunk6.data.stream_id, StreamID(2));
+  EXPECT_THAT(chunk6.data.payload, SizeIs(4));
+
+  ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk7,
+                              buf_.Produce(kNow, kOneFragmentPacketSize));
+  EXPECT_EQ(chunk7.data.stream_id, StreamID(3));
+  EXPECT_THAT(chunk7.data.payload, SizeIs(6));
+
+  ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk8,
+                              buf_.Produce(kNow, kOneFragmentPacketSize));
+  EXPECT_EQ(chunk8.data.stream_id, StreamID(4));
+  EXPECT_THAT(chunk8.data.payload, SizeIs(8));
+}
 }  // namespace
 }  // namespace dcsctp