Limits the send and receive buffer by bytes, not by packets.
The new limit is 16MB for each buffer.
Also refactors the code to handle send failure more consistently.

BUG=3429
R=juberti@webrtc.org

Review URL: https://webrtc-codereview.appspot.com/21559005

git-svn-id: http://webrtc.googlecode.com/svn/trunk/talk@6511 4adac7df-926f-26a2-2b94-8c16560cd09d
diff --git a/app/webrtc/datachannel.cc b/app/webrtc/datachannel.cc
index 14caa41..af4fb24 100644
--- a/app/webrtc/datachannel.cc
+++ b/app/webrtc/datachannel.cc
@@ -35,13 +35,57 @@
 
 namespace webrtc {
 
-static size_t kMaxQueuedReceivedDataPackets = 100;
-static size_t kMaxQueuedSendDataPackets = 100;
+static size_t kMaxQueuedReceivedDataBytes = 16 * 1024 * 1024;
+static size_t kMaxQueuedSendDataBytes = 16 * 1024 * 1024;
 
 enum {
   MSG_CHANNELREADY,
 };
 
+DataChannel::PacketQueue::PacketQueue() : byte_count_(0) {}
+
+DataChannel::PacketQueue::~PacketQueue() {
+  Clear();
+}
+
+bool DataChannel::PacketQueue::Empty() const {
+  return packets_.empty();
+}
+
+DataBuffer* DataChannel::PacketQueue::Front() {
+  return packets_.front();
+}
+
+void DataChannel::PacketQueue::Pop() {
+  if (packets_.empty()) {
+    return;
+  }
+
+  byte_count_ -= packets_.front()->size();
+  packets_.pop_front();
+}
+
+void DataChannel::PacketQueue::Push(DataBuffer* packet) {
+  byte_count_ += packet->size();
+  packets_.push_back(packet);
+}
+
+void DataChannel::PacketQueue::Clear() {
+  while (!packets_.empty()) {
+    delete packets_.front();
+    packets_.pop_front();
+  }
+  byte_count_ = 0;
+}
+
+void DataChannel::PacketQueue::Swap(PacketQueue* other) {
+  size_t other_byte_count = other->byte_count_;
+  other->byte_count_ = byte_count_;
+  byte_count_ = other_byte_count;
+
+  other->packets_.swap(packets_);
+}
+
 talk_base::scoped_refptr<DataChannel> DataChannel::Create(
     DataChannelProviderInterface* provider,
     cricket::DataChannelType dct,
@@ -114,11 +158,7 @@
   return true;
 }
 
-DataChannel::~DataChannel() {
-  ClearQueuedReceivedData();
-  ClearQueuedSendData();
-  ClearQueuedControlData();
-}
+DataChannel::~DataChannel() {}
 
 void DataChannel::RegisterObserver(DataChannelObserver* observer) {
   observer_ = observer;
@@ -139,13 +179,7 @@
 }
 
 uint64 DataChannel::buffered_amount() const {
-  uint64 buffered_amount = 0;
-  for (std::deque<DataBuffer*>::const_iterator it = queued_send_data_.begin();
-      it != queued_send_data_.end();
-      ++it) {
-    buffered_amount += (*it)->size();
-  }
-  return buffered_amount;
+  return queued_send_data_.byte_count();
 }
 
 void DataChannel::Close() {
@@ -163,89 +197,25 @@
   }
   // If the queue is non-empty, we're waiting for SignalReadyToSend,
   // so just add to the end of the queue and keep waiting.
-  if (!queued_send_data_.empty()) {
-    if (!QueueSendData(buffer)) {
-      if (data_channel_type_ == cricket::DCT_RTP) {
-        return false;
-      }
+  if (!queued_send_data_.Empty()) {
+    // Only SCTP DataChannel queues the outgoing data when the transport is
+    // blocked.
+    ASSERT(data_channel_type_ == cricket::DCT_SCTP);
+    if (!QueueSendDataMessage(buffer)) {
       Close();
     }
     return true;
   }
 
-  cricket::SendDataResult send_result;
-  if (!InternalSendWithoutQueueing(buffer, &send_result)) {
-    if (data_channel_type_ == cricket::DCT_RTP) {
-      return false;
-    }
-    if (send_result != cricket::SDR_BLOCK || !QueueSendData(buffer)) {
-      Close();
-    }
+  bool success = SendDataMessage(buffer);
+  if (data_channel_type_ == cricket::DCT_RTP) {
+    return success;
   }
+
+  // Always return true for SCTP DataChannel per the spec.
   return true;
 }
 
-void DataChannel::QueueControl(const talk_base::Buffer* buffer) {
-  queued_control_data_.push(buffer);
-}
-
-bool DataChannel::SendOpenMessage(const talk_base::Buffer* raw_buffer) {
-  ASSERT(data_channel_type_ == cricket::DCT_SCTP &&
-         was_ever_writable_ &&
-         config_.id >= 0 &&
-         !config_.negotiated);
-
-  talk_base::scoped_ptr<const talk_base::Buffer> buffer(raw_buffer);
-
-  cricket::SendDataParams send_params;
-  send_params.ssrc = config_.id;
-  send_params.ordered = true;
-  send_params.type = cricket::DMT_CONTROL;
-
-  cricket::SendDataResult send_result;
-  bool retval = provider_->SendData(send_params, *buffer, &send_result);
-  if (retval) {
-    LOG(LS_INFO) << "Sent OPEN message on channel " << config_.id;
-    // Send data as ordered before we receive any mesage from the remote peer
-    // to make sure the remote peer will not receive any data before it receives
-    // the OPEN message.
-    waiting_for_open_ack_ = true;
-  } else if (send_result == cricket::SDR_BLOCK) {
-    // Link is congested.  Queue for later.
-    QueueControl(buffer.release());
-  } else {
-    LOG(LS_ERROR) << "Failed to send OPEN message with result "
-                  << send_result << " on channel " << config_.id;
-  }
-  return retval;
-}
-
-bool DataChannel::SendOpenAckMessage(const talk_base::Buffer* raw_buffer) {
-  ASSERT(data_channel_type_ == cricket::DCT_SCTP &&
-         was_ever_writable_ &&
-         config_.id >= 0);
-
-  talk_base::scoped_ptr<const talk_base::Buffer> buffer(raw_buffer);
-
-  cricket::SendDataParams send_params;
-  send_params.ssrc = config_.id;
-  send_params.ordered = config_.ordered;
-  send_params.type = cricket::DMT_CONTROL;
-
-  cricket::SendDataResult send_result;
-  bool retval = provider_->SendData(send_params, *buffer, &send_result);
-  if (retval) {
-    LOG(LS_INFO) << "Sent OPEN_ACK message on channel " << config_.id;
-  } else if (send_result == cricket::SDR_BLOCK) {
-    // Link is congested.  Queue for later.
-    QueueControl(buffer.release());
-  } else {
-    LOG(LS_ERROR) << "Failed to send OPEN_ACK message with result "
-                  << send_result << " on channel " << config_.id;
-  }
-  return retval;
-}
-
 void DataChannel::SetReceiveSsrc(uint32 receive_ssrc) {
   ASSERT(data_channel_type_ == cricket::DCT_RTP);
 
@@ -262,6 +232,27 @@
   DoClose();
 }
 
+void DataChannel::SetSctpSid(int sid) {
+  ASSERT(config_.id < 0 && sid >= 0 && data_channel_type_ == cricket::DCT_SCTP);
+  if (config_.id == sid)
+    return;
+
+  config_.id = sid;
+  provider_->AddSctpDataStream(sid);
+}
+
+void DataChannel::OnTransportChannelCreated() {
+  ASSERT(data_channel_type_ == cricket::DCT_SCTP);
+  if (!connected_to_provider_) {
+    connected_to_provider_ = provider_->ConnectDataChannel(this);
+  }
+  // The sid may have been unassigned when provider_->ConnectDataChannel was
+  // done. So always add the streams even if connected_to_provider_ is true.
+  if (config_.id >= 0) {
+    provider_->AddSctpDataStream(config_.id);
+  }
+}
+
 void DataChannel::SetSendSsrc(uint32 send_ssrc) {
   ASSERT(data_channel_type_ == cricket::DCT_RTP);
   if (send_ssrc_set_) {
@@ -330,12 +321,18 @@
   if (was_ever_writable_ && observer_) {
     observer_->OnMessage(*buffer.get());
   } else {
-    if (queued_received_data_.size() > kMaxQueuedReceivedDataPackets) {
-      LOG(LS_ERROR)
-          << "Queued received data exceeds the max number of packets.";
-      ClearQueuedReceivedData();
+    if (queued_received_data_.byte_count() + payload.length() >
+            kMaxQueuedReceivedDataBytes) {
+      LOG(LS_ERROR) << "Queued received data exceeds the max buffer size.";
+
+      queued_received_data_.Clear();
+      if (data_channel_type_ != cricket::DCT_RTP) {
+        Close();
+      }
+
+      return;
     }
-    queued_received_data_.push(buffer.release());
+    queued_received_data_.Push(buffer.release());
   }
 }
 
@@ -350,22 +347,27 @@
     was_ever_writable_ = true;
 
     if (data_channel_type_ == cricket::DCT_SCTP) {
+      talk_base::Buffer payload;
+
       if (config_.open_handshake_role == InternalDataChannelInit::kOpener) {
-        talk_base::Buffer* payload = new talk_base::Buffer;
-        WriteDataChannelOpenMessage(label_, config_, payload);
-        SendOpenMessage(payload);
+        WriteDataChannelOpenMessage(label_, config_, &payload);
+        SendControlMessage(payload);
       } else if (config_.open_handshake_role ==
-                 InternalDataChannelInit::kAcker) {
-        talk_base::Buffer* payload = new talk_base::Buffer;
-        WriteDataChannelOpenAckMessage(payload);
-        SendOpenAckMessage(payload);
+                     InternalDataChannelInit::kAcker) {
+        WriteDataChannelOpenAckMessage(&payload);
+        SendControlMessage(payload);
       }
     }
 
     UpdateState();
-    ASSERT(queued_send_data_.empty());
+    ASSERT(queued_send_data_.Empty());
   } else if (state_ == kOpen) {
-    DeliverQueuedSendData();
+    // TODO(jiayl): Sending OPEN message here contradicts with the pre-condition
+    // that the readyState is open. According to the standard, the channel
+    // should not become open before the OPEN message is sent.
+    SendQueuedControlMessages();
+
+    SendQueuedDataMessages();
   }
 }
 
@@ -389,7 +391,7 @@
         if (was_ever_writable_) {
           // TODO(jiayl): Do not transition to kOpen if we failed to send the
           // OPEN message.
-          DeliverQueuedControlData();
+          SendQueuedControlMessages();
           SetState(kOpen);
           // If we have received buffers before the channel got writable.
           // Deliver them now.
@@ -441,75 +443,27 @@
     return;
   }
 
-  while (!queued_received_data_.empty()) {
-    DataBuffer* buffer = queued_received_data_.front();
+  while (!queued_received_data_.Empty()) {
+    talk_base::scoped_ptr<DataBuffer> buffer(queued_received_data_.Front());
     observer_->OnMessage(*buffer);
-    queued_received_data_.pop();
-    delete buffer;
+    queued_received_data_.Pop();
   }
 }
 
-void DataChannel::ClearQueuedReceivedData() {
-  while (!queued_received_data_.empty()) {
-    DataBuffer* buffer = queued_received_data_.front();
-    queued_received_data_.pop();
-    delete buffer;
-  }
-}
-
-void DataChannel::DeliverQueuedSendData() {
+void DataChannel::SendQueuedDataMessages() {
   ASSERT(was_ever_writable_ && state_ == kOpen);
 
-  // TODO(jiayl): Sending OPEN message here contradicts with the pre-condition
-  // that the readyState is open. According to the standard, the channel should
-  // not become open before the OPEN message is sent.
-  DeliverQueuedControlData();
+  PacketQueue packet_buffer;
+  packet_buffer.Swap(&queued_send_data_);
 
-  while (!queued_send_data_.empty()) {
-    DataBuffer* buffer = queued_send_data_.front();
-    cricket::SendDataResult send_result;
-    if (!InternalSendWithoutQueueing(*buffer, &send_result)) {
-      LOG(LS_WARNING) << "DeliverQueuedSendData aborted due to send_result "
-                      << send_result;
-      break;
-    }
-    queued_send_data_.pop_front();
-    delete buffer;
+  while (!packet_buffer.Empty()) {
+    talk_base::scoped_ptr<DataBuffer> buffer(packet_buffer.Front());
+    SendDataMessage(*buffer);
+    packet_buffer.Pop();
   }
 }
 
-void DataChannel::ClearQueuedControlData() {
-  while (!queued_control_data_.empty()) {
-    const talk_base::Buffer *buf = queued_control_data_.front();
-    queued_control_data_.pop();
-    delete buf;
-  }
-}
-
-void DataChannel::DeliverQueuedControlData() {
-  ASSERT(was_ever_writable_);
-  while (!queued_control_data_.empty()) {
-    const talk_base::Buffer* buf = queued_control_data_.front();
-    queued_control_data_.pop();
-    if (config_.open_handshake_role == InternalDataChannelInit::kOpener) {
-      SendOpenMessage(buf);
-    } else {
-      ASSERT(config_.open_handshake_role == InternalDataChannelInit::kAcker);
-      SendOpenAckMessage(buf);
-    }
-  }
-}
-
-void DataChannel::ClearQueuedSendData() {
-  while (!queued_send_data_.empty()) {
-    DataBuffer* buffer = queued_send_data_.front();
-    queued_send_data_.pop_front();
-    delete buffer;
-  }
-}
-
-bool DataChannel::InternalSendWithoutQueueing(
-    const DataBuffer& buffer, cricket::SendDataResult* send_result) {
+bool DataChannel::SendDataMessage(const DataBuffer& buffer) {
   cricket::SendDataParams send_params;
 
   if (data_channel_type_ == cricket::DCT_SCTP) {
@@ -529,34 +483,78 @@
   }
   send_params.type = buffer.binary ? cricket::DMT_BINARY : cricket::DMT_TEXT;
 
-  return provider_->SendData(send_params, buffer.data, send_result);
+  cricket::SendDataResult send_result = cricket::SDR_SUCCESS;
+  bool success = provider_->SendData(send_params, buffer.data, &send_result);
+
+  if (!success && data_channel_type_ == cricket::DCT_SCTP) {
+    if (send_result != cricket::SDR_BLOCK || !QueueSendDataMessage(buffer)) {
+      LOG(LS_ERROR) << "Closing the DataChannel due to a failure to send data, "
+                    << "send_result = " << send_result;
+      Close();
+    }
+  }
+  return success;
 }
 
-bool DataChannel::QueueSendData(const DataBuffer& buffer) {
-  if (queued_send_data_.size() >= kMaxQueuedSendDataPackets) {
+bool DataChannel::QueueSendDataMessage(const DataBuffer& buffer) {
+  if (queued_send_data_.byte_count() >= kMaxQueuedSendDataBytes) {
     LOG(LS_ERROR) << "Can't buffer any more data for the data channel.";
     return false;
   }
-  queued_send_data_.push_back(new DataBuffer(buffer));
+  queued_send_data_.Push(new DataBuffer(buffer));
   return true;
 }
 
-void DataChannel::SetSctpSid(int sid) {
-  ASSERT(config_.id < 0 && sid >= 0 && data_channel_type_ == cricket::DCT_SCTP);
-  config_.id = sid;
-  provider_->AddSctpDataStream(sid);
+void DataChannel::SendQueuedControlMessages() {
+  ASSERT(was_ever_writable_);
+
+  PacketQueue control_packets;
+  control_packets.Swap(&queued_control_data_);
+
+  while (!control_packets.Empty()) {
+    talk_base::scoped_ptr<DataBuffer> buf(control_packets.Front());
+    SendControlMessage(buf->data);
+    control_packets.Pop();
+  }
 }
 
-void DataChannel::OnTransportChannelCreated() {
-  ASSERT(data_channel_type_ == cricket::DCT_SCTP);
-  if (!connected_to_provider_) {
-    connected_to_provider_ = provider_->ConnectDataChannel(this);
+void DataChannel::QueueControlMessage(const talk_base::Buffer& buffer) {
+  queued_control_data_.Push(new DataBuffer(buffer, true));
+}
+
+bool DataChannel::SendControlMessage(const talk_base::Buffer& buffer) {
+  bool is_open_message =
+      (config_.open_handshake_role == InternalDataChannelInit::kOpener);
+
+  ASSERT(data_channel_type_ == cricket::DCT_SCTP &&
+         was_ever_writable_ &&
+         config_.id >= 0 &&
+         (!is_open_message || !config_.negotiated));
+
+  cricket::SendDataParams send_params;
+  send_params.ssrc = config_.id;
+  send_params.ordered = config_.ordered || is_open_message;
+  send_params.type = cricket::DMT_CONTROL;
+
+  cricket::SendDataResult send_result = cricket::SDR_SUCCESS;
+  bool retval = provider_->SendData(send_params, buffer, &send_result);
+  if (retval) {
+    LOG(LS_INFO) << "Sent CONTROL message on channel " << config_.id;
+
+    if (is_open_message) {
+      // Send data as ordered before we receive any message from the remote peer
+      // to make sure the remote peer will not receive any data before it
+      // receives the OPEN message.
+      waiting_for_open_ack_ = true;
+    }
+  } else if (send_result == cricket::SDR_BLOCK) {
+    QueueControlMessage(buffer);
+  } else {
+    LOG(LS_ERROR) << "Closing the DataChannel due to a failure to send"
+                  << " the CONTROL message, send_result = " << send_result;
+    Close();
   }
-  // The sid may have been unassigned when provider_->ConnectDataChannel was
-  // done. So always add the streams even if connected_to_provider_ is true.
-  if (config_.id >= 0) {
-    provider_->AddSctpDataStream(config_.id);
-  }
+  return retval;
 }
 
 }  // namespace webrtc
diff --git a/app/webrtc/datachannel.h b/app/webrtc/datachannel.h
index 9256e0e..0510f7e 100644
--- a/app/webrtc/datachannel.h
+++ b/app/webrtc/datachannel.h
@@ -29,7 +29,7 @@
 #define TALK_APP_WEBRTC_DATACHANNEL_H_
 
 #include <string>
-#include <queue>
+#include <deque>
 
 #include "talk/app/webrtc/datachannelinterface.h"
 #include "talk/app/webrtc/proxy.h"
@@ -149,7 +149,8 @@
 
   // The following methods are for SCTP only.
 
-  // Sets the SCTP sid and adds to transport layer if not set yet.
+  // Sets the SCTP sid and adds to transport layer if not set yet. Should only
+  // be called once.
   void SetSctpSid(int sid);
   // Called when the transport channel is created.
   void OnTransportChannelCreated();
@@ -175,23 +176,49 @@
   virtual ~DataChannel();
 
  private:
+  // A packet queue which tracks the total queued bytes. Queued packets are
+  // owned by this class.
+  class PacketQueue {
+   public:
+    PacketQueue();
+    ~PacketQueue();
+
+    size_t byte_count() const {
+      return byte_count_;
+    }
+
+    bool Empty() const;
+
+    DataBuffer* Front();
+
+    void Pop();
+
+    void Push(DataBuffer* packet);
+
+    void Clear();
+
+    void Swap(PacketQueue* other);
+
+   private:
+    std::deque<DataBuffer*> packets_;
+    size_t byte_count_;
+  };
+
   bool Init(const InternalDataChannelInit& config);
   void DoClose();
   void UpdateState();
   void SetState(DataState state);
   void DisconnectFromTransport();
-  void DeliverQueuedControlData();
-  void QueueControl(const talk_base::Buffer* buffer);
-  void ClearQueuedControlData();
+
   void DeliverQueuedReceivedData();
-  void ClearQueuedReceivedData();
-  void DeliverQueuedSendData();
-  void ClearQueuedSendData();
-  bool InternalSendWithoutQueueing(const DataBuffer& buffer,
-                                   cricket::SendDataResult* send_result);
-  bool QueueSendData(const DataBuffer& buffer);
-  bool SendOpenMessage(const talk_base::Buffer* buffer);
-  bool SendOpenAckMessage(const talk_base::Buffer* buffer);
+
+  void SendQueuedDataMessages();
+  bool SendDataMessage(const DataBuffer& buffer);
+  bool QueueSendDataMessage(const DataBuffer& buffer);
+
+  void SendQueuedControlMessages();
+  void QueueControlMessage(const talk_base::Buffer& buffer);
+  bool SendControlMessage(const talk_base::Buffer& buffer);
 
   std::string label_;
   InternalDataChannelInit config_;
@@ -208,9 +235,9 @@
   uint32 receive_ssrc_;
   // Control messages that always have to get sent out before any queued
   // data.
-  std::queue<const talk_base::Buffer*> queued_control_data_;
-  std::queue<DataBuffer*> queued_received_data_;
-  std::deque<DataBuffer*> queued_send_data_;
+  PacketQueue queued_control_data_;
+  PacketQueue queued_received_data_;
+  PacketQueue queued_send_data_;
 };
 
 class DataChannelFactory {
diff --git a/app/webrtc/datachannel_unittest.cc b/app/webrtc/datachannel_unittest.cc
index 991ae0c..6f223fe 100644
--- a/app/webrtc/datachannel_unittest.cc
+++ b/app/webrtc/datachannel_unittest.cc
@@ -174,6 +174,16 @@
             static_cast<uint32>(webrtc_data_channel_->id()));
 }
 
+TEST_F(SctpDataChannelTest, QueuedOpenMessageSent) {
+  provider_.set_send_blocked(true);
+  SetChannelReady();
+  provider_.set_send_blocked(false);
+
+  EXPECT_EQ(cricket::DMT_CONTROL, provider_.last_send_data_params().type);
+  EXPECT_EQ(provider_.last_send_data_params().ssrc,
+            static_cast<uint32>(webrtc_data_channel_->id()));
+}
+
 // Tests that the DataChannel created after transport gets ready can enter OPEN
 // state.
 TEST_F(SctpDataChannelTest, LateCreatedChannelTransitionToOpen) {
@@ -330,11 +340,17 @@
 // Tests that the DataChannel is closed if the sending buffer is full.
 TEST_F(SctpDataChannelTest, ClosedWhenSendBufferFull) {
   SetChannelReady();
-  webrtc::DataBuffer buffer("abcd");
+
+  const size_t buffer_size = 1024;
+  talk_base::Buffer buffer;
+  buffer.SetLength(buffer_size);
+  memset(buffer.data(), 0, buffer_size);
+
+  webrtc::DataBuffer packet(buffer, true);
   provider_.set_send_blocked(true);
 
-  for (size_t i = 0; i < 101; ++i) {
-    EXPECT_TRUE(webrtc_data_channel_->Send(buffer));
+  for (size_t i = 0; i < 16 * 1024 + 1; ++i) {
+    EXPECT_TRUE(webrtc_data_channel_->Send(packet));
   }
 
   EXPECT_EQ(webrtc::DataChannelInterface::kClosed,
@@ -376,3 +392,21 @@
             webrtc_data_channel_->state());
 }
 
+// Tests that the DataChannel is closed if the received buffer is full.
+TEST_F(SctpDataChannelTest, ClosedWhenReceivedBufferFull) {
+  SetChannelReady();
+  const size_t buffer_size = 1024;
+  talk_base::Buffer buffer;
+  buffer.SetLength(buffer_size);
+  memset(buffer.data(), 0, buffer_size);
+
+  cricket::ReceiveDataParams params;
+  params.ssrc = 0;
+
+  // Receiving data without having an observer will overflow the buffer.
+  for (size_t i = 0; i < 16 * 1024 + 1; ++i) {
+    webrtc_data_channel_->OnDataReceived(NULL, params, buffer);
+  }
+  EXPECT_EQ(webrtc::DataChannelInterface::kClosed,
+            webrtc_data_channel_->state());
+}