blob: 81d2627c7f6c0ae1dd5f9119367a82ba8b07bd6a [file] [log] [blame]
// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/message_loop/message_loop.h"
#include "base/rand_util.h"
#include "chrome/browser/devtools/device/android_device_manager.h"
#include "content/public/browser/browser_thread.h"
#include "net/base/io_buffer.h"
#include "net/base/net_errors.h"
#include "net/server/web_socket.h"
#include "net/socket/stream_socket.h"
using content::BrowserThread;
using net::WebSocket;
namespace {
const int kBufferSize = 16 * 1024;
class WebSocketImpl : public AndroidDeviceManager::AndroidWebSocket {
public:
typedef AndroidDeviceManager::Device Device;
WebSocketImpl(scoped_refptr<base::MessageLoopProxy> device_message_loop,
scoped_refptr<Device> device,
const std::string& socket_name,
const std::string& url,
Delegate* delegate);
virtual void Connect() OVERRIDE;
virtual void Disconnect() OVERRIDE;
virtual void SendFrame(const std::string& message) OVERRIDE;
virtual void ClearDelegate() OVERRIDE;
private:
friend class base::RefCountedThreadSafe<AndroidWebSocket>;
virtual ~WebSocketImpl();
void Connected(int result, net::StreamSocket* socket);
void StartListeningOnHandlerThread();
void OnBytesRead(scoped_refptr<net::IOBuffer> response_buffer, int result);
void SendFrameOnHandlerThread(const std::string& message);
void SendPendingRequests(int result);
void DisconnectOnHandlerThread(bool closed_by_device);
void OnSocketOpened();
void OnFrameRead(const std::string& message);
void OnSocketClosed(bool closed_by_device);
scoped_refptr<base::MessageLoopProxy> device_message_loop_;
scoped_refptr<Device> device_;
std::string socket_name_;
std::string url_;
scoped_ptr<net::StreamSocket> socket_;
Delegate* delegate_;
std::string response_buffer_;
std::string request_buffer_;
};
WebSocketImpl::WebSocketImpl(
scoped_refptr<base::MessageLoopProxy> device_message_loop,
scoped_refptr<Device> device,
const std::string& socket_name,
const std::string& url,
Delegate* delegate)
: device_message_loop_(device_message_loop),
device_(device),
socket_name_(socket_name),
url_(url),
delegate_(delegate) {
}
void WebSocketImpl::Connect() {
DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
device_->HttpUpgrade(
socket_name_, url_, base::Bind(&WebSocketImpl::Connected, this));
}
void WebSocketImpl::Disconnect() {
DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
device_message_loop_->PostTask(
FROM_HERE,
base::Bind(&WebSocketImpl::DisconnectOnHandlerThread, this, false));
}
void WebSocketImpl::SendFrame(const std::string& message) {
DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
device_message_loop_->PostTask(
FROM_HERE,
base::Bind(&WebSocketImpl::SendFrameOnHandlerThread, this, message));
}
void WebSocketImpl::ClearDelegate() {
delegate_ = NULL;
}
void WebSocketImpl::SendFrameOnHandlerThread(const std::string& message) {
DCHECK_EQ(device_message_loop_, base::MessageLoopProxy::current());
int mask = base::RandInt(0, 0x7FFFFFFF);
std::string encoded_frame = WebSocket::EncodeFrameHybi17(message, mask);
request_buffer_ += encoded_frame;
if (request_buffer_.length() == encoded_frame.length())
SendPendingRequests(0);
}
WebSocketImpl::~WebSocketImpl() {
DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
}
void WebSocketImpl::Connected(int result, net::StreamSocket* socket) {
DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
if (result != net::OK || socket == NULL) {
OnSocketClosed(true);
return;
}
socket_.reset(socket);
device_message_loop_->PostTask(
FROM_HERE,
base::Bind(&WebSocketImpl::StartListeningOnHandlerThread, this));
OnSocketOpened();
}
void WebSocketImpl::StartListeningOnHandlerThread() {
DCHECK_EQ(device_message_loop_, base::MessageLoopProxy::current());
scoped_refptr<net::IOBuffer> response_buffer =
new net::IOBuffer(kBufferSize);
int result = socket_->Read(
response_buffer.get(),
kBufferSize,
base::Bind(&WebSocketImpl::OnBytesRead, this, response_buffer));
if (result != net::ERR_IO_PENDING)
OnBytesRead(response_buffer, result);
}
void WebSocketImpl::OnBytesRead(
scoped_refptr<net::IOBuffer> response_buffer, int result) {
DCHECK_EQ(device_message_loop_, base::MessageLoopProxy::current());
if (!socket_)
return;
if (result <= 0) {
DisconnectOnHandlerThread(true);
return;
}
std::string data = std::string(response_buffer->data(), result);
response_buffer_ += data;
int bytes_consumed;
std::string output;
WebSocket::ParseResult parse_result = WebSocket::DecodeFrameHybi17(
response_buffer_, false, &bytes_consumed, &output);
while (parse_result == WebSocket::FRAME_OK) {
response_buffer_ = response_buffer_.substr(bytes_consumed);
BrowserThread::PostTask(BrowserThread::UI, FROM_HERE,
base::Bind(&WebSocketImpl::OnFrameRead, this, output));
parse_result = WebSocket::DecodeFrameHybi17(
response_buffer_, false, &bytes_consumed, &output);
}
if (parse_result == WebSocket::FRAME_ERROR ||
parse_result == WebSocket::FRAME_CLOSE) {
DisconnectOnHandlerThread(true);
return;
}
result = socket_->Read(
response_buffer.get(),
kBufferSize,
base::Bind(&WebSocketImpl::OnBytesRead, this, response_buffer));
if (result != net::ERR_IO_PENDING)
OnBytesRead(response_buffer, result);
}
void WebSocketImpl::SendPendingRequests(int result) {
DCHECK_EQ(device_message_loop_, base::MessageLoopProxy::current());
if (!socket_)
return;
if (result < 0) {
DisconnectOnHandlerThread(true);
return;
}
request_buffer_ = request_buffer_.substr(result);
if (request_buffer_.empty())
return;
scoped_refptr<net::StringIOBuffer> buffer =
new net::StringIOBuffer(request_buffer_);
result = socket_->Write(buffer.get(), buffer->size(),
base::Bind(&WebSocketImpl::SendPendingRequests,
this));
if (result != net::ERR_IO_PENDING)
SendPendingRequests(result);
}
void WebSocketImpl::DisconnectOnHandlerThread(bool closed_by_device) {
DCHECK_EQ(device_message_loop_, base::MessageLoopProxy::current());
if (!socket_)
return;
// Wipe out socket_ first since Disconnect can re-enter this method.
scoped_ptr<net::StreamSocket> socket(socket_.release());
socket->Disconnect();
BrowserThread::PostTask(BrowserThread::UI, FROM_HERE,
base::Bind(&WebSocketImpl::OnSocketClosed, this, closed_by_device));
}
void WebSocketImpl::OnSocketOpened() {
if (delegate_)
delegate_->OnSocketOpened();
}
void WebSocketImpl::OnFrameRead(const std::string& message) {
if (delegate_)
delegate_->OnFrameRead(message);
}
void WebSocketImpl::OnSocketClosed(bool closed_by_device) {
if (delegate_)
delegate_->OnSocketClosed(closed_by_device);
}
} // namespace
scoped_refptr<AndroidDeviceManager::AndroidWebSocket>
AndroidDeviceManager::Device::CreateWebSocket(
const std::string& socket,
const std::string& url,
AndroidDeviceManager::AndroidWebSocket::Delegate* delegate) {
return new WebSocketImpl(device_message_loop_, this, socket, url, delegate);
}