blob: 39dee0bb579628fdc8d831883cf3d26994f658ee [file] [log] [blame]
/*
* Copyright (C) 2013 Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include "config.h"
#include "modules/websockets/NewWebSocketChannelImpl.h"
#include "core/dom/ContextLifecycleObserver.h"
#include "core/dom/ScriptExecutionContext.h"
#include "core/fileapi/Blob.h"
#include "core/fileapi/FileError.h"
#include "core/fileapi/FileReaderLoader.h"
#include "core/fileapi/FileReaderLoaderClient.h"
#include "core/platform/NotImplemented.h"
#include "modules/websockets/WebSocketChannel.h"
#include "modules/websockets/WebSocketChannelClient.h"
#include "public/platform/Platform.h"
#include "public/platform/WebSocketHandle.h"
#include "public/platform/WebString.h"
#include "public/platform/WebURL.h"
#include "public/platform/WebVector.h"
#include "weborigin/SecurityOrigin.h"
#include "wtf/ArrayBuffer.h"
#include "wtf/Vector.h"
#include "wtf/text/WTFString.h"
// FIXME: We should implement Inspector notification.
// FIXME: We should add log messages.
using WebKit::WebSocketHandle;
namespace WebCore {
namespace {
struct PendingEvent {
enum Type {
DidConnectComplete,
DidReceiveTextMessage,
DidReceiveBinaryMessage,
DidReceiveError,
DidClose,
};
Type type;
Vector<char> message; // for DidReceiveTextMessage / DidReceiveBinaryMessage
int closingCode; // for DidClose
String closingReason; // for DidClose
explicit PendingEvent(Type type) : type(type), closingCode(0) { }
PendingEvent(Type, Vector<char>*);
PendingEvent(int code, const String& reason) : type(DidClose), closingCode(code), closingReason(reason) { }
};
class PendingEventProcessor : public RefCounted<PendingEventProcessor> {
public:
PendingEventProcessor() : m_isAborted(false) { }
virtual ~PendingEventProcessor() { }
void abort() { m_isAborted = true; }
void append(const PendingEvent& e) { m_events.append(e); }
void process(NewWebSocketChannelImpl*);
private:
bool m_isAborted;
Vector<PendingEvent> m_events;
};
PendingEvent::PendingEvent(Type type, Vector<char>* data)
: type(type)
, closingCode(0)
{
ASSERT(type == DidReceiveTextMessage || type == DidReceiveBinaryMessage);
message.swap(*data);
}
void PendingEventProcessor::process(NewWebSocketChannelImpl* channel)
{
RefPtr<PendingEventProcessor> protect(this);
for (size_t i = 0; i < m_events.size() && !m_isAborted; ++i) {
PendingEvent& event = m_events[i];
switch (event.type) {
case PendingEvent::DidConnectComplete:
channel->handleDidConnect();
// |this| can be detached here.
break;
case PendingEvent::DidReceiveTextMessage:
channel->handleTextMessage(&event.message);
// |this| can be detached here.
break;
case PendingEvent::DidReceiveBinaryMessage:
channel->handleBinaryMessage(&event.message);
// |this| can be detached here.
break;
case PendingEvent::DidReceiveError:
channel->handleDidReceiveMessageError();
// |this| can be detached here.
break;
case PendingEvent::DidClose:
channel->handleDidClose(event.closingCode, event.closingReason);
// |this| can be detached here.
break;
}
}
m_events.clear();
}
bool isClean(int code)
{
return code == WebSocketChannel::CloseEventCodeNormalClosure
|| (WebSocketChannel::CloseEventCodeMinimumUserDefined <= code
&& code <= WebSocketChannel::CloseEventCodeMaximumUserDefined);
}
} // namespace
class NewWebSocketChannelImpl::BlobLoader : public FileReaderLoaderClient {
public:
BlobLoader(const Blob&, NewWebSocketChannelImpl*);
virtual ~BlobLoader() { }
void cancel();
// FileReaderLoaderClient functions.
virtual void didStartLoading() OVERRIDE { }
virtual void didReceiveData() OVERRIDE { }
virtual void didFinishLoading() OVERRIDE;
virtual void didFail(FileError::ErrorCode) OVERRIDE;
private:
NewWebSocketChannelImpl* m_channel;
FileReaderLoader m_loader;
};
NewWebSocketChannelImpl::BlobLoader::BlobLoader(const Blob& blob, NewWebSocketChannelImpl* channel)
: m_channel(channel)
, m_loader(FileReaderLoader::ReadAsArrayBuffer, this)
{
m_loader.start(channel->scriptExecutionContext(), blob);
}
void NewWebSocketChannelImpl::BlobLoader::cancel()
{
m_loader.cancel();
// didFail will be called immediately.
// |this| is deleted here.
}
void NewWebSocketChannelImpl::BlobLoader::didFinishLoading()
{
m_channel->didFinishLoadingBlob(m_loader.arrayBufferResult());
// |this| is deleted here.
}
void NewWebSocketChannelImpl::BlobLoader::didFail(FileError::ErrorCode errorCode)
{
m_channel->didFailLoadingBlob(errorCode);
// |this| is deleted here.
}
class NewWebSocketChannelImpl::Resumer {
public:
explicit Resumer(NewWebSocketChannelImpl*);
~Resumer();
void append(const PendingEvent&);
void suspend();
void resumeLater();
void abort();
private:
void resumeNow(Timer<Resumer>*);
NewWebSocketChannelImpl* m_channel;
RefPtr<PendingEventProcessor> m_pendingEventProcessor;
Timer<Resumer> m_timer;
bool m_isAborted;
};
NewWebSocketChannelImpl::Resumer::Resumer(NewWebSocketChannelImpl* channel)
: m_channel(channel)
, m_pendingEventProcessor(adoptRef(new PendingEventProcessor))
, m_timer(this, &Resumer::resumeNow)
, m_isAborted(false) { }
NewWebSocketChannelImpl::Resumer::~Resumer()
{
abort();
}
void NewWebSocketChannelImpl::Resumer::append(const PendingEvent& event)
{
if (m_isAborted)
return;
m_pendingEventProcessor->append(event);
}
void NewWebSocketChannelImpl::Resumer::suspend()
{
if (m_isAborted)
return;
m_timer.stop();
m_channel->m_isSuspended = true;
}
void NewWebSocketChannelImpl::Resumer::resumeLater()
{
if (m_isAborted)
return;
if (!m_timer.isActive())
m_timer.startOneShot(0);
}
void NewWebSocketChannelImpl::Resumer::abort()
{
if (m_isAborted)
return;
m_isAborted = true;
m_timer.stop();
m_pendingEventProcessor->abort();
m_pendingEventProcessor = 0;
}
void NewWebSocketChannelImpl::Resumer::resumeNow(Timer<Resumer>*)
{
ASSERT(!m_isAborted);
m_channel->m_isSuspended = false;
ASSERT(m_channel->m_client);
m_pendingEventProcessor->process(m_channel);
// |this| can be aborted here.
// |this| can be deleted here.
}
NewWebSocketChannelImpl::NewWebSocketChannelImpl(ScriptExecutionContext* context, WebSocketChannelClient* client, const String& sourceURL, unsigned lineNumber)
: ContextLifecycleObserver(context)
, m_handle(adoptPtr(WebKit::Platform::current()->createWebSocketHandle()))
, m_client(client)
, m_resumer(adoptPtr(new Resumer(this)))
, m_isSuspended(false)
, m_sendingQuota(0)
, m_receivedDataSizeForFlowControl(receivedDataSizeForFlowControlHighWaterMark * 2) // initial quota
, m_bufferedAmount(0)
, m_sentSizeOfTopMessage(0)
{
}
NewWebSocketChannelImpl::~NewWebSocketChannelImpl()
{
abortAsyncOperations();
}
void NewWebSocketChannelImpl::connect(const KURL& url, const String& protocol)
{
if (!m_handle)
return;
m_url = url;
Vector<String> protocols;
// Since protocol is already verified and escaped, we can simply split it.
protocol.split(", ", true, protocols);
WebKit::WebVector<WebKit::WebString> webProtocols(protocols.size());
for (size_t i = 0; i < protocols.size(); ++i) {
webProtocols[i] = protocols[i];
}
String origin = scriptExecutionContext()->securityOrigin()->toString();
m_handle->connect(url, webProtocols, origin, this);
flowControlIfNecessary();
}
String NewWebSocketChannelImpl::subprotocol()
{
return m_subprotocol;
}
String NewWebSocketChannelImpl::extensions()
{
return m_extensions;
}
WebSocketChannel::SendResult NewWebSocketChannelImpl::send(const String& message)
{
m_messages.append(Message(message));
sendInternal();
return SendSuccess;
}
WebSocketChannel::SendResult NewWebSocketChannelImpl::send(const Blob& blob)
{
m_messages.append(Message(blob));
sendInternal();
return SendSuccess;
}
WebSocketChannel::SendResult NewWebSocketChannelImpl::send(const ArrayBuffer& buffer, unsigned byteOffset, unsigned byteLength)
{
// buffer.slice copies its contents.
m_messages.append(buffer.slice(byteOffset, byteOffset + byteLength));
sendInternal();
return SendSuccess;
}
unsigned long NewWebSocketChannelImpl::bufferedAmount() const
{
return m_bufferedAmount;
}
void NewWebSocketChannelImpl::close(int code, const String& reason)
{
ASSERT(m_handle);
m_handle->close(static_cast<unsigned short>(code), reason);
}
void NewWebSocketChannelImpl::fail(const String& reason, MessageLevel level, const String& sourceURL, unsigned lineNumber)
{
// m_handle and m_client can be null here.
if (m_client)
m_client->didReceiveMessageError();
if (m_isSuspended) {
m_resumer->append(PendingEvent(CloseEventCodeAbnormalClosure, reason));
} else {
handleDidClose(CloseEventCodeAbnormalClosure, reason);
// handleDidClose may delete this object.
}
}
void NewWebSocketChannelImpl::disconnect()
{
abortAsyncOperations();
if (m_handle)
m_handle->close(CloseEventCodeAbnormalClosure, "");
m_handle.clear();
m_client = 0;
}
void NewWebSocketChannelImpl::suspend()
{
m_resumer->suspend();
}
void NewWebSocketChannelImpl::resume()
{
m_resumer->resumeLater();
}
void NewWebSocketChannelImpl::handleDidConnect()
{
ASSERT(m_client);
ASSERT(!m_isSuspended);
m_client->didConnect();
}
void NewWebSocketChannelImpl::handleTextMessage(Vector<char>* messageData)
{
ASSERT(m_client);
ASSERT(!m_isSuspended);
ASSERT(messageData);
String message = String::fromUTF8(messageData->data(), messageData->size());
// For consistency with handleBinaryMessage, we clear |messageData|.
messageData->clear();
if (message.isNull()) {
failAsError("Could not decode a text frame as UTF-8.");
// failAsError may delete this object.
} else {
m_client->didReceiveMessage(message);
}
}
void NewWebSocketChannelImpl::handleBinaryMessage(Vector<char>* messageData)
{
ASSERT(m_client);
ASSERT(!m_isSuspended);
ASSERT(messageData);
OwnPtr<Vector<char> > binaryData = adoptPtr(new Vector<char>);
messageData->swap(*binaryData);
m_client->didReceiveBinaryData(binaryData.release());
}
void NewWebSocketChannelImpl::handleDidReceiveMessageError()
{
ASSERT(m_client);
ASSERT(!m_isSuspended);
m_client->didReceiveMessageError();
}
void NewWebSocketChannelImpl::handleDidClose(unsigned short code, const String& reason)
{
ASSERT(!m_isSuspended);
m_handle.clear();
abortAsyncOperations();
if (!m_client) {
return;
}
WebSocketChannelClient* client = m_client;
m_client = 0;
WebSocketChannelClient::ClosingHandshakeCompletionStatus status =
isClean(code) ? WebSocketChannelClient::ClosingHandshakeComplete : WebSocketChannelClient::ClosingHandshakeIncomplete;
client->didClose(m_bufferedAmount, status, code, reason);
// client->didClose may delete this object.
}
NewWebSocketChannelImpl::Message::Message(const String& text)
: type(MessageTypeText)
, text(text.utf8(String::StrictConversionReplacingUnpairedSurrogatesWithFFFD)) { }
NewWebSocketChannelImpl::Message::Message(const Blob& blob)
: type(MessageTypeBlob)
, blob(Blob::create(blob.url(), blob.type(), blob.size())) { }
NewWebSocketChannelImpl::Message::Message(PassRefPtr<ArrayBuffer> arrayBuffer)
: type(MessageTypeArrayBuffer)
, arrayBuffer(arrayBuffer) { }
void NewWebSocketChannelImpl::sendInternal()
{
ASSERT(m_handle);
unsigned long bufferedAmount = m_bufferedAmount;
while (!m_messages.isEmpty() && m_sendingQuota > 0 && !m_blobLoader) {
bool final = false;
const Message& message = m_messages.first();
switch (message.type) {
case MessageTypeText: {
WebSocketHandle::MessageType type =
m_sentSizeOfTopMessage ? WebSocketHandle::MessageTypeContinuation : WebSocketHandle::MessageTypeText;
size_t size = std::min(static_cast<size_t>(m_sendingQuota), message.text.length() - m_sentSizeOfTopMessage);
final = (m_sendingQuota == size);
m_handle->send(type, message.text.data() + m_sentSizeOfTopMessage, size, final);
m_sentSizeOfTopMessage += size;
m_sendingQuota -= size;
break;
}
case MessageTypeBlob:
ASSERT(!m_blobLoader);
m_blobLoader = adoptPtr(new BlobLoader(*message.blob, this));
break;
case MessageTypeArrayBuffer: {
WebSocketHandle::MessageType type =
m_sentSizeOfTopMessage ? WebSocketHandle::MessageTypeContinuation : WebSocketHandle::MessageTypeBinary;
size_t size = std::min(static_cast<size_t>(m_sendingQuota), message.arrayBuffer->byteLength() - m_sentSizeOfTopMessage);
final = (m_sendingQuota == size);
m_handle->send(type, static_cast<const char*>(message.arrayBuffer->data()) + m_sentSizeOfTopMessage, size, final);
m_sentSizeOfTopMessage += size;
m_sendingQuota -= size;
break;
}
}
if (final) {
m_messages.removeFirst();
m_sentSizeOfTopMessage = 0;
}
}
if (m_client && m_bufferedAmount != bufferedAmount) {
m_client->didUpdateBufferedAmount(m_bufferedAmount);
}
}
void NewWebSocketChannelImpl::flowControlIfNecessary()
{
if (!m_handle || m_receivedDataSizeForFlowControl < receivedDataSizeForFlowControlHighWaterMark) {
return;
}
m_handle->flowControl(m_receivedDataSizeForFlowControl);
m_receivedDataSizeForFlowControl = 0;
}
void NewWebSocketChannelImpl::abortAsyncOperations()
{
if (m_blobLoader) {
m_blobLoader->cancel();
m_blobLoader.clear();
}
m_resumer->abort();
}
void NewWebSocketChannelImpl::didConnect(WebSocketHandle* handle, bool succeed, const WebKit::WebString& selectedProtocol, const WebKit::WebString& extensions)
{
ASSERT(m_handle);
ASSERT(handle == m_handle);
ASSERT(m_client);
if (!succeed) {
failAsError("Cannot connect to " + m_url.string() + ".");
// failAsError may delete this object.
return;
}
m_subprotocol = selectedProtocol;
m_extensions = extensions;
if (m_isSuspended)
m_resumer->append(PendingEvent(PendingEvent::DidConnectComplete));
else
m_client->didConnect();
}
void NewWebSocketChannelImpl::didReceiveData(WebSocketHandle* handle, WebSocketHandle::MessageType type, const char* data, size_t size, bool fin)
{
ASSERT(m_handle);
ASSERT(handle == m_handle);
ASSERT(m_client);
// Non-final frames cannot be empty.
ASSERT(fin || size);
switch (type) {
case WebSocketHandle::MessageTypeText:
ASSERT(m_receivingMessageData.isEmpty());
m_receivingMessageTypeIsText = true;
break;
case WebSocketHandle::MessageTypeBinary:
ASSERT(m_receivingMessageData.isEmpty());
m_receivingMessageTypeIsText = false;
break;
case WebSocketHandle::MessageTypeContinuation:
ASSERT(!m_receivingMessageData.isEmpty());
break;
}
m_receivingMessageData.append(data, size);
m_receivedDataSizeForFlowControl += size;
flowControlIfNecessary();
if (!fin) {
return;
}
if (m_isSuspended) {
PendingEvent::Type type =
m_receivingMessageTypeIsText ? PendingEvent::DidReceiveTextMessage : PendingEvent::DidReceiveBinaryMessage;
m_resumer->append(PendingEvent(type, &m_receivingMessageData));
return;
}
Vector<char> messageData;
messageData.swap(m_receivingMessageData);
if (m_receivingMessageTypeIsText) {
handleTextMessage(&messageData);
// handleTextMessage may delete this object.
} else {
handleBinaryMessage(&messageData);
}
}
void NewWebSocketChannelImpl::didClose(WebSocketHandle* handle, unsigned short code, const WebKit::WebString& reason)
{
ASSERT(m_handle);
m_handle.clear();
// FIXME: Maybe we should notify an error to m_client for some didClose messages.
if (m_isSuspended) {
m_resumer->append(PendingEvent(code, reason));
} else {
handleDidClose(code, reason);
// handleDidClose may delete this object.
}
}
void NewWebSocketChannelImpl::didFinishLoadingBlob(PassRefPtr<ArrayBuffer> buffer)
{
m_blobLoader.clear();
ASSERT(m_handle);
// The loaded blob is always placed on m_messages[0].
ASSERT(m_messages.size() > 0 && m_messages.first().type == MessageTypeBlob);
// We replace it with the loaded blob.
m_messages.first() = Message(buffer);
sendInternal();
}
void NewWebSocketChannelImpl::didFailLoadingBlob(FileError::ErrorCode errorCode)
{
m_blobLoader.clear();
if (errorCode == FileError::ABORT_ERR) {
// The error is caused by cancel().
return;
}
// FIXME: Generate human-friendly reason message.
failAsError("Failed to load Blob: error code = " + String::number(errorCode));
// |this| can be deleted here.
}
} // namespace WebCore