| // Copyright 2013 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "mojo/system/data_pipe.h" |
| |
| #include <string.h> |
| |
| #include <algorithm> |
| #include <limits> |
| |
| #include "base/logging.h" |
| #include "mojo/system/constants.h" |
| #include "mojo/system/memory.h" |
| #include "mojo/system/waiter_list.h" |
| |
| namespace mojo { |
| namespace system { |
| |
| void DataPipe::ProducerCancelAllWaiters() { |
| base::AutoLock locker(lock_); |
| DCHECK(has_local_producer_no_lock()); |
| producer_waiter_list_->CancelAllWaiters(); |
| } |
| |
| void DataPipe::ProducerClose() { |
| base::AutoLock locker(lock_); |
| DCHECK(has_local_producer_no_lock()); |
| producer_waiter_list_.reset(); |
| ProducerCloseImplNoLock(); |
| } |
| |
| MojoResult DataPipe::ProducerWriteData(const void* elements, |
| uint32_t* num_elements, |
| MojoWriteDataFlags flags) { |
| base::AutoLock locker(lock_); |
| DCHECK(has_local_producer_no_lock()); |
| |
| if (producer_in_two_phase_write_) |
| return MOJO_RESULT_BUSY; |
| |
| // TODO(vtl): This implementation may write less than requested, even if room |
| // is available. Fix this. (Probably make a subclass-specific impl.) |
| void* buffer = NULL; |
| uint32_t buffer_num_elements = *num_elements; |
| MojoResult rv = ProducerBeginWriteDataImplNoLock(&buffer, |
| &buffer_num_elements, |
| flags); |
| if (rv != MOJO_RESULT_OK) |
| return rv; |
| |
| uint32_t num_elements_to_write = std::min(*num_elements, buffer_num_elements); |
| memcpy(buffer, elements, num_elements_to_write * element_size_); |
| |
| rv = ProducerEndWriteDataImplNoLock(num_elements_to_write); |
| if (rv != MOJO_RESULT_OK) |
| return rv; |
| |
| *num_elements = num_elements_to_write; |
| return MOJO_RESULT_OK; |
| } |
| |
| MojoResult DataPipe::ProducerBeginWriteData(void** buffer, |
| uint32_t* buffer_num_elements, |
| MojoWriteDataFlags flags) { |
| base::AutoLock locker(lock_); |
| DCHECK(has_local_producer_no_lock()); |
| |
| if (producer_in_two_phase_write_) |
| return MOJO_RESULT_BUSY; |
| |
| MojoResult rv = ProducerBeginWriteDataImplNoLock(buffer, |
| buffer_num_elements, |
| flags); |
| if (rv != MOJO_RESULT_OK) |
| return rv; |
| |
| producer_in_two_phase_write_ = true; |
| return MOJO_RESULT_OK; |
| } |
| |
| MojoResult DataPipe::ProducerEndWriteData(uint32_t num_elements_written) { |
| base::AutoLock locker(lock_); |
| DCHECK(has_local_producer_no_lock()); |
| |
| if (!producer_in_two_phase_write_) |
| return MOJO_RESULT_FAILED_PRECONDITION; |
| |
| MojoResult rv = ProducerEndWriteDataImplNoLock(num_elements_written); |
| producer_in_two_phase_write_ = false; // End two-phase write even on failure. |
| return rv; |
| } |
| |
| MojoResult DataPipe::ProducerAddWaiter(Waiter* waiter, |
| MojoWaitFlags flags, |
| MojoResult wake_result) { |
| base::AutoLock locker(lock_); |
| DCHECK(has_local_producer_no_lock()); |
| |
| if ((flags & ProducerSatisfiedFlagsNoLock())) |
| return MOJO_RESULT_ALREADY_EXISTS; |
| if (!(flags & ProducerSatisfiableFlagsNoLock())) |
| return MOJO_RESULT_FAILED_PRECONDITION; |
| |
| producer_waiter_list_->AddWaiter(waiter, flags, wake_result); |
| return MOJO_RESULT_OK; |
| } |
| |
| void DataPipe::ProducerRemoveWaiter(Waiter* waiter) { |
| base::AutoLock locker(lock_); |
| DCHECK(has_local_producer_no_lock()); |
| producer_waiter_list_->RemoveWaiter(waiter); |
| } |
| |
| void DataPipe::ConsumerCancelAllWaiters() { |
| base::AutoLock locker(lock_); |
| DCHECK(has_local_consumer_no_lock()); |
| consumer_waiter_list_->CancelAllWaiters(); |
| } |
| |
| void DataPipe::ConsumerClose() { |
| base::AutoLock locker(lock_); |
| DCHECK(has_local_consumer_no_lock()); |
| consumer_waiter_list_.reset(); |
| ConsumerCloseImplNoLock(); |
| } |
| |
| MojoResult DataPipe::ConsumerReadData(void* elements, |
| uint32_t* num_elements, |
| MojoReadDataFlags flags) { |
| base::AutoLock locker(lock_); |
| DCHECK(has_local_consumer_no_lock()); |
| |
| if (consumer_in_two_phase_read_) |
| return MOJO_RESULT_BUSY; |
| |
| if ((flags & MOJO_READ_DATA_FLAG_DISCARD)) { |
| return ConsumerDiscardDataNoLock(num_elements, |
| (flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE)); |
| } |
| if ((flags & MOJO_READ_DATA_FLAG_QUERY)) |
| return ConsumerQueryDataNoLock(num_elements); |
| |
| // TODO(vtl): This implementation may write less than requested, even if room |
| // is available. Fix this. (Probably make a subclass-specific impl.) |
| const void* buffer = NULL; |
| uint32_t buffer_num_elements = 0; |
| MojoResult rv = ConsumerBeginReadDataImplNoLock(&buffer, |
| &buffer_num_elements, |
| flags); |
| if (rv != MOJO_RESULT_OK) |
| return rv; |
| |
| uint32_t num_elements_to_read = std::min(*num_elements, buffer_num_elements); |
| memcpy(elements, buffer, num_elements_to_read * element_size_); |
| |
| rv = ConsumerEndReadDataImplNoLock(num_elements_to_read); |
| if (rv != MOJO_RESULT_OK) |
| return rv; |
| |
| *num_elements = num_elements_to_read; |
| return MOJO_RESULT_OK; |
| } |
| |
| MojoResult DataPipe::ConsumerBeginReadData(const void** buffer, |
| uint32_t* buffer_num_elements, |
| MojoReadDataFlags flags) { |
| base::AutoLock locker(lock_); |
| DCHECK(has_local_consumer_no_lock()); |
| |
| if (consumer_in_two_phase_read_) |
| return MOJO_RESULT_BUSY; |
| |
| MojoResult rv = ConsumerBeginReadDataImplNoLock(buffer, |
| buffer_num_elements, |
| flags); |
| if (rv != MOJO_RESULT_OK) |
| return rv; |
| |
| consumer_in_two_phase_read_ = true; |
| return MOJO_RESULT_OK; |
| } |
| |
| MojoResult DataPipe::ConsumerEndReadData(uint32_t num_elements_read) { |
| base::AutoLock locker(lock_); |
| DCHECK(has_local_consumer_no_lock()); |
| |
| if (!consumer_in_two_phase_read_) |
| return MOJO_RESULT_FAILED_PRECONDITION; |
| |
| MojoResult rv = ConsumerEndReadDataImplNoLock(num_elements_read); |
| consumer_in_two_phase_read_ = false; // End two-phase read even on failure. |
| return rv; |
| } |
| |
| MojoResult DataPipe::ConsumerAddWaiter(Waiter* waiter, |
| MojoWaitFlags flags, |
| MojoResult wake_result) { |
| base::AutoLock locker(lock_); |
| DCHECK(has_local_consumer_no_lock()); |
| |
| if ((flags & ConsumerSatisfiedFlagsNoLock())) |
| return MOJO_RESULT_ALREADY_EXISTS; |
| if (!(flags & ConsumerSatisfiableFlagsNoLock())) |
| return MOJO_RESULT_FAILED_PRECONDITION; |
| |
| consumer_waiter_list_->AddWaiter(waiter, flags, wake_result); |
| return MOJO_RESULT_OK; |
| } |
| |
| void DataPipe::ConsumerRemoveWaiter(Waiter* waiter) { |
| base::AutoLock locker(lock_); |
| DCHECK(has_local_consumer_no_lock()); |
| consumer_waiter_list_->RemoveWaiter(waiter); |
| } |
| |
| DataPipe::DataPipe(bool has_local_producer, bool has_local_consumer) |
| : element_size_(0), |
| producer_waiter_list_(has_local_producer ? new WaiterList() : NULL), |
| consumer_waiter_list_(has_local_consumer ? new WaiterList() : NULL), |
| producer_in_two_phase_write_(false), |
| consumer_in_two_phase_read_(false) { |
| DCHECK(has_local_producer || has_local_consumer); |
| } |
| |
| DataPipe::~DataPipe() { |
| DCHECK(!has_local_producer_no_lock()); |
| DCHECK(!has_local_consumer_no_lock()); |
| } |
| |
| MojoResult DataPipe::Init(bool may_discard, |
| size_t element_size, |
| size_t capacity_num_elements) { |
| // No need to lock: This method is not thread-safe. |
| |
| if (element_size == 0) |
| return MOJO_RESULT_INVALID_ARGUMENT; |
| if (!capacity_num_elements) { |
| // Set the capacity to the default (rounded down by element size, but always |
| // at least one element). |
| capacity_num_elements = |
| std::max(static_cast<size_t>(1), |
| kDefaultDataPipeCapacityBytes / element_size); |
| } |
| if (capacity_num_elements > |
| std::numeric_limits<uint32_t>::max() / element_size) |
| return MOJO_RESULT_INVALID_ARGUMENT; |
| if (capacity_num_elements * element_size > kMaxDataPipeCapacityBytes) |
| return MOJO_RESULT_RESOURCE_EXHAUSTED; |
| |
| may_discard_ = may_discard; |
| element_size_ = element_size; |
| capacity_num_elements_ = capacity_num_elements; |
| return MOJO_RESULT_OK; |
| } |
| |
| void DataPipe::AwakeProducerWaitersForStateChangeNoLock() { |
| lock_.AssertAcquired(); |
| if (!has_local_producer_no_lock()) |
| return; |
| producer_waiter_list_->AwakeWaitersForStateChange( |
| ProducerSatisfiedFlagsNoLock(), ProducerSatisfiableFlagsNoLock()); |
| } |
| |
| void DataPipe::AwakeConsumerWaitersForStateChangeNoLock() { |
| lock_.AssertAcquired(); |
| if (!has_local_consumer_no_lock()) |
| return; |
| consumer_waiter_list_->AwakeWaitersForStateChange( |
| ConsumerSatisfiedFlagsNoLock(), ConsumerSatisfiableFlagsNoLock()); |
| } |
| |
| } // namespace system |
| } // namespace mojo |