blob: 9d16f5d3417fd277b9491e38b13fb7a6a5cef2aa [file] [log] [blame]
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.io.nio;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Locale;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.ConnectedEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Timeout.Task;
/* ------------------------------------------------------------ */
/**
* An Endpoint that can be scheduled by {@link SelectorManager}.
*/
public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint, ConnectedEndPoint
{
public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio");
private final boolean WORK_AROUND_JVM_BUG_6346658 = System.getProperty("os.name").toLowerCase(Locale.ENGLISH).contains("win");
private final SelectorManager.SelectSet _selectSet;
private final SelectorManager _manager;
private SelectionKey _key;
private final Runnable _handler = new Runnable()
{
public void run() { handle(); }
};
/** The desired value for {@link SelectionKey#interestOps()} */
private int _interestOps;
/**
* The connection instance is the handler for any IO activity on the endpoint.
* There is a different type of connection for HTTP, AJP, WebSocket and
* ProxyConnect. The connection may change for an SCEP as it is upgraded
* from HTTP to proxy connect or websocket.
*/
private volatile AsyncConnection _connection;
private static final int STATE_NEEDS_DISPATCH=-1;
private static final int STATE_UNDISPATCHED=0;
private static final int STATE_DISPATCHED=1;
private static final int STATE_ASYNC=2;
private int _state;
private boolean _onIdle;
/** true if the last write operation succeed and wrote all offered bytes */
private volatile boolean _writable = true;
/** True if a thread has is blocked in {@link #blockReadable(long)} */
private boolean _readBlocked;
/** True if a thread has is blocked in {@link #blockWritable(long)} */
private boolean _writeBlocked;
/** true if {@link SelectSet#destroyEndPoint(SelectChannelEndPoint)} has not been called */
private boolean _open;
private volatile long _idleTimestamp;
private volatile boolean _checkIdle;
private boolean _interruptable;
private boolean _ishut;
/* ------------------------------------------------------------ */
public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime)
throws IOException
{
super(channel, maxIdleTime);
_manager = selectSet.getManager();
_selectSet = selectSet;
_state=STATE_UNDISPATCHED;
_onIdle=false;
_open=true;
_key = key;
setCheckForIdle(true);
}
/* ------------------------------------------------------------ */
public SelectionKey getSelectionKey()
{
synchronized (this)
{
return _key;
}
}
/* ------------------------------------------------------------ */
public SelectorManager getSelectManager()
{
return _manager;
}
/* ------------------------------------------------------------ */
public Connection getConnection()
{
return _connection;
}
/* ------------------------------------------------------------ */
public void setConnection(Connection connection)
{
Connection old=_connection;
_connection=(AsyncConnection)connection;
if (old!=null && old!=_connection)
_manager.endPointUpgraded(this,old);
}
/* ------------------------------------------------------------ */
public long getIdleTimestamp()
{
return _idleTimestamp;
}
/* ------------------------------------------------------------ */
/** Called by selectSet to schedule handling
*
*/
public void schedule()
{
synchronized (this)
{
// If there is no key, then do nothing
if (_key == null || !_key.isValid())
{
_readBlocked=false;
_writeBlocked=false;
this.notifyAll();
return;
}
// If there are threads dispatched reading and writing
if (_readBlocked || _writeBlocked)
{
// assert _dispatched;
if (_readBlocked && _key.isReadable())
_readBlocked=false;
if (_writeBlocked && _key.isWritable())
_writeBlocked=false;
// wake them up is as good as a dispatched.
this.notifyAll();
// we are not interested in further selecting
_key.interestOps(0);
if (_state<STATE_DISPATCHED)
updateKey();
return;
}
// Remove writeable op
if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
{
// Remove writeable op
_interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
_key.interestOps(_interestOps);
_writable = true; // Once writable is in ops, only removed with dispatch.
}
// If dispatched, then deregister interest
if (_state>=STATE_DISPATCHED)
_key.interestOps(0);
else
{
// other wise do the dispatch
dispatch();
if (_state>=STATE_DISPATCHED && !_selectSet.getManager().isDeferringInterestedOps0())
{
_key.interestOps(0);
}
}
}
}
/* ------------------------------------------------------------ */
public void asyncDispatch()
{
synchronized(this)
{
switch(_state)
{
case STATE_NEEDS_DISPATCH:
case STATE_UNDISPATCHED:
dispatch();
break;
case STATE_DISPATCHED:
case STATE_ASYNC:
_state=STATE_ASYNC;
break;
}
}
}
/* ------------------------------------------------------------ */
public void dispatch()
{
synchronized(this)
{
if (_state<=STATE_UNDISPATCHED)
{
if (_onIdle)
_state = STATE_NEEDS_DISPATCH;
else
{
_state = STATE_DISPATCHED;
boolean dispatched = _manager.dispatch(_handler);
if(!dispatched)
{
_state = STATE_NEEDS_DISPATCH;
LOG.warn("Dispatched Failed! "+this+" to "+_manager);
updateKey();
}
}
}
}
}
/* ------------------------------------------------------------ */
/**
* Called when a dispatched thread is no longer handling the endpoint.
* The selection key operations are updated.
* @return If false is returned, the endpoint has been redispatched and
* thread must keep handling the endpoint.
*/
protected boolean undispatch()
{
synchronized (this)
{
switch(_state)
{
case STATE_ASYNC:
_state=STATE_DISPATCHED;
return false;
default:
_state=STATE_UNDISPATCHED;
updateKey();
return true;
}
}
}
/* ------------------------------------------------------------ */
public void cancelTimeout(Task task)
{
getSelectSet().cancelTimeout(task);
}
/* ------------------------------------------------------------ */
public void scheduleTimeout(Task task, long timeoutMs)
{
getSelectSet().scheduleTimeout(task,timeoutMs);
}
/* ------------------------------------------------------------ */
public void setCheckForIdle(boolean check)
{
if (check)
{
_idleTimestamp=System.currentTimeMillis();
_checkIdle=true;
}
else
_checkIdle=false;
}
/* ------------------------------------------------------------ */
public boolean isCheckForIdle()
{
return _checkIdle;
}
/* ------------------------------------------------------------ */
protected void notIdle()
{
_idleTimestamp=System.currentTimeMillis();
}
/* ------------------------------------------------------------ */
public void checkIdleTimestamp(long now)
{
if (isCheckForIdle() && _maxIdleTime>0)
{
final long idleForMs=now-_idleTimestamp;
if (idleForMs>_maxIdleTime)
{
// Don't idle out again until onIdleExpired task completes.
setCheckForIdle(false);
_manager.dispatch(new Runnable()
{
public void run()
{
try
{
onIdleExpired(idleForMs);
}
finally
{
setCheckForIdle(true);
}
}
});
}
}
}
/* ------------------------------------------------------------ */
public void onIdleExpired(long idleForMs)
{
try
{
synchronized (this)
{
_onIdle=true;
}
_connection.onIdleExpired(idleForMs);
}
finally
{
synchronized (this)
{
_onIdle=false;
if (_state==STATE_NEEDS_DISPATCH)
dispatch();
}
}
}
/* ------------------------------------------------------------ */
@Override
public int fill(Buffer buffer) throws IOException
{
int fill=super.fill(buffer);
if (fill>0)
notIdle();
return fill;
}
/* ------------------------------------------------------------ */
@Override
public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
{
int l = super.flush(header, buffer, trailer);
// If there was something to write and it wasn't written, then we are not writable.
if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent()))
{
synchronized (this)
{
_writable=false;
if (_state<STATE_DISPATCHED)
updateKey();
}
}
else if (l>0)
{
_writable=true;
notIdle();
}
return l;
}
/* ------------------------------------------------------------ */
/*
*/
@Override
public int flush(Buffer buffer) throws IOException
{
int l = super.flush(buffer);
// If there was something to write and it wasn't written, then we are not writable.
if (l==0 && buffer!=null && buffer.hasContent())
{
synchronized (this)
{
_writable=false;
if (_state<STATE_DISPATCHED)
updateKey();
}
}
else if (l>0)
{
_writable=true;
notIdle();
}
return l;
}
/* ------------------------------------------------------------ */
/*
* Allows thread to block waiting for further events.
*/
@Override
public boolean blockReadable(long timeoutMs) throws IOException
{
synchronized (this)
{
if (isInputShutdown())
throw new EofException();
long now=_selectSet.getNow();
long end=now+timeoutMs;
boolean check=isCheckForIdle();
setCheckForIdle(true);
try
{
_readBlocked=true;
while (!isInputShutdown() && _readBlocked)
{
try
{
updateKey();
this.wait(timeoutMs>0?(end-now):10000);
}
catch (final InterruptedException e)
{
LOG.warn(e);
if (_interruptable)
throw new InterruptedIOException(){{this.initCause(e);}};
}
finally
{
now=_selectSet.getNow();
}
if (_readBlocked && timeoutMs>0 && now>=end)
return false;
}
}
finally
{
_readBlocked=false;
setCheckForIdle(check);
}
}
return true;
}
/* ------------------------------------------------------------ */
/*
* Allows thread to block waiting for further events.
*/
@Override
public boolean blockWritable(long timeoutMs) throws IOException
{
synchronized (this)
{
if (isOutputShutdown())
throw new EofException();
long now=_selectSet.getNow();
long end=now+timeoutMs;
boolean check=isCheckForIdle();
setCheckForIdle(true);
try
{
_writeBlocked=true;
while (_writeBlocked && !isOutputShutdown())
{
try
{
updateKey();
this.wait(timeoutMs>0?(end-now):10000);
}
catch (final InterruptedException e)
{
LOG.warn(e);
if (_interruptable)
throw new InterruptedIOException(){{this.initCause(e);}};
}
finally
{
now=_selectSet.getNow();
}
if (_writeBlocked && timeoutMs>0 && now>=end)
return false;
}
}
finally
{
_writeBlocked=false;
setCheckForIdle(check);
}
}
return true;
}
/* ------------------------------------------------------------ */
/** Set the interruptable mode of the endpoint.
* If set to false (default), then interrupts are assumed to be spurious
* and blocking operations continue unless the endpoint has been closed.
* If true, then interrupts of blocking operations result in InterruptedIOExceptions
* being thrown.
* @param interupable
*/
public void setInterruptable(boolean interupable)
{
synchronized (this)
{
_interruptable=interupable;
}
}
/* ------------------------------------------------------------ */
public boolean isInterruptable()
{
return _interruptable;
}
/* ------------------------------------------------------------ */
/**
* @see org.eclipse.jetty.io.AsyncEndPoint#scheduleWrite()
*/
public void scheduleWrite()
{
if (_writable)
LOG.debug("Required scheduleWrite {}",this);
_writable=false;
updateKey();
}
/* ------------------------------------------------------------ */
public boolean isWritable()
{
return _writable;
}
/* ------------------------------------------------------------ */
public boolean hasProgressed()
{
return false;
}
/* ------------------------------------------------------------ */
/**
* Updates selection key. Adds operations types to the selection key as needed. No operations
* are removed as this is only done during dispatch. This method records the new key and
* schedules a call to doUpdateKey to do the keyChange
*/
private void updateKey()
{
final boolean changed;
synchronized (this)
{
int current_ops=-1;
if (getChannel().isOpen())
{
boolean read_interest = _readBlocked || (_state<STATE_DISPATCHED && !_connection.isSuspended());
boolean write_interest= _writeBlocked || (_state<STATE_DISPATCHED && !_writable);
_interestOps =
((!_socket.isInputShutdown() && read_interest ) ? SelectionKey.OP_READ : 0)
| ((!_socket.isOutputShutdown()&& write_interest) ? SelectionKey.OP_WRITE : 0);
try
{
current_ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
}
catch(Exception e)
{
_key=null;
LOG.ignore(e);
}
}
changed=_interestOps!=current_ops;
}
if(changed)
{
_selectSet.addChange(this);
_selectSet.wakeup();
}
}
/* ------------------------------------------------------------ */
/**
* Synchronize the interestOps with the actual key. Call is scheduled by a call to updateKey
*/
void doUpdateKey()
{
synchronized (this)
{
if (getChannel().isOpen())
{
if (_interestOps>0)
{
if (_key==null || !_key.isValid())
{
SelectableChannel sc = (SelectableChannel)getChannel();
if (sc.isRegistered())
{
updateKey();
}
else
{
try
{
_key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
}
catch (Exception e)
{
LOG.ignore(e);
if (_key!=null && _key.isValid())
{
_key.cancel();
}
if (_open)
{
_selectSet.destroyEndPoint(this);
}
_open=false;
_key = null;
}
}
}
else
{
_key.interestOps(_interestOps);
}
}
else
{
if (_key!=null && _key.isValid())
_key.interestOps(0);
else
_key=null;
}
}
else
{
if (_key!=null && _key.isValid())
_key.cancel();
if (_open)
{
_open=false;
_selectSet.destroyEndPoint(this);
}
_key = null;
}
}
}
/* ------------------------------------------------------------ */
/*
*/
protected void handle()
{
boolean dispatched=true;
try
{
while(dispatched)
{
try
{
while(true)
{
final AsyncConnection next = (AsyncConnection)_connection.handle();
if (next!=_connection)
{
LOG.debug("{} replaced {}",next,_connection);
Connection old=_connection;
_connection=next;
_manager.endPointUpgraded(this,old);
continue;
}
break;
}
}
catch (ClosedChannelException e)
{
LOG.ignore(e);
}
catch (EofException e)
{
LOG.debug("EOF", e);
try{close();}
catch(IOException e2){LOG.ignore(e2);}
}
catch (IOException e)
{
LOG.warn(e.toString());
try{close();}
catch(IOException e2){LOG.ignore(e2);}
}
catch (Throwable e)
{
LOG.warn("handle failed", e);
try{close();}
catch(IOException e2){LOG.ignore(e2);}
}
finally
{
if (!_ishut && isInputShutdown() && isOpen())
{
_ishut=true;
try
{
_connection.onInputShutdown();
}
catch(Throwable x)
{
LOG.warn("onInputShutdown failed", x);
try{close();}
catch(IOException e2){LOG.ignore(e2);}
}
finally
{
updateKey();
}
}
dispatched=!undispatch();
}
}
}
finally
{
if (dispatched)
{
dispatched=!undispatch();
while (dispatched)
{
LOG.warn("SCEP.run() finally DISPATCHED");
dispatched=!undispatch();
}
}
}
}
/* ------------------------------------------------------------ */
/*
* @see org.eclipse.io.nio.ChannelEndPoint#close()
*/
@Override
public void close() throws IOException
{
// On unix systems there is a JVM issue that if you cancel before closing, it can
// cause the selector to block waiting for a channel to close and that channel can
// block waiting for the remote end. But on windows, if you don't cancel before a
// close, then the selector can block anyway!
// https://bugs.eclipse.org/bugs/show_bug.cgi?id=357318
if (WORK_AROUND_JVM_BUG_6346658)
{
try
{
SelectionKey key = _key;
if (key!=null)
key.cancel();
}
catch (Throwable e)
{
LOG.ignore(e);
}
}
try
{
super.close();
}
catch (IOException e)
{
LOG.ignore(e);
}
finally
{
updateKey();
}
}
/* ------------------------------------------------------------ */
@Override
public String toString()
{
// Do NOT use synchronized (this)
// because it's very easy to deadlock when debugging is enabled.
// We do a best effort to print the right toString() and that's it.
SelectionKey key = _key;
String keyString = "";
if (key != null)
{
if (key.isValid())
{
if (key.isReadable())
keyString += "r";
if (key.isWritable())
keyString += "w";
}
else
{
keyString += "!";
}
}
else
{
keyString += "-";
}
return String.format("SCEP@%x{l(%s)<->r(%s),s=%d,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%d%s}-{%s}",
hashCode(),
_socket.getRemoteSocketAddress(),
_socket.getLocalSocketAddress(),
_state,
isOpen(),
isInputShutdown(),
isOutputShutdown(),
_readBlocked,
_writeBlocked,
_writable,
_interestOps,
keyString,
_connection);
}
/* ------------------------------------------------------------ */
public SelectSet getSelectSet()
{
return _selectSet;
}
/* ------------------------------------------------------------ */
/**
* Don't set the SoTimeout
* @see org.eclipse.jetty.io.nio.ChannelEndPoint#setMaxIdleTime(int)
*/
@Override
public void setMaxIdleTime(int timeMs) throws IOException
{
_maxIdleTime=timeMs;
}
}