/*
 * Copyright (c) 2011 jMonkeyEngine
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are
 * met:
 *
 * * Redistributions of source code must retain the above copyright
 *   notice, this list of conditions and the following disclaimer.
 *
 * * Redistributions in binary form must reproduce the above copyright
 *   notice, this list of conditions and the following disclaimer in the
 *   documentation and/or other materials provided with the distribution.
 *
 * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
 *   may be used to endorse or promote products derived from this software
 *   without specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */

package com.jme3.network.kernel.udp;

import com.jme3.network.Filter;
import com.jme3.network.kernel.*;
import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 *  A Kernel implementation using UDP packets.
 *
 *  @version   $Revision: 8944 $
 *  @author    Paul Speed
 */
public class UdpKernel extends AbstractKernel
{
    static Logger log = Logger.getLogger(UdpKernel.class.getName());

    private InetSocketAddress address;
    private HostThread thread;

    private ExecutorService writer;
    
    // The nature of UDP means that even through a firewall,
    // a user would have to have a unique address+port since UDP
    // can't really be NAT'ed.
    private Map<SocketAddress,UdpEndpoint> socketEndpoints = new ConcurrentHashMap<SocketAddress,UdpEndpoint>();

    public UdpKernel( InetAddress host, int port )
    {
        this( new InetSocketAddress(host, port) );
    }

    public UdpKernel( int port ) throws IOException
    {
        this( new InetSocketAddress(port) );
    }

    public UdpKernel( InetSocketAddress address )
    {
        this.address = address;
    }

    protected HostThread createHostThread()
    {
        return new HostThread();
    }

    public void initialize()
    {
        if( thread != null )
            throw new IllegalStateException( "Kernel already initialized." );

        writer = Executors.newFixedThreadPool(2, new NamedThreadFactory(toString() + "-writer"));
        
        thread = createHostThread();

        try {
            thread.connect();
            thread.start();
        } catch( IOException e ) {
            throw new KernelException( "Error hosting:" + address, e );
        }
    }

    public void terminate() throws InterruptedException
    {
        if( thread == null )
            throw new IllegalStateException( "Kernel not initialized." );

        try {
            thread.close();
            writer.shutdown();
            thread = null;
        } catch( IOException e ) {
            throw new KernelException( "Error closing host connection:" + address, e );
        }
    }

    /**
     *  Dispatches the data to all endpoints managed by the
     *  kernel.  'routing' is currently ignored.
     */
    public void broadcast( Filter<? super Endpoint> filter, ByteBuffer data, boolean reliable,
                           boolean copy )
    {
        if( reliable )
            throw new UnsupportedOperationException( "Reliable send not supported by this kernel." );

        if( copy ) {
            // Copy the data just once
            byte[] temp = new byte[data.remaining()];
            System.arraycopy(data.array(), data.position(), temp, 0, data.remaining());
            data = ByteBuffer.wrap(temp);
        }

        // Hand it to all of the endpoints that match our routing
        for( UdpEndpoint p : socketEndpoints.values() ) {
            // Does it match the filter?
            if( filter != null && !filter.apply(p) )
                continue;
    
            // Send the data
            p.send( data );
        }
    }

    protected Endpoint getEndpoint( SocketAddress address, boolean create )
    {
        UdpEndpoint p = socketEndpoints.get(address);
        if( p == null && create ) {
            p = new UdpEndpoint( this, nextEndpointId(), address, thread.getSocket() );
            socketEndpoints.put( address, p );

            // Add an event for it.
            addEvent( EndpointEvent.createAdd( this, p ) );
        }
        return p;
    }

    /**
     *  Called by the endpoints when they need to be closed.
     */
    protected void closeEndpoint( UdpEndpoint p ) throws IOException
    {
        // Just book-keeping to do here.
        if( socketEndpoints.remove( p.getRemoteAddress() ) == null )
            return;

        log.log( Level.INFO, "Closing endpoint:{0}.", p );            
        log.log( Level.FINE, "Socket endpoints size:{0}", socketEndpoints.size() );

        addEvent( EndpointEvent.createRemove( this, p ) );
        
        // If there are no pending messages then add one so that the
        // kernel-user knows to wake up if it is only listening for
        // envelopes.
        if( !hasEnvelopes() ) {
            // Note: this is not really a race condition.  At worst, our
            // event has already been handled by now and it does no harm
            // to check again.
            addEnvelope( EVENTS_PENDING );
        }
    }

    protected void newData( DatagramPacket packet )
    {
        // So the tricky part here is figuring out the endpoint and
        // whether it's new or not.  In these UDP schemes, firewalls have
        // to be ported back to a specific machine so we will consider
        // the address + port (ie: SocketAddress) the defacto unique
        // ID.
        Endpoint p = getEndpoint( packet.getSocketAddress(), true );

        // We'll copy the data to trim it.
        byte[] data = new byte[packet.getLength()];
        System.arraycopy(packet.getData(), 0, data, 0, data.length);

        Envelope env = new Envelope( p, data, false );
        addEnvelope( env );
    }

    protected void enqueueWrite( Endpoint endpoint, DatagramPacket packet )
    {
        writer.execute( new MessageWriter(endpoint, packet) );
    } 

    protected class MessageWriter implements Runnable
    {
        private Endpoint endpoint;
        private DatagramPacket packet;
        
        public MessageWriter( Endpoint endpoint, DatagramPacket packet )
        {
            this.endpoint = endpoint;
            this.packet = packet;
        }
        
        public void run()
        {
            // Not guaranteed to always work but an extra datagram
            // to a dead connection isn't so big of a deal.
            if( !endpoint.isConnected() ) {
                return;
            }
            
            try {
                thread.getSocket().send(packet);
            } catch( Exception e ) {
                KernelException exc = new KernelException( "Error sending datagram to:" + address, e );
                exc.fillInStackTrace();
                reportError(exc);
            }
        } 
    }

    protected class HostThread extends Thread
    {
        private DatagramSocket socket;
        private AtomicBoolean go = new AtomicBoolean(true);

        private byte[] buffer = new byte[65535]; // slightly bigger than needed.

        public HostThread()
        {
            setName( "UDP Host@" + address );
            setDaemon(true);
        }

        protected DatagramSocket getSocket()
        {
            return socket;
        }

        public void connect() throws IOException
        {
            socket = new DatagramSocket( address );
            log.log( Level.INFO, "Hosting UDP connection:{0}.", address );
        }

        public void close() throws IOException, InterruptedException
        {
            // Set the thread to stop
            go.set(false);

            // Make sure the channel is closed
            socket.close();

            // And wait for it
            join();
        }

        public void run()
        {
            log.log( Level.INFO, "Kernel started for connection:{0}.", address );

            // An atomic is safest and costs almost nothing
            while( go.get() ) {
                try {
                    // Could reuse the packet but I don't see the
                    // point and it may lead to subtle bugs if not properly
                    // reset.
                    DatagramPacket packet = new DatagramPacket( buffer, buffer.length );
                    socket.receive(packet);

                    newData( packet );
                } catch( IOException e ) {
                    if( !go.get() )
                        return;
                    reportError( e );
                }
            }
        }
    }
}
