testng.xml working, need to remove debug messages and clean up.
diff --git a/src/main/java/org/testng/remote/RemoteTestNG.java b/src/main/java/org/testng/remote/RemoteTestNG.java
index d52941c..d9233e9 100644
--- a/src/main/java/org/testng/remote/RemoteTestNG.java
+++ b/src/main/java/org/testng/remote/RemoteTestNG.java
@@ -18,7 +18,6 @@
 import org.testng.remote.strprotocol.MessageHub;

 import org.testng.remote.strprotocol.RemoteTestListener;

 import org.testng.remote.strprotocol.SerializedMessageSender;

-import org.testng.remote.strprotocol.StringMessageSender;

 import org.testng.remote.strprotocol.SuiteMessage;

 import org.testng.reporters.JUnitXMLReporter;

 import org.testng.reporters.TestHTMLReporter;

@@ -34,7 +33,7 @@
  * @author Cedric Beust <cedric@beust.com>

  */

 public class RemoteTestNG extends TestNG {

-  private static final String LOCALHOST = "127.0.0.1";

+  private static final String LOCALHOST = "localhost";

 

   // The following constants are referenced by the Eclipse plug-in, make sure you

   // modify the plug-in as well if you change any of them.

@@ -76,9 +75,9 @@
   @Override

   public void run() {

     List<IMessageSender> senders = Lists.newArrayList();

-    if (m_port != null) {

-      senders.add(new StringMessageSender(m_host, m_port));

-    }

+//    if (m_port != null) {

+//      senders.add(new StringMessageSender(m_host, m_port));

+//    }

     if (m_serPort != null) {

       senders.add(new SerializedMessageSender(m_host, m_serPort));

     }

@@ -102,7 +101,8 @@
         }

 

         GenericMessage gm= new GenericMessage(MessageHelper.GENERIC_SUITE_COUNT);

-        gm.addProperty("suiteCount", suites.size()).addProperty("testCount", testCount);

+        gm.setSuiteCount(suites.size());

+        gm.setTestCount(testCount);

         msh.sendMessage(gm);

 

         addListener(new RemoteSuiteListener(msh));

@@ -160,9 +160,9 @@
     new JCommander(Arrays.asList(cla, ra), args);

     m_debug = cla.debug;

     if (m_debug) {

-      while (true) {

+//      while (true) {

         initAndRun(args, cla, ra);

-      }

+//      }

     }

     else {

       initAndRun(args, cla, ra);

@@ -174,6 +174,7 @@
     if (m_debug) {

       // In debug mode, override the port and the XML file to a fixed location

       cla.port = Integer.parseInt(DEBUG_PORT);

+      ra.serPort = cla.port;

       cla.suiteFiles = Arrays.asList(new String[] {

           DEBUG_SUITE_DIRECTORY + DEBUG_SUITE_FILE

       });

diff --git a/src/main/java/org/testng/remote/strprotocol/AbstractRemoteTestRunnerClient.java b/src/main/java/org/testng/remote/strprotocol/AbstractRemoteTestRunnerClient.java
index 435c365..f6f3019 100755
--- a/src/main/java/org/testng/remote/strprotocol/AbstractRemoteTestRunnerClient.java
+++ b/src/main/java/org/testng/remote/strprotocol/AbstractRemoteTestRunnerClient.java
@@ -18,9 +18,8 @@
 
 import org.testng.TestNGException;
 
-import java.io.BufferedReader;
+import java.io.EOFException;
 import java.io.IOException;
-import java.io.PrintWriter;
 import java.net.ServerSocket;
 import java.net.Socket;
 
@@ -40,8 +39,9 @@
    */
   private ServerSocket       fServerSocket;
   private Socket             fSocket;
-  private PrintWriter        m_outputWriter;
-  private BufferedReader     m_inputReader;
+  private ServerConnection m_serverConnection;
+//  private PrintWriter        m_outputWriter;
+//  private BufferedReader     m_inputReader;
 
   /**
    * Start listening to a test run. Start a server connection that
@@ -52,6 +52,7 @@
                                           ServerConnection serverConnection) {
     m_suiteListeners= suiteListeners;
     m_testListeners= testListeners;
+    m_serverConnection = serverConnection;
 
     serverConnection.start();
   }
@@ -65,19 +66,19 @@
   }
 
   private synchronized void shutdown() {
-    if(m_outputWriter != null) {
-      m_outputWriter.close();
-      m_outputWriter = null;
-    }
-    try {
-      if(m_inputReader != null) {
-        m_inputReader.close();
-        m_inputReader = null;
-      }
-    }
-    catch(IOException e) {
-      e.printStackTrace();
-    }
+//    if(m_outputWriter != null) {
+//      m_outputWriter.close();
+//      m_outputWriter = null;
+//    }
+//    try {
+//      if(m_inputReader != null) {
+//        m_inputReader.close();
+//        m_inputReader = null;
+//      }
+//    }
+//    catch(IOException e) {
+//      e.printStackTrace();
+//    }
     try {
       if(fSocket != null) {
         fSocket.close();
@@ -99,7 +100,7 @@
   }
 
   public boolean isRunning() {
-    return fSocket != null;
+    return m_serverConnection.getMessageSender() != null;
   }
 
   /**
@@ -107,10 +108,7 @@
    */
   public synchronized void stopTest() {
     if(isRunning()) {
-      if(null != m_outputWriter) {
-        m_outputWriter.println(MessageHelper.STOP_MSG);
-        m_outputWriter.flush();
-      }
+      m_serverConnection.getMessageSender().sendStop();
       shutdown();
     }
   }
@@ -172,28 +170,46 @@
       m_messageMarshaller = messageMarshaller;
     }
 
+    IMessageSender getMessageSender() {
+      return m_messageMarshaller;
+    }
+
     @Override
     public void run() {
-      IMessage message = m_messageMarshaller.receiveMessage();
-      while (message != null) {
-        if (message instanceof GenericMessage) {
-          notifyStart((GenericMessage) message);
+      try {
+        IMessage message = m_messageMarshaller.receiveMessage();
+        while (message != null) {
+          if (message instanceof GenericMessage) {
+            notifyStart((GenericMessage) message);
+          }
+          else if (message instanceof SuiteMessage) {
+            notifySuiteEvents((SuiteMessage) message);
+          }
+          else if (message instanceof TestMessage) {
+            notifyTestEvents((TestMessage) message);
+          }
+          else if (message instanceof TestResultMessage) {
+            notifyResultEvents((TestResultMessage) message);
+          }
+          else {
+            throw new TestNGException("Unknown message type:" + message);
+          }
+          if (isRunning()) {
+            m_messageMarshaller.sendAck();
+          }
+          message = m_messageMarshaller.receiveMessage();
         }
-        else if (message instanceof SuiteMessage) {
-          notifySuiteEvents((SuiteMessage) message);
-        }
-        else if (message instanceof TestMessage) {
-          notifyTestEvents((TestMessage) message);
-        }
-        else if (message instanceof TestResultMessage) {
-          notifyResultEvents((TestResultMessage) message);
-        }
-        else {
-          throw new TestNGException("Unknown message type:" + message);
-        }
-        message = m_messageMarshaller.receiveMessage();
       }
-      m_messageMarshaller.shutDown();
+      catch (EOFException ex) {
+        // Expected
+      }
+      catch(Exception ex) {
+        ex.printStackTrace();
+      }
+      finally {
+        m_messageMarshaller.shutDown();
+        m_messageMarshaller = null;
+      }
 //      try {
 //        fServerSocket = new ServerSocket(fServerPort);
 //        fSocket = fServerSocket.accept();
diff --git a/src/main/java/org/testng/remote/strprotocol/BaseMessageSender.java b/src/main/java/org/testng/remote/strprotocol/BaseMessageSender.java
index b90ef57..4524bea 100644
--- a/src/main/java/org/testng/remote/strprotocol/BaseMessageSender.java
+++ b/src/main/java/org/testng/remote/strprotocol/BaseMessageSender.java
@@ -1,13 +1,15 @@
 package org.testng.remote.strprotocol;
 
 import org.testng.TestNGException;
-import org.testng.remote.RemoteTestNG;
 
 import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
 import java.io.UnsupportedEncodingException;
 import java.net.ConnectException;
 import java.net.ServerSocket;
@@ -22,13 +24,16 @@
 
   /** Outgoing message stream. */
   protected OutputStream m_outStream;
+  /** Used to send ACK and STOP */
+  private PrintWriter m_outWriter;
 
   /** Incoming message stream. */
   protected volatile InputStream m_inStream;
-  protected BufferedReader m_inReader;
+  /** Used to receive ACK and STOP */
+  protected volatile BufferedReader m_inReader;
 
   private ReaderThread m_readerThread;
-  protected InputStream m_receiverInputStream;
+//  protected InputStream m_receiverInputStream;
 
   public BaseMessageSender(String host, int port) {
     m_host = host;
@@ -47,10 +52,14 @@
     while (true) {
       try {
         m_clientSocket = new Socket(m_host, m_port);
-  
-        m_outStream = m_clientSocket.getOutputStream();
-        m_inStream = m_clientSocket.getInputStream();
+        p("Received a connection from Eclipse on " + m_host + ":" + m_port);
 
+        // Output streams
+        m_outStream = m_clientSocket.getOutputStream();
+        m_outWriter = new PrintWriter(new BufferedWriter(new OutputStreamWriter(m_outStream)));
+
+        // Input streams
+        m_inStream = m_clientSocket.getInputStream();
         try {
           m_inReader = new BufferedReader(new InputStreamReader(m_inStream,
               "UTF-8")); //$NON-NLS-1$
@@ -59,7 +68,7 @@
           // Should never happen
           m_inReader = new BufferedReader(new InputStreamReader(m_inStream));
         }
-  
+
         p("Connection established, starting reader thread");
         m_readerThread = new ReaderThread();
         m_readerThread.start();
@@ -77,15 +86,68 @@
     }
   }
 
+  private void _initSockets() throws IOException {
+    m_clientSocket = new Socket(m_host, m_port);
+    p("Received a connection from Eclipse on " + m_host + ":" + m_port);
+
+    // Output streams
+    m_outStream = m_clientSocket.getOutputStream();
+    m_outWriter = new PrintWriter(new BufferedWriter(new OutputStreamWriter(m_outStream)));
+
+    // Input streams
+    m_inStream = m_clientSocket.getInputStream();
+    try {
+      m_inReader = new BufferedReader(new InputStreamReader(m_inStream,
+          "UTF-8")); //$NON-NLS-1$
+    }
+    catch(UnsupportedEncodingException ueex) {
+      // Should never happen
+      m_inReader = new BufferedReader(new InputStreamReader(m_inStream));
+    }
+
+  }
+
+  private void sendAdminMessage(String message) {
+    p("Sending admin message " + message);
+    m_outWriter.println(message);
+    m_outWriter.flush();
+  }
+
+  @Override
+  public void sendAck() {
+//    p("Writing ack as an int");
+//    try {
+//      m_outStream.write(1);
+//      m_outStream.flush();
+//    } catch (IOException e) {
+//      // TODO Auto-generated catch block
+//      e.printStackTrace();
+//    }
+//    p("Done writing ack as an int");
+    sendAdminMessage(MessageHelper.ACK_MSG);
+  }
+
+  @Override
+  public void sendStop() {
+    sendAdminMessage(MessageHelper.STOP_MSG);
+  }
+
   @Override
   public void initReceiver() {
-    if (m_receiverInputStream != null) {
+    if (m_inStream != null) {
       p("Receiver already initialized");
     }
     ServerSocket serverSocket;
     try {
       serverSocket = new ServerSocket(m_port);
-      m_receiverInputStream = serverSocket.accept().getInputStream();
+      Socket socket = serverSocket.accept();
+      m_inStream = socket.getInputStream();
+      m_inReader = new BufferedReader(new InputStreamReader(m_inStream));
+      m_outStream = socket.getOutputStream();
+      m_outWriter = new PrintWriter(new OutputStreamWriter(m_outStream));
+
+//      initSockets();
+
     } catch (IOException e) {
       // TODO Auto-generated catch block
       e.printStackTrace();
@@ -132,7 +194,8 @@
   }
 
   private static void p(String msg) {
-    if (RemoteTestNG.isVerbose()) {
+    if (true) {
+//    if (RemoteTestNG.isVerbose()) {
       System.out.println("[BaseMessageSender] " + msg); //$NON-NLS-1$
     }
   }
@@ -146,13 +209,27 @@
       super("ReaderThread"); //$NON-NLS-1$
     }
 
+//    @Override
+    public void _run() {
+      try {
+        p("ReaderThread reading from instream");
+        int ack = m_inStream.read();
+        p("ReaderThread read int:" + ack);
+      } catch (IOException e) {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      }
+    }
+
     @Override
     public void run() {
       try {
-        String message;
-        while((m_inReader != null) && (message = m_inReader.readLine()) != null) {
-          if(m_debug) {
-            p("Reply:" + message); //$NON-NLS-1$
+        p("ReaderThread waiting for an admin message");
+        String message = m_inReader.readLine();
+        p("ReaderThread received admin message:" + message);
+        while (message != null) {
+          if (m_debug) {
+            p("Admin message:" + message); //$NON-NLS-1$
           }
           boolean acknowledge = MessageHelper.ACK_MSG.equals(message);
           boolean stop = MessageHelper.STOP_MSG.equals(message);
@@ -164,10 +241,26 @@
               break;
             }
           }
+          message = m_inReader.readLine();
         }
+//        while((m_reader != null) && (message = m_reader.readLine()) != null) {
+//          if (m_debug) {
+//            p("Admin message:" + message); //$NON-NLS-1$
+//          }
+//          boolean acknowledge = MessageHelper.ACK_MSG.equals(message);
+//          boolean stop = MessageHelper.STOP_MSG.equals(message);
+//          if(acknowledge || stop) {
+//            synchronized(m_lock) {
+//              m_lock.notifyAll();
+//            }
+//            if (stop) {
+//              break;
+//            }
+//          }
+//        }
       }
       catch(IOException ioe) {
-        ;
+        // ioe.printStackTrace();
       }
     }
   }
diff --git a/src/main/java/org/testng/remote/strprotocol/GenericMessage.java b/src/main/java/org/testng/remote/strprotocol/GenericMessage.java
index ecb3e50..7616836 100755
--- a/src/main/java/org/testng/remote/strprotocol/GenericMessage.java
+++ b/src/main/java/org/testng/remote/strprotocol/GenericMessage.java
@@ -1,9 +1,5 @@
 package org.testng.remote.strprotocol;
 
-import org.testng.collections.Maps;
-
-import java.util.Iterator;
-import java.util.Map;
 
 
 
@@ -15,31 +11,50 @@
  * @author <a href='mailto:the_mindstorm[at]evolva[dot]ro'>Alexandru Popescu</a>
  */
 public class GenericMessage implements IStringMessage {
-  protected Map m_properties;
+//  protected Map m_properties;
   protected final int m_messageType;
+  private int m_suiteCount;
+
+  private int m_testCount;
 
   public GenericMessage(final int type) {
-    this(type, Maps.newHashMap());
-  }
-
-  public GenericMessage(final int type, Map props) {
     m_messageType = type;
-    m_properties = props;
   }
 
-  public GenericMessage addProperty(final String propName, final Object propValue) {
-    m_properties.put(propName, propValue);
-
-    return this;
+  public int getSuiteCount() {
+    return m_suiteCount;
   }
 
+  public void setSuiteCount(int suiteCount) {
+    m_suiteCount = suiteCount;
+  }
+
+  public int getTestCount() {
+    return m_testCount;
+  }
+
+  public void setTestCount(int testCount) {
+    m_testCount = testCount;
+  }
+
+//  public GenericMessage(final int type, Map props) {
+//    m_messageType = type;
+//    m_properties = props;
+//  }
+
+//  public GenericMessage addProperty(final String propName, final Object propValue) {
+//    m_properties.put(propName, propValue);
+//
+//    return this;
+//  }
+
   public GenericMessage addProperty(final String propName, final int propValue) {
     return addProperty(propName, Integer.valueOf(propValue));
   }
 
-  public String getProperty(final String propName) {
-    return (String) m_properties.get(propName);
-  }
+//  public String getProperty(final String propName) {
+//    return (String) m_properties.get(propName);
+//  }
 
   /**
    * @see net.noco.testng.runner.IStringMessage#getMessageAsString()
@@ -49,16 +64,18 @@
     StringBuffer buf = new StringBuffer();
 
     buf.append(m_messageType);
+    buf.append(MessageHelper.DELIMITER).append("testCount").append(getTestCount())
+        .append(MessageHelper.DELIMITER).append("suiteCount").append(getSuiteCount());
 
-        for(Iterator it = m_properties.entrySet().iterator(); it.hasNext(); ) {
-          Map.Entry entry = (Map.Entry) it.next();
-
-          buf.append(MessageHelper.DELIMITER)
-              .append(entry.getKey())
-              .append(MessageHelper.DELIMITER)
-              .append(entry.getValue())
-              ;
-        }
+//        for(Iterator it = m_properties.entrySet().iterator(); it.hasNext(); ) {
+//          Map.Entry entry = (Map.Entry) it.next();
+//
+//          buf.append(MessageHelper.DELIMITER)
+//              .append(entry.getKey())
+//              .append(MessageHelper.DELIMITER)
+//              .append(entry.getValue())
+//              ;
+//        }
 
 
     return buf.toString();
diff --git a/src/main/java/org/testng/remote/strprotocol/IMessage.java b/src/main/java/org/testng/remote/strprotocol/IMessage.java
index a82398f..3aeaa4a 100755
--- a/src/main/java/org/testng/remote/strprotocol/IMessage.java
+++ b/src/main/java/org/testng/remote/strprotocol/IMessage.java
@@ -1,10 +1,12 @@
 package org.testng.remote.strprotocol;
 
+import java.io.Serializable;
+
 
 /**
  * Marker interface for messages exchanged between RemoteTestNG and a client.
  *
  * @author Cedric Beust <cedric@beust.com>
  */
-public interface IMessage {
+public interface IMessage extends Serializable {
 }
diff --git a/src/main/java/org/testng/remote/strprotocol/IMessageSender.java b/src/main/java/org/testng/remote/strprotocol/IMessageSender.java
index a385a0f..264c506 100644
--- a/src/main/java/org/testng/remote/strprotocol/IMessageSender.java
+++ b/src/main/java/org/testng/remote/strprotocol/IMessageSender.java
@@ -10,7 +10,16 @@
 
   void sendMessage(IMessage message) throws Exception;
 
-  IMessage receiveMessage();
+  /**
+   * Will return null or throw EOFException when the connection has been severed.
+   */
+  IMessage receiveMessage() throws Exception;
 
   void shutDown();
+
+  // These two methods should probably be in a separate class since they should all be
+  // the same for implementers of this interface.
+  void sendAck();
+
+  void sendStop();
 }
diff --git a/src/main/java/org/testng/remote/strprotocol/MessageHelper.java b/src/main/java/org/testng/remote/strprotocol/MessageHelper.java
index 4fb871a..cff7cf1 100755
--- a/src/main/java/org/testng/remote/strprotocol/MessageHelper.java
+++ b/src/main/java/org/testng/remote/strprotocol/MessageHelper.java
@@ -3,10 +3,8 @@
 
 import org.testng.ITestResult;
 import org.testng.collections.Lists;
-import org.testng.collections.Maps;
 
 import java.util.List;
-import java.util.Map;
 import java.util.regex.Pattern;
 
 
@@ -53,13 +51,17 @@
       return new GenericMessage(Integer.parseInt(messageParts[0]));
     }
     else {
-      Map props = Maps.newHashMap();
+      GenericMessage result = new GenericMessage(Integer.parseInt(messageParts[0]));
 
       for(int i = 1; i < messageParts.length; i+=2) {
-        props.put(messageParts[i], messageParts[i + 1]);
+        if ("testCount".equals(messageParts[i])) {
+          result.setTestCount(Integer.parseInt(messageParts[i + 1]));
+        } else if ("suiteCount".equals(messageParts[i])) {
+          result.setSuiteCount(Integer.parseInt(messageParts[i + 1]));
+        }
       }
 
-      return new GenericMessage(Integer.parseInt(messageParts[0]), props);
+      return result;
     }
   }
 
diff --git a/src/main/java/org/testng/remote/strprotocol/SerializedMessageSender.java b/src/main/java/org/testng/remote/strprotocol/SerializedMessageSender.java
index 3268dea..e0f6eb9 100644
--- a/src/main/java/org/testng/remote/strprotocol/SerializedMessageSender.java
+++ b/src/main/java/org/testng/remote/strprotocol/SerializedMessageSender.java
@@ -1,31 +1,68 @@
 package org.testng.remote.strprotocol;
 
+import java.io.EOFException;
 import java.io.IOException;
+import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 
 
 public class SerializedMessageSender extends BaseMessageSender {
 
+  private ObjectOutputStream m_oos;
+  private ObjectInputStream m_ios;
+
   public SerializedMessageSender(String host, int port) {
     super(host, port);
   }
 
-  @Override
-  public void sendMessage(IMessage message) throws IOException {
-    ObjectOutputStream oos = new ObjectOutputStream(m_outStream);
-
-    synchronized(m_lock) {
-      oos.writeObject(message);
-      oos.flush();
-      try {
-        m_lock.wait();
-      }
-      catch(InterruptedException e) { }
+  private void initStreams(boolean output) {
+    try {
+      if (output) m_oos = new ObjectOutputStream(m_outStream);
+      else m_ios = new ObjectInputStream(m_inStream);
+    } catch (IOException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
     }
   }
 
   @Override
-  public IMessage receiveMessage() {
-    return null;
+  public void sendMessage(IMessage message) throws IOException {
+    synchronized(m_lock) {
+      m_oos = new ObjectOutputStream(m_outStream);
+      p("Sending message " + message);
+      m_oos.writeObject(message);
+      m_oos.flush();
+
+      try {
+        p("Message sent, waiting for lock...");
+        m_lock.wait();
+        p("... lock done");
+      }
+      catch(InterruptedException e) {
+      }
+    }
+  }
+
+
+  @Override
+  public IMessage receiveMessage() throws IOException, ClassNotFoundException {
+
+    IMessage result = null;
+    try {
+      m_ios = new ObjectInputStream(m_inStream);
+//      synchronized(m_input) {
+        result = (IMessage) m_ios.readObject();
+        p("Received message " + result);
+//        sendAck();
+//      }
+    }
+    catch(EOFException ex) {
+      // ignore
+    }
+    return result;
+  }
+
+  private static void p(String s) {
+    System.out.println("[SerializedMessageSender] " + s);
   }
 }
diff --git a/src/main/java/org/testng/remote/strprotocol/StringMessageSender.java b/src/main/java/org/testng/remote/strprotocol/StringMessageSender.java
index c1295f3..1c2aa12 100644
--- a/src/main/java/org/testng/remote/strprotocol/StringMessageSender.java
+++ b/src/main/java/org/testng/remote/strprotocol/StringMessageSender.java
@@ -76,9 +76,9 @@
 
     if (m_inReader == null) {
       try {
-        m_inReader = new BufferedReader(new InputStreamReader(m_receiverInputStream, "UTF-8"));
+        m_inReader = new BufferedReader(new InputStreamReader(m_inStream, "UTF-8"));
       } catch (UnsupportedEncodingException e) {
-        m_inReader = new BufferedReader(new InputStreamReader(m_receiverInputStream));
+        m_inReader = new BufferedReader(new InputStreamReader(m_inStream));
       }
     }
     try {