blob: 521143797216bd8acb2f039d90c0a2570bbe8ff5 [file] [log] [blame]
// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "device/serial/data_receiver.h"
#include <limits>
#include "base/bind.h"
#include "base/message_loop/message_loop.h"
#include "device/serial/async_waiter.h"
namespace device {
// Represents a receive that is not yet fulfilled.
class DataReceiver::PendingReceive {
public:
PendingReceive(DataReceiver* receiver,
const ReceiveDataCallback& callback,
const ReceiveErrorCallback& error_callback,
int32_t fatal_error_value);
// Dispatches |data| to |receive_callback_|.
void DispatchData(const void* data, uint32_t num_bytes);
// Reports |error| to |receive_error_callback_| if it is an appropriate time.
// Returns whether it dispatched |error|.
bool DispatchError(DataReceiver::PendingError* error,
uint32_t bytes_received);
// Reports |fatal_error_value_| to |receive_error_callback_|.
void DispatchFatalError();
private:
class Buffer;
// Invoked when the user is finished with the ReadOnlyBuffer provided to
// |receive_callback_|.
void Done(uint32_t num_bytes);
// The DataReceiver that owns this.
DataReceiver* receiver_;
// The callback to dispatch data.
ReceiveDataCallback receive_callback_;
// The callback to report errors.
ReceiveErrorCallback receive_error_callback_;
// The error value to report when DispatchFatalError() is called.
const int32_t fatal_error_value_;
// True if the user owns a buffer passed to |receive_callback_| as part of
// DispatchData().
bool buffer_in_use_;
};
// A ReadOnlyBuffer implementation that provides a view of a data pipe owned by
// a DataReceiver.
class DataReceiver::PendingReceive::Buffer : public ReadOnlyBuffer {
public:
Buffer(scoped_refptr<DataReceiver> pipe,
PendingReceive* receive,
const char* buffer,
uint32_t buffer_size);
virtual ~Buffer();
// ReadOnlyBuffer overrides.
virtual const char* GetData() OVERRIDE;
virtual uint32_t GetSize() OVERRIDE;
virtual void Done(uint32_t bytes_consumed) OVERRIDE;
virtual void DoneWithError(uint32_t bytes_consumed, int32_t error) OVERRIDE;
private:
// The DataReceiver whose data pipe we are providing a view.
scoped_refptr<DataReceiver> receiver_;
// The PendingReceive to which this buffer has been created in response.
PendingReceive* pending_receive_;
const char* buffer_;
uint32_t buffer_size_;
};
// Represents an error received from the DataSource.
struct DataReceiver::PendingError {
PendingError(uint32_t offset, int32_t error)
: offset(offset), error(error), dispatched(false) {}
// The location within the data stream where the error occurred.
const uint32_t offset;
// The value of the error that occurred.
const int32_t error;
// Whether the error has been dispatched to the user.
bool dispatched;
};
DataReceiver::DataReceiver(mojo::InterfacePtr<serial::DataSource> source,
uint32_t buffer_size,
int32_t fatal_error_value)
: source_(source.Pass()),
fatal_error_value_(fatal_error_value),
bytes_received_(0),
shut_down_(false),
weak_factory_(this) {
MojoCreateDataPipeOptions options = {
sizeof(options), MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1, buffer_size,
};
mojo::ScopedDataPipeProducerHandle remote_handle;
MojoResult result = mojo::CreateDataPipe(&options, &remote_handle, &handle_);
DCHECK_EQ(MOJO_RESULT_OK, result);
source_->Init(remote_handle.Pass());
source_.set_client(this);
}
bool DataReceiver::Receive(const ReceiveDataCallback& callback,
const ReceiveErrorCallback& error_callback) {
DCHECK(!callback.is_null() && !error_callback.is_null());
if (pending_receive_ || shut_down_)
return false;
// When the DataSource encounters an error, it pauses transmission. When the
// user starts a new receive following notification of the error (via
// |error_callback| of the previous Receive call) of the error we can tell the
// DataSource to resume transmission of data.
if (pending_error_ && pending_error_->dispatched) {
source_->Resume();
pending_error_.reset();
}
pending_receive_.reset(
new PendingReceive(this, callback, error_callback, fatal_error_value_));
base::MessageLoop::current()->PostTask(
FROM_HERE,
base::Bind(&DataReceiver::ReceiveInternal, weak_factory_.GetWeakPtr()));
return true;
}
DataReceiver::~DataReceiver() {
ShutDown();
}
void DataReceiver::OnError(uint32_t offset, int32_t error) {
if (shut_down_)
return;
if (pending_error_) {
// When OnError is called by the DataSource, transmission of data is
// suspended. Thus we shouldn't receive another call to OnError until we
// have fully dealt with the error and called Resume to resume transmission
// (see Receive()). Under normal operation we should never get here, but if
// we do (e.g. in the case of a hijacked service process) just shut down.
ShutDown();
return;
}
pending_error_.reset(new PendingError(offset, error));
if (pending_receive_ &&
pending_receive_->DispatchError(pending_error_.get(), bytes_received_)) {
pending_receive_.reset();
waiter_.reset();
}
}
void DataReceiver::OnConnectionError() {
ShutDown();
}
void DataReceiver::Done(uint32_t bytes_consumed) {
if (shut_down_)
return;
DCHECK(pending_receive_);
MojoResult result = mojo::EndReadDataRaw(handle_.get(), bytes_consumed);
DCHECK_EQ(MOJO_RESULT_OK, result);
pending_receive_.reset();
bytes_received_ += bytes_consumed;
}
void DataReceiver::OnDoneWaiting(MojoResult result) {
DCHECK(pending_receive_ && !shut_down_ && waiter_);
waiter_.reset();
if (result != MOJO_RESULT_OK) {
ShutDown();
return;
}
ReceiveInternal();
}
void DataReceiver::ReceiveInternal() {
if (shut_down_)
return;
DCHECK(pending_receive_);
if (pending_error_ &&
pending_receive_->DispatchError(pending_error_.get(), bytes_received_)) {
pending_receive_.reset();
waiter_.reset();
return;
}
const void* data;
uint32_t num_bytes = std::numeric_limits<uint32_t>::max();
MojoResult result = mojo::BeginReadDataRaw(
handle_.get(), &data, &num_bytes, MOJO_READ_DATA_FLAG_NONE);
if (result == MOJO_RESULT_OK) {
if (!CheckErrorNotInReadRange(num_bytes)) {
ShutDown();
return;
}
pending_receive_->DispatchData(data, num_bytes);
return;
}
if (result == MOJO_RESULT_SHOULD_WAIT) {
waiter_.reset(new AsyncWaiter(
handle_.get(),
MOJO_HANDLE_SIGNAL_READABLE,
base::Bind(&DataReceiver::OnDoneWaiting, weak_factory_.GetWeakPtr())));
return;
}
ShutDown();
}
bool DataReceiver::CheckErrorNotInReadRange(uint32_t num_bytes) {
DCHECK(pending_receive_);
if (!pending_error_)
return true;
DCHECK_NE(bytes_received_, pending_error_->offset);
DCHECK_NE(num_bytes, 0u);
uint32_t potential_bytes_received = bytes_received_ + num_bytes;
// bytes_received_ can overflow so we must consider two cases:
// 1. Both |bytes_received_| and |pending_error_->offset| have overflowed an
// equal number of times. In this case, |potential_bytes_received| must
// be in the range (|bytes_received|, |pending_error_->offset|]. Below
// this range can only occur if |bytes_received_| overflows before
// |pending_error_->offset|. Above can only occur if |bytes_received_|
// overtakes |pending_error_->offset|.
// 2. |pending_error_->offset| has overflowed once more than
// |bytes_received_|. In this case, |potential_bytes_received| must not
// be in the range (|pending_error_->offset|, |bytes_received_|].
if ((bytes_received_ < pending_error_->offset &&
(potential_bytes_received > pending_error_->offset ||
potential_bytes_received <= bytes_received_)) ||
(bytes_received_ > pending_error_->offset &&
potential_bytes_received > pending_error_->offset &&
potential_bytes_received <= bytes_received_)) {
return false;
}
return true;
}
void DataReceiver::ShutDown() {
shut_down_ = true;
if (pending_receive_)
pending_receive_->DispatchFatalError();
pending_error_.reset();
waiter_.reset();
}
DataReceiver::PendingReceive::PendingReceive(
DataReceiver* receiver,
const ReceiveDataCallback& callback,
const ReceiveErrorCallback& error_callback,
int32_t fatal_error_value)
: receiver_(receiver),
receive_callback_(callback),
receive_error_callback_(error_callback),
fatal_error_value_(fatal_error_value),
buffer_in_use_(false) {
}
void DataReceiver::PendingReceive::DispatchData(const void* data,
uint32_t num_bytes) {
DCHECK(!buffer_in_use_);
buffer_in_use_ = true;
receive_callback_.Run(scoped_ptr<ReadOnlyBuffer>(
new Buffer(receiver_, this, static_cast<const char*>(data), num_bytes)));
}
bool DataReceiver::PendingReceive::DispatchError(PendingError* error,
uint32_t bytes_received) {
DCHECK(!error->dispatched);
if (buffer_in_use_ || bytes_received != error->offset)
return false;
error->dispatched = true;
receive_error_callback_.Run(error->error);
return true;
}
void DataReceiver::PendingReceive::DispatchFatalError() {
receive_error_callback_.Run(fatal_error_value_);
}
void DataReceiver::PendingReceive::Done(uint32_t bytes_consumed) {
DCHECK(buffer_in_use_);
buffer_in_use_ = false;
receiver_->Done(bytes_consumed);
}
DataReceiver::PendingReceive::Buffer::Buffer(
scoped_refptr<DataReceiver> receiver,
PendingReceive* receive,
const char* buffer,
uint32_t buffer_size)
: receiver_(receiver),
pending_receive_(receive),
buffer_(buffer),
buffer_size_(buffer_size) {
}
DataReceiver::PendingReceive::Buffer::~Buffer() {
if (pending_receive_)
pending_receive_->Done(0);
}
const char* DataReceiver::PendingReceive::Buffer::GetData() {
return buffer_;
}
uint32_t DataReceiver::PendingReceive::Buffer::GetSize() {
return buffer_size_;
}
void DataReceiver::PendingReceive::Buffer::Done(uint32_t bytes_consumed) {
pending_receive_->Done(bytes_consumed);
pending_receive_ = NULL;
receiver_ = NULL;
buffer_ = NULL;
buffer_size_ = 0;
}
void DataReceiver::PendingReceive::Buffer::DoneWithError(
uint32_t bytes_consumed,
int32_t error) {
Done(bytes_consumed);
}
} // namespace device