blob: 4a701a5ac33e4a53b8c069df01af6489aace7600 [file] [log] [blame]
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket;
import java.io.EOFException;
import java.io.IOException;
import java.net.ProtocolException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.net.ssl.SSLEngine;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.Buffers;
import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.io.ConnectedEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.SimpleBuffers;
import org.eclipse.jetty.io.nio.AsyncConnection;
import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
import org.eclipse.jetty.io.nio.SelectorManager;
import org.eclipse.jetty.io.nio.SslConnection;
import org.eclipse.jetty.util.B64Code;
import org.eclipse.jetty.util.QuotedStringTokenizer;
import org.eclipse.jetty.util.component.AggregateLifeCycle;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.ThreadPool;
/* ------------------------------------------------------------ */
/**
* <p>WebSocketClientFactory contains the common components needed by multiple {@link WebSocketClient} instances
* (for example, a {@link ThreadPool}, a {@link SelectorManager NIO selector}, etc).</p>
* <p>WebSocketClients with different configurations should share the same factory to avoid to waste resources.</p>
* <p>If a ThreadPool or MaskGen is passed in the constructor, then it is not added with {@link AggregateLifeCycle#addBean(Object)},
* so it's lifecycle must be controlled externally.
*
* @see WebSocketClient
*/
public class WebSocketClientFactory extends AggregateLifeCycle
{
private final static Logger __log = org.eclipse.jetty.util.log.Log.getLogger(WebSocketClientFactory.class.getName());
private final static ByteArrayBuffer __ACCEPT = new ByteArrayBuffer.CaseInsensitive("Sec-WebSocket-Accept");
private final Queue<WebSocketConnection> connections = new ConcurrentLinkedQueue<WebSocketConnection>();
private final SslContextFactory _sslContextFactory = new SslContextFactory();
private final ThreadPool _threadPool;
private final WebSocketClientSelector _selector;
private MaskGen _maskGen;
private WebSocketBuffers _buffers;
/* ------------------------------------------------------------ */
/**
* <p>Creates a WebSocketClientFactory with the default configuration.</p>
*/
public WebSocketClientFactory()
{
this(null);
}
/* ------------------------------------------------------------ */
/**
* <p>Creates a WebSocketClientFactory with the given ThreadPool and the default configuration.</p>
*
* @param threadPool the ThreadPool instance to use
*/
public WebSocketClientFactory(ThreadPool threadPool)
{
this(threadPool, new RandomMaskGen());
}
/* ------------------------------------------------------------ */
/**
* <p>Creates a WebSocketClientFactory with the given ThreadPool and the given MaskGen.</p>
*
* @param threadPool the ThreadPool instance to use
* @param maskGen the MaskGen instance to use
*/
public WebSocketClientFactory(ThreadPool threadPool, MaskGen maskGen)
{
this(threadPool, maskGen, 8192);
}
/* ------------------------------------------------------------ */
/**
* <p>Creates a WebSocketClientFactory with the specified configuration.</p>
*
* @param threadPool the ThreadPool instance to use
* @param maskGen the mask generator to use
* @param bufferSize the read buffer size
*/
public WebSocketClientFactory(ThreadPool threadPool, MaskGen maskGen, int bufferSize)
{
if (threadPool == null)
threadPool = new QueuedThreadPool();
_threadPool = threadPool;
addBean(_threadPool);
_buffers = new WebSocketBuffers(bufferSize);
addBean(_buffers);
_maskGen = maskGen;
addBean(_maskGen);
_selector = new WebSocketClientSelector();
addBean(_selector);
addBean(_sslContextFactory);
}
/* ------------------------------------------------------------ */
/**
* @return the SslContextFactory used to configure SSL parameters
*/
public SslContextFactory getSslContextFactory()
{
return _sslContextFactory;
}
/* ------------------------------------------------------------ */
/**
* Get the selectorManager. Used to configure the manager.
*
* @return The {@link SelectorManager} instance.
*/
public SelectorManager getSelectorManager()
{
return _selector;
}
/* ------------------------------------------------------------ */
/**
* Get the ThreadPool.
* Used to set/query the thread pool configuration.
*
* @return The {@link ThreadPool}
*/
public ThreadPool getThreadPool()
{
return _threadPool;
}
/* ------------------------------------------------------------ */
/**
* @return the shared mask generator, or null if no shared mask generator is used
* @see WebSocketClient#getMaskGen()
*/
public MaskGen getMaskGen()
{
return _maskGen;
}
/* ------------------------------------------------------------ */
/**
* @param maskGen the shared mask generator, or null if no shared mask generator is used
* @see WebSocketClient#setMaskGen(MaskGen)
*/
public void setMaskGen(MaskGen maskGen)
{
if (isRunning())
throw new IllegalStateException(getState());
removeBean(_maskGen);
_maskGen = maskGen;
addBean(maskGen);
}
/* ------------------------------------------------------------ */
/**
* @param bufferSize the read buffer size
* @see #getBufferSize()
*/
public void setBufferSize(int bufferSize)
{
if (isRunning())
throw new IllegalStateException(getState());
removeBean(_buffers);
_buffers = new WebSocketBuffers(bufferSize);
addBean(_buffers);
}
/* ------------------------------------------------------------ */
/**
* @return the read buffer size
*/
public int getBufferSize()
{
return _buffers.getBufferSize();
}
@Override
protected void doStop() throws Exception
{
closeConnections();
super.doStop();
}
/* ------------------------------------------------------------ */
/**
* <p>Creates and returns a new instance of a {@link WebSocketClient}, configured with this
* WebSocketClientFactory instance.</p>
*
* @return a new {@link WebSocketClient} instance
*/
public WebSocketClient newWebSocketClient()
{
return new WebSocketClient(this);
}
protected SSLEngine newSslEngine(SocketChannel channel) throws IOException
{
SSLEngine sslEngine;
if (channel != null)
{
String peerHost = channel.socket().getInetAddress().getHostAddress();
int peerPort = channel.socket().getPort();
sslEngine = _sslContextFactory.newSslEngine(peerHost, peerPort);
}
else
{
sslEngine = _sslContextFactory.newSslEngine();
}
sslEngine.setUseClientMode(true);
sslEngine.beginHandshake();
return sslEngine;
}
protected boolean addConnection(WebSocketConnection connection)
{
return isRunning() && connections.add(connection);
}
protected boolean removeConnection(WebSocketConnection connection)
{
return connections.remove(connection);
}
protected void closeConnections()
{
for (WebSocketConnection connection : connections)
connection.shutdown();
}
/* ------------------------------------------------------------ */
/**
* WebSocket Client Selector Manager
*/
class WebSocketClientSelector extends SelectorManager
{
@Override
public boolean dispatch(Runnable task)
{
return _threadPool.dispatch(task);
}
@Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, final SelectionKey key) throws IOException
{
WebSocketClient.WebSocketFuture holder = (WebSocketClient.WebSocketFuture)key.attachment();
int maxIdleTime = holder.getMaxIdleTime();
if (maxIdleTime < 0)
maxIdleTime = (int)getMaxIdleTime();
SelectChannelEndPoint result = new SelectChannelEndPoint(channel, selectSet, key, maxIdleTime);
AsyncEndPoint endPoint = result;
// Detect if it is SSL, and wrap the connection if so
if ("wss".equals(holder.getURI().getScheme()))
{
SSLEngine sslEngine = newSslEngine(channel);
SslConnection sslConnection = new SslConnection(sslEngine, endPoint);
endPoint.setConnection(sslConnection);
endPoint = sslConnection.getSslEndPoint();
}
AsyncConnection connection = selectSet.getManager().newConnection(channel, endPoint, holder);
endPoint.setConnection(connection);
return result;
}
@Override
public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment)
{
WebSocketClient.WebSocketFuture holder = (WebSocketClient.WebSocketFuture)attachment;
return new HandshakeConnection(endpoint, holder);
}
@Override
protected void endPointOpened(SelectChannelEndPoint endpoint)
{
// TODO expose on outer class ??
}
@Override
protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
{
LOG.debug("upgrade {} -> {}", oldConnection, endpoint.getConnection());
}
@Override
protected void endPointClosed(SelectChannelEndPoint endpoint)
{
endpoint.getConnection().onClose();
}
@Override
protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
{
if (!(attachment instanceof WebSocketClient.WebSocketFuture))
super.connectionFailed(channel, ex, attachment);
else
{
__log.debug(ex);
WebSocketClient.WebSocketFuture future = (WebSocketClient.WebSocketFuture)attachment;
future.handshakeFailed(ex);
}
}
}
/* ------------------------------------------------------------ */
/**
* Handshake Connection.
* Handles the connection until the handshake succeeds or fails.
*/
class HandshakeConnection extends AbstractConnection implements AsyncConnection
{
private final AsyncEndPoint _endp;
private final WebSocketClient.WebSocketFuture _future;
private final String _key;
private final HttpParser _parser;
private String _accept;
private String _error;
private ByteArrayBuffer _handshake;
public HandshakeConnection(AsyncEndPoint endpoint, WebSocketClient.WebSocketFuture future)
{
super(endpoint, System.currentTimeMillis());
_endp = endpoint;
_future = future;
byte[] bytes = new byte[16];
new Random().nextBytes(bytes);
_key = new String(B64Code.encode(bytes));
Buffers buffers = new SimpleBuffers(_buffers.getBuffer(), null);
_parser = new HttpParser(buffers, _endp, new HttpParser.EventHandler()
{
@Override
public void startResponse(Buffer version, int status, Buffer reason) throws IOException
{
if (status != 101)
{
_error = "Bad response status " + status + " " + reason;
_endp.close();
}
}
@Override
public void parsedHeader(Buffer name, Buffer value) throws IOException
{
if (__ACCEPT.equals(name))
_accept = value.toString();
}
@Override // TODO simone says shouldn't be needed
public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
{
if (_error == null)
_error = "Bad response: " + method + " " + url + " " + version;
_endp.close();
}
@Override // TODO simone says shouldn't be needed
public void content(Buffer ref) throws IOException
{
if (_error == null)
_error = "Bad response. " + ref.length() + "B of content?";
_endp.close();
}
});
}
private boolean handshake()
{
if (_handshake==null)
{
String path = _future.getURI().getPath();
if (path == null || path.length() == 0)
path = "/";
if (_future.getURI().getRawQuery() != null)
path += "?" + _future.getURI().getRawQuery();
String origin = _future.getOrigin();
StringBuilder request = new StringBuilder(512);
request.append("GET ").append(path).append(" HTTP/1.1\r\n")
.append("Host: ").append(_future.getURI().getHost()).append(":")
.append(_future.getURI().getPort()).append("\r\n")
.append("Upgrade: websocket\r\n")
.append("Connection: Upgrade\r\n")
.append("Sec-WebSocket-Key: ")
.append(_key).append("\r\n");
if (origin != null)
request.append("Origin: ").append(origin).append("\r\n");
request.append("Sec-WebSocket-Version: ").append(WebSocketConnectionRFC6455.VERSION).append("\r\n");
if (_future.getProtocol() != null)
request.append("Sec-WebSocket-Protocol: ").append(_future.getProtocol()).append("\r\n");
Map<String, String> cookies = _future.getCookies();
if (cookies != null && cookies.size() > 0)
{
for (String cookie : cookies.keySet())
request.append("Cookie: ")
.append(QuotedStringTokenizer.quoteIfNeeded(cookie, HttpFields.__COOKIE_DELIM))
.append("=")
.append(QuotedStringTokenizer.quoteIfNeeded(cookies.get(cookie), HttpFields.__COOKIE_DELIM))
.append("\r\n");
}
request.append("\r\n");
_handshake=new ByteArrayBuffer(request.toString(), false);
}
// TODO extensions
try
{
int len = _handshake.length();
int flushed = _endp.flush(_handshake);
if (flushed<0)
throw new IOException("incomplete handshake");
}
catch (IOException e)
{
_future.handshakeFailed(e);
}
return _handshake.length()==0;
}
public Connection handle() throws IOException
{
while (_endp.isOpen() && !_parser.isComplete())
{
if (_handshake==null || _handshake.length()>0)
if (!handshake())
return this;
if (!_parser.parseAvailable())
{
if (_endp.isInputShutdown())
_future.handshakeFailed(new IOException("Incomplete handshake response"));
return this;
}
}
if (_error == null)
{
if (_accept == null)
{
_error = "No Sec-WebSocket-Accept";
}
else if (!WebSocketConnectionRFC6455.hashKey(_key).equals(_accept))
{
_error = "Bad Sec-WebSocket-Accept";
}
else
{
WebSocketConnection connection = newWebSocketConnection();
Buffer header = _parser.getHeaderBuffer();
if (header.hasContent())
connection.fillBuffersFrom(header);
_buffers.returnBuffer(header);
_future.onConnection(connection);
return connection;
}
}
_endp.close();
return this;
}
private WebSocketConnection newWebSocketConnection() throws IOException
{
__log.debug("newWebSocketConnection()");
return new WebSocketClientConnection(
_future._client.getFactory(),
_future.getWebSocket(),
_endp,
_buffers,
System.currentTimeMillis(),
_future.getMaxIdleTime(),
_future.getProtocol(),
null,
WebSocketConnectionRFC6455.VERSION,
_future.getMaskGen());
}
public void onInputShutdown() throws IOException
{
_endp.close();
}
public boolean isIdle()
{
return false;
}
public boolean isSuspended()
{
return false;
}
public void onClose()
{
if (_error != null)
_future.handshakeFailed(new ProtocolException(_error));
else
_future.handshakeFailed(new EOFException());
}
}
private static class WebSocketClientConnection extends WebSocketConnectionRFC6455
{
private final WebSocketClientFactory factory;
public WebSocketClientConnection(WebSocketClientFactory factory, WebSocket webSocket, EndPoint endPoint, WebSocketBuffers buffers, long timeStamp, int maxIdleTime, String protocol, List<Extension> extensions, int draftVersion, MaskGen maskGen) throws IOException
{
super(webSocket, endPoint, buffers, timeStamp, maxIdleTime, protocol, extensions, draftVersion, maskGen);
this.factory = factory;
}
@Override
public void onClose()
{
super.onClose();
factory.removeConnection(this);
}
}
}