blob: 4426defa934cd0362ecc2eec712730a55dbf04a9 [file] [log] [blame]
// Copyright 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/public/cpp/bindings/connector.h"
#include <stdint.h>
#include <utility>
#include "base/bind.h"
#include "base/location.h"
#include "base/logging.h"
#include "base/macros.h"
#include "base/synchronization/lock.h"
#include "mojo/public/cpp/bindings/lib/may_auto_lock.h"
#include "mojo/public/cpp/bindings/sync_handle_watcher.h"
namespace mojo {
Connector::Connector(ScopedMessagePipeHandle message_pipe,
ConnectorConfig config,
scoped_refptr<base::SingleThreadTaskRunner> runner)
: message_pipe_(std::move(message_pipe)),
task_runner_(std::move(runner)),
weak_factory_(this) {
if (config == MULTI_THREADED_SEND)
lock_.emplace();
weak_self_ = weak_factory_.GetWeakPtr();
// Even though we don't have an incoming receiver, we still want to monitor
// the message pipe to know if is closed or encounters an error.
WaitToReadMore();
}
Connector::~Connector() {
{
// Allow for quick destruction on any thread if the pipe is already closed.
base::AutoLock lock(connected_lock_);
if (!connected_)
return;
}
DCHECK(thread_checker_.CalledOnValidThread());
CancelWait();
}
void Connector::CloseMessagePipe() {
// Throw away the returned message pipe.
PassMessagePipe();
}
ScopedMessagePipeHandle Connector::PassMessagePipe() {
DCHECK(thread_checker_.CalledOnValidThread());
CancelWait();
internal::MayAutoLock locker(&lock_);
ScopedMessagePipeHandle message_pipe = std::move(message_pipe_);
weak_factory_.InvalidateWeakPtrs();
sync_handle_watcher_callback_count_ = 0;
base::AutoLock lock(connected_lock_);
connected_ = false;
return message_pipe;
}
void Connector::RaiseError() {
DCHECK(thread_checker_.CalledOnValidThread());
HandleError(true, true);
}
bool Connector::WaitForIncomingMessage(MojoDeadline deadline) {
DCHECK(thread_checker_.CalledOnValidThread());
if (error_)
return false;
ResumeIncomingMethodCallProcessing();
MojoResult rv =
Wait(message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, deadline, nullptr);
if (rv == MOJO_RESULT_SHOULD_WAIT || rv == MOJO_RESULT_DEADLINE_EXCEEDED)
return false;
if (rv != MOJO_RESULT_OK) {
// Users that call WaitForIncomingMessage() should expect their code to be
// re-entered, so we call the error handler synchronously.
HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION, false);
return false;
}
ignore_result(ReadSingleMessage(&rv));
return (rv == MOJO_RESULT_OK);
}
void Connector::PauseIncomingMethodCallProcessing() {
DCHECK(thread_checker_.CalledOnValidThread());
if (paused_)
return;
paused_ = true;
CancelWait();
}
void Connector::ResumeIncomingMethodCallProcessing() {
DCHECK(thread_checker_.CalledOnValidThread());
if (!paused_)
return;
paused_ = false;
WaitToReadMore();
}
bool Connector::Accept(Message* message) {
DCHECK(lock_ || thread_checker_.CalledOnValidThread());
// It shouldn't hurt even if |error_| may be changed by a different thread at
// the same time. The outcome is that we may write into |message_pipe_| after
// encountering an error, which should be fine.
if (error_)
return false;
internal::MayAutoLock locker(&lock_);
if (!message_pipe_.is_valid() || drop_writes_)
return true;
MojoResult rv =
WriteMessageNew(message_pipe_.get(), message->TakeMojoMessage(),
MOJO_WRITE_MESSAGE_FLAG_NONE);
switch (rv) {
case MOJO_RESULT_OK:
break;
case MOJO_RESULT_FAILED_PRECONDITION:
// There's no point in continuing to write to this pipe since the other
// end is gone. Avoid writing any future messages. Hide write failures
// from the caller since we'd like them to continue consuming any backlog
// of incoming messages before regarding the message pipe as closed.
drop_writes_ = true;
break;
case MOJO_RESULT_BUSY:
// We'd get a "busy" result if one of the message's handles is:
// - |message_pipe_|'s own handle;
// - simultaneously being used on another thread; or
// - in a "busy" state that prohibits it from being transferred (e.g.,
// a data pipe handle in the middle of a two-phase read/write,
// regardless of which thread that two-phase read/write is happening
// on).
// TODO(vtl): I wonder if this should be a |DCHECK()|. (But, until
// crbug.com/389666, etc. are resolved, this will make tests fail quickly
// rather than hanging.)
CHECK(false) << "Race condition or other bug detected";
return false;
default:
// This particular write was rejected, presumably because of bad input.
// The pipe is not necessarily in a bad state.
return false;
}
return true;
}
void Connector::AllowWokenUpBySyncWatchOnSameThread() {
DCHECK(thread_checker_.CalledOnValidThread());
allow_woken_up_by_others_ = true;
EnsureSyncWatcherExists();
sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
}
bool Connector::SyncWatch(const bool* should_stop) {
DCHECK(thread_checker_.CalledOnValidThread());
if (error_)
return false;
ResumeIncomingMethodCallProcessing();
EnsureSyncWatcherExists();
return sync_watcher_->SyncWatch(should_stop);
}
void Connector::SetWatcherHeapProfilerTag(const char* tag) {
heap_profiler_tag_ = tag;
if (handle_watcher_) {
handle_watcher_->set_heap_profiler_tag(tag);
}
}
void Connector::OnWatcherHandleReady(MojoResult result) {
OnHandleReadyInternal(result);
}
void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) {
base::WeakPtr<Connector> weak_self(weak_self_);
sync_handle_watcher_callback_count_++;
OnHandleReadyInternal(result);
// At this point, this object might have been deleted.
if (weak_self) {
DCHECK_LT(0u, sync_handle_watcher_callback_count_);
sync_handle_watcher_callback_count_--;
}
}
void Connector::OnHandleReadyInternal(MojoResult result) {
DCHECK(thread_checker_.CalledOnValidThread());
if (result != MOJO_RESULT_OK) {
HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false);
return;
}
ReadAllAvailableMessages();
// At this point, this object might have been deleted. Return.
}
void Connector::WaitToReadMore() {
CHECK(!paused_);
DCHECK(!handle_watcher_);
handle_watcher_.reset(new Watcher(FROM_HERE, task_runner_));
if (heap_profiler_tag_)
handle_watcher_->set_heap_profiler_tag(heap_profiler_tag_);
MojoResult rv = handle_watcher_->Start(
message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
base::Bind(&Connector::OnWatcherHandleReady, base::Unretained(this)));
if (rv != MOJO_RESULT_OK) {
// If the watch failed because the handle is invalid or its conditions can
// no longer be met, we signal the error asynchronously to avoid reentry.
task_runner_->PostTask(
FROM_HERE,
base::Bind(&Connector::OnWatcherHandleReady, weak_self_, rv));
}
if (allow_woken_up_by_others_) {
EnsureSyncWatcherExists();
sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
}
}
bool Connector::ReadSingleMessage(MojoResult* read_result) {
CHECK(!paused_);
bool receiver_result = false;
// Detect if |this| was destroyed or the message pipe was closed/transferred
// during message dispatch.
base::WeakPtr<Connector> weak_self = weak_self_;
Message message;
const MojoResult rv = ReadMessage(message_pipe_.get(), &message);
*read_result = rv;
if (rv == MOJO_RESULT_OK) {
receiver_result =
incoming_receiver_ && incoming_receiver_->Accept(&message);
}
if (!weak_self)
return false;
if (rv == MOJO_RESULT_SHOULD_WAIT)
return true;
if (rv != MOJO_RESULT_OK) {
HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION, false);
return false;
}
if (enforce_errors_from_incoming_receiver_ && !receiver_result) {
HandleError(true, false);
return false;
}
return true;
}
void Connector::ReadAllAvailableMessages() {
while (!error_) {
MojoResult rv;
if (!ReadSingleMessage(&rv)) {
// Return immediately without touching any members. |this| may have been
// destroyed.
return;
}
if (paused_)
return;
if (rv == MOJO_RESULT_SHOULD_WAIT)
break;
}
}
void Connector::CancelWait() {
handle_watcher_.reset();
sync_watcher_.reset();
}
void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) {
if (error_ || !message_pipe_.is_valid())
return;
if (paused_) {
// Enforce calling the error handler asynchronously if the user has paused
// receiving messages. We need to wait until the user starts receiving
// messages again.
force_async_handler = true;
}
if (!force_pipe_reset && force_async_handler)
force_pipe_reset = true;
if (force_pipe_reset) {
CancelWait();
internal::MayAutoLock locker(&lock_);
message_pipe_.reset();
MessagePipe dummy_pipe;
message_pipe_ = std::move(dummy_pipe.handle0);
} else {
CancelWait();
}
if (force_async_handler) {
if (!paused_)
WaitToReadMore();
} else {
error_ = true;
if (!connection_error_handler_.is_null())
connection_error_handler_.Run();
}
}
void Connector::EnsureSyncWatcherExists() {
if (sync_watcher_)
return;
sync_watcher_.reset(new SyncHandleWatcher(
message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
base::Bind(&Connector::OnSyncHandleWatcherHandleReady,
base::Unretained(this))));
}
} // namespace mojo