blob: 4e551cd51af0642647eeae10bc68ade6e0c8b086 [file] [log] [blame]
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.net.ProtocolException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import org.eclipse.jetty.client.HttpClient.Connector;
import org.eclipse.jetty.client.security.Authentication;
import org.eclipse.jetty.client.security.SecurityListener;
import org.eclipse.jetty.http.HttpCookie;
import org.eclipse.jetty.http.HttpHeaders;
import org.eclipse.jetty.http.HttpMethods;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.PathMap;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.component.AggregateLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory;
/**
* @version $Revision: 879 $ $Date: 2009-09-11 16:13:28 +0200 (Fri, 11 Sep 2009) $
*/
public class HttpDestination implements Dumpable
{
private static final Logger LOG = Log.getLogger(HttpDestination.class);
private final List<HttpExchange> _exchanges = new LinkedList<HttpExchange>();
private final List<AbstractHttpConnection> _connections = new LinkedList<AbstractHttpConnection>();
private final BlockingQueue<Object> _reservedConnections = new ArrayBlockingQueue<Object>(10, true);
private final List<AbstractHttpConnection> _idleConnections = new ArrayList<AbstractHttpConnection>();
private final HttpClient _client;
private final Address _address;
private final boolean _ssl;
private final SslContextFactory _sslContextFactory;
private final ByteArrayBuffer _hostHeader;
private volatile int _maxConnections;
private volatile int _maxQueueSize;
private int _pendingConnections = 0;
private int _pendingReservedConnections = 0;
private volatile Address _proxy;
private Authentication _proxyAuthentication;
private PathMap _authorizations;
private List<HttpCookie> _cookies;
HttpDestination(HttpClient client, Address address, boolean ssl, SslContextFactory sslContextFactory)
{
_client = client;
_address = address;
_ssl = ssl;
_sslContextFactory = sslContextFactory;
_maxConnections = _client.getMaxConnectionsPerAddress();
_maxQueueSize = _client.getMaxQueueSizePerAddress();
String addressString = address.getHost();
if (address.getPort() != (_ssl ? 443 : 80))
addressString += ":" + address.getPort();
_hostHeader = new ByteArrayBuffer(addressString);
}
public HttpClient getHttpClient()
{
return _client;
}
public Address getAddress()
{
return _address;
}
public boolean isSecure()
{
return _ssl;
}
public SslContextFactory getSslContextFactory()
{
return _sslContextFactory;
}
public Buffer getHostHeader()
{
return _hostHeader;
}
public int getMaxConnections()
{
return _maxConnections;
}
public void setMaxConnections(int maxConnections)
{
this._maxConnections = maxConnections;
}
public int getMaxQueueSize()
{
return _maxQueueSize;
}
public void setMaxQueueSize(int maxQueueSize)
{
this._maxQueueSize = maxQueueSize;
}
public int getConnections()
{
synchronized (this)
{
return _connections.size();
}
}
public int getIdleConnections()
{
synchronized (this)
{
return _idleConnections.size();
}
}
public void addAuthorization(String pathSpec, Authentication authorization)
{
synchronized (this)
{
if (_authorizations == null)
_authorizations = new PathMap();
_authorizations.put(pathSpec, authorization);
}
// TODO query and remove methods
}
public void addCookie(HttpCookie cookie)
{
synchronized (this)
{
if (_cookies == null)
_cookies = new ArrayList<HttpCookie>();
_cookies.add(cookie);
}
// TODO query, remove and age methods
}
/**
* Get a connection. We either get an idle connection if one is available, or
* we make a new connection, if we have not yet reached maxConnections. If we
* have reached maxConnections, we wait until the number reduces.
*
* @param timeout max time prepared to block waiting to be able to get a connection
* @return a HttpConnection for this destination
* @throws IOException if an I/O error occurs
*/
private AbstractHttpConnection getConnection(long timeout) throws IOException
{
AbstractHttpConnection connection = null;
while ((connection == null) && (connection = getIdleConnection()) == null && timeout > 0)
{
boolean startConnection = false;
synchronized (this)
{
int totalConnections = _connections.size() + _pendingConnections;
if (totalConnections < _maxConnections)
{
_pendingReservedConnections++;
startConnection = true;
}
}
if (startConnection)
{
startNewConnection();
try
{
Object o = _reservedConnections.take();
if (o instanceof AbstractHttpConnection)
{
connection = (AbstractHttpConnection)o;
}
else
throw (IOException)o;
}
catch (InterruptedException e)
{
LOG.ignore(e);
}
}
else
{
try
{
Thread.currentThread();
Thread.sleep(200);
timeout -= 200;
}
catch (InterruptedException e)
{
LOG.ignore(e);
}
}
}
return connection;
}
public AbstractHttpConnection reserveConnection(long timeout) throws IOException
{
AbstractHttpConnection connection = getConnection(timeout);
if (connection != null)
connection.setReserved(true);
return connection;
}
public AbstractHttpConnection getIdleConnection() throws IOException
{
AbstractHttpConnection connection = null;
while (true)
{
synchronized (this)
{
if (connection != null)
{
_connections.remove(connection);
connection.close();
connection = null;
}
if (_idleConnections.size() > 0)
connection = _idleConnections.remove(_idleConnections.size() - 1);
}
if (connection == null)
{
return null;
}
// Check if the connection was idle,
// but it expired just a moment ago
if (connection.cancelIdleTimeout())
{
return connection;
}
}
}
protected void startNewConnection()
{
try
{
synchronized (this)
{
_pendingConnections++;
}
final Connector connector = _client._connector;
if (connector != null)
connector.startConnection(this);
}
catch (Exception e)
{
LOG.debug(e);
onConnectionFailed(e);
}
}
public void onConnectionFailed(Throwable throwable)
{
Throwable connect_failure = null;
boolean startConnection = false;
synchronized (this)
{
_pendingConnections--;
if (_pendingReservedConnections > 0)
{
connect_failure = throwable;
_pendingReservedConnections--;
}
else if (_exchanges.size() > 0)
{
HttpExchange ex = _exchanges.remove(0);
if (ex.setStatus(HttpExchange.STATUS_EXCEPTED))
ex.getEventListener().onConnectionFailed(throwable);
// Since an existing connection had failed, we need to create a
// connection if the queue is not empty and client is running.
if (!_exchanges.isEmpty() && _client.isStarted())
startConnection = true;
}
}
if (startConnection)
startNewConnection();
if (connect_failure != null)
{
try
{
_reservedConnections.put(connect_failure);
}
catch (InterruptedException e)
{
LOG.ignore(e);
}
}
}
public void onException(Throwable throwable)
{
synchronized (this)
{
_pendingConnections--;
if (_exchanges.size() > 0)
{
HttpExchange ex = _exchanges.remove(0);
if (ex.setStatus(HttpExchange.STATUS_EXCEPTED))
ex.getEventListener().onException(throwable);
}
}
}
public void onNewConnection(final AbstractHttpConnection connection) throws IOException
{
Connection reservedConnection = null;
synchronized (this)
{
_pendingConnections--;
_connections.add(connection);
if (_pendingReservedConnections > 0)
{
reservedConnection = connection;
_pendingReservedConnections--;
}
else
{
// Establish the tunnel if needed
EndPoint endPoint = connection.getEndPoint();
if (isProxied() && endPoint instanceof SelectConnector.UpgradableEndPoint)
{
SelectConnector.UpgradableEndPoint proxyEndPoint = (SelectConnector.UpgradableEndPoint)endPoint;
ConnectExchange connect = new ConnectExchange(getAddress(), proxyEndPoint);
connect.setAddress(getProxy());
LOG.debug("Establishing tunnel to {} via {}", getAddress(), getProxy());
send(connection, connect);
}
else
{
// Another connection stole the exchange that caused the creation of this connection ?
if (_exchanges.size() == 0)
{
LOG.debug("No exchanges for new connection {}", connection);
connection.setIdleTimeout();
_idleConnections.add(connection);
}
else
{
HttpExchange exchange = _exchanges.remove(0);
send(connection, exchange);
}
}
}
}
if (reservedConnection != null)
{
try
{
_reservedConnections.put(reservedConnection);
}
catch (InterruptedException e)
{
LOG.ignore(e);
}
}
}
public void returnConnection(AbstractHttpConnection connection, boolean close) throws IOException
{
if (connection.isReserved())
connection.setReserved(false);
if (close)
{
try
{
connection.close();
}
catch (IOException e)
{
LOG.ignore(e);
}
}
if (!_client.isStarted())
return;
if (!close && connection.getEndPoint().isOpen())
{
synchronized (this)
{
if (_exchanges.size() == 0)
{
connection.setIdleTimeout();
_idleConnections.add(connection);
}
else
{
HttpExchange ex = _exchanges.remove(0);
send(connection, ex);
}
this.notifyAll();
}
}
else
{
boolean startConnection = false;
synchronized (this)
{
_connections.remove(connection);
if (!_exchanges.isEmpty())
startConnection = true;
}
if (startConnection)
startNewConnection();
}
}
public void returnIdleConnection(AbstractHttpConnection connection)
{
// TODO work out the real idle time;
long idleForMs = connection.getEndPoint() != null ? connection.getEndPoint().getMaxIdleTime() : -1;
connection.onIdleExpired(idleForMs);
boolean startConnection = false;
synchronized (this)
{
_idleConnections.remove(connection);
_connections.remove(connection);
if (!_exchanges.isEmpty() && _client.isStarted())
startConnection = true;
}
if (startConnection)
startNewConnection();
}
public void send(HttpExchange ex) throws IOException
{
ex.setStatus(HttpExchange.STATUS_WAITING_FOR_CONNECTION);
LinkedList<String> listeners = _client.getRegisteredListeners();
if (listeners != null)
{
// Add registered listeners, fail if we can't load them
for (int i = listeners.size(); i > 0; --i)
{
String listenerClass = listeners.get(i - 1);
try
{
Class<?> listener = Class.forName(listenerClass);
Constructor constructor = listener.getDeclaredConstructor(HttpDestination.class, HttpExchange.class);
HttpEventListener elistener = (HttpEventListener)constructor.newInstance(this, ex);
ex.setEventListener(elistener);
}
catch (final Exception e)
{
throw new IOException("Unable to instantiate registered listener for destination: " + listenerClass)
{
{
initCause(e);
}
};
}
}
}
// Security is supported by default and should be the first consulted
if (_client.hasRealms())
{
ex.setEventListener(new SecurityListener(this, ex));
}
doSend(ex);
}
public void resend(HttpExchange ex) throws IOException
{
ex.getEventListener().onRetry();
ex.reset();
doSend(ex);
}
protected void doSend(HttpExchange ex) throws IOException
{
// add cookies
// TODO handle max-age etc.
if (_cookies != null)
{
StringBuilder buf = null;
for (HttpCookie cookie : _cookies)
{
if (buf == null)
buf = new StringBuilder();
else
buf.append("; ");
buf.append(cookie.getName()); // TODO quotes
buf.append("=");
buf.append(cookie.getValue()); // TODO quotes
}
if (buf != null)
ex.addRequestHeader(HttpHeaders.COOKIE, buf.toString());
}
// Add any known authorizations
if (_authorizations != null)
{
Authentication auth = (Authentication)_authorizations.match(ex.getRequestURI());
if (auth != null)
(auth).setCredentials(ex);
}
// Schedule the timeout here, before we queue the exchange
// so that we count also the queue time in the timeout
ex.scheduleTimeout(this);
AbstractHttpConnection connection = getIdleConnection();
if (connection != null)
{
send(connection, ex);
}
else
{
boolean startConnection = false;
synchronized (this)
{
if (_exchanges.size() == _maxQueueSize)
throw new RejectedExecutionException("Queue full for address " + _address);
_exchanges.add(ex);
if (_connections.size() + _pendingConnections < _maxConnections)
startConnection = true;
}
if (startConnection)
startNewConnection();
}
}
protected void exchangeExpired(HttpExchange exchange)
{
// The exchange may expire while waiting in the
// destination queue, make sure it is removed
synchronized (this)
{
_exchanges.remove(exchange);
}
}
protected void send(AbstractHttpConnection connection, HttpExchange exchange) throws IOException
{
synchronized (this)
{
// If server closes the connection, put the exchange back
// to the exchange queue and recycle the connection
if (!connection.send(exchange))
{
if (exchange.getStatus() <= HttpExchange.STATUS_WAITING_FOR_CONNECTION)
_exchanges.add(0, exchange);
returnIdleConnection(connection);
}
}
}
@Override
public synchronized String toString()
{
return String.format("HttpDestination@%x//%s:%d(%d/%d,%d,%d/%d)%n", hashCode(), _address.getHost(), _address.getPort(), _connections.size(), _maxConnections, _idleConnections.size(), _exchanges.size(), _maxQueueSize);
}
public synchronized String toDetailString()
{
StringBuilder b = new StringBuilder();
b.append(toString());
b.append('\n');
synchronized (this)
{
for (AbstractHttpConnection connection : _connections)
{
b.append(connection.toDetailString());
if (_idleConnections.contains(connection))
b.append(" IDLE");
b.append('\n');
}
}
b.append("--");
b.append('\n');
return b.toString();
}
public void setProxy(Address proxy)
{
_proxy = proxy;
}
public Address getProxy()
{
return _proxy;
}
public Authentication getProxyAuthentication()
{
return _proxyAuthentication;
}
public void setProxyAuthentication(Authentication authentication)
{
_proxyAuthentication = authentication;
}
public boolean isProxied()
{
return _proxy != null;
}
public void close() throws IOException
{
synchronized (this)
{
for (AbstractHttpConnection connection : _connections)
{
connection.close();
}
}
}
public String dump()
{
return AggregateLifeCycle.dump(this);
}
public void dump(Appendable out, String indent) throws IOException
{
synchronized (this)
{
out.append(String.valueOf(this));
out.append("idle=");
out.append(String.valueOf(_idleConnections.size()));
out.append(" pending=");
out.append(String.valueOf(_pendingConnections));
out.append("\n");
AggregateLifeCycle.dump(out, indent, _connections);
}
}
private class ConnectExchange extends ContentExchange
{
private final SelectConnector.UpgradableEndPoint proxyEndPoint;
public ConnectExchange(Address serverAddress, SelectConnector.UpgradableEndPoint proxyEndPoint)
{
this.proxyEndPoint = proxyEndPoint;
setMethod(HttpMethods.CONNECT);
String serverHostAndPort = serverAddress.toString();
setRequestURI(serverHostAndPort);
addRequestHeader(HttpHeaders.HOST, serverHostAndPort);
addRequestHeader(HttpHeaders.PROXY_CONNECTION, "keep-alive");
addRequestHeader(HttpHeaders.USER_AGENT, "Jetty-Client");
}
@Override
protected void onResponseComplete() throws IOException
{
int responseStatus = getResponseStatus();
if (responseStatus == HttpStatus.OK_200)
{
proxyEndPoint.upgrade();
}
else if (responseStatus == HttpStatus.GATEWAY_TIMEOUT_504)
{
onExpire();
}
else
{
onException(new ProtocolException("Proxy: " + proxyEndPoint.getRemoteAddr() + ":" + proxyEndPoint.getRemotePort() + " didn't return http return code 200, but " + responseStatus));
}
}
@Override
protected void onConnectionFailed(Throwable x)
{
HttpDestination.this.onConnectionFailed(x);
}
@Override
protected void onException(Throwable x)
{
HttpExchange exchange = null;
synchronized (HttpDestination.this)
{
if (!_exchanges.isEmpty())
exchange = _exchanges.remove(0);
}
if (exchange != null && exchange.setStatus(STATUS_EXCEPTED))
exchange.getEventListener().onException(x);
}
@Override
protected void onExpire()
{
HttpExchange exchange = null;
synchronized (HttpDestination.this)
{
if (!_exchanges.isEmpty())
exchange = _exchanges.remove(0);
}
if (exchange != null && exchange.setStatus(STATUS_EXPIRED))
exchange.getEventListener().onExpire();
}
}
}