dcsctp: Implement Round Robin scheduler
Bug: webrtc:12793
Change-Id: I19adb292443def42ee54df67c4869b980db7b7c0
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/219682
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#34093}
diff --git a/net/dcsctp/tx/BUILD.gn b/net/dcsctp/tx/BUILD.gn
index 641c8a6..2f0b27a 100644
--- a/net/dcsctp/tx/BUILD.gn
+++ b/net/dcsctp/tx/BUILD.gn
@@ -127,6 +127,7 @@
"../public:socket",
"../public:types",
"../testing:data_generator",
+ "../testing:testing_macros",
"../timer",
]
absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
diff --git a/net/dcsctp/tx/rr_send_queue.cc b/net/dcsctp/tx/rr_send_queue.cc
index f2d22c8..7f91339 100644
--- a/net/dcsctp/tx/rr_send_queue.cc
+++ b/net/dcsctp/tx/rr_send_queue.cc
@@ -11,8 +11,7 @@
#include <cstdint>
#include <deque>
-#include <unordered_map>
-#include <unordered_set>
+#include <map>
#include <utility>
#include <vector>
@@ -22,52 +21,16 @@
#include "net/dcsctp/packet/data.h"
#include "net/dcsctp/public/dcsctp_message.h"
#include "net/dcsctp/public/dcsctp_socket.h"
+#include "net/dcsctp/public/types.h"
#include "net/dcsctp/tx/send_queue.h"
#include "rtc_base/logging.h"
namespace dcsctp {
-void RRSendQueue::Add(TimeMs now,
- DcSctpMessage message,
- const SendOptions& send_options) {
- RTC_DCHECK(!message.payload().empty());
- std::deque<Item>& queue =
- IsPaused(message.stream_id()) ? paused_items_ : items_;
- // Any limited lifetime should start counting from now - when the message
- // has been added to the queue.
- absl::optional<TimeMs> expires_at = absl::nullopt;
- if (send_options.lifetime.has_value()) {
- // `expires_at` is the time when it expires. Which is slightly larger than
- // the message's lifetime, as the message is alive during its entire
- // lifetime (which may be zero).
- expires_at = now + *send_options.lifetime + DurationMs(1);
- }
- queue.emplace_back(std::move(message), expires_at, send_options);
-}
-size_t RRSendQueue::total_bytes() const {
- // TODO(boivie): Have the current size as a member variable, so that's it not
- // calculated for every operation.
- return absl::c_accumulate(items_, 0,
- [](size_t size, const Item& item) {
- return size + item.remaining_size;
- }) +
- absl::c_accumulate(paused_items_, 0,
- [](size_t size, const Item& item) {
- return size + item.remaining_size;
- });
-}
-
-bool RRSendQueue::IsFull() const {
- return total_bytes() >= buffer_size_;
-}
-
-bool RRSendQueue::IsEmpty() const {
- return items_.empty();
-}
-
-RRSendQueue::Item* RRSendQueue::GetFirstNonExpiredMessage(TimeMs now) {
+RRSendQueue::OutgoingStream::Item*
+RRSendQueue::OutgoingStream::GetFirstNonExpiredMessage(TimeMs now) {
while (!items_.empty()) {
- RRSendQueue::Item& item = items_.front();
+ RRSendQueue::OutgoingStream::Item& item = items_.front();
// An entire item can be discarded iff:
// 1) It hasn't been partially sent (has been allocated a message_id).
// 2) It has a non-negative expiry time.
@@ -75,9 +38,6 @@
if (!item.message_id.has_value() && item.expires_at.has_value() &&
*item.expires_at <= now) {
// TODO(boivie): This should be reported to the client.
- RTC_DLOG(LS_VERBOSE)
- << log_prefix_
- << "Message is expired before even partially sent - discarding";
items_.pop_front();
continue;
}
@@ -87,35 +47,42 @@
return nullptr;
}
-absl::optional<SendQueue::DataToSend> RRSendQueue::Produce(TimeMs now,
- size_t max_size) {
+void RRSendQueue::OutgoingStream::Add(DcSctpMessage message,
+ absl::optional<TimeMs> expires_at,
+ const SendOptions& send_options) {
+ items_.emplace_back(std::move(message), expires_at, send_options);
+}
+
+absl::optional<SendQueue::DataToSend> RRSendQueue::OutgoingStream::Produce(
+ TimeMs now,
+ size_t max_size) {
Item* item = GetFirstNonExpiredMessage(now);
if (item == nullptr) {
return absl::nullopt;
}
+ // If a stream is paused, it will allow sending all partially sent messages
+ // but will not start sending new fragments of completely unsent messages.
+ if (is_paused_ && !item->message_id.has_value()) {
+ return absl::nullopt;
+ }
+
DcSctpMessage& message = item->message;
- // Don't make too small fragments as that can result in increased risk of
- // failure to assemble a message if a small fragment is missing.
if (item->remaining_size > max_size && max_size < kMinimumFragmentedPayload) {
- RTC_DLOG(LS_VERBOSE) << log_prefix_ << "tx-msg: Will not fragment "
- << item->remaining_size << " bytes into buffer of "
- << max_size << " bytes";
return absl::nullopt;
}
// Allocate Message ID and SSN when the first fragment is sent.
if (!item->message_id.has_value()) {
MID& mid =
- mid_by_stream_id_[{item->send_options.unordered, message.stream_id()}];
+ item->send_options.unordered ? next_unordered_mid_ : next_ordered_mid_;
item->message_id = mid;
mid = MID(*mid + 1);
}
if (!item->send_options.unordered && !item->ssn.has_value()) {
- SSN& ssn = ssn_by_stream_id_[message.stream_id()];
- item->ssn = ssn;
- ssn = SSN(*ssn + 1);
+ item->ssn = next_ssn_;
+ next_ssn_ = SSN(*next_ssn_ + 1);
}
// Grab the next `max_size` fragment from this message and calculate flags.
@@ -157,38 +124,39 @@
item->message.payload().size());
RTC_DCHECK(item->remaining_size > 0);
}
- RTC_DLOG(LS_VERBOSE) << log_prefix_ << "tx-msg: Producing chunk of "
- << chunk.data.size() << " bytes (max: " << max_size
- << ")";
return chunk;
}
-void RRSendQueue::Discard(IsUnordered unordered,
- StreamID stream_id,
- MID message_id) {
- // As this method will only discard partially sent messages, and as the queue
- // is a FIFO queue, the only partially sent message would be the topmost
- // message.
+size_t RRSendQueue::OutgoingStream::buffered_amount() const {
+ size_t bytes = 0;
+ for (const auto& item : items_) {
+ bytes += item.remaining_size;
+ }
+ return bytes;
+}
+
+void RRSendQueue::OutgoingStream::Discard(IsUnordered unordered,
+ MID message_id) {
if (!items_.empty()) {
Item& item = items_.front();
if (item.send_options.unordered == unordered &&
- item.message.stream_id() == stream_id && item.message_id.has_value() &&
- *item.message_id == message_id) {
+ item.message_id.has_value() && *item.message_id == message_id) {
items_.pop_front();
}
}
}
-void RRSendQueue::PrepareResetStreams(rtc::ArrayView<const StreamID> streams) {
- for (StreamID stream_id : streams) {
- paused_streams_.insert(stream_id);
- }
+void RRSendQueue::OutgoingStream::Pause() {
+ is_paused_ = true;
- // Will not discard partially sent messages - only whole messages. Partially
- // delivered messages (at the time of receiving a Stream Reset command) will
- // always deliver all the fragments before actually resetting the stream.
+ // A stream is pause when it's about to be reset. In this implementation,
+ // it will throw away all non-partially send messages. This is subject to
+ // change. It will however not discard any partially sent messages - only
+ // whole messages. Partially delivered messages (at the time of receiving a
+ // Stream Reset command) will always deliver all the fragments before actually
+ // resetting the stream.
for (auto it = items_.begin(); it != items_.end();) {
- if (IsPaused(it->message.stream_id()) && it->remaining_offset == 0) {
+ if (it->remaining_offset == 0) {
it = items_.erase(it);
} else {
++it;
@@ -196,37 +164,7 @@
}
}
-bool RRSendQueue::CanResetStreams() const {
- for (auto& item : items_) {
- if (IsPaused(item.message.stream_id())) {
- return false;
- }
- }
- return true;
-}
-
-void RRSendQueue::CommitResetStreams() {
- for (StreamID stream_id : paused_streams_) {
- ssn_by_stream_id_[stream_id] = SSN(0);
- // https://tools.ietf.org/html/rfc8260#section-2.3.2
- // "When an association resets the SSN using the SCTP extension defined
- // in [RFC6525], the two counters (one for the ordered messages, one for
- // the unordered messages) used for the MIDs MUST be reset to 0."
- mid_by_stream_id_[{IsUnordered(false), stream_id}] = MID(0);
- mid_by_stream_id_[{IsUnordered(true), stream_id}] = MID(0);
- }
- RollbackResetStreams();
-}
-
-void RRSendQueue::RollbackResetStreams() {
- while (!paused_items_.empty()) {
- items_.push_back(std::move(paused_items_.front()));
- paused_items_.pop_front();
- }
- paused_streams_.clear();
-}
-
-void RRSendQueue::Reset() {
+void RRSendQueue::OutgoingStream::Reset() {
if (!items_.empty()) {
// If this message has been partially sent, reset it so that it will be
// re-sent.
@@ -237,13 +175,141 @@
item.ssn = absl::nullopt;
item.current_fsn = FSN(0);
}
- RollbackResetStreams();
- mid_by_stream_id_.clear();
- ssn_by_stream_id_.clear();
+ is_paused_ = false;
+ next_ordered_mid_ = MID(0);
+ next_unordered_mid_ = MID(0);
+ next_ssn_ = SSN(0);
}
-bool RRSendQueue::IsPaused(StreamID stream_id) const {
- return paused_streams_.find(stream_id) != paused_streams_.end();
+bool RRSendQueue::OutgoingStream::has_partially_sent_message() const {
+ if (items_.empty()) {
+ return false;
+ }
+ return items_.front().message_id.has_value();
}
+void RRSendQueue::Add(TimeMs now,
+ DcSctpMessage message,
+ const SendOptions& send_options) {
+ RTC_DCHECK(!message.payload().empty());
+ // Any limited lifetime should start counting from now - when the message
+ // has been added to the queue.
+ absl::optional<TimeMs> expires_at = absl::nullopt;
+ if (send_options.lifetime.has_value()) {
+ // `expires_at` is the time when it expires. Which is slightly larger than
+ // the message's lifetime, as the message is alive during its entire
+ // lifetime (which may be zero).
+ expires_at = now + *send_options.lifetime + DurationMs(1);
+ }
+ GetOrCreateStreamInfo(message.stream_id())
+ .Add(std::move(message), expires_at, send_options);
+}
+
+size_t RRSendQueue::total_bytes() const {
+ // TODO(boivie): Have the current size as a member variable, so that's it not
+ // calculated for every operation.
+ size_t bytes = 0;
+ for (const auto& stream : streams_) {
+ bytes += stream.second.buffered_amount();
+ }
+
+ return bytes;
+}
+
+bool RRSendQueue::IsFull() const {
+ return total_bytes() >= buffer_size_;
+}
+
+bool RRSendQueue::IsEmpty() const {
+ return total_bytes() == 0;
+}
+
+absl::optional<SendQueue::DataToSend> RRSendQueue::Produce(
+ std::map<StreamID, RRSendQueue::OutgoingStream>::iterator it,
+ TimeMs now,
+ size_t max_size) {
+ absl::optional<DataToSend> data = it->second.Produce(now, max_size);
+ if (data.has_value()) {
+ RTC_DLOG(LS_VERBOSE) << log_prefix_ << "tx-msg: Producing chunk of "
+ << data->data.size() << " bytes (max: " << max_size
+ << ")";
+
+ if (data->data.is_end) {
+ // No more fragments. Continue with the next stream next time.
+ next_stream_id_ = StreamID(*it->first + 1);
+ }
+ }
+
+ return data;
+}
+
+absl::optional<SendQueue::DataToSend> RRSendQueue::Produce(TimeMs now,
+ size_t max_size) {
+ auto start_it = streams_.lower_bound(next_stream_id_);
+ for (auto it = start_it; it != streams_.end(); ++it) {
+ absl::optional<DataToSend> ret = Produce(it, now, max_size);
+ if (ret.has_value()) {
+ return ret;
+ }
+ }
+
+ for (auto it = streams_.begin(); it != start_it; ++it) {
+ absl::optional<DataToSend> ret = Produce(it, now, max_size);
+ if (ret.has_value()) {
+ return ret;
+ }
+ }
+ return absl::nullopt;
+}
+
+void RRSendQueue::Discard(IsUnordered unordered,
+ StreamID stream_id,
+ MID message_id) {
+ GetOrCreateStreamInfo(stream_id).Discard(unordered, message_id);
+}
+
+void RRSendQueue::PrepareResetStreams(rtc::ArrayView<const StreamID> streams) {
+ for (StreamID stream_id : streams) {
+ GetOrCreateStreamInfo(stream_id).Pause();
+ }
+}
+
+bool RRSendQueue::CanResetStreams() const {
+ // Streams can be reset if those streams that are paused don't have any
+ // messages that are partially sent.
+ for (auto& stream : streams_) {
+ if (stream.second.is_paused() &&
+ stream.second.has_partially_sent_message()) {
+ return false;
+ }
+ }
+ return true;
+}
+
+void RRSendQueue::CommitResetStreams() {
+ Reset();
+}
+
+void RRSendQueue::RollbackResetStreams() {
+ for (auto& stream_entry : streams_) {
+ stream_entry.second.Resume();
+ }
+}
+
+void RRSendQueue::Reset() {
+ for (auto& stream_entry : streams_) {
+ OutgoingStream& stream = stream_entry.second;
+ stream.Reset();
+ }
+}
+
+RRSendQueue::OutgoingStream& RRSendQueue::GetOrCreateStreamInfo(
+ StreamID stream_id) {
+ auto it = streams_.find(stream_id);
+ if (it != streams_.end()) {
+ return it->second;
+ }
+
+ return streams_.emplace(stream_id, OutgoingStream()).first->second;
+}
} // namespace dcsctp
diff --git a/net/dcsctp/tx/rr_send_queue.h b/net/dcsctp/tx/rr_send_queue.h
index c43dc91..abbe702 100644
--- a/net/dcsctp/tx/rr_send_queue.h
+++ b/net/dcsctp/tx/rr_send_queue.h
@@ -12,9 +12,8 @@
#include <cstdint>
#include <deque>
+#include <map>
#include <string>
-#include <unordered_map>
-#include <unordered_set>
#include <utility>
#include "absl/algorithm/container.h"
@@ -78,44 +77,91 @@
size_t total_bytes() const;
private:
- // An enqueued message and metadata.
- struct Item {
- explicit Item(DcSctpMessage msg,
- absl::optional<TimeMs> expires_at,
- const SendOptions& send_options)
- : message(std::move(msg)),
- expires_at(expires_at),
- send_options(send_options),
- remaining_offset(0),
- remaining_size(message.payload().size()) {}
- DcSctpMessage message;
- absl::optional<TimeMs> expires_at;
- SendOptions send_options;
- // The remaining payload (offset and size) to be sent, when it has been
- // fragmented.
- size_t remaining_offset;
- size_t remaining_size;
- // If set, an allocated Message ID and SSN. Will be allocated when the first
- // fragment is sent.
- absl::optional<MID> message_id = absl::nullopt;
- absl::optional<SSN> ssn = absl::nullopt;
- // The current Fragment Sequence Number, incremented for each fragment.
- FSN current_fsn = FSN(0);
+ // Per-stream information.
+ class OutgoingStream {
+ public:
+ // Enqueues a message to this stream.
+ void Add(DcSctpMessage message,
+ absl::optional<TimeMs> expires_at,
+ const SendOptions& send_options);
+
+ // Possibly produces a data chunk to send.
+ absl::optional<DataToSend> Produce(TimeMs now, size_t max_size);
+
+ // The amount of data enqueued on this stream.
+ size_t buffered_amount() const;
+
+ // Discards a partially sent message, see `SendQueue::Discard`.
+ void Discard(IsUnordered unordered, MID message_id);
+
+ // Pauses this stream, which is used before resetting it.
+ void Pause();
+
+ // Resumes a paused stream.
+ void Resume() { is_paused_ = false; }
+
+ bool is_paused() const { return is_paused_; }
+
+ // Resets this stream, meaning MIDs and SSNs are set to zero.
+ void Reset();
+
+ // Indicates if this stream has a partially sent message in it.
+ bool has_partially_sent_message() const;
+
+ private:
+ // An enqueued message and metadata.
+ struct Item {
+ explicit Item(DcSctpMessage msg,
+ absl::optional<TimeMs> expires_at,
+ const SendOptions& send_options)
+ : message(std::move(msg)),
+ expires_at(expires_at),
+ send_options(send_options),
+ remaining_offset(0),
+ remaining_size(message.payload().size()) {}
+ DcSctpMessage message;
+ absl::optional<TimeMs> expires_at;
+ SendOptions send_options;
+ // The remaining payload (offset and size) to be sent, when it has been
+ // fragmented.
+ size_t remaining_offset;
+ size_t remaining_size;
+ // If set, an allocated Message ID and SSN. Will be allocated when the
+ // first fragment is sent.
+ absl::optional<MID> message_id = absl::nullopt;
+ absl::optional<SSN> ssn = absl::nullopt;
+ // The current Fragment Sequence Number, incremented for each fragment.
+ FSN current_fsn = FSN(0);
+ };
+
+ // Returns the first non-expired message, or nullptr if there isn't one.
+ Item* GetFirstNonExpiredMessage(TimeMs now);
+
+ // Streams are pause when they are about to be reset.
+ bool is_paused_ = false;
+ // MIDs are different for unordered and ordered messages sent on a stream.
+ MID next_unordered_mid_ = MID(0);
+ MID next_ordered_mid_ = MID(0);
+
+ SSN next_ssn_ = SSN(0);
+ // Enqueued messages, and metadata.
+ std::deque<Item> items_;
};
- Item* GetFirstNonExpiredMessage(TimeMs now);
- bool IsPaused(StreamID stream_id) const;
+ OutgoingStream& GetOrCreateStreamInfo(StreamID stream_id);
+ absl::optional<DataToSend> Produce(
+ std::map<StreamID, OutgoingStream>::iterator it,
+ TimeMs now,
+ size_t max_size);
const std::string log_prefix_;
const size_t buffer_size_;
- std::deque<Item> items_;
- std::unordered_set<StreamID, StreamID::Hasher> paused_streams_;
- std::deque<Item> paused_items_;
+ // The next stream to send chunks from.
+ StreamID next_stream_id_ = StreamID(0);
- std::unordered_map<std::pair<IsUnordered, StreamID>, MID, UnorderedStreamHash>
- mid_by_stream_id_;
- std::unordered_map<StreamID, SSN, StreamID::Hasher> ssn_by_stream_id_;
+ // All streams, and messages added to those.
+ std::map<StreamID, OutgoingStream> streams_;
};
} // namespace dcsctp
diff --git a/net/dcsctp/tx/rr_send_queue_test.cc b/net/dcsctp/tx/rr_send_queue_test.cc
index 0f6fd2b..5e99bb4 100644
--- a/net/dcsctp/tx/rr_send_queue_test.cc
+++ b/net/dcsctp/tx/rr_send_queue_test.cc
@@ -18,20 +18,25 @@
#include "net/dcsctp/public/dcsctp_options.h"
#include "net/dcsctp/public/dcsctp_socket.h"
#include "net/dcsctp/public/types.h"
+#include "net/dcsctp/testing/testing_macros.h"
#include "net/dcsctp/tx/send_queue.h"
#include "rtc_base/gunit.h"
#include "test/gmock.h"
namespace dcsctp {
namespace {
+using ::testing::SizeIs;
constexpr TimeMs kNow = TimeMs(0);
constexpr StreamID kStreamID(1);
constexpr PPID kPPID(53);
+constexpr size_t kMaxQueueSize = 1000;
+constexpr size_t kOneFragmentPacketSize = 100;
+constexpr size_t kTwoFragmentPacketSize = 101;
class RRSendQueueTest : public testing::Test {
protected:
- RRSendQueueTest() : buf_("log: ", 100) {}
+ RRSendQueueTest() : buf_("log: ", kMaxQueueSize) {}
const DcSctpOptions options_;
RRSendQueue buf_;
@@ -39,7 +44,7 @@
TEST_F(RRSendQueueTest, EmptyBuffer) {
EXPECT_TRUE(buf_.IsEmpty());
- EXPECT_FALSE(buf_.Produce(kNow, 100).has_value());
+ EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value());
EXPECT_FALSE(buf_.IsFull());
}
@@ -48,7 +53,8 @@
EXPECT_FALSE(buf_.IsEmpty());
EXPECT_FALSE(buf_.IsFull());
- absl::optional<SendQueue::DataToSend> chunk_opt = buf_.Produce(kNow, 100);
+ absl::optional<SendQueue::DataToSend> chunk_opt =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
ASSERT_TRUE(chunk_opt.has_value());
EXPECT_TRUE(chunk_opt->data.is_beginning);
EXPECT_TRUE(chunk_opt->data.is_end);
@@ -76,7 +82,7 @@
EXPECT_FALSE(chunk_end->data.is_beginning);
EXPECT_TRUE(chunk_end->data.is_end);
- EXPECT_FALSE(buf_.Produce(kNow, 100).has_value());
+ EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value());
}
TEST_F(RRSendQueueTest, GetChunksFromTwoMessages) {
@@ -84,14 +90,16 @@
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
buf_.Add(kNow, DcSctpMessage(StreamID(3), PPID(54), payload));
- absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 100);
+ absl::optional<SendQueue::DataToSend> chunk_one =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
ASSERT_TRUE(chunk_one.has_value());
EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
EXPECT_EQ(chunk_one->data.ppid, kPPID);
EXPECT_TRUE(chunk_one->data.is_beginning);
EXPECT_TRUE(chunk_one->data.is_end);
- absl::optional<SendQueue::DataToSend> chunk_two = buf_.Produce(kNow, 100);
+ absl::optional<SendQueue::DataToSend> chunk_two =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
ASSERT_TRUE(chunk_two.has_value());
EXPECT_EQ(chunk_two->data.stream_id, StreamID(3));
EXPECT_EQ(chunk_two->data.ppid, PPID(54));
@@ -100,7 +108,7 @@
}
TEST_F(RRSendQueueTest, BufferBecomesFullAndEmptied) {
- std::vector<uint8_t> payload(60);
+ std::vector<uint8_t> payload(600);
EXPECT_FALSE(buf_.IsFull());
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
EXPECT_FALSE(buf_.IsFull());
@@ -112,14 +120,14 @@
buf_.Add(kNow, DcSctpMessage(StreamID(5), PPID(55), payload));
EXPECT_TRUE(buf_.IsFull());
- absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 100);
+ absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 1000);
ASSERT_TRUE(chunk_one.has_value());
EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
EXPECT_EQ(chunk_one->data.ppid, kPPID);
EXPECT_TRUE(buf_.IsFull());
- absl::optional<SendQueue::DataToSend> chunk_two = buf_.Produce(kNow, 100);
+ absl::optional<SendQueue::DataToSend> chunk_two = buf_.Produce(kNow, 1000);
ASSERT_TRUE(chunk_two.has_value());
EXPECT_EQ(chunk_two->data.stream_id, StreamID(3));
EXPECT_EQ(chunk_two->data.ppid, PPID(54));
@@ -127,7 +135,7 @@
EXPECT_FALSE(buf_.IsFull());
EXPECT_FALSE(buf_.IsEmpty());
- absl::optional<SendQueue::DataToSend> chunk_three = buf_.Produce(kNow, 100);
+ absl::optional<SendQueue::DataToSend> chunk_three = buf_.Produce(kNow, 1000);
ASSERT_TRUE(chunk_three.has_value());
EXPECT_EQ(chunk_three->data.stream_id, StreamID(5));
EXPECT_EQ(chunk_three->data.ppid, PPID(55));
@@ -171,7 +179,7 @@
// Default is ordered
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
absl::optional<SendQueue::DataToSend> chunk_one =
- buf_.Produce(kNow, /*max_size=*/100);
+ buf_.Produce(kNow, kOneFragmentPacketSize);
ASSERT_TRUE(chunk_one.has_value());
EXPECT_FALSE(chunk_one->data.is_unordered);
@@ -180,7 +188,7 @@
opts.unordered = IsUnordered(true);
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload), opts);
absl::optional<SendQueue::DataToSend> chunk_two =
- buf_.Produce(kNow, /*max_size=*/100);
+ buf_.Produce(kNow, kOneFragmentPacketSize);
ASSERT_TRUE(chunk_two.has_value());
EXPECT_TRUE(chunk_two->data.is_unordered);
}
@@ -192,7 +200,7 @@
TimeMs now = kNow;
buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload));
now += DurationMs(1000000);
- ASSERT_TRUE(buf_.Produce(now, 100));
+ ASSERT_TRUE(buf_.Produce(now, kOneFragmentPacketSize));
SendOptions expires_2_seconds;
expires_2_seconds.lifetime = DurationMs(2000);
@@ -200,17 +208,17 @@
// Add and consume within lifetime
buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds);
now += DurationMs(2000);
- ASSERT_TRUE(buf_.Produce(now, 100));
+ ASSERT_TRUE(buf_.Produce(now, kOneFragmentPacketSize));
// Add and consume just outside lifetime
buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds);
now += DurationMs(2001);
- ASSERT_FALSE(buf_.Produce(now, 100));
+ ASSERT_FALSE(buf_.Produce(now, kOneFragmentPacketSize));
// A long time after expiry
buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds);
now += DurationMs(1000000);
- ASSERT_FALSE(buf_.Produce(now, 100));
+ ASSERT_FALSE(buf_.Produce(now, kOneFragmentPacketSize));
// Expire one message, but produce the second that is not expired.
buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds);
@@ -221,8 +229,8 @@
buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_4_seconds);
now += DurationMs(2001);
- ASSERT_TRUE(buf_.Produce(now, 100));
- ASSERT_FALSE(buf_.Produce(now, 100));
+ ASSERT_TRUE(buf_.Produce(now, kOneFragmentPacketSize));
+ ASSERT_FALSE(buf_.Produce(now, kOneFragmentPacketSize));
}
TEST_F(RRSendQueueTest, DiscardPartialPackets) {
@@ -231,28 +239,31 @@
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
buf_.Add(kNow, DcSctpMessage(StreamID(2), PPID(54), payload));
- absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 100);
+ absl::optional<SendQueue::DataToSend> chunk_one =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
ASSERT_TRUE(chunk_one.has_value());
EXPECT_FALSE(chunk_one->data.is_end);
EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
buf_.Discard(IsUnordered(false), chunk_one->data.stream_id,
chunk_one->data.message_id);
- absl::optional<SendQueue::DataToSend> chunk_two = buf_.Produce(kNow, 100);
+ absl::optional<SendQueue::DataToSend> chunk_two =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
ASSERT_TRUE(chunk_two.has_value());
EXPECT_FALSE(chunk_two->data.is_end);
EXPECT_EQ(chunk_two->data.stream_id, StreamID(2));
- absl::optional<SendQueue::DataToSend> chunk_three = buf_.Produce(kNow, 100);
+ absl::optional<SendQueue::DataToSend> chunk_three =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
ASSERT_TRUE(chunk_three.has_value());
EXPECT_TRUE(chunk_three->data.is_end);
EXPECT_EQ(chunk_three->data.stream_id, StreamID(2));
- ASSERT_FALSE(buf_.Produce(kNow, 100));
+ ASSERT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize));
// Calling it again shouldn't cause issues.
buf_.Discard(IsUnordered(false), chunk_one->data.stream_id,
chunk_one->data.message_id);
- ASSERT_FALSE(buf_.Produce(kNow, 100));
+ ASSERT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize));
}
TEST_F(RRSendQueueTest, PrepareResetStreamsDiscardsStream) {
@@ -292,7 +303,7 @@
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
EXPECT_EQ(buf_.total_bytes(), payload.size());
- EXPECT_FALSE(buf_.Produce(kNow, 100).has_value());
+ EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value());
buf_.CommitResetStreams();
EXPECT_EQ(buf_.total_bytes(), payload.size());
@@ -308,11 +319,13 @@
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
- absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 100);
+ absl::optional<SendQueue::DataToSend> chunk_one =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
ASSERT_TRUE(chunk_one.has_value());
EXPECT_EQ(chunk_one->data.ssn, SSN(0));
- absl::optional<SendQueue::DataToSend> chunk_two = buf_.Produce(kNow, 100);
+ absl::optional<SendQueue::DataToSend> chunk_two =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
ASSERT_TRUE(chunk_two.has_value());
EXPECT_EQ(chunk_two->data.ssn, SSN(1));
@@ -325,7 +338,8 @@
EXPECT_TRUE(buf_.CanResetStreams());
buf_.CommitResetStreams();
- absl::optional<SendQueue::DataToSend> chunk_three = buf_.Produce(kNow, 100);
+ absl::optional<SendQueue::DataToSend> chunk_three =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
ASSERT_TRUE(chunk_three.has_value());
EXPECT_EQ(chunk_three->data.ssn, SSN(0));
}
@@ -336,11 +350,13 @@
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
- absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 100);
+ absl::optional<SendQueue::DataToSend> chunk_one =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
ASSERT_TRUE(chunk_one.has_value());
EXPECT_EQ(chunk_one->data.ssn, SSN(0));
- absl::optional<SendQueue::DataToSend> chunk_two = buf_.Produce(kNow, 100);
+ absl::optional<SendQueue::DataToSend> chunk_two =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
ASSERT_TRUE(chunk_two.has_value());
EXPECT_EQ(chunk_two->data.ssn, SSN(1));
@@ -352,10 +368,111 @@
EXPECT_TRUE(buf_.CanResetStreams());
buf_.RollbackResetStreams();
- absl::optional<SendQueue::DataToSend> chunk_three = buf_.Produce(kNow, 100);
+ absl::optional<SendQueue::DataToSend> chunk_three =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
ASSERT_TRUE(chunk_three.has_value());
EXPECT_EQ(chunk_three->data.ssn, SSN(2));
}
+TEST_F(RRSendQueueTest, ReturnsFragmentsForOneMessageBeforeMovingToNext) {
+ std::vector<uint8_t> payload(200);
+ buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload));
+ buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, payload));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk1.data.stream_id, StreamID(1));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk2.data.stream_id, StreamID(1));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk3.data.stream_id, StreamID(2));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk4,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk4.data.stream_id, StreamID(2));
+}
+
+TEST_F(RRSendQueueTest, ReturnsAlsoSmallFragmentsBeforeMovingToNext) {
+ std::vector<uint8_t> payload(kTwoFragmentPacketSize);
+ buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload));
+ buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, payload));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk1.data.stream_id, StreamID(1));
+ EXPECT_THAT(chunk1.data.payload, SizeIs(kOneFragmentPacketSize));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk2.data.stream_id, StreamID(1));
+ EXPECT_THAT(chunk2.data.payload,
+ SizeIs(kTwoFragmentPacketSize - kOneFragmentPacketSize));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk3.data.stream_id, StreamID(2));
+ EXPECT_THAT(chunk3.data.payload, SizeIs(kOneFragmentPacketSize));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk4,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk4.data.stream_id, StreamID(2));
+ EXPECT_THAT(chunk4.data.payload,
+ SizeIs(kTwoFragmentPacketSize - kOneFragmentPacketSize));
+}
+
+TEST_F(RRSendQueueTest, WillCycleInRoundRobinFashionBetweenStreams) {
+ buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(1)));
+ buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(2)));
+ buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, std::vector<uint8_t>(3)));
+ buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, std::vector<uint8_t>(4)));
+ buf_.Add(kNow, DcSctpMessage(StreamID(3), kPPID, std::vector<uint8_t>(5)));
+ buf_.Add(kNow, DcSctpMessage(StreamID(3), kPPID, std::vector<uint8_t>(6)));
+ buf_.Add(kNow, DcSctpMessage(StreamID(4), kPPID, std::vector<uint8_t>(7)));
+ buf_.Add(kNow, DcSctpMessage(StreamID(4), kPPID, std::vector<uint8_t>(8)));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk1.data.stream_id, StreamID(1));
+ EXPECT_THAT(chunk1.data.payload, SizeIs(1));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk2.data.stream_id, StreamID(2));
+ EXPECT_THAT(chunk2.data.payload, SizeIs(3));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk3.data.stream_id, StreamID(3));
+ EXPECT_THAT(chunk3.data.payload, SizeIs(5));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk4,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk4.data.stream_id, StreamID(4));
+ EXPECT_THAT(chunk4.data.payload, SizeIs(7));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk5,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk5.data.stream_id, StreamID(1));
+ EXPECT_THAT(chunk5.data.payload, SizeIs(2));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk6,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk6.data.stream_id, StreamID(2));
+ EXPECT_THAT(chunk6.data.payload, SizeIs(4));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk7,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk7.data.stream_id, StreamID(3));
+ EXPECT_THAT(chunk7.data.payload, SizeIs(6));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk8,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk8.data.stream_id, StreamID(4));
+ EXPECT_THAT(chunk8.data.payload, SizeIs(8));
+}
} // namespace
} // namespace dcsctp