| // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "net/quic/reliable_quic_stream.h" |
| |
| #include "net/quic/quic_session.h" |
| #include "net/quic/quic_spdy_decompressor.h" |
| #include "net/spdy/write_blocked_list.h" |
| |
| using base::StringPiece; |
| using std::min; |
| |
| namespace net { |
| |
| #define ENDPOINT (is_server_ ? "Server: " : " Client: ") |
| |
| namespace { |
| |
| struct iovec MakeIovec(StringPiece data) { |
| struct iovec iov = {const_cast<char*>(data.data()), |
| static_cast<size_t>(data.size())}; |
| return iov; |
| } |
| |
| } // namespace |
| |
| ReliableQuicStream::ReliableQuicStream(QuicStreamId id, |
| QuicSession* session) |
| : sequencer_(this), |
| id_(id), |
| session_(session), |
| stream_bytes_read_(0), |
| stream_bytes_written_(0), |
| stream_error_(QUIC_STREAM_NO_ERROR), |
| connection_error_(QUIC_NO_ERROR), |
| read_side_closed_(false), |
| write_side_closed_(false), |
| fin_buffered_(false), |
| fin_sent_(false), |
| is_server_(session_->is_server()) { |
| } |
| |
| ReliableQuicStream::~ReliableQuicStream() { |
| } |
| |
| bool ReliableQuicStream::WillAcceptStreamFrame( |
| const QuicStreamFrame& frame) const { |
| if (read_side_closed_) { |
| return true; |
| } |
| if (frame.stream_id != id_) { |
| LOG(ERROR) << "Error!"; |
| return false; |
| } |
| return sequencer_.WillAcceptStreamFrame(frame); |
| } |
| |
| bool ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) { |
| DCHECK_EQ(frame.stream_id, id_); |
| if (read_side_closed_) { |
| DVLOG(1) << ENDPOINT << "Ignoring frame " << frame.stream_id; |
| // We don't want to be reading: blackhole the data. |
| return true; |
| } |
| // Note: This count include duplicate data received. |
| stream_bytes_read_ += frame.data.TotalBufferSize(); |
| |
| bool accepted = sequencer_.OnStreamFrame(frame); |
| |
| return accepted; |
| } |
| |
| void ReliableQuicStream::OnStreamReset(QuicRstStreamErrorCode error) { |
| stream_error_ = error; |
| CloseWriteSide(); |
| CloseReadSide(); |
| } |
| |
| void ReliableQuicStream::OnConnectionClosed(QuicErrorCode error, |
| bool from_peer) { |
| if (read_side_closed_ && write_side_closed_) { |
| return; |
| } |
| if (error != QUIC_NO_ERROR) { |
| stream_error_ = QUIC_STREAM_CONNECTION_ERROR; |
| connection_error_ = error; |
| } |
| |
| CloseWriteSide(); |
| CloseReadSide(); |
| } |
| |
| void ReliableQuicStream::OnFinRead() { |
| DCHECK(sequencer_.IsClosed()); |
| CloseReadSide(); |
| } |
| |
| void ReliableQuicStream::Reset(QuicRstStreamErrorCode error) { |
| DCHECK_NE(QUIC_STREAM_NO_ERROR, error); |
| stream_error_ = error; |
| // Sending a RstStream results in calling CloseStream. |
| session()->SendRstStream(id(), error); |
| } |
| |
| void ReliableQuicStream::CloseConnection(QuicErrorCode error) { |
| session()->connection()->SendConnectionClose(error); |
| } |
| |
| void ReliableQuicStream::CloseConnectionWithDetails(QuicErrorCode error, |
| const string& details) { |
| session()->connection()->SendConnectionCloseWithDetails(error, details); |
| } |
| |
| QuicVersion ReliableQuicStream::version() { |
| return session()->connection()->version(); |
| } |
| |
| void ReliableQuicStream::WriteOrBufferData(StringPiece data, bool fin) { |
| DCHECK(data.size() > 0 || fin); |
| DCHECK(!fin_buffered_); |
| |
| QuicConsumedData consumed_data(0, false); |
| fin_buffered_ = fin; |
| |
| if (queued_data_.empty()) { |
| struct iovec iov(MakeIovec(data)); |
| consumed_data = WritevData(&iov, 1, fin, NULL); |
| DCHECK_LE(consumed_data.bytes_consumed, data.length()); |
| } |
| |
| // If there's unconsumed data or an unconsumed fin, queue it. |
| if (consumed_data.bytes_consumed < data.length() || |
| (fin && !consumed_data.fin_consumed)) { |
| queued_data_.push_back( |
| string(data.data() + consumed_data.bytes_consumed, |
| data.length() - consumed_data.bytes_consumed)); |
| } |
| } |
| |
| void ReliableQuicStream::OnCanWrite() { |
| bool fin = false; |
| while (!queued_data_.empty()) { |
| const string& data = queued_data_.front(); |
| if (queued_data_.size() == 1 && fin_buffered_) { |
| fin = true; |
| } |
| struct iovec iov(MakeIovec(data)); |
| QuicConsumedData consumed_data = WritevData(&iov, 1, fin, NULL); |
| if (consumed_data.bytes_consumed == data.size() && |
| fin == consumed_data.fin_consumed) { |
| queued_data_.pop_front(); |
| } else { |
| queued_data_.front().erase(0, consumed_data.bytes_consumed); |
| break; |
| } |
| } |
| } |
| |
| QuicConsumedData ReliableQuicStream::WritevData( |
| const struct iovec* iov, |
| int iov_count, |
| bool fin, |
| QuicAckNotifier::DelegateInterface* ack_notifier_delegate) { |
| if (write_side_closed_) { |
| DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed"; |
| return QuicConsumedData(0, false); |
| } |
| |
| size_t write_length = 0u; |
| for (int i = 0; i < iov_count; ++i) { |
| write_length += iov[i].iov_len; |
| } |
| QuicConsumedData consumed_data = session()->WritevData( |
| id(), iov, iov_count, stream_bytes_written_, fin, ack_notifier_delegate); |
| stream_bytes_written_ += consumed_data.bytes_consumed; |
| if (consumed_data.bytes_consumed == write_length) { |
| if (fin && consumed_data.fin_consumed) { |
| fin_sent_ = true; |
| CloseWriteSide(); |
| } else if (fin && !consumed_data.fin_consumed) { |
| session_->MarkWriteBlocked(id(), EffectivePriority()); |
| } |
| } else { |
| session_->MarkWriteBlocked(id(), EffectivePriority()); |
| } |
| return consumed_data; |
| } |
| |
| void ReliableQuicStream::CloseReadSide() { |
| if (read_side_closed_) { |
| return; |
| } |
| DVLOG(1) << ENDPOINT << "Done reading from stream " << id(); |
| |
| read_side_closed_ = true; |
| if (write_side_closed_) { |
| DVLOG(1) << ENDPOINT << "Closing stream: " << id(); |
| session_->CloseStream(id()); |
| } |
| } |
| |
| void ReliableQuicStream::CloseWriteSide() { |
| if (write_side_closed_) { |
| return; |
| } |
| DVLOG(1) << ENDPOINT << "Done writing to stream " << id(); |
| |
| write_side_closed_ = true; |
| if (read_side_closed_) { |
| DVLOG(1) << ENDPOINT << "Closing stream: " << id(); |
| session_->CloseStream(id()); |
| } |
| } |
| |
| bool ReliableQuicStream::HasBufferedData() { |
| return !queued_data_.empty(); |
| } |
| |
| void ReliableQuicStream::OnClose() { |
| CloseReadSide(); |
| CloseWriteSide(); |
| } |
| |
| } // namespace net |