blob: ad70878e4121604f83f5077506ad4645c92feb3f [file] [log] [blame]
// Copyright 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "nacl_io/socket/udp_node.h"
#include <errno.h>
#include <string.h>
#include <algorithm>
#include "nacl_io/log.h"
#include "nacl_io/pepper_interface.h"
#include "nacl_io/socket/packet.h"
#include "nacl_io/socket/udp_event_emitter.h"
#include "nacl_io/stream/stream_fs.h"
namespace {
const size_t kMaxPacketSize = 65536;
const size_t kDefaultFifoSize = kMaxPacketSize * 8;
}
namespace nacl_io {
class UdpWork : public StreamFs::Work {
public:
explicit UdpWork(const ScopedUdpEventEmitter& emitter)
: StreamFs::Work(emitter->stream()->stream()),
emitter_(emitter),
packet_(NULL) {}
~UdpWork() { delete packet_; }
UDPSocketInterface* UDPInterface() {
return filesystem()->ppapi()->GetUDPSocketInterface();
}
protected:
ScopedUdpEventEmitter emitter_;
Packet* packet_;
};
class UdpSendWork : public UdpWork {
public:
explicit UdpSendWork(const ScopedUdpEventEmitter& emitter,
const ScopedSocketNode& node)
: UdpWork(emitter), node_(node) {}
virtual bool Start(int32_t val) {
AUTO_LOCK(emitter_->GetLock());
// Does the stream exist, and can it send?
if (!node_->TestStreamFlags(SSF_CAN_SEND))
return false;
// Check if we are already sending.
if (node_->TestStreamFlags(SSF_SENDING))
return false;
packet_ = emitter_->ReadTXPacket_Locked();
if (NULL == packet_)
return false;
int err = UDPInterface()->SendTo(node_->socket_resource(),
packet_->buffer(),
packet_->len(),
packet_->addr(),
filesystem()->GetRunCompletion(this));
if (err != PP_OK_COMPLETIONPENDING) {
// Anything else, we should assume the socket has gone bad.
node_->SetError_Locked(err);
return false;
}
node_->SetStreamFlags(SSF_SENDING);
return true;
}
virtual void Run(int32_t length_error) {
AUTO_LOCK(emitter_->GetLock());
if (length_error < 0) {
node_->SetError_Locked(length_error);
return;
}
// If we did send, then Q more work.
node_->ClearStreamFlags(SSF_SENDING);
node_->QueueOutput();
}
private:
// We assume that transmits will always complete. If the upstream
// actually back pressures, enough to prevent the Send callback
// from triggering, this resource may never go away.
ScopedSocketNode node_;
};
class UdpRecvWork : public UdpWork {
public:
explicit UdpRecvWork(const ScopedUdpEventEmitter& emitter)
: UdpWork(emitter) {
}
virtual bool Start(int32_t val) {
AUTO_LOCK(emitter_->GetLock());
UdpNode* stream = static_cast<UdpNode*>(emitter_->stream());
// Does the stream exist, and can it recv?
if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_RECV))
return false;
// Check if we are already receiving.
if (stream->TestStreamFlags(SSF_RECVING))
return false;
stream->SetStreamFlags(SSF_RECVING);
int err = UDPInterface()->RecvFrom(stream->socket_resource(),
data_,
kMaxPacketSize,
&addr_,
filesystem()->GetRunCompletion(this));
if (err != PP_OK_COMPLETIONPENDING) {
stream->SetError_Locked(err);
return false;
}
return true;
}
virtual void Run(int32_t length_error) {
AUTO_LOCK(emitter_->GetLock());
UdpNode* stream = static_cast<UdpNode*>(emitter_->stream());
if (NULL == stream)
return;
// On successful receive we queue more input
if (length_error > 0) {
Packet* packet = new Packet(filesystem()->ppapi());
packet->Copy(data_, length_error, addr_);
filesystem()->ppapi()->ReleaseResource(addr_);
emitter_->WriteRXPacket_Locked(packet);
stream->ClearStreamFlags(SSF_RECVING);
stream->QueueInput();
} else {
stream->SetError_Locked(length_error);
}
}
private:
char data_[kMaxPacketSize];
PP_Resource addr_;
};
UdpNode::UdpNode(Filesystem* filesystem)
: SocketNode(filesystem),
emitter_(new UdpEventEmitter(kDefaultFifoSize, kDefaultFifoSize)) {
emitter_->AttachStream(this);
}
void UdpNode::Destroy() {
emitter_->DetachStream();
SocketNode::Destroy();
}
UdpEventEmitter* UdpNode::GetEventEmitter() {
return emitter_.get();
}
Error UdpNode::Init(int open_flags) {
Error err = SocketNode::Init(open_flags);
if (err != 0)
return err;
if (UDPInterface() == NULL) {
LOG_ERROR("Got NULL interface: UDP");
return EACCES;
}
socket_resource_ =
UDPInterface()->Create(filesystem_->ppapi()->GetInstance());
if (0 == socket_resource_) {
LOG_ERROR("Unable to create UDP resource.");
return EACCES;
}
return 0;
}
void UdpNode::QueueInput() {
UdpRecvWork* work = new UdpRecvWork(emitter_);
stream()->EnqueueWork(work);
}
void UdpNode::QueueOutput() {
if (!TestStreamFlags(SSF_CAN_SEND))
return;
if (TestStreamFlags(SSF_SENDING))
return;
UdpSendWork* work = new UdpSendWork(emitter_, ScopedSocketNode(this));
stream()->EnqueueWork(work);
}
Error UdpNode::Bind(const struct sockaddr* addr, socklen_t len) {
if (0 == socket_resource_)
return EBADF;
/* Only bind once. */
if (IsBound())
return EINVAL;
PP_Resource out_addr = SockAddrToResource(addr, len);
if (0 == out_addr)
return EINVAL;
int err =
UDPInterface()->Bind(socket_resource_, out_addr, PP_BlockUntilComplete());
filesystem_->ppapi()->ReleaseResource(out_addr);
if (err != 0)
return PPErrorToErrno(err);
// Get the address that was actually bound (in case addr was 0.0.0.0:0).
out_addr = UDPInterface()->GetBoundAddress(socket_resource_);
if (out_addr == 0)
return EINVAL;
// Now that we are bound, we can start sending and receiving.
SetStreamFlags(SSF_CAN_SEND | SSF_CAN_RECV);
QueueInput();
local_addr_ = out_addr;
return 0;
}
Error UdpNode::Connect(const HandleAttr& attr,
const struct sockaddr* addr,
socklen_t len) {
if (0 == socket_resource_)
return EBADF;
/* Connect for UDP is the default dest, it's legal to change it. */
if (remote_addr_ != 0) {
filesystem_->ppapi()->ReleaseResource(remote_addr_);
remote_addr_ = 0;
}
remote_addr_ = SockAddrToResource(addr, len);
if (0 == remote_addr_)
return EINVAL;
return 0;
}
Error UdpNode::Recv_Locked(void* buf,
size_t len,
PP_Resource* out_addr,
int* out_len) {
Packet* packet = emitter_->ReadRXPacket_Locked();
*out_len = 0;
*out_addr = 0;
if (packet) {
int capped_len = static_cast<int32_t>(std::min<int>(len, packet->len()));
memcpy(buf, packet->buffer(), capped_len);
if (packet->addr() != 0) {
filesystem_->ppapi()->AddRefResource(packet->addr());
*out_addr = packet->addr();
}
*out_len = capped_len;
delete packet;
return 0;
}
// Should never happen, Recv_Locked should not be called
// unless already in a POLLIN state.
return EBADF;
}
Error UdpNode::Send_Locked(const void* buf,
size_t len,
PP_Resource addr,
int* out_len) {
if (!IsBound()) {
// Pepper requires a socket to be bound before it can send.
sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = 0;
memset(&addr.sin_addr, 0, sizeof(addr.sin_addr));
Error err =
Bind(reinterpret_cast<const struct sockaddr*>(&addr), sizeof(addr));
if (err != 0)
return err;
}
*out_len = 0;
int capped_len = static_cast<int32_t>(std::min<int>(len, kMaxPacketSize));
Packet* packet = new Packet(filesystem_->ppapi());
packet->Copy(buf, capped_len, addr);
emitter_->WriteTXPacket_Locked(packet);
*out_len = capped_len;
return 0;
}
} // namespace nacl_io