blob: 538fbfa1a9b4dfdb6ce9ed417b9713e1601f4454 [file] [log] [blame]
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.io.nio;
import java.io.IOException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.Channel;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.ConnectedEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.component.AggregateLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Timeout;
import org.eclipse.jetty.util.thread.Timeout.Task;
/* ------------------------------------------------------------ */
/**
* The Selector Manager manages and number of SelectSets to allow
* NIO scheduling to scale to large numbers of connections.
* <p>
*/
public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable
{
public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio");
private static final int __MONITOR_PERIOD=Integer.getInteger("org.eclipse.jetty.io.nio.MONITOR_PERIOD",1000).intValue();
private static final int __MAX_SELECTS=Integer.getInteger("org.eclipse.jetty.io.nio.MAX_SELECTS",100000).intValue();
private static final int __BUSY_PAUSE=Integer.getInteger("org.eclipse.jetty.io.nio.BUSY_PAUSE",50).intValue();
private static final int __IDLE_TICK=Integer.getInteger("org.eclipse.jetty.io.nio.IDLE_TICK",400).intValue();
private int _maxIdleTime;
private int _lowResourcesMaxIdleTime;
private long _lowResourcesConnections;
private SelectSet[] _selectSet;
private int _selectSets=1;
private volatile int _set=0;
private boolean _deferringInterestedOps0=true;
private int _selectorPriorityDelta=0;
/* ------------------------------------------------------------ */
/**
* @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed.
* @see #setLowResourcesMaxIdleTime(long)
*/
public void setMaxIdleTime(long maxIdleTime)
{
_maxIdleTime=(int)maxIdleTime;
}
/* ------------------------------------------------------------ */
/**
* @param selectSets number of select sets to create
*/
public void setSelectSets(int selectSets)
{
long lrc = _lowResourcesConnections * _selectSets;
_selectSets=selectSets;
_lowResourcesConnections=lrc/_selectSets;
}
/* ------------------------------------------------------------ */
/**
* @return the max idle time
*/
public long getMaxIdleTime()
{
return _maxIdleTime;
}
/* ------------------------------------------------------------ */
/**
* @return the number of select sets in use
*/
public int getSelectSets()
{
return _selectSets;
}
/* ------------------------------------------------------------ */
/**
* @param i
* @return The select set
*/
public SelectSet getSelectSet(int i)
{
return _selectSet[i];
}
/* ------------------------------------------------------------ */
/** Register a channel
* @param channel
* @param att Attached Object
*/
public void register(SocketChannel channel, Object att)
{
// The ++ increment here is not atomic, but it does not matter.
// so long as the value changes sometimes, then connections will
// be distributed over the available sets.
int s=_set++;
if (s<0)
s=-s;
s=s%_selectSets;
SelectSet[] sets=_selectSet;
if (sets!=null)
{
SelectSet set=sets[s];
set.addChange(channel,att);
set.wakeup();
}
}
/* ------------------------------------------------------------ */
/** Register a channel
* @param channel
*/
public void register(SocketChannel channel)
{
// The ++ increment here is not atomic, but it does not matter.
// so long as the value changes sometimes, then connections will
// be distributed over the available sets.
int s=_set++;
if (s<0)
s=-s;
s=s%_selectSets;
SelectSet[] sets=_selectSet;
if (sets!=null)
{
SelectSet set=sets[s];
set.addChange(channel);
set.wakeup();
}
}
/* ------------------------------------------------------------ */
/** Register a {@link ServerSocketChannel}
* @param acceptChannel
*/
public void register(ServerSocketChannel acceptChannel)
{
int s=_set++;
if (s<0)
s=-s;
s=s%_selectSets;
SelectSet set=_selectSet[s];
set.addChange(acceptChannel);
set.wakeup();
}
/* ------------------------------------------------------------ */
/**
* @return delta The value to add to the selector thread priority.
*/
public int getSelectorPriorityDelta()
{
return _selectorPriorityDelta;
}
/* ------------------------------------------------------------ */
/** Set the selector thread priorty delta.
* @param delta The value to add to the selector thread priority.
*/
public void setSelectorPriorityDelta(int delta)
{
_selectorPriorityDelta=delta;
}
/* ------------------------------------------------------------ */
/**
* @return the lowResourcesConnections
*/
public long getLowResourcesConnections()
{
return _lowResourcesConnections*_selectSets;
}
/* ------------------------------------------------------------ */
/**
* Set the number of connections, which if exceeded places this manager in low resources state.
* This is not an exact measure as the connection count is averaged over the select sets.
* @param lowResourcesConnections the number of connections
* @see #setLowResourcesMaxIdleTime(long)
*/
public void setLowResourcesConnections(long lowResourcesConnections)
{
_lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets;
}
/* ------------------------------------------------------------ */
/**
* @return the lowResourcesMaxIdleTime
*/
public long getLowResourcesMaxIdleTime()
{
return _lowResourcesMaxIdleTime;
}
/* ------------------------------------------------------------ */
/**
* @param lowResourcesMaxIdleTime the period in ms that a connection is allowed to be idle when this SelectSet has more connections than {@link #getLowResourcesConnections()}
* @see #setMaxIdleTime(long)
*/
public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)
{
_lowResourcesMaxIdleTime=(int)lowResourcesMaxIdleTime;
}
/* ------------------------------------------------------------------------------- */
public abstract boolean dispatch(Runnable task);
/* ------------------------------------------------------------ */
/* (non-Javadoc)
* @see org.eclipse.component.AbstractLifeCycle#doStart()
*/
@Override
protected void doStart() throws Exception
{
_selectSet = new SelectSet[_selectSets];
for (int i=0;i<_selectSet.length;i++)
_selectSet[i]= new SelectSet(i);
super.doStart();
// start a thread to Select
for (int i=0;i<getSelectSets();i++)
{
final int id=i;
boolean selecting=dispatch(new Runnable()
{
public void run()
{
String name=Thread.currentThread().getName();
int priority=Thread.currentThread().getPriority();
try
{
SelectSet[] sets=_selectSet;
if (sets==null)
return;
SelectSet set=sets[id];
Thread.currentThread().setName(name+" Selector"+id);
if (getSelectorPriorityDelta()!=0)
Thread.currentThread().setPriority(Thread.currentThread().getPriority()+getSelectorPriorityDelta());
LOG.debug("Starting {} on {}",Thread.currentThread(),this);
while (isRunning())
{
try
{
set.doSelect();
}
catch(IOException e)
{
LOG.ignore(e);
}
catch(Exception e)
{
LOG.warn(e);
}
}
}
finally
{
LOG.debug("Stopped {} on {}",Thread.currentThread(),this);
Thread.currentThread().setName(name);
if (getSelectorPriorityDelta()!=0)
Thread.currentThread().setPriority(priority);
}
}
});
if (!selecting)
throw new IllegalStateException("!Selecting");
}
}
/* ------------------------------------------------------------------------------- */
@Override
protected void doStop() throws Exception
{
SelectSet[] sets= _selectSet;
_selectSet=null;
if (sets!=null)
{
for (SelectSet set : sets)
{
if (set!=null)
set.stop();
}
}
super.doStop();
}
/* ------------------------------------------------------------ */
/**
* @param endpoint
*/
protected abstract void endPointClosed(SelectChannelEndPoint endpoint);
/* ------------------------------------------------------------ */
/**
* @param endpoint
*/
protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
/* ------------------------------------------------------------ */
protected abstract void endPointUpgraded(ConnectedEndPoint endpoint,Connection oldConnection);
/* ------------------------------------------------------------------------------- */
public abstract AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment);
/* ------------------------------------------------------------ */
/**
* Create a new end point
* @param channel
* @param selectSet
* @param sKey the selection key
* @return the new endpoint {@link SelectChannelEndPoint}
* @throws IOException
*/
protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException;
/* ------------------------------------------------------------------------------- */
protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment)
{
LOG.warn(ex+","+channel+","+attachment);
LOG.debug(ex);
}
/* ------------------------------------------------------------ */
public String dump()
{
return AggregateLifeCycle.dump(this);
}
/* ------------------------------------------------------------ */
public void dump(Appendable out, String indent) throws IOException
{
AggregateLifeCycle.dumpObject(out,this);
AggregateLifeCycle.dump(out,indent,TypeUtil.asList(_selectSet));
}
/* ------------------------------------------------------------------------------- */
/* ------------------------------------------------------------------------------- */
/* ------------------------------------------------------------------------------- */
public class SelectSet implements Dumpable
{
private final int _setID;
private final Timeout _timeout;
private final ConcurrentLinkedQueue<Object> _changes = new ConcurrentLinkedQueue<Object>();
private volatile Selector _selector;
private volatile Thread _selecting;
private int _busySelects;
private long _monitorNext;
private boolean _pausing;
private boolean _paused;
private volatile long _idleTick;
private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>();
/* ------------------------------------------------------------ */
SelectSet(int acceptorID) throws Exception
{
_setID=acceptorID;
_idleTick = System.currentTimeMillis();
_timeout = new Timeout(this);
_timeout.setDuration(0L);
// create a selector;
_selector = Selector.open();
_monitorNext=System.currentTimeMillis()+__MONITOR_PERIOD;
}
/* ------------------------------------------------------------ */
public void addChange(Object change)
{
_changes.add(change);
}
/* ------------------------------------------------------------ */
public void addChange(SelectableChannel channel, Object att)
{
if (att==null)
addChange(channel);
else if (att instanceof EndPoint)
addChange(att);
else
addChange(new ChannelAndAttachment(channel,att));
}
/* ------------------------------------------------------------ */
/**
* Select and dispatch tasks found from changes and the selector.
*
* @throws IOException
*/
public void doSelect() throws IOException
{
try
{
_selecting=Thread.currentThread();
final Selector selector=_selector;
// Stopped concurrently ?
if (selector == null)
return;
// Make any key changes required
Object change;
int changes=_changes.size();
while (changes-->0 && (change=_changes.poll())!=null)
{
Channel ch=null;
SelectionKey key=null;
try
{
if (change instanceof EndPoint)
{
// Update the operations for a key.
SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change;
ch=endpoint.getChannel();
endpoint.doUpdateKey();
}
else if (change instanceof ChannelAndAttachment)
{
// finish accepting/connecting this connection
final ChannelAndAttachment asc = (ChannelAndAttachment)change;
final SelectableChannel channel=asc._channel;
ch=channel;
final Object att = asc._attachment;
if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected())
{
key = channel.register(selector,SelectionKey.OP_READ,att);
SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key);
key.attach(endpoint);
endpoint.schedule();
}
else if (channel.isOpen())
{
key = channel.register(selector,SelectionKey.OP_CONNECT,att);
}
}
else if (change instanceof SocketChannel)
{
// Newly registered channel
final SocketChannel channel=(SocketChannel)change;
ch=channel;
key = channel.register(selector,SelectionKey.OP_READ,null);
SelectChannelEndPoint endpoint = createEndPoint(channel,key);
key.attach(endpoint);
endpoint.schedule();
}
else if (change instanceof ChangeTask)
{
((Runnable)change).run();
}
else if (change instanceof Runnable)
{
dispatch((Runnable)change);
}
else
throw new IllegalArgumentException(change.toString());
}
catch (CancelledKeyException e)
{
LOG.ignore(e);
}
catch (Throwable e)
{
if (isRunning())
LOG.warn(e);
else
LOG.debug(e);
try
{
if (ch!=null)
ch.close();
}
catch(IOException e2)
{
LOG.debug(e2);
}
}
}
// Do and instant select to see if any connections can be handled.
int selected=selector.selectNow();
long now=System.currentTimeMillis();
// if no immediate things to do
if (selected==0 && selector.selectedKeys().isEmpty())
{
// If we are in pausing mode
if (_pausing)
{
try
{
Thread.sleep(__BUSY_PAUSE); // pause to reduce impact of busy loop
}
catch(InterruptedException e)
{
LOG.ignore(e);
}
now=System.currentTimeMillis();
}
// workout how long to wait in select
_timeout.setNow(now);
long to_next_timeout=_timeout.getTimeToNext();
long wait = _changes.size()==0?__IDLE_TICK:0L;
if (wait > 0 && to_next_timeout >= 0 && wait > to_next_timeout)
wait = to_next_timeout;
// If we should wait with a select
if (wait>0)
{
long before=now;
selector.select(wait);
now = System.currentTimeMillis();
_timeout.setNow(now);
// If we are monitoring for busy selector
// and this select did not wait more than 1ms
if (__MONITOR_PERIOD>0 && now-before <=1)
{
// count this as a busy select and if there have been too many this monitor cycle
if (++_busySelects>__MAX_SELECTS)
{
// Start injecting pauses
_pausing=true;
// if this is the first pause
if (!_paused)
{
// Log and dump some status
_paused=true;
LOG.warn("Selector {} is too busy, pausing!",this);
}
}
}
}
}
// have we been destroyed while sleeping
if (_selector==null || !selector.isOpen())
return;
// Look for things to do
for (SelectionKey key: selector.selectedKeys())
{
SocketChannel channel=null;
try
{
if (!key.isValid())
{
key.cancel();
SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
if (endpoint != null)
endpoint.doUpdateKey();
continue;
}
Object att = key.attachment();
if (att instanceof SelectChannelEndPoint)
{
if (key.isReadable()||key.isWritable())
((SelectChannelEndPoint)att).schedule();
}
else if (key.isConnectable())
{
// Complete a connection of a registered channel
channel = (SocketChannel)key.channel();
boolean connected=false;
try
{
connected=channel.finishConnect();
}
catch(Exception e)
{
connectionFailed(channel,e,att);
}
finally
{
if (connected)
{
key.interestOps(SelectionKey.OP_READ);
SelectChannelEndPoint endpoint = createEndPoint(channel,key);
key.attach(endpoint);
endpoint.schedule();
}
else
{
key.cancel();
channel.close();
}
}
}
else
{
// Wrap readable registered channel in an endpoint
channel = (SocketChannel)key.channel();
SelectChannelEndPoint endpoint = createEndPoint(channel,key);
key.attach(endpoint);
if (key.isReadable())
endpoint.schedule();
}
key = null;
}
catch (CancelledKeyException e)
{
LOG.ignore(e);
}
catch (Exception e)
{
if (isRunning())
LOG.warn(e);
else
LOG.ignore(e);
try
{
if (channel!=null)
channel.close();
}
catch(IOException e2)
{
LOG.debug(e2);
}
if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
key.cancel();
}
}
// Everything always handled
selector.selectedKeys().clear();
now=System.currentTimeMillis();
_timeout.setNow(now);
Task task = _timeout.expired();
while (task!=null)
{
if (task instanceof Runnable)
dispatch((Runnable)task);
task = _timeout.expired();
}
// Idle tick
if (now-_idleTick>__IDLE_TICK)
{
_idleTick=now;
final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections))
?(now+_maxIdleTime-_lowResourcesMaxIdleTime)
:now;
dispatch(new Runnable()
{
public void run()
{
for (SelectChannelEndPoint endp:_endPoints.keySet())
{
endp.checkIdleTimestamp(idle_now);
}
}
public String toString() {return "Idle-"+super.toString();}
});
}
// Reset busy select monitor counts
if (__MONITOR_PERIOD>0 && now>_monitorNext)
{
_busySelects=0;
_pausing=false;
_monitorNext=now+__MONITOR_PERIOD;
}
}
catch (ClosedSelectorException e)
{
if (isRunning())
LOG.warn(e);
else
LOG.ignore(e);
}
catch (CancelledKeyException e)
{
LOG.ignore(e);
}
finally
{
_selecting=null;
}
}
/* ------------------------------------------------------------ */
private void renewSelector()
{
try
{
synchronized (this)
{
Selector selector=_selector;
if (selector==null)
return;
final Selector new_selector = Selector.open();
for (SelectionKey k: selector.keys())
{
if (!k.isValid() || k.interestOps()==0)
continue;
final SelectableChannel channel = k.channel();
final Object attachment = k.attachment();
if (attachment==null)
addChange(channel);
else
addChange(channel,attachment);
}
_selector.close();
_selector=new_selector;
}
}
catch(IOException e)
{
throw new RuntimeException("recreating selector",e);
}
}
/* ------------------------------------------------------------ */
public SelectorManager getManager()
{
return SelectorManager.this;
}
/* ------------------------------------------------------------ */
public long getNow()
{
return _timeout.getNow();
}
/* ------------------------------------------------------------ */
/**
* @param task The task to timeout. If it implements Runnable, then
* expired will be called from a dispatched thread.
*
* @param timeoutMs
*/
public void scheduleTimeout(Timeout.Task task, long timeoutMs)
{
if (!(task instanceof Runnable))
throw new IllegalArgumentException("!Runnable");
_timeout.schedule(task, timeoutMs);
}
/* ------------------------------------------------------------ */
public void cancelTimeout(Timeout.Task task)
{
task.cancel();
}
/* ------------------------------------------------------------ */
public void wakeup()
{
try
{
Selector selector = _selector;
if (selector!=null)
selector.wakeup();
}
catch(Exception e)
{
addChange(new ChangeTask()
{
public void run()
{
renewSelector();
}
});
renewSelector();
}
}
/* ------------------------------------------------------------ */
private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException
{
SelectChannelEndPoint endp = newEndPoint(channel,this,sKey);
LOG.debug("created {}",endp);
endPointOpened(endp);
_endPoints.put(endp,this);
return endp;
}
/* ------------------------------------------------------------ */
public void destroyEndPoint(SelectChannelEndPoint endp)
{
LOG.debug("destroyEndPoint {}",endp);
_endPoints.remove(endp);
endPointClosed(endp);
}
/* ------------------------------------------------------------ */
Selector getSelector()
{
return _selector;
}
/* ------------------------------------------------------------ */
void stop() throws Exception
{
// Spin for a while waiting for selector to complete
// to avoid unneccessary closed channel exceptions
try
{
for (int i=0;i<100 && _selecting!=null;i++)
{
wakeup();
Thread.sleep(10);
}
}
catch(Exception e)
{
LOG.ignore(e);
}
// close endpoints and selector
synchronized (this)
{
Selector selector=_selector;
for (SelectionKey key:selector.keys())
{
if (key==null)
continue;
Object att=key.attachment();
if (att instanceof EndPoint)
{
EndPoint endpoint = (EndPoint)att;
try
{
endpoint.close();
}
catch(IOException e)
{
LOG.ignore(e);
}
}
}
_timeout.cancelAll();
try
{
selector=_selector;
if (selector != null)
selector.close();
}
catch (IOException e)
{
LOG.ignore(e);
}
_selector=null;
}
}
/* ------------------------------------------------------------ */
public String dump()
{
return AggregateLifeCycle.dump(this);
}
/* ------------------------------------------------------------ */
public void dump(Appendable out, String indent) throws IOException
{
out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_setID)).append("\n");
Thread selecting = _selecting;
Object where = "not selecting";
StackTraceElement[] trace =selecting==null?null:selecting.getStackTrace();
if (trace!=null)
{
for (StackTraceElement t:trace)
if (t.getClassName().startsWith("org.eclipse.jetty."))
{
where=t;
break;
}
}
Selector selector=_selector;
if (selector!=null)
{
final ArrayList<Object> dump = new ArrayList<Object>(selector.keys().size()*2);
dump.add(where);
final CountDownLatch latch = new CountDownLatch(1);
addChange(new ChangeTask()
{
public void run()
{
dumpKeyState(dump);
latch.countDown();
}
});
try
{
latch.await(5,TimeUnit.SECONDS);
}
catch(InterruptedException e)
{
LOG.ignore(e);
}
AggregateLifeCycle.dump(out,indent,dump);
}
}
/* ------------------------------------------------------------ */
public void dumpKeyState(List<Object> dumpto)
{
Selector selector=_selector;
Set<SelectionKey> keys = selector.keys();
dumpto.add(selector + " keys=" + keys.size());
for (SelectionKey key: keys)
{
if (key.isValid())
dumpto.add(key.attachment()+" iOps="+key.interestOps()+" rOps="+key.readyOps());
else
dumpto.add(key.attachment()+" iOps=-1 rOps=-1");
}
}
/* ------------------------------------------------------------ */
public String toString()
{
Selector selector=_selector;
return String.format("%s keys=%d selected=%d",
super.toString(),
selector != null && selector.isOpen() ? selector.keys().size() : -1,
selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1);
}
}
/* ------------------------------------------------------------ */
private static class ChannelAndAttachment
{
final SelectableChannel _channel;
final Object _attachment;
public ChannelAndAttachment(SelectableChannel channel, Object attachment)
{
super();
_channel = channel;
_attachment = attachment;
}
}
/* ------------------------------------------------------------ */
public boolean isDeferringInterestedOps0()
{
return _deferringInterestedOps0;
}
/* ------------------------------------------------------------ */
public void setDeferringInterestedOps0(boolean deferringInterestedOps0)
{
_deferringInterestedOps0 = deferringInterestedOps0;
}
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
private interface ChangeTask extends Runnable
{}
}