/*
 * Copyright (c) 2011 jMonkeyEngine
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are
 * met:
 *
 * * Redistributions of source code must retain the above copyright
 *   notice, this list of conditions and the following disclaimer.
 *
 * * Redistributions in binary form must reproduce the above copyright
 *   notice, this list of conditions and the following disclaimer in the
 *   documentation and/or other materials provided with the distribution.
 *
 * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
 *   may be used to endorse or promote products derived from this software
 *   without specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */

package com.jme3.network.base;

import com.jme3.network.Filter;
import com.jme3.network.HostedConnection;
import com.jme3.network.Message;
import com.jme3.network.MessageListener;
import com.jme3.network.kernel.Endpoint;
import com.jme3.network.kernel.EndpointEvent;
import com.jme3.network.kernel.Envelope;
import com.jme3.network.kernel.Kernel;
import com.jme3.network.message.ClientRegistrationMessage;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 *  Wraps a single Kernel and forwards new messages
 *  to the supplied message dispatcher and new endpoint
 *  events to the connection dispatcher.  This is used
 *  by DefaultServer to manage its kernel objects.
 *
 *  <p>This adapter assumes a simple protocol where two
 *  bytes define a (short) object size with the object data
 *  to follow.  Note: this limits the size of serialized
 *  objects to 32676 bytes... even though, for example,
 *  datagram packets can hold twice that. :P</p>  
 *
 *  @version   $Revision: 8944 $
 *  @author    Paul Speed
 */
public class KernelAdapter extends Thread
{
    static Logger log = Logger.getLogger(KernelAdapter.class.getName());
    
    private DefaultServer server; // this is unfortunate
    private Kernel kernel;
    private MessageListener<HostedConnection> messageDispatcher;
    private AtomicBoolean go = new AtomicBoolean(true);
 
    // Keeps track of the in-progress messages that are received
    // on reliable connections
    private Map<Endpoint, MessageProtocol> messageBuffers = new ConcurrentHashMap<Endpoint,MessageProtocol>();
    
    // Marks the messages as reliable or not if they came
    // through this connector.
    private boolean reliable;
    
    public KernelAdapter( DefaultServer server, Kernel kernel, MessageListener<HostedConnection> messageDispatcher,
                          boolean reliable )
    {
        super( String.valueOf(kernel) );
        this.server = server;
        this.kernel = kernel;
        this.messageDispatcher = messageDispatcher;
        this.reliable = reliable;
        setDaemon(true);
    }

    public Kernel getKernel()
    {
        return kernel;
    }

    public void initialize()
    {
        kernel.initialize();
    }
 
    public void broadcast( Filter<? super Endpoint> filter, ByteBuffer data, boolean reliable, 
                           boolean copy )
    {
        kernel.broadcast( filter, data, reliable, copy );
    }                           
 
    public void close() throws InterruptedException
    {
        go.set(false);
        
        // Kill the kernel
        kernel.terminate();
    }

    protected void reportError( Endpoint p, Object context, Exception e )
    {
        // Should really be queued up so the outer thread can
        // retrieve them.  For now we'll just log it.  FIXME
        log.log( Level.SEVERE, "Unhandled error, endpoint:" + p + ", context:" + context, e );
        
        // In lieu of other options, at least close the endpoint
        p.close();
    }                                                      

    protected HostedConnection getConnection( Endpoint p )
    {
        return server.getConnection(p);
    }
 
    protected void connectionClosed( Endpoint p )
    {
        // Remove any message buffer we've been accumulating 
        // on behalf of this endpoing
        messageBuffers.remove(p);

        log.log( Level.FINE, "Buffers size:{0}", messageBuffers.size() );
    
        server.connectionClosed(p);
    }
 
    /**
     *  Note on threading for those writing their own server 
     *  or adapter implementations.  The rule that a single connection be 
     *  processed by only one thread at a time is more about ensuring that
     *  the messages are delivered in the order that they are received
     *  than for any user-code safety.  99% of the time the user code should
     *  be writing for multithreaded access anyway.
     *
     *  <p>The issue with the messages is that if a an implementation is
     *  using a general thread pool then it would be possible for a 
     *  naive implementation to have one thread grab an Envelope from
     *  connection 1's and another grab the next Envelope.  Since an Envelope
     *  may contain several messages, delivering the second thread's messages
     *  before or during the first's would be really confusing and hard
     *  to code for in user code.</p>
     *
     *  <p>And that's why this note is here.  DefaultServer does a rudimentary
     *  per-connection locking but it couldn't possibly guard against
     *  out of order Envelope processing.</p>    
     */
    protected void dispatch( Endpoint p, Message m )
    {
        // Because this class is the only one with the information
        // to do it... we need to pull of the registration message
        // here.
        if( m instanceof ClientRegistrationMessage ) {
            server.registerClient( this, p, (ClientRegistrationMessage)m );
            return;           
        }                
 
        try {           
            HostedConnection source = getConnection(p);
            if( source == null ) {
                if( reliable ) {
                    // If it's a reliable connection then it's slightly more
                    // concerning but this can happen all the time for a UDP endpoint.
                    log.log( Level.WARNING, "Recieved message from unconnected endpoint:" + p + "  message:" + m );
                }                    
                return; 
            }
            messageDispatcher.messageReceived( source, m );
        } catch( Exception e ) {
            reportError(p, m, e);
        }
    }

    protected MessageProtocol getMessageBuffer( Endpoint p )
    {
        if( !reliable ) {
            // Since UDP comes in packets and they aren't split
            // up, there is no reason to buffer.  In fact, there would
            // be a down side because there is no way for us to reliably
            // clean these up later since we'd create another one for 
            // any random UDP packet that comes to the port.
            return new MessageProtocol();
        } else {
            // See if we already have one
            MessageProtocol result = messageBuffers.get(p);
            if( result == null ) {
                result = new MessageProtocol();
                messageBuffers.put(p, result);
            }
            return result;
        }
    }

    protected void createAndDispatch( Envelope env )
    {
        MessageProtocol protocol = getMessageBuffer(env.getSource()); 
    
        byte[] data = env.getData();
        ByteBuffer buffer = ByteBuffer.wrap(data);

        int count = protocol.addBuffer( buffer );
        if( count == 0 ) {
            // This can happen if there was only a partial message
            // received.  However, this should never happen for unreliable
            // connections.
            if( !reliable ) {
                // Log some additional information about the packet.
                int len = Math.min( 10, data.length );
                StringBuilder sb = new StringBuilder();
                for( int i = 0; i < len; i++ ) {
                    sb.append( "[" + Integer.toHexString(data[i]) + "]" ); 
                }
                log.log( Level.INFO, "First 10 bytes of incomplete nessage:" + sb );         
                throw new RuntimeException( "Envelope contained incomplete data:" + env );
            }                
        }            
        
        // Should be complete... and maybe we should check but we don't
        Message m = null;
        while( (m = protocol.getMessage()) != null ) {
            m.setReliable(reliable);
            dispatch( env.getSource(), m );
        }
    } 

    protected void createAndDispatch( EndpointEvent event )
    {
        // Only need to tell the server about disconnects 
        if( event.getType() == EndpointEvent.Type.REMOVE ) {            
            connectionClosed( event.getEndpoint() );
        }            
    }
 
    protected void flushEvents()
    {
        EndpointEvent event;
        while( (event = kernel.nextEvent()) != null ) {
            try {
                createAndDispatch( event );
            } catch( Exception e ) {
                reportError(event.getEndpoint(), event, e);        
            }
        }
    }
 
    public void run()
    {
        while( go.get() ) {
        
            try {           
                // Check for pending events
                flushEvents();
 
                // Grab the next envelope
                Envelope e = kernel.read();
                if( e == Kernel.EVENTS_PENDING )
                    continue; // We'll catch it up above
 
                // Check for pending events that might have
                // come in while we were blocking.  This is usually
                // when the connection add events come through
                flushEvents();
            
                try {
                    createAndDispatch( e );
                } catch( Exception ex ) {
                    reportError(e.getSource(), e, ex);        
                }
                        
            } catch( InterruptedException ex ) {
                if( !go.get() )
                    return;
                throw new RuntimeException( "Unexpected interruption", ex );
            }
        }
    }
        
}


