blob: 3827786b7df261c2caca47e79c57f70db6446acb [file] [log] [blame]
/*
* Copyright (c) 2011 jMonkeyEngine
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* * Neither the name of 'jMonkeyEngine' nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.jme3.network.base;
import com.jme3.network.ErrorListener;
import com.jme3.network.Message;
import com.jme3.network.MessageListener;
import com.jme3.network.kernel.Connector;
import com.jme3.network.kernel.ConnectorException;
import java.nio.ByteBuffer;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Wraps a single Connector and forwards new messages
* to the supplied message dispatcher. This is used
* by DefaultClient to manage its connector objects.
* This is only responsible for message reading and provides
* no support for buffering writes.
*
* <p>This adapter assumes a simple protocol where two
* bytes define a (short) object size with the object data
* to follow. Note: this limits the size of serialized
* objects to 32676 bytes... even though, for example,
* datagram packets can hold twice that. :P</p>
*
* @version $Revision: 8944 $
* @author Paul Speed
*/
public class ConnectorAdapter extends Thread
{
private static final int OUTBOUND_BACKLOG = 16000;
private Connector connector;
private MessageListener<Object> dispatcher;
private ErrorListener<Object> errorHandler;
private AtomicBoolean go = new AtomicBoolean(true);
private BlockingQueue<ByteBuffer> outbound;
// Writes messages out on a background thread
private WriterThread writer;
// Marks the messages as reliable or not if they came
// through this connector.
private boolean reliable;
public ConnectorAdapter( Connector connector, MessageListener<Object> dispatcher,
ErrorListener<Object> errorHandler, boolean reliable )
{
super( String.valueOf(connector) );
this.connector = connector;
this.dispatcher = dispatcher;
this.errorHandler = errorHandler;
this.reliable = reliable;
setDaemon(true);
// The backlog makes sure that the outbound channel blocks once
// a certain backlog level is reached. It is set high so that it
// is only reached in the worst cases... which are usually things like
// raw throughput tests. Technically, a saturated TCP channel could
// back up quite a bit if the buffers are full and the socket has
// stalled but 16,000 messages is still a big backlog.
outbound = new ArrayBlockingQueue<ByteBuffer>(OUTBOUND_BACKLOG);
// Note: this technically adds a potential deadlock case
// with the above code where there wasn't one before. For example,
// if a TCP outbound queue fills to capacity and a client sends
// in such a way that they block TCP message handling then if the HostedConnection
// on the server is similarly blocked then the TCP network buffers may
// all get full and no outbound messages move and we forever block
// on the queue.
// However, in practice this can't really happen... or at least it's
// the sign of other really bad things.
// First, currently the server-side outbound queues are all unbounded and
// so won't ever block the handling of messages if the outbound channel is full.
// Second, there would have to be a huge amount of data backlog for this
// to ever occur anyway.
// Third, it's a sign of a really poor architecture if 16,000 messages
// can go out in a way that blocks reads.
writer = new WriterThread();
writer.start();
}
public void close()
{
go.set(false);
// Kill the writer service
writer.shutdown();
if( connector.isConnected() )
{
// Kill the connector
connector.close();
}
}
protected void dispatch( Message m )
{
dispatcher.messageReceived( null, m );
}
public void write( ByteBuffer data )
{
try {
outbound.put( data );
} catch( InterruptedException e ) {
throw new RuntimeException( "Interrupted while waiting for queue to drain", e );
}
}
protected void handleError( Exception e )
{
if( !go.get() )
return;
errorHandler.handleError( this, e );
}
public void run()
{
MessageProtocol protocol = new MessageProtocol();
try {
while( go.get() ) {
ByteBuffer buffer = connector.read();
if( buffer == null ) {
if( go.get() ) {
throw new ConnectorException( "Connector closed." );
} else {
// Just dump out because a null buffer is expected
// from a closed/closing connector
break;
}
}
protocol.addBuffer( buffer );
Message m = null;
while( (m = protocol.getMessage()) != null ) {
m.setReliable( reliable );
dispatch( m );
}
}
} catch( Exception e ) {
handleError( e );
}
}
protected class WriterThread extends Thread
{
public WriterThread()
{
super( String.valueOf(connector) + "-writer" );
}
public void shutdown()
{
interrupt();
}
private void write( ByteBuffer data )
{
try {
connector.write(data);
} catch( Exception e ) {
handleError( e );
}
}
public void run()
{
while( go.get() ) {
try {
ByteBuffer data = outbound.take();
write(data);
} catch( InterruptedException e ) {
if( !go.get() )
return;
throw new RuntimeException( "Interrupted waiting for data", e );
}
}
}
}
}