blob: fdc23caaff385c144b1f05ed7b25a5dabf4b0a4f [file] [log] [blame]
// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "device/serial/data_receiver.h"
#include <limits>
#include "base/bind.h"
#include "base/message_loop/message_loop.h"
namespace device {
// Represents a receive that is not yet fulfilled.
class DataReceiver::PendingReceive {
public:
PendingReceive(DataReceiver* receiver,
const ReceiveDataCallback& callback,
const ReceiveErrorCallback& error_callback,
int32_t fatal_error_value);
// Dispatches |data| to |receive_callback_|. Returns whether this
// PendingReceive is finished by this call.
bool DispatchDataFrame(DataReceiver::DataFrame* data);
// Reports |fatal_error_value_| to |receive_error_callback_|.
void DispatchFatalError();
bool buffer_in_use() { return buffer_in_use_; }
private:
class Buffer;
// Invoked when the user is finished with the ReadOnlyBuffer provided to
// |receive_callback_|.
void Done(uint32_t num_bytes);
// The DataReceiver that owns this.
DataReceiver* receiver_;
// The callback to dispatch data.
ReceiveDataCallback receive_callback_;
// The callback to report errors.
ReceiveErrorCallback receive_error_callback_;
// The error value to report when DispatchFatalError() is called.
const int32_t fatal_error_value_;
// True if the user owns a buffer passed to |receive_callback_| as part of
// DispatchDataFrame().
bool buffer_in_use_;
};
// A ReadOnlyBuffer implementation that provides a view of a buffer owned by a
// DataReceiver.
class DataReceiver::PendingReceive::Buffer : public ReadOnlyBuffer {
public:
Buffer(scoped_refptr<DataReceiver> pipe,
PendingReceive* receive,
const char* buffer,
uint32_t buffer_size);
~Buffer() override;
// ReadOnlyBuffer overrides.
const char* GetData() override;
uint32_t GetSize() override;
void Done(uint32_t bytes_consumed) override;
void DoneWithError(uint32_t bytes_consumed, int32_t error) override;
private:
// The DataReceiver of whose buffer we are providing a view.
scoped_refptr<DataReceiver> receiver_;
// The PendingReceive to which this buffer has been created in response.
PendingReceive* pending_receive_;
const char* buffer_;
uint32_t buffer_size_;
};
// A buffer of data or an error received from the DataSource.
struct DataReceiver::DataFrame {
explicit DataFrame(mojo::Array<uint8_t> data)
: is_error(false),
data(data.Pass()),
offset(0),
error(0),
dispatched(false) {}
explicit DataFrame(int32_t error)
: is_error(true), offset(0), error(error), dispatched(false) {}
// Whether this DataFrame represents an error.
bool is_error;
// The data received from the DataSource.
mojo::Array<uint8_t> data;
// The offset within |data| at which the next read should begin.
uint32_t offset;
// The value of the error that occurred.
const int32_t error;
// Whether the error has been dispatched to the user.
bool dispatched;
};
DataReceiver::DataReceiver(mojo::InterfacePtr<serial::DataSource> source,
uint32_t buffer_size,
int32_t fatal_error_value)
: source_(source.Pass()),
fatal_error_value_(fatal_error_value),
shut_down_(false),
weak_factory_(this) {
source_.set_client(this);
source_.set_error_handler(this);
source_->Init(buffer_size);
}
bool DataReceiver::Receive(const ReceiveDataCallback& callback,
const ReceiveErrorCallback& error_callback) {
DCHECK(!callback.is_null() && !error_callback.is_null());
if (pending_receive_ || shut_down_)
return false;
// When the DataSource encounters an error, it pauses transmission. When the
// user starts a new receive following notification of the error (via
// |error_callback| of the previous Receive call) of the error we can tell the
// DataSource to resume transmission of data.
if (!pending_data_frames_.empty() && pending_data_frames_.front()->is_error &&
pending_data_frames_.front()->dispatched) {
source_->Resume();
pending_data_frames_.pop();
}
pending_receive_.reset(
new PendingReceive(this, callback, error_callback, fatal_error_value_));
ReceiveInternal();
return true;
}
DataReceiver::~DataReceiver() {
ShutDown();
}
void DataReceiver::OnError(int32_t error) {
if (shut_down_)
return;
pending_data_frames_.push(linked_ptr<DataFrame>(new DataFrame(error)));
if (pending_receive_)
ReceiveInternal();
}
void DataReceiver::OnData(mojo::Array<uint8_t> data) {
pending_data_frames_.push(linked_ptr<DataFrame>(new DataFrame(data.Pass())));
if (pending_receive_)
ReceiveInternal();
}
void DataReceiver::OnConnectionError() {
ShutDown();
}
void DataReceiver::Done(uint32_t bytes_consumed) {
if (shut_down_)
return;
DCHECK(pending_receive_);
DataFrame& pending_data = *pending_data_frames_.front();
pending_data.offset += bytes_consumed;
DCHECK_LE(pending_data.offset, pending_data.data.size());
if (pending_data.offset == pending_data.data.size()) {
source_->ReportBytesReceived(
static_cast<uint32_t>(pending_data.data.size()));
pending_data_frames_.pop();
}
pending_receive_.reset();
}
void DataReceiver::ReceiveInternal() {
if (shut_down_)
return;
DCHECK(pending_receive_);
if (pending_receive_->buffer_in_use())
return;
if (!pending_data_frames_.empty() &&
pending_receive_->DispatchDataFrame(pending_data_frames_.front().get())) {
pending_receive_.reset();
}
}
void DataReceiver::ShutDown() {
shut_down_ = true;
if (pending_receive_)
pending_receive_->DispatchFatalError();
}
DataReceiver::PendingReceive::PendingReceive(
DataReceiver* receiver,
const ReceiveDataCallback& callback,
const ReceiveErrorCallback& error_callback,
int32_t fatal_error_value)
: receiver_(receiver),
receive_callback_(callback),
receive_error_callback_(error_callback),
fatal_error_value_(fatal_error_value),
buffer_in_use_(false) {
}
bool DataReceiver::PendingReceive::DispatchDataFrame(
DataReceiver::DataFrame* data) {
DCHECK(!buffer_in_use_);
DCHECK(!data->dispatched);
if (data->is_error) {
data->dispatched = true;
base::MessageLoop::current()->PostTask(
FROM_HERE, base::Bind(receive_error_callback_, data->error));
return true;
}
buffer_in_use_ = true;
base::MessageLoop::current()->PostTask(
FROM_HERE,
base::Bind(
receive_callback_,
base::Passed(scoped_ptr<ReadOnlyBuffer>(new Buffer(
receiver_,
this,
reinterpret_cast<char*>(&data->data[0]) + data->offset,
static_cast<uint32_t>(data->data.size() - data->offset))))));
return false;
}
void DataReceiver::PendingReceive::DispatchFatalError() {
receive_error_callback_.Run(fatal_error_value_);
}
void DataReceiver::PendingReceive::Done(uint32_t bytes_consumed) {
DCHECK(buffer_in_use_);
buffer_in_use_ = false;
receiver_->Done(bytes_consumed);
}
DataReceiver::PendingReceive::Buffer::Buffer(
scoped_refptr<DataReceiver> receiver,
PendingReceive* receive,
const char* buffer,
uint32_t buffer_size)
: receiver_(receiver),
pending_receive_(receive),
buffer_(buffer),
buffer_size_(buffer_size) {
}
DataReceiver::PendingReceive::Buffer::~Buffer() {
if (pending_receive_)
pending_receive_->Done(0);
}
const char* DataReceiver::PendingReceive::Buffer::GetData() {
return buffer_;
}
uint32_t DataReceiver::PendingReceive::Buffer::GetSize() {
return buffer_size_;
}
void DataReceiver::PendingReceive::Buffer::Done(uint32_t bytes_consumed) {
pending_receive_->Done(bytes_consumed);
pending_receive_ = NULL;
receiver_ = NULL;
buffer_ = NULL;
buffer_size_ = 0;
}
void DataReceiver::PendingReceive::Buffer::DoneWithError(
uint32_t bytes_consumed,
int32_t error) {
Done(bytes_consumed);
}
} // namespace device