| // Copyright 2014 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "mojo/services/public/cpp/network/udp_socket_wrapper.h" |
| |
| #include <assert.h> |
| |
| #include "mojo/public/cpp/environment/logging.h" |
| |
| namespace mojo { |
| namespace { |
| |
| const uint32_t kDefaultReceiveQueueSlots = 32; |
| |
| } // namespace |
| |
| UDPSocketWrapper::NegotiateCallbackHandler::NegotiateCallbackHandler( |
| UDPSocketWrapper* delegate) |
| : delegate_(delegate) { |
| } |
| |
| UDPSocketWrapper::NegotiateCallbackHandler::~NegotiateCallbackHandler() {} |
| |
| void UDPSocketWrapper::NegotiateCallbackHandler::Run( |
| uint32_t actual_size) const { |
| delegate_->OnNegotiateMaxPendingSendRequestsCompleted(actual_size); |
| } |
| |
| UDPSocketWrapper::SendCallbackHandler::SendCallbackHandler( |
| UDPSocketWrapper* delegate, |
| const ErrorCallback& forward_callback) |
| : delegate_(delegate), |
| forward_callback_(forward_callback) { |
| } |
| |
| UDPSocketWrapper::SendCallbackHandler::~SendCallbackHandler() {} |
| |
| void UDPSocketWrapper::SendCallbackHandler::Run(NetworkErrorPtr result) const { |
| delegate_->OnSendToCompleted(result.Pass(), forward_callback_); |
| } |
| |
| UDPSocketWrapper::ReceivedData::ReceivedData() {} |
| UDPSocketWrapper::ReceivedData::~ReceivedData() {} |
| |
| UDPSocketWrapper::SendRequest::SendRequest() {} |
| UDPSocketWrapper::SendRequest::~SendRequest() {} |
| |
| UDPSocketWrapper::UDPSocketWrapper(UDPSocketPtr socket) |
| : socket_(socket.Pass()), |
| max_receive_queue_size_(kDefaultReceiveQueueSlots), |
| max_pending_sends_(1), |
| current_pending_sends_(0) { |
| Initialize(0); |
| } |
| |
| UDPSocketWrapper::UDPSocketWrapper(UDPSocketPtr socket, |
| uint32_t receive_queue_slots, |
| uint32_t requested_max_pending_sends) |
| : socket_(socket.Pass()), |
| max_receive_queue_size_(receive_queue_slots), |
| max_pending_sends_(1), |
| current_pending_sends_(0) { |
| Initialize(requested_max_pending_sends); |
| } |
| |
| UDPSocketWrapper::~UDPSocketWrapper() { |
| while (!receive_queue_.empty()) { |
| delete receive_queue_.front(); |
| receive_queue_.pop(); |
| } |
| while (!send_requests_.empty()) { |
| delete send_requests_.front(); |
| send_requests_.pop(); |
| } |
| } |
| |
| void UDPSocketWrapper::AllowAddressReuse(const ErrorCallback& callback) { |
| socket_->AllowAddressReuse(callback); |
| } |
| |
| void UDPSocketWrapper::Bind( |
| NetAddressPtr addr, |
| const Callback<void(NetworkErrorPtr, NetAddressPtr)>& callback) { |
| socket_->Bind(addr.Pass(), callback); |
| } |
| |
| void UDPSocketWrapper::SetSendBufferSize(uint32_t size, |
| const ErrorCallback& callback) { |
| socket_->SetSendBufferSize(size, callback); |
| } |
| |
| void UDPSocketWrapper::SetReceiveBufferSize(uint32_t size, |
| const ErrorCallback& callback) { |
| socket_->SetReceiveBufferSize(size, callback); |
| } |
| |
| bool UDPSocketWrapper::ReceiveFrom(const ReceiveCallback& callback) { |
| if (receive_queue_.empty()) { |
| receive_requests_.push(callback); |
| return false; |
| } |
| |
| ReceivedData* data = receive_queue_.front(); |
| receive_queue_.pop(); |
| socket_->ReceiveMore(1); |
| callback.Run(data->result.Pass(), data->src_addr.Pass(), data->data.Pass()); |
| delete data; |
| return true; |
| } |
| |
| void UDPSocketWrapper::SendTo(NetAddressPtr dest_addr, |
| Array<uint8_t> data, |
| const ErrorCallback& callback) { |
| if (current_pending_sends_ >= max_pending_sends_) { |
| SendRequest* request = new SendRequest(); |
| request->dest_addr = dest_addr.Pass(); |
| request->data = data.Pass(); |
| request->callback = callback; |
| send_requests_.push(request); |
| return; |
| } |
| |
| MOJO_DCHECK(send_requests_.empty()); |
| current_pending_sends_++; |
| socket_->SendTo(dest_addr.Pass(), data.Pass(), |
| ErrorCallback(static_cast<typename ErrorCallback::Runnable*>( |
| new SendCallbackHandler(this, callback)))); |
| } |
| |
| void UDPSocketWrapper::OnReceived(NetworkErrorPtr result, |
| NetAddressPtr src_addr, |
| Array<uint8_t> data) { |
| if (!receive_requests_.empty()) { |
| // The cache should be empty if there are user requests waiting for data. |
| MOJO_DCHECK(receive_queue_.empty()); |
| |
| socket_->ReceiveMore(1); |
| |
| ReceiveCallback callback = receive_requests_.front(); |
| receive_requests_.pop(); |
| |
| callback.Run(result.Pass(), src_addr.Pass(), data.Pass()); |
| return; |
| } |
| |
| MOJO_DCHECK(receive_queue_.size() < max_receive_queue_size_); |
| ReceivedData* received_data = new ReceivedData(); |
| received_data->result = result.Pass(); |
| received_data->src_addr = src_addr.Pass(); |
| received_data->data = data.Pass(); |
| receive_queue_.push(received_data); |
| } |
| |
| void UDPSocketWrapper::Initialize(uint32_t requested_max_pending_sends) { |
| socket_.set_client(this); |
| socket_->NegotiateMaxPendingSendRequests( |
| requested_max_pending_sends, |
| Callback<void(uint32_t)>( |
| static_cast<typename Callback<void(uint32_t)>::Runnable*>( |
| new NegotiateCallbackHandler(this)))); |
| socket_->ReceiveMore(max_receive_queue_size_); |
| } |
| |
| void UDPSocketWrapper::OnNegotiateMaxPendingSendRequestsCompleted( |
| uint32_t actual_size) { |
| MOJO_DCHECK(max_pending_sends_ == 1); |
| |
| if (actual_size == 0) { |
| assert(false); |
| return; |
| } |
| |
| max_pending_sends_ = actual_size; |
| |
| while (ProcessNextSendRequest()); |
| } |
| |
| void UDPSocketWrapper::OnSendToCompleted( |
| NetworkErrorPtr result, |
| const ErrorCallback& forward_callback) { |
| current_pending_sends_--; |
| ProcessNextSendRequest(); |
| |
| forward_callback.Run(result.Pass()); |
| } |
| |
| bool UDPSocketWrapper::ProcessNextSendRequest() { |
| if (current_pending_sends_ >= max_pending_sends_ || send_requests_.empty()) |
| return false; |
| |
| SendRequest* request = send_requests_.front(); |
| send_requests_.pop(); |
| |
| current_pending_sends_++; |
| |
| socket_->SendTo( |
| request->dest_addr.Pass(), request->data.Pass(), |
| ErrorCallback(static_cast<typename ErrorCallback::Runnable*>( |
| new SendCallbackHandler(this, request->callback)))); |
| |
| delete request; |
| |
| return true; |
| } |
| |
| } // namespace mojo |