Add the ability to set the number of worker threads

There is a long comment in MockWebServer detailing the reason.
This is to fix a flaky test that was issuing two requests in
quick succession and getting unexpected results.

Change-Id: I9af3f7f83fafc72002d874dd251fdf5a1df5d487
diff --git a/src/main/java/com/google/mockwebserver/MockWebServer.java b/src/main/java/com/google/mockwebserver/MockWebServer.java
index f7930bc..afcacc5 100644
--- a/src/main/java/com/google/mockwebserver/MockWebServer.java
+++ b/src/main/java/com/google/mockwebserver/MockWebServer.java
@@ -70,11 +70,14 @@
     private int bodyLimit = Integer.MAX_VALUE;
     private ServerSocket serverSocket;
     private SSLSocketFactory sslSocketFactory;
-    private ExecutorService executor;
+    private ExecutorService acceptExecutor;
+    private ExecutorService requestExecutor;
     private boolean tunnelProxy;
     private Dispatcher dispatcher = new QueueDispatcher();
 
     private int port = -1;
+    private int workerThreads = Integer.MAX_VALUE;
+
 
     public int getPort() {
         if (port == -1) {
@@ -120,6 +123,10 @@
         return hostName.contains(".") ? hostName : ".local";
     }
 
+    public void setWorkerThreads(int threads) {
+        this.workerThreads = threads;
+    }
+
     /**
      * Sets the number of bytes of the POST body to keep in memory to the given
      * limit.
@@ -184,15 +191,21 @@
      *     specific port is unavailable.
      */
     public void play(int port) throws IOException {
-        if (executor != null) {
+        if (acceptExecutor != null) {
             throw new IllegalStateException("play() already called");
         }
-        executor = Executors.newCachedThreadPool();
+        // The acceptExecutor handles the Socket.accept() and hands each request off to the
+        // requestExecutor. It also handles shutdown.
+        acceptExecutor = Executors.newSingleThreadExecutor();
+        // The requestExecutor has a fixed number of worker threads. In order to get strict
+        // guarantees that requests are handled in the order in which they are accepted
+        // workerThreads should be set to 1.
+        requestExecutor = Executors.newFixedThreadPool(workerThreads);
         serverSocket = new ServerSocket(port);
         serverSocket.setReuseAddress(true);
 
         this.port = serverSocket.getLocalPort();
-        executor.execute(namedRunnable("MockWebServer-accept-" + port, new Runnable() {
+        acceptExecutor.execute(namedRunnable("MockWebServer-accept-" + port, new Runnable() {
             public void run() {
                 try {
                     acceptConnections();
@@ -218,9 +231,14 @@
                     }
                 }
                 try {
-                    executor.shutdown();
+                    acceptExecutor.shutdown();
                 } catch (Throwable e) {
-                    logger.log(Level.WARNING, "MockWebServer executor shutdown failed", e);
+                    logger.log(Level.WARNING, "MockWebServer acceptExecutor shutdown failed", e);
+                }
+                try {
+                    requestExecutor.shutdown();
+                } catch (Throwable e) {
+                    logger.log(Level.WARNING, "MockWebServer requestExecutor shutdown failed", e);
                 }
             }
 
@@ -253,7 +271,7 @@
 
     private void serveConnection(final Socket raw) {
         String name = "MockWebServer-" + raw.getRemoteSocketAddress();
-        executor.execute(namedRunnable(name, new Runnable() {
+        requestExecutor.execute(namedRunnable(name, new Runnable() {
             int sequenceNumber = 0;
 
             public void run() {
@@ -335,6 +353,23 @@
                   return false;
                 }
                 writeResponse(out, response);
+
+                // For socket policies that poison the socket after the response is written:
+                // The client has received the response and will no longer be blocked after
+                // writeResponse() has returned. A client can then re-use the connection before
+                // the socket is poisoned (i.e. keep-alive / connection pooling). The second
+                // request/response may fail at the beginning, middle, end, or even succeed
+                // depending on scheduling. Delays can be required in tests to improve the chances
+                // of sockets being in a known state when subsequent requests are made.
+                //
+                // For SHUTDOWN_OUTPUT_AT_END the client may detect a problem with its input socket
+                // after the request has been made but before the server has chosen a response.
+                // For clients that perform retries, this can cause the client to issue a retry
+                // request. The retry handler may call dispatcher.dispatch(request) before the
+                // initial, failed request handler does and cause non-obvious response ordering.
+                // Setting workerThreads = 1 ensures that the dispatcher is called for requests in
+                // the order they are received.
+
                 if (response.getSocketPolicy() == SocketPolicy.DISCONNECT_AT_END) {
                     in.close();
                     out.close();