blob: 20cbde85be9ffbb906c3841b3dea9c30362be725 [file] [log] [blame]
// Copyright 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "media/cast/transport/pacing/paced_sender.h"
#include "base/big_endian.h"
#include "base/bind.h"
#include "base/message_loop/message_loop.h"
namespace media {
namespace cast {
namespace transport {
namespace {
static const int64 kPacingIntervalMs = 10;
// Each frame will be split into no more than kPacingMaxBurstsPerFrame
// bursts of packets.
static const size_t kPacingMaxBurstsPerFrame = 3;
static const size_t kTargetBurstSize = 10;
static const size_t kMaxBurstSize = 20;
static const size_t kMaxDedupeWindowMs = 500;
} // namespace
// static
PacketKey PacedPacketSender::MakePacketKey(const base::TimeTicks& ticks,
uint32 ssrc,
uint16 packet_id) {
return std::make_pair(ticks, std::make_pair(ssrc, packet_id));
}
PacedSender::PacedSender(
base::TickClock* clock,
LoggingImpl* logging,
PacketSender* transport,
const scoped_refptr<base::SingleThreadTaskRunner>& transport_task_runner)
: clock_(clock),
logging_(logging),
transport_(transport),
transport_task_runner_(transport_task_runner),
audio_ssrc_(0),
video_ssrc_(0),
max_burst_size_(kTargetBurstSize),
next_max_burst_size_(kTargetBurstSize),
next_next_max_burst_size_(kTargetBurstSize),
current_burst_size_(0),
state_(State_Unblocked),
weak_factory_(this) {
}
PacedSender::~PacedSender() {}
void PacedSender::RegisterAudioSsrc(uint32 audio_ssrc) {
audio_ssrc_ = audio_ssrc;
}
void PacedSender::RegisterVideoSsrc(uint32 video_ssrc) {
video_ssrc_ = video_ssrc;
}
bool PacedSender::SendPackets(const SendPacketVector& packets) {
if (packets.empty()) {
return true;
}
for (size_t i = 0; i < packets.size(); i++) {
packet_list_[packets[i].first] =
make_pair(PacketType_Normal, packets[i].second);
}
if (state_ == State_Unblocked) {
SendStoredPackets();
}
return true;
}
bool PacedSender::ResendPackets(const SendPacketVector& packets,
base::TimeDelta dedupe_window) {
if (packets.empty()) {
return true;
}
base::TimeTicks now = clock_->NowTicks();
for (size_t i = 0; i < packets.size(); i++) {
std::map<PacketKey, base::TimeTicks>::const_iterator j =
sent_time_.find(packets[i].first);
if (j != sent_time_.end() && now - j->second < dedupe_window) {
LogPacketEvent(packets[i].second->data, PACKET_RTX_REJECTED);
continue;
}
packet_list_[packets[i].first] =
make_pair(PacketType_Resend, packets[i].second);
}
if (state_ == State_Unblocked) {
SendStoredPackets();
}
return true;
}
bool PacedSender::SendRtcpPacket(uint32 ssrc, PacketRef packet) {
if (state_ == State_TransportBlocked) {
packet_list_[PacedPacketSender::MakePacketKey(base::TimeTicks(), ssrc, 0)] =
make_pair(PacketType_RTCP, packet);
} else {
// We pass the RTCP packets straight through.
if (!transport_->SendPacket(
packet,
base::Bind(&PacedSender::SendStoredPackets,
weak_factory_.GetWeakPtr()))) {
state_ = State_TransportBlocked;
}
}
return true;
}
void PacedSender::CancelSendingPacket(const PacketKey& packet_key) {
packet_list_.erase(packet_key);
}
PacketRef PacedSender::GetNextPacket(PacketType* packet_type,
PacketKey* packet_key) {
std::map<PacketKey, std::pair<PacketType, PacketRef> >::iterator i;
i = packet_list_.begin();
DCHECK(i != packet_list_.end());
*packet_type = i->second.first;
*packet_key = i->first;
PacketRef ret = i->second.second;
packet_list_.erase(i);
return ret;
}
bool PacedSender::empty() const {
return packet_list_.empty();
}
size_t PacedSender::size() const {
return packet_list_.size();
}
// This function can be called from three places:
// 1. User called one of the Send* functions and we were in an unblocked state.
// 2. state_ == State_TransportBlocked and the transport is calling us to
// let us know that it's ok to send again.
// 3. state_ == State_BurstFull and there are still packets to send. In this
// case we called PostDelayedTask on this function to start a new burst.
void PacedSender::SendStoredPackets() {
State previous_state = state_;
state_ = State_Unblocked;
if (empty()) {
return;
}
base::TimeTicks now = clock_->NowTicks();
// I don't actually trust that PostDelayTask(x - now) will mean that
// now >= x when the call happens, so check if the previous state was
// State_BurstFull too.
if (now >= burst_end_ || previous_state == State_BurstFull) {
// Start a new burst.
current_burst_size_ = 0;
burst_end_ = now + base::TimeDelta::FromMilliseconds(kPacingIntervalMs);
// The goal here is to try to send out the queued packets over the next
// three bursts, while trying to keep the burst size below 10 if possible.
// We have some evidence that sending more than 12 packets in a row doesn't
// work very well, but we don't actually know why yet. Sending out packets
// sooner is better than sending out packets later as that gives us more
// time to re-send them if needed. So if we have less than 30 packets, just
// send 10 at a time. If we have less than 60 packets, send n / 3 at a time.
// if we have more than 60, we send 20 at a time. 20 packets is ~24Mbit/s
// which is more bandwidth than the cast library should need, and sending
// out more data per second is unlikely to be helpful.
size_t max_burst_size = std::min(
kMaxBurstSize,
std::max(kTargetBurstSize, size() / kPacingMaxBurstsPerFrame));
// If the queue is long, issue a warning. Try to limit the number of
// warnings issued by only issuing the warning when the burst size
// grows. Otherwise we might get 100 warnings per second.
if (max_burst_size > next_next_max_burst_size_ && size() > 100) {
LOG(WARNING) << "Packet queue is very long:" << size();
}
max_burst_size_ = std::max(next_max_burst_size_, max_burst_size);
next_max_burst_size_ = std::max(next_next_max_burst_size_, max_burst_size);
next_next_max_burst_size_ = max_burst_size;
}
base::Closure cb = base::Bind(&PacedSender::SendStoredPackets,
weak_factory_.GetWeakPtr());
while (!empty()) {
if (current_burst_size_ >= max_burst_size_) {
transport_task_runner_->PostDelayedTask(FROM_HERE,
cb,
burst_end_ - now);
state_ = State_BurstFull;
return;
}
PacketType packet_type;
PacketKey packet_key;
PacketRef packet = GetNextPacket(&packet_type, &packet_key);
sent_time_[packet_key] = now;
sent_time_buffer_[packet_key] = now;
switch (packet_type) {
case PacketType_Resend:
LogPacketEvent(packet->data, PACKET_RETRANSMITTED);
break;
case PacketType_Normal:
LogPacketEvent(packet->data, PACKET_SENT_TO_NETWORK);
break;
case PacketType_RTCP:
break;
}
if (!transport_->SendPacket(packet, cb)) {
state_ = State_TransportBlocked;
return;
}
current_burst_size_++;
}
// Keep ~0.5 seconds of data (1000 packets)
if (sent_time_buffer_.size() >=
kMaxBurstSize * kMaxDedupeWindowMs / kPacingIntervalMs) {
sent_time_.swap(sent_time_buffer_);
sent_time_buffer_.clear();
}
DCHECK_LE(sent_time_buffer_.size(),
kMaxBurstSize * kMaxDedupeWindowMs / kPacingIntervalMs);
DCHECK_LE(sent_time_.size(),
2 * kMaxBurstSize * kMaxDedupeWindowMs / kPacingIntervalMs);
state_ = State_Unblocked;
}
void PacedSender::LogPacketEvent(const Packet& packet, CastLoggingEvent event) {
// Get SSRC from packet and compare with the audio_ssrc / video_ssrc to see
// if the packet is audio or video.
DCHECK_GE(packet.size(), 12u);
base::BigEndianReader reader(reinterpret_cast<const char*>(&packet[8]), 4);
uint32 ssrc;
bool success = reader.ReadU32(&ssrc);
DCHECK(success);
bool is_audio;
if (ssrc == audio_ssrc_) {
is_audio = true;
} else if (ssrc == video_ssrc_) {
is_audio = false;
} else {
DVLOG(3) << "Got unknown ssrc " << ssrc << " when logging packet event";
return;
}
EventMediaType media_type = is_audio ? AUDIO_EVENT : VIDEO_EVENT;
logging_->InsertSinglePacketEvent(clock_->NowTicks(), event, media_type,
packet);
}
} // namespace transport
} // namespace cast
} // namespace media