| // Copyright 2013 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "mojo/system/channel.h" |
| |
| #include "base/basictypes.h" |
| #include "base/bind.h" |
| #include "base/compiler_specific.h" |
| #include "base/logging.h" |
| #include "base/message_loop/message_loop.h" |
| #include "base/strings/stringprintf.h" |
| |
| namespace mojo { |
| namespace system { |
| |
| COMPILE_ASSERT(Channel::kBootstrapEndpointId != |
| MessageInTransit::kInvalidEndpointId, |
| kBootstrapEndpointId_is_invalid); |
| |
| STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::EndpointId |
| Channel::kBootstrapEndpointId; |
| |
| Channel::EndpointInfo::EndpointInfo() { |
| } |
| |
| Channel::EndpointInfo::EndpointInfo(scoped_refptr<MessagePipe> message_pipe, |
| unsigned port) |
| : message_pipe(message_pipe), |
| port(port) { |
| } |
| |
| Channel::EndpointInfo::~EndpointInfo() { |
| } |
| |
| Channel::Channel() |
| : next_local_id_(kBootstrapEndpointId) { |
| } |
| |
| bool Channel::Init(const PlatformChannelHandle& handle) { |
| DCHECK(creation_thread_checker_.CalledOnValidThread()); |
| |
| // No need to take |lock_|, since this must be called before this object |
| // becomes thread-safe. |
| DCHECK(!raw_channel_.get()); |
| |
| raw_channel_.reset( |
| RawChannel::Create(handle, this, base::MessageLoop::current())); |
| if (!raw_channel_->Init()) { |
| raw_channel_.reset(); |
| return false; |
| } |
| |
| return true; |
| } |
| |
| void Channel::Shutdown() { |
| DCHECK(creation_thread_checker_.CalledOnValidThread()); |
| |
| base::AutoLock locker(lock_); |
| DCHECK(raw_channel_.get()); |
| raw_channel_->Shutdown(); |
| raw_channel_.reset(); |
| |
| // TODO(vtl): Should I clear |local_id_to_endpoint_info_map_|? Or assert that |
| // it's empty? |
| } |
| |
| MessageInTransit::EndpointId Channel::AttachMessagePipeEndpoint( |
| scoped_refptr<MessagePipe> message_pipe, unsigned port) { |
| MessageInTransit::EndpointId local_id; |
| { |
| base::AutoLock locker(lock_); |
| |
| while (next_local_id_ == MessageInTransit::kInvalidEndpointId || |
| local_id_to_endpoint_info_map_.find(next_local_id_) != |
| local_id_to_endpoint_info_map_.end()) |
| next_local_id_++; |
| |
| local_id = next_local_id_; |
| next_local_id_++; |
| |
| // TODO(vtl): Use emplace when we move to C++11 unordered_maps. (It'll avoid |
| // some expensive reference count increment/decrements.) Once this is done, |
| // we should be able to delete |EndpointInfo|'s default constructor. |
| local_id_to_endpoint_info_map_[local_id] = EndpointInfo(message_pipe, port); |
| } |
| |
| message_pipe->Attach(port, scoped_refptr<Channel>(this), local_id); |
| return local_id; |
| } |
| |
| void Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id, |
| MessageInTransit::EndpointId remote_id) { |
| EndpointInfo endpoint_info; |
| { |
| base::AutoLock locker(lock_); |
| |
| IdToEndpointInfoMap::const_iterator it = |
| local_id_to_endpoint_info_map_.find(local_id); |
| CHECK(it != local_id_to_endpoint_info_map_.end()); |
| endpoint_info = it->second; |
| } |
| |
| endpoint_info.message_pipe->Run(endpoint_info.port, remote_id); |
| } |
| |
| bool Channel::WriteMessage(MessageInTransit* message) { |
| base::AutoLock locker(lock_); |
| if (!raw_channel_.get()) { |
| // TODO(vtl): I think this is probably not an error condition, but I should |
| // think about it (and the shutdown sequence) more carefully. |
| LOG(INFO) << "WriteMessage() after shutdown"; |
| return false; |
| } |
| |
| return raw_channel_->WriteMessage(message); |
| } |
| |
| void Channel::DetachMessagePipeEndpoint(MessageInTransit::EndpointId local_id) { |
| DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId); |
| |
| base::AutoLock locker_(lock_); |
| local_id_to_endpoint_info_map_.erase(local_id); |
| } |
| |
| Channel::~Channel() { |
| // The channel should have been shut down first. |
| DCHECK(!raw_channel_.get()); |
| } |
| |
| void Channel::OnReadMessage(const MessageInTransit& message) { |
| switch (message.type()) { |
| case MessageInTransit::kTypeMessagePipeEndpoint: |
| case MessageInTransit::kTypeMessagePipe: |
| OnReadMessageForDownstream(message); |
| break; |
| case MessageInTransit::TYPE_CHANNEL: |
| OnReadMessageForChannel(message); |
| break; |
| default: |
| HandleRemoteError(base::StringPrintf( |
| "Received message of invalid type %u", |
| static_cast<unsigned>(message.type()))); |
| break; |
| } |
| } |
| |
| void Channel::OnFatalError(FatalError fatal_error) { |
| // TODO(vtl): IMPORTANT. Notify all our endpoints that they're dead. |
| NOTIMPLEMENTED(); |
| } |
| |
| void Channel::OnReadMessageForDownstream(const MessageInTransit& message) { |
| DCHECK(message.type() == MessageInTransit::kTypeMessagePipeEndpoint || |
| message.type() == MessageInTransit::kTypeMessagePipe); |
| |
| MessageInTransit::EndpointId local_id = message.destination_id(); |
| if (local_id == MessageInTransit::kInvalidEndpointId) { |
| HandleRemoteError("Received message with no destination ID"); |
| return; |
| } |
| |
| EndpointInfo endpoint_info; |
| { |
| base::AutoLock locker(lock_); |
| |
| // Since we own |raw_channel_|, and this method and |Shutdown()| should only |
| // be called from the creation thread, |raw_channel_| should never be null |
| // here. |
| DCHECK(raw_channel_.get()); |
| |
| IdToEndpointInfoMap::const_iterator it = |
| local_id_to_endpoint_info_map_.find(local_id); |
| if (it == local_id_to_endpoint_info_map_.end()) { |
| HandleRemoteError(base::StringPrintf( |
| "Received a message for nonexistent local destination ID %u", |
| static_cast<unsigned>(local_id))); |
| return; |
| } |
| endpoint_info = it->second; |
| } |
| |
| // We need to duplicate the message, because |EnqueueMessage()| will take |
| // ownership of it. |
| MessageInTransit* own_message = MessageInTransit::Create( |
| message.type(), message.subtype(), message.data(), message.data_size()); |
| if (endpoint_info.message_pipe->EnqueueMessage( |
| MessagePipe::GetPeerPort(endpoint_info.port), own_message, NULL) != |
| MOJO_RESULT_OK) { |
| HandleLocalError(base::StringPrintf( |
| "Failed to enqueue message to local destination ID %u", |
| static_cast<unsigned>(local_id))); |
| return; |
| } |
| } |
| |
| void Channel::OnReadMessageForChannel(const MessageInTransit& message) { |
| // TODO(vtl): Currently no channel-only messages yet. |
| HandleRemoteError("Received invalid channel message"); |
| NOTREACHED(); |
| } |
| |
| void Channel::HandleRemoteError(const base::StringPiece& error_message) { |
| // TODO(vtl): Is this how we really want to handle this? |
| LOG(INFO) << error_message; |
| } |
| |
| void Channel::HandleLocalError(const base::StringPiece& error_message) { |
| // TODO(vtl): Is this how we really want to handle this? |
| LOG(FATAL) << error_message; |
| } |
| |
| } // namespace system |
| } // namespace mojo |