blob: 876a5e5557fffc93b79bbba928ed2aef2a115d34 [file] [log] [blame]
// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "device/serial/data_source_sender.h"
#include <limits>
#include "base/bind.h"
#include "base/message_loop/message_loop.h"
#include "device/serial/async_waiter.h"
namespace device {
// Represents a send that is not yet fulfilled.
class DataSourceSender::PendingSend {
public:
PendingSend(DataSourceSender* sender, const ReadyCallback& callback);
// Asynchronously fills |data| with up to |num_bytes| of data. Following this,
// one of Done() and DoneWithError() will be called with the result.
void GetData(void* data, uint32_t num_bytes);
private:
class Buffer;
// Reports a successful write of |bytes_written|.
void Done(uint32_t bytes_written);
// Reports a partially successful or unsuccessful write of |bytes_written|
// with an error of |error|.
void DoneWithError(uint32_t bytes_written, int32_t error);
// The DataSourceSender that owns this.
DataSourceSender* sender_;
// The callback to call to get data.
ReadyCallback callback_;
// Whether the buffer specified by GetData() has been passed to |callback_|,
// but has not yet called Done() or DoneWithError().
bool buffer_in_use_;
};
// A Writable implementation that provides a view of a data pipe owned by a
// DataSourceSender.
class DataSourceSender::PendingSend::Buffer : public WritableBuffer {
public:
Buffer(scoped_refptr<DataSourceSender> sender,
PendingSend* send,
char* buffer,
uint32_t buffer_size);
virtual ~Buffer();
// WritableBuffer overrides.
virtual char* GetData() OVERRIDE;
virtual uint32_t GetSize() OVERRIDE;
virtual void Done(uint32_t bytes_written) OVERRIDE;
virtual void DoneWithError(uint32_t bytes_written, int32_t error) OVERRIDE;
private:
// The DataSourceSender whose data pipe we are providing a view.
scoped_refptr<DataSourceSender> sender_;
// The PendingSend to which this buffer has been created in response.
PendingSend* pending_send_;
char* buffer_;
uint32_t buffer_size_;
};
DataSourceSender::DataSourceSender(const ReadyCallback& ready_callback,
const ErrorCallback& error_callback)
: ready_callback_(ready_callback),
error_callback_(error_callback),
bytes_sent_(0),
shut_down_(false) {
DCHECK(!ready_callback.is_null() && !error_callback.is_null());
}
void DataSourceSender::ShutDown() {
shut_down_ = true;
waiter_.reset();
ready_callback_.Reset();
error_callback_.Reset();
}
DataSourceSender::~DataSourceSender() {
DCHECK(shut_down_);
}
void DataSourceSender::Init(mojo::ScopedDataPipeProducerHandle handle) {
// This should never occur. |handle_| is only valid and |pending_send_| is
// only set after Init is called. Receiving an invalid |handle| from the
// client is also unrecoverable.
if (pending_send_ || handle_.is_valid() || !handle.is_valid() || shut_down_) {
DispatchFatalError();
return;
}
handle_ = handle.Pass();
pending_send_.reset(new PendingSend(this, ready_callback_));
StartWaiting();
}
void DataSourceSender::Resume() {
if (pending_send_ || !handle_.is_valid()) {
DispatchFatalError();
return;
}
pending_send_.reset(new PendingSend(this, ready_callback_));
StartWaiting();
}
void DataSourceSender::OnConnectionError() {
DispatchFatalError();
}
void DataSourceSender::StartWaiting() {
DCHECK(pending_send_ && !waiter_);
waiter_.reset(
new AsyncWaiter(handle_.get(),
MOJO_HANDLE_SIGNAL_WRITABLE,
base::Bind(&DataSourceSender::OnDoneWaiting, this)));
}
void DataSourceSender::OnDoneWaiting(MojoResult result) {
DCHECK(pending_send_ && !shut_down_ && waiter_);
waiter_.reset();
if (result != MOJO_RESULT_OK) {
DispatchFatalError();
return;
}
void* data = NULL;
uint32_t num_bytes = std::numeric_limits<uint32_t>::max();
result = mojo::BeginWriteDataRaw(
handle_.get(), &data, &num_bytes, MOJO_WRITE_DATA_FLAG_NONE);
if (result != MOJO_RESULT_OK) {
DispatchFatalError();
return;
}
pending_send_->GetData(static_cast<char*>(data), num_bytes);
}
void DataSourceSender::Done(uint32_t bytes_written) {
DoneInternal(bytes_written);
if (!shut_down_)
StartWaiting();
}
void DataSourceSender::DoneWithError(uint32_t bytes_written, int32_t error) {
DoneInternal(bytes_written);
pending_send_.reset();
if (!shut_down_)
client()->OnError(bytes_sent_, error);
// We don't call StartWaiting here so we don't send any additional data until
// Resume() is called.
}
void DataSourceSender::DoneInternal(uint32_t bytes_written) {
DCHECK(pending_send_);
if (shut_down_)
return;
bytes_sent_ += bytes_written;
MojoResult result = mojo::EndWriteDataRaw(handle_.get(), bytes_written);
if (result != MOJO_RESULT_OK) {
DispatchFatalError();
return;
}
}
void DataSourceSender::DispatchFatalError() {
if (shut_down_)
return;
error_callback_.Run();
ShutDown();
}
DataSourceSender::PendingSend::PendingSend(DataSourceSender* sender,
const ReadyCallback& callback)
: sender_(sender), callback_(callback), buffer_in_use_(false) {
}
void DataSourceSender::PendingSend::GetData(void* data, uint32_t num_bytes) {
DCHECK(!buffer_in_use_);
buffer_in_use_ = true;
callback_.Run(scoped_ptr<WritableBuffer>(
new Buffer(sender_, this, static_cast<char*>(data), num_bytes)));
}
void DataSourceSender::PendingSend::Done(uint32_t bytes_written) {
DCHECK(buffer_in_use_);
buffer_in_use_ = false;
sender_->Done(bytes_written);
}
void DataSourceSender::PendingSend::DoneWithError(uint32_t bytes_written,
int32_t error) {
DCHECK(buffer_in_use_);
buffer_in_use_ = false;
sender_->DoneWithError(bytes_written, error);
}
DataSourceSender::PendingSend::Buffer::Buffer(
scoped_refptr<DataSourceSender> sender,
PendingSend* send,
char* buffer,
uint32_t buffer_size)
: sender_(sender),
pending_send_(send),
buffer_(buffer),
buffer_size_(buffer_size) {
}
DataSourceSender::PendingSend::Buffer::~Buffer() {
if (sender_)
pending_send_->Done(0);
}
char* DataSourceSender::PendingSend::Buffer::GetData() {
return buffer_;
}
uint32_t DataSourceSender::PendingSend::Buffer::GetSize() {
return buffer_size_;
}
void DataSourceSender::PendingSend::Buffer::Done(uint32_t bytes_written) {
DCHECK(sender_);
pending_send_->Done(bytes_written);
sender_ = NULL;
pending_send_ = NULL;
buffer_ = NULL;
buffer_size_ = 0;
}
void DataSourceSender::PendingSend::Buffer::DoneWithError(
uint32_t bytes_written,
int32_t error) {
DCHECK(sender_);
pending_send_->DoneWithError(bytes_written, error);
sender_ = NULL;
pending_send_ = NULL;
buffer_ = NULL;
buffer_size_ = 0;
}
} // namespace device