| // Copyright 2013 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "mojo/system/message_pipe.h" |
| |
| #include "base/logging.h" |
| #include "mojo/system/channel.h" |
| #include "mojo/system/dispatcher.h" |
| #include "mojo/system/local_message_pipe_endpoint.h" |
| #include "mojo/system/message_in_transit.h" |
| #include "mojo/system/message_pipe_endpoint.h" |
| #include "mojo/system/proxy_message_pipe_endpoint.h" |
| |
| namespace mojo { |
| namespace system { |
| |
| MessagePipe::MessagePipe(scoped_ptr<MessagePipeEndpoint> endpoint_0, |
| scoped_ptr<MessagePipeEndpoint> endpoint_1) { |
| endpoints_[0].reset(endpoint_0.release()); |
| endpoints_[1].reset(endpoint_1.release()); |
| } |
| |
| MessagePipe::MessagePipe() { |
| endpoints_[0].reset(new LocalMessagePipeEndpoint()); |
| endpoints_[1].reset(new LocalMessagePipeEndpoint()); |
| } |
| |
| // static |
| unsigned MessagePipe::GetPeerPort(unsigned port) { |
| DCHECK(port == 0 || port == 1); |
| return port ^ 1; |
| } |
| |
| void MessagePipe::CancelAllWaiters(unsigned port) { |
| DCHECK(port == 0 || port == 1); |
| |
| base::AutoLock locker(lock_); |
| DCHECK(endpoints_[port].get()); |
| endpoints_[port]->CancelAllWaiters(); |
| } |
| |
| void MessagePipe::Close(unsigned port) { |
| DCHECK(port == 0 || port == 1); |
| |
| unsigned destination_port = GetPeerPort(port); |
| |
| base::AutoLock locker(lock_); |
| DCHECK(endpoints_[port].get()); |
| |
| endpoints_[port]->Close(); |
| bool should_destroy_destination = endpoints_[destination_port].get() ? |
| !endpoints_[destination_port]->OnPeerClose() : false; |
| |
| endpoints_[port].reset(); |
| if (should_destroy_destination) { |
| endpoints_[destination_port]->Close(); |
| endpoints_[destination_port].reset(); |
| } |
| } |
| |
| // TODO(vtl): Support sending handles. |
| // TODO(vtl): Handle flags. |
| MojoResult MessagePipe::WriteMessage( |
| unsigned port, |
| const void* bytes, uint32_t num_bytes, |
| const std::vector<Dispatcher*>* dispatchers, |
| MojoWriteMessageFlags flags) { |
| DCHECK(port == 0 || port == 1); |
| return EnqueueMessage( |
| GetPeerPort(port), |
| MessageInTransit::Create( |
| MessageInTransit::kTypeMessagePipeEndpoint, |
| MessageInTransit::kSubtypeMessagePipeEndpointData, |
| bytes, num_bytes), |
| dispatchers); |
| } |
| |
| MojoResult MessagePipe::ReadMessage( |
| unsigned port, |
| void* bytes, uint32_t* num_bytes, |
| std::vector<scoped_refptr<Dispatcher> >* dispatchers, |
| uint32_t* num_dispatchers, |
| MojoReadMessageFlags flags) { |
| DCHECK(port == 0 || port == 1); |
| |
| base::AutoLock locker(lock_); |
| DCHECK(endpoints_[port].get()); |
| |
| return endpoints_[port]->ReadMessage(bytes, num_bytes, |
| dispatchers, num_dispatchers, |
| flags); |
| } |
| |
| MojoResult MessagePipe::AddWaiter(unsigned port, |
| Waiter* waiter, |
| MojoWaitFlags flags, |
| MojoResult wake_result) { |
| DCHECK(port == 0 || port == 1); |
| |
| base::AutoLock locker(lock_); |
| DCHECK(endpoints_[port].get()); |
| |
| return endpoints_[port]->AddWaiter(waiter, flags, wake_result); |
| } |
| |
| void MessagePipe::RemoveWaiter(unsigned port, Waiter* waiter) { |
| DCHECK(port == 0 || port == 1); |
| |
| base::AutoLock locker(lock_); |
| DCHECK(endpoints_[port].get()); |
| |
| endpoints_[port]->RemoveWaiter(waiter); |
| } |
| |
| MojoResult MessagePipe::EnqueueMessage( |
| unsigned port, |
| MessageInTransit* message, |
| const std::vector<Dispatcher*>* dispatchers) { |
| DCHECK(port == 0 || port == 1); |
| DCHECK(message); |
| |
| if (message->type() == MessageInTransit::kTypeMessagePipe) { |
| DCHECK(!dispatchers); |
| return HandleControlMessage(port, message); |
| } |
| |
| DCHECK_EQ(message->type(), MessageInTransit::kTypeMessagePipeEndpoint); |
| |
| base::AutoLock locker(lock_); |
| DCHECK(endpoints_[GetPeerPort(port)].get()); |
| |
| // The destination port need not be open, unlike the source port. |
| if (!endpoints_[port].get()) { |
| message->Destroy(); |
| return MOJO_RESULT_FAILED_PRECONDITION; |
| } |
| |
| MojoResult result = endpoints_[port]->CanEnqueueMessage(message, dispatchers); |
| if (result != MOJO_RESULT_OK) { |
| message->Destroy(); |
| return result; |
| } |
| |
| if (dispatchers) { |
| DCHECK(!dispatchers->empty()); |
| |
| std::vector<scoped_refptr<Dispatcher> > replacement_dispatchers; |
| for (size_t i = 0; i < dispatchers->size(); i++) { |
| replacement_dispatchers.push_back( |
| (*dispatchers)[i]->CreateEquivalentDispatcherAndCloseNoLock()); |
| } |
| |
| endpoints_[port]->EnqueueMessage(message, &replacement_dispatchers); |
| } else { |
| endpoints_[port]->EnqueueMessage(message, NULL); |
| } |
| |
| return MOJO_RESULT_OK; |
| } |
| |
| void MessagePipe::Attach(unsigned port, |
| scoped_refptr<Channel> channel, |
| MessageInTransit::EndpointId local_id) { |
| DCHECK(port == 0 || port == 1); |
| DCHECK(channel.get()); |
| DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId); |
| |
| base::AutoLock locker(lock_); |
| DCHECK(endpoints_[port].get()); |
| |
| endpoints_[port]->Attach(channel, local_id); |
| } |
| |
| void MessagePipe::Run(unsigned port, MessageInTransit::EndpointId remote_id) { |
| DCHECK(port == 0 || port == 1); |
| DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId); |
| |
| base::AutoLock locker(lock_); |
| DCHECK(endpoints_[port].get()); |
| |
| if (!endpoints_[port]->Run(remote_id)) { |
| endpoints_[port]->Close(); |
| endpoints_[port].reset(); |
| } |
| } |
| |
| MessagePipe::~MessagePipe() { |
| // Owned by the dispatchers. The owning dispatchers should only release us via |
| // their |Close()| method, which should inform us of being closed via our |
| // |Close()|. Thus these should already be null. |
| DCHECK(!endpoints_[0].get()); |
| DCHECK(!endpoints_[1].get()); |
| } |
| |
| MojoResult MessagePipe::HandleControlMessage(unsigned port, |
| MessageInTransit* message) { |
| DCHECK(port == 0 || port == 1); |
| DCHECK(message); |
| DCHECK_EQ(message->type(), MessageInTransit::kTypeMessagePipe); |
| |
| MojoResult rv = MOJO_RESULT_OK; |
| switch (message->subtype()) { |
| case MessageInTransit::kSubtypeMessagePipePeerClosed: { |
| unsigned source_port = GetPeerPort(port); |
| |
| base::AutoLock locker(lock_); |
| DCHECK(endpoints_[source_port].get()); |
| |
| endpoints_[source_port]->Close(); |
| if (endpoints_[port].get()) |
| endpoints_[port]->OnPeerClose(); |
| |
| endpoints_[source_port].reset(); |
| break; |
| } |
| default: |
| LOG(WARNING) << "Unrecognized MessagePipe control message subtype " |
| << message->subtype(); |
| rv = MOJO_RESULT_UNKNOWN; |
| break; |
| } |
| |
| message->Destroy(); |
| return rv; |
| } |
| |
| } // namespace system |
| } // namespace mojo |