| /* |
| * Copyright (c) 2011 jMonkeyEngine |
| * All rights reserved. |
| * |
| * Redistribution and use in source and binary forms, with or without |
| * modification, are permitted provided that the following conditions are |
| * met: |
| * |
| * * Redistributions of source code must retain the above copyright |
| * notice, this list of conditions and the following disclaimer. |
| * |
| * * Redistributions in binary form must reproduce the above copyright |
| * notice, this list of conditions and the following disclaimer in the |
| * documentation and/or other materials provided with the distribution. |
| * |
| * * Neither the name of 'jMonkeyEngine' nor the names of its contributors |
| * may be used to endorse or promote products derived from this software |
| * without specific prior written permission. |
| * |
| * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED |
| * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
| * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR |
| * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, |
| * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, |
| * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR |
| * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF |
| * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING |
| * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS |
| * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| */ |
| |
| package com.jme3.network.kernel.tcp; |
| |
| import com.jme3.network.Filter; |
| import com.jme3.network.kernel.*; |
| import java.io.IOException; |
| import java.net.InetAddress; |
| import java.net.InetSocketAddress; |
| import java.net.Socket; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.*; |
| import java.nio.channels.spi.SelectorProvider; |
| import java.util.Iterator; |
| import java.util.Map; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.logging.Level; |
| import java.util.logging.Logger; |
| |
| |
| /** |
| * A Kernel implementation based on NIO selectors. |
| * |
| * @version $Revision: 8944 $ |
| * @author Paul Speed |
| */ |
| public class SelectorKernel extends AbstractKernel |
| { |
| static Logger log = Logger.getLogger(SelectorKernel.class.getName()); |
| |
| private InetSocketAddress address; |
| private SelectorThread thread; |
| |
| private Map<Long,NioEndpoint> endpoints = new ConcurrentHashMap<Long,NioEndpoint>(); |
| |
| public SelectorKernel( InetAddress host, int port ) |
| { |
| this( new InetSocketAddress(host, port) ); |
| } |
| |
| public SelectorKernel( int port ) throws IOException |
| { |
| this( new InetSocketAddress(port) ); |
| } |
| |
| public SelectorKernel( InetSocketAddress address ) |
| { |
| this.address = address; |
| } |
| |
| protected SelectorThread createSelectorThread() |
| { |
| return new SelectorThread(); |
| } |
| |
| public void initialize() |
| { |
| if( thread != null ) |
| throw new IllegalStateException( "Kernel already initialized." ); |
| |
| thread = createSelectorThread(); |
| |
| try { |
| thread.connect(); |
| thread.start(); |
| } catch( IOException e ) { |
| throw new KernelException( "Error hosting:" + address, e ); |
| } |
| } |
| |
| public void terminate() throws InterruptedException |
| { |
| if( thread == null ) |
| throw new IllegalStateException( "Kernel not initialized." ); |
| |
| try { |
| thread.close(); |
| thread = null; |
| } catch( IOException e ) { |
| throw new KernelException( "Error closing host connection:" + address, e ); |
| } |
| } |
| |
| public void broadcast( Filter<? super Endpoint> filter, ByteBuffer data, boolean reliable, |
| boolean copy ) |
| { |
| if( !reliable ) |
| throw new UnsupportedOperationException( "Unreliable send not supported by this kernel." ); |
| |
| if( copy ) { |
| // Copy the data just once |
| byte[] temp = new byte[data.remaining()]; |
| System.arraycopy(data.array(), data.position(), temp, 0, data.remaining()); |
| data = ByteBuffer.wrap(temp); |
| } |
| |
| // Hand it to all of the endpoints that match our routing |
| for( NioEndpoint p : endpoints.values() ) { |
| // Does it match the filter? |
| if( filter != null && !filter.apply(p) ) |
| continue; |
| |
| // Give it the data... but let each endpoint track their |
| // own completion over the shared array of bytes by |
| // duplicating it |
| p.send( data.duplicate(), false, false ); |
| } |
| |
| // Wake up the selector so it can reinitialize its |
| // state accordingly. |
| wakeupSelector(); |
| } |
| |
| protected NioEndpoint addEndpoint( SocketChannel c ) |
| { |
| // Note: we purposely do NOT put the key in the endpoint. |
| // SelectionKeys are dangerous outside the selector thread |
| // and this is safer. |
| NioEndpoint p = new NioEndpoint( this, nextEndpointId(), c ); |
| |
| endpoints.put( p.getId(), p ); |
| |
| // Enqueue an endpoint event for the listeners |
| addEvent( EndpointEvent.createAdd( this, p ) ); |
| |
| return p; |
| } |
| |
| protected void removeEndpoint( NioEndpoint p, SocketChannel c ) |
| { |
| endpoints.remove( p.getId() ); |
| log.log( Level.FINE, "Endpoints size:{0}", endpoints.size() ); |
| |
| // Enqueue an endpoint event for the listeners |
| addEvent( EndpointEvent.createRemove( this, p ) ); |
| |
| // If there are no pending messages then add one so that the |
| // kernel-user knows to wake up if it is only listening for |
| // envelopes. |
| if( !hasEnvelopes() ) { |
| // Note: this is not really a race condition. At worst, our |
| // event has already been handled by now and it does no harm |
| // to check again. |
| addEnvelope( EVENTS_PENDING ); |
| } |
| } |
| |
| /** |
| * Called by the endpoints when they need to be closed. |
| */ |
| protected void closeEndpoint( NioEndpoint p ) throws IOException |
| { |
| //log.log( Level.INFO, "Closing endpoint:{0}.", p ); |
| |
| thread.cancel(p); |
| } |
| |
| /** |
| * Used internally by the endpoints to wakeup the selector |
| * when they have data to send. |
| */ |
| protected void wakeupSelector() |
| { |
| thread.wakeupSelector(); |
| } |
| |
| protected void newData( NioEndpoint p, SocketChannel c, ByteBuffer shared, int size ) |
| { |
| // Note: if ever desirable, it would be possible to accumulate |
| // data per source channel and only 'finalize' it when |
| // asked for more envelopes then were ready. I just don't |
| // think it will be an issue in practice. The busier the |
| // server, the more the buffers will fill before we get to them. |
| // And if the server isn't busy, who cares if we chop things up |
| // smaller... the network is still likely to deliver things in |
| // bulk anyway. |
| |
| // Must copy the shared data before we use it |
| byte[] dataCopy = new byte[size]; |
| System.arraycopy(shared.array(), 0, dataCopy, 0, size); |
| |
| Envelope env = new Envelope( p, dataCopy, true ); |
| addEnvelope( env ); |
| } |
| |
| /** |
| * This class is purposely tucked neatly away because |
| * messing with the selector from other threads for any |
| * reason is very bad. This is the safest architecture. |
| */ |
| protected class SelectorThread extends Thread |
| { |
| private ServerSocketChannel serverChannel; |
| private Selector selector; |
| private AtomicBoolean go = new AtomicBoolean(true); |
| private ByteBuffer working = ByteBuffer.allocate( 8192 ); |
| |
| /** |
| * Because we want to keep the keys to ourselves, we'll do |
| * the endpoint -> key mapping internally. |
| */ |
| private Map<NioEndpoint,SelectionKey> endpointKeys = new ConcurrentHashMap<NioEndpoint,SelectionKey>(); |
| |
| public SelectorThread() |
| { |
| setName( "Selector@" + address ); |
| setDaemon(true); |
| } |
| |
| public void connect() throws IOException |
| { |
| // Create a new selector |
| this.selector = SelectorProvider.provider().openSelector(); |
| |
| // Create a new non-blocking server socket channel |
| this.serverChannel = ServerSocketChannel.open(); |
| serverChannel.configureBlocking(false); |
| |
| // Bind the server socket to the specified address and port |
| serverChannel.socket().bind(address); |
| |
| // Register the server socket channel, indicating an interest in |
| // accepting new connections |
| serverChannel.register(selector, SelectionKey.OP_ACCEPT); |
| |
| log.log( Level.INFO, "Hosting TCP connection:{0}.", address ); |
| } |
| |
| public void close() throws IOException, InterruptedException |
| { |
| // Set the thread to stop |
| go.set(false); |
| |
| // Make sure the channel is closed |
| serverChannel.close(); |
| |
| // Force the selector to stop blocking |
| wakeupSelector(); |
| |
| // And wait for it |
| join(); |
| } |
| |
| protected void wakeupSelector() |
| { |
| selector.wakeup(); |
| } |
| |
| protected void setupSelectorOptions() |
| { |
| // For now, selection keys will either be in OP_READ |
| // or OP_WRITE. So while we are writing a buffer, we |
| // will not be reading. This is way simpler and less |
| // error prone... it can always be changed when everything |
| // else works if we are looking to micro-optimize. |
| |
| // Setup options based on the current state of |
| // the endpoints. This could potentially be more |
| // efficiently done as change requests... or simply |
| // keeping a thread-safe set of endpoints with pending |
| // writes. For most cases, it shouldn't matter. |
| for( Map.Entry<NioEndpoint,SelectionKey> e : endpointKeys.entrySet() ) { |
| if( e.getKey().hasPending() ) { |
| e.getValue().interestOps(SelectionKey.OP_WRITE); |
| } |
| } |
| } |
| |
| protected void accept( SelectionKey key ) throws IOException |
| { |
| // Would only get accepts on a server channel |
| ServerSocketChannel serverChan = (ServerSocketChannel)key.channel(); |
| |
| // Setup the connection to be non-blocking |
| SocketChannel remoteChan = serverChan.accept(); |
| remoteChan.configureBlocking(false); |
| |
| // And disable Nagle's buffering algorithm... we want |
| // data to go when we put it there. |
| Socket sock = remoteChan.socket(); |
| sock.setTcpNoDelay(true); |
| |
| // Let the selector know we're interested in reading |
| // data from the channel |
| SelectionKey endKey = remoteChan.register( selector, SelectionKey.OP_READ ); |
| |
| // And now create a new endpoint |
| NioEndpoint p = addEndpoint( remoteChan ); |
| endKey.attach(p); |
| endpointKeys.put(p, endKey); |
| } |
| |
| protected void cancel( NioEndpoint p ) throws IOException |
| { |
| SelectionKey key = endpointKeys.remove(p); |
| if( key == null ) { |
| //log.log( Level.INFO, "Endpoint already closed:{0}.", p ); |
| return; // already closed it |
| } |
| log.log( Level.FINE, "Endpoint keys size:{0}", endpointKeys.size() ); |
| |
| log.log( Level.INFO, "Closing endpoint:{0}.", p ); |
| SocketChannel c = (SocketChannel)key.channel(); |
| |
| // Note: key.cancel() is specifically thread safe. One of |
| // the few things one can do with a key from another |
| // thread. |
| key.cancel(); |
| c.close(); |
| removeEndpoint( p, c ); |
| } |
| |
| protected void cancel( SelectionKey key, SocketChannel c ) throws IOException |
| { |
| NioEndpoint p = (NioEndpoint)key.attachment(); |
| log.log( Level.INFO, "Closing channel endpoint:{0}.", p ); |
| Object o = endpointKeys.remove(p); |
| |
| log.log( Level.FINE, "Endpoint keys size:{0}", endpointKeys.size() ); |
| |
| key.cancel(); |
| c.close(); |
| removeEndpoint( p, c ); |
| } |
| |
| protected void read( SelectionKey key ) throws IOException |
| { |
| NioEndpoint p = (NioEndpoint)key.attachment(); |
| SocketChannel c = (SocketChannel)key.channel(); |
| working.clear(); |
| |
| int size; |
| try { |
| size = c.read(working); |
| } catch( IOException e ) { |
| // The remove end forcibly closed the connection... |
| // close out our end and cancel the key |
| cancel( key, c ); |
| return; |
| } |
| |
| if( size == -1 ) { |
| // The remote end shut down cleanly... |
| // close out our end and cancel the key |
| cancel( key, c ); |
| return; |
| } |
| |
| newData( p, c, working, size ); |
| } |
| |
| protected void write( SelectionKey key ) throws IOException |
| { |
| NioEndpoint p = (NioEndpoint)key.attachment(); |
| SocketChannel c = (SocketChannel)key.channel(); |
| |
| // We will send what we can and move on. |
| ByteBuffer current = p.peekPending(); |
| if( current == NioEndpoint.CLOSE_MARKER ) { |
| // This connection wants to be closed now |
| closeEndpoint(p); |
| |
| // Nothing more to do |
| return; |
| } |
| |
| c.write( current ); |
| |
| // If we wrote all of that packet then we need to remove it |
| if( current.remaining() == 0 ) { |
| p.removePending(); |
| } |
| |
| // If we happened to empty the pending queue then let's read |
| // again. |
| if( !p.hasPending() ) { |
| key.interestOps( SelectionKey.OP_READ ); |
| } |
| } |
| |
| protected void select() throws IOException |
| { |
| selector.select(); |
| |
| for( Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext(); ) { |
| SelectionKey key = i.next(); |
| i.remove(); |
| |
| if( !key.isValid() ) |
| { |
| // When does this happen? |
| log.log( Level.INFO, "Key is not valid:{0}.", key ); |
| continue; |
| } |
| |
| try { |
| if( key.isAcceptable() ) |
| accept(key); |
| else if( key.isWritable() ) |
| write(key); |
| else if( key.isReadable() ) |
| read(key); |
| } catch( IOException e ) { |
| if( !go.get() ) |
| return; // error likely due to shutting down |
| reportError( e ); |
| |
| // And at this level, errors likely mean the key is now |
| // dead and it doesn't hurt to kick them anyway. If we |
| // find IOExceptions that are not fatal, this can be |
| // readdressed |
| cancel( key, (SocketChannel)key.channel() ); |
| } |
| } |
| } |
| |
| public void run() |
| { |
| log.log( Level.INFO, "Kernel started for connection:{0}.", address ); |
| |
| // An atomic is safest and costs almost nothing |
| while( go.get() ) { |
| // Setup any queued option changes |
| setupSelectorOptions(); |
| |
| // Check for available keys and process them |
| try { |
| select(); |
| } catch( ClosedSelectorException e ) { |
| if( !go.get() ) |
| return; // it's because we're shutting down |
| throw new KernelException( "Premature selector closing", e ); |
| } catch( IOException e ) { |
| if( !go.get() ) |
| return; // error likely due to shutting down |
| reportError( e ); |
| } |
| } |
| } |
| } |
| } |