8199433: (se) select(Consumer<SelectionKey> action) as alternative to selected-key set

Reviewed-by: bpb
diff --git a/src/java.base/linux/classes/sun/nio/ch/EPollSelectorImpl.java b/src/java.base/linux/classes/sun/nio/ch/EPollSelectorImpl.java
index 8996b3c..1ed0523 100644
--- a/src/java.base/linux/classes/sun/nio/ch/EPollSelectorImpl.java
+++ b/src/java.base/linux/classes/sun/nio/ch/EPollSelectorImpl.java
@@ -27,6 +27,7 @@
 
 import java.io.IOException;
 import java.nio.channels.ClosedSelectorException;
+import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.spi.SelectorProvider;
 import java.util.ArrayDeque;
@@ -34,6 +35,7 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 
 import static sun.nio.ch.EPoll.EPOLLIN;
 import static sun.nio.ch.EPoll.EPOLL_CTL_ADD;
@@ -97,7 +99,9 @@
     }
 
     @Override
-    protected int doSelect(long timeout) throws IOException {
+    protected int doSelect(Consumer<SelectionKey> action, long timeout)
+        throws IOException
+    {
         assert Thread.holdsLock(this);
 
         // epoll_wait timeout is int
@@ -130,7 +134,7 @@
             end(blocking);
         }
         processDeregisterQueue();
-        return updateSelectedKeys(numEntries);
+        return processEvents(numEntries, action);
     }
 
     /**
@@ -171,13 +175,13 @@
     }
 
     /**
-     * Update the keys of file descriptors that were polled and add them to
-     * the selected-key set.
+     * Process the polled events.
      * If the interrupt fd has been selected, drain it and clear the interrupt.
      */
-    private int updateSelectedKeys(int numEntries) throws IOException {
+    private int processEvents(int numEntries, Consumer<SelectionKey> action)
+        throws IOException
+    {
         assert Thread.holdsLock(this);
-        assert Thread.holdsLock(nioSelectedKeys());
 
         boolean interrupted = false;
         int numKeysUpdated = 0;
@@ -190,17 +194,7 @@
                 SelectionKeyImpl ski = fdToKey.get(fd);
                 if (ski != null) {
                     int rOps = EPoll.getEvents(event);
-                    if (selectedKeys.contains(ski)) {
-                        if (ski.translateAndUpdateReadyOps(rOps)) {
-                            numKeysUpdated++;
-                        }
-                    } else {
-                        ski.translateAndSetReadyOps(rOps);
-                        if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
-                            selectedKeys.add(ski);
-                            numKeysUpdated++;
-                        }
-                    }
+                    numKeysUpdated += processReadyEvents(rOps, ski, action);
                 }
             }
         }
diff --git a/src/java.base/macosx/classes/sun/nio/ch/KQueueSelectorImpl.java b/src/java.base/macosx/classes/sun/nio/ch/KQueueSelectorImpl.java
index 6215b13..56925b0 100644
--- a/src/java.base/macosx/classes/sun/nio/ch/KQueueSelectorImpl.java
+++ b/src/java.base/macosx/classes/sun/nio/ch/KQueueSelectorImpl.java
@@ -27,6 +27,7 @@
 
 import java.io.IOException;
 import java.nio.channels.ClosedSelectorException;
+import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.spi.SelectorProvider;
 import java.util.ArrayDeque;
@@ -34,6 +35,7 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 
 import static sun.nio.ch.KQueue.EVFILT_READ;
 import static sun.nio.ch.KQueue.EVFILT_WRITE;
@@ -100,7 +102,9 @@
     }
 
     @Override
-    protected int doSelect(long timeout) throws IOException {
+    protected int doSelect(Consumer<SelectionKey> action, long timeout)
+        throws IOException
+    {
         assert Thread.holdsLock(this);
 
         long to = Math.min(timeout, Integer.MAX_VALUE);  // max kqueue timeout
@@ -132,7 +136,7 @@
             end(blocking);
         }
         processDeregisterQueue();
-        return updateSelectedKeys(numEntries);
+        return processEvents(numEntries, action);
     }
 
     /**
@@ -180,13 +184,13 @@
     }
 
     /**
-     * Update the keys of file descriptors that were polled and add them to
-     * the selected-key set.
+     * Process the polled events.
      * If the interrupt fd has been selected, drain it and clear the interrupt.
      */
-    private int updateSelectedKeys(int numEntries) throws IOException {
+    private int processEvents(int numEntries, Consumer<SelectionKey> action)
+        throws IOException
+    {
         assert Thread.holdsLock(this);
-        assert Thread.holdsLock(nioSelectedKeys());
 
         int numKeysUpdated = 0;
         boolean interrupted = false;
@@ -214,22 +218,10 @@
                     } else if (filter == EVFILT_WRITE) {
                         rOps |= Net.POLLOUT;
                     }
-
-                    if (selectedKeys.contains(ski)) {
-                        if (ski.translateAndUpdateReadyOps(rOps)) {
-                            // file descriptor may be polled more than once per poll
-                            if (ski.lastPolled != pollCount) {
-                                numKeysUpdated++;
-                                ski.lastPolled = pollCount;
-                            }
-                        }
-                    } else {
-                        ski.translateAndSetReadyOps(rOps);
-                        if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
-                            selectedKeys.add(ski);
-                            numKeysUpdated++;
-                            ski.lastPolled = pollCount;
-                        }
+                    int updated = processReadyEvents(rOps, ski, action);
+                    if (updated > 0 && ski.lastPolled != pollCount) {
+                        numKeysUpdated++;
+                        ski.lastPolled = pollCount;
                     }
                 }
             }
diff --git a/src/java.base/share/classes/java/nio/channels/SelectionKey.java b/src/java.base/share/classes/java/nio/channels/SelectionKey.java
index ceb7f57..2a96d3d 100644
--- a/src/java.base/share/classes/java/nio/channels/SelectionKey.java
+++ b/src/java.base/share/classes/java/nio/channels/SelectionKey.java
@@ -291,7 +291,7 @@
      * detects that the corresponding channel is ready for reading, has reached
      * end-of-stream, has been remotely shut down for further reading, or has
      * an error pending, then it will add {@code OP_READ} to the key's
-     * ready-operation set and add the key to its selected-key&nbsp;set.  </p>
+     * ready-operation set.  </p>
      */
     public static final int OP_READ = 1 << 0;
 
@@ -303,8 +303,7 @@
      * href="Selector.html#selop">selection operation</a>.  If the selector
      * detects that the corresponding channel is ready for writing, has been
      * remotely shut down for further writing, or has an error pending, then it
-     * will add {@code OP_WRITE} to the key's ready set and add the key to its
-     * selected-key&nbsp;set.  </p>
+     * will add {@code OP_WRITE} to the key's ready set.  </p>
      */
     public static final int OP_WRITE = 1 << 2;
 
@@ -316,8 +315,7 @@
      * href="Selector.html#selop">selection operation</a>.  If the selector
      * detects that the corresponding socket channel is ready to complete its
      * connection sequence, or has an error pending, then it will add
-     * {@code OP_CONNECT} to the key's ready set and add the key to its
-     * selected-key&nbsp;set.  </p>
+     * {@code OP_CONNECT} to the key's ready set.  </p>
      */
     public static final int OP_CONNECT = 1 << 3;
 
@@ -329,8 +327,7 @@
      * href="Selector.html#selop">selection operation</a>.  If the selector
      * detects that the corresponding server-socket channel is ready to accept
      * another connection, or has an error pending, then it will add
-     * {@code OP_ACCEPT} to the key's ready set and add the key to its
-     * selected-key&nbsp;set.  </p>
+     * {@code OP_ACCEPT} to the key's ready set.  </p>
      */
     public static final int OP_ACCEPT = 1 << 4;
 
diff --git a/src/java.base/share/classes/java/nio/channels/Selector.java b/src/java.base/share/classes/java/nio/channels/Selector.java
index 7b0f39c..6ba1948 100644
--- a/src/java.base/share/classes/java/nio/channels/Selector.java
+++ b/src/java.base/share/classes/java/nio/channels/Selector.java
@@ -28,7 +28,9 @@
 import java.io.Closeable;
 import java.io.IOException;
 import java.nio.channels.spi.SelectorProvider;
+import java.util.Objects;
 import java.util.Set;
+import java.util.function.Consumer;
 
 
 /**
@@ -56,7 +58,8 @@
  *
  *   <li><p> The <i>selected-key set</i> is the set of keys such that each
  *   key's channel was detected to be ready for at least one of the operations
- *   identified in the key's interest set during a prior selection operation.
+ *   identified in the key's interest set during a prior selection operation
+ *   that adds keys or updates keys in the set.
  *   This set is returned by the {@link #selectedKeys() selectedKeys} method.
  *   The selected-key set is always a subset of the key set. </p></li>
  *
@@ -92,6 +95,27 @@
  * <a id="selop"></a>
  * <h2>Selection</h2>
  *
+ * <p> A selection operation queries the underlying operating system for an
+ * update as to the readiness of each registered channel to perform any of the
+ * operations identified by its key's interest set.  There are two forms of
+ * selection operation:
+ *
+ * <ol>
+ *
+ *   <li><p> The {@link #select()}, {@link #select(long)}, and {@link #selectNow()}
+ *   methods add the keys of channels ready to perform an operation to the
+ *   selected-key set, or update the ready-operation set of keys already in the
+ *   selected-key set. </p></li>
+ *
+ *   <li><p> The {@link #select(Consumer)}, {@link #select(Consumer, long)}, and
+ *   {@link #selectNow(Consumer)} methods perform an <i>action</i> on the key
+ *   of each channel that is ready to perform an operation.  These methods do
+ *   not add to the selected-key set. </p></li>
+ *
+ * </ol>
+ *
+ * <h3>Selection operations that add to the selected-key set</h3>
+ *
  * <p> During each selection operation, keys may be added to and removed from a
  * selector's selected-key set and may be removed from its key and
  * cancelled-key sets.  Selection is performed by the {@link #select()}, {@link
@@ -141,6 +165,45 @@
  * difference between the three selection methods. </p>
  *
  *
+ * <h3>Selection operations that perform an action on selected keys</h3>
+ *
+ * <p> During each selection operation, keys may be removed from the selector's
+ * key, selected-key, and cancelled-key sets.  Selection is performed by the
+ * {@link #select(Consumer)}, {@link #select(Consumer,long)}, and {@link
+ * #selectNow(Consumer)} methods, and involves three steps:  </p>
+ *
+ * <ol>
+ *
+ *   <li><p> Each key in the cancelled-key set is removed from each key set of
+ *   which it is a member, and its channel is deregistered.  This step leaves
+ *   the cancelled-key set empty. </p></li>
+ *
+ *   <li><p> The underlying operating system is queried for an update as to the
+ *   readiness of each remaining channel to perform any of the operations
+ *   identified by its key's interest set as of the moment that the selection
+ *   operation began.
+ *
+ *   <p> For a channel that is ready for at least one such operation, the
+ *   ready-operation set of the channel's key is set to identify exactly the
+ *   operations for which the channel is ready and the <i>action</i> specified
+ *   to the {@code select} method is invoked to consume the channel's key.  Any
+ *   readiness information previously recorded in the ready set is discarded
+ *   prior to invoking the <i>action</i>.
+ *
+ *   <p> Alternatively, where a channel is ready for more than one operation,
+ *   the <i>action</i> may be invoked more than once with the channel's key and
+ *   ready-operation set modified to a subset of the operations for which the
+ *   channel is ready.  Where the <i>action</i> is invoked more than once for
+ *   the same key then its ready-operation set never contains operation bits
+ *   that were contained in the set at previous calls to the <i>action</i>
+ *   in the same selection operation.  </p></li>
+ *
+ *   <li><p> If any keys were added to the cancelled-key set while step (2) was
+ *   in progress then they are processed as in step (1). </p></li>
+ *
+ * </ol>
+ *
+ *
  * <h2>Concurrency</h2>
  *
  * <p> A Selector and its key set are safe for use by multiple concurrent
@@ -156,13 +219,12 @@
  *
  * <p> Keys may be cancelled and channels may be closed at any time.  Hence the
  * presence of a key in one or more of a selector's key sets does not imply
- * that the key is valid or that its channel is open.  Application code should
+ * that the key is valid or that its channel is open. Application code should
  * be careful to synchronize and check these conditions as necessary if there
  * is any possibility that another thread will cancel a key or close a channel.
  *
- * <p> A thread blocked in one of the {@link #select()} or {@link
- * #select(long)} methods may be interrupted by some other thread in one of
- * three ways:
+ * <p> A thread blocked in a selection operation may be interrupted by some
+ * other thread in one of three ways:
  *
  * <ul>
  *
@@ -356,18 +418,188 @@
     public abstract int select() throws IOException;
 
     /**
+     * Selects and performs an action on the keys whose corresponding channels
+     * are ready for I/O operations.
+     *
+     * <p> This method performs a blocking <a href="#selop">selection
+     * operation</a>.  It wakes up from querying the operating system only when
+     * at least one channel is selected, this selector's {@link #wakeup wakeup}
+     * method is invoked, the current thread is interrupted, or the given
+     * timeout period expires, whichever comes first.
+     *
+     * <p> The specified <i>action</i>'s {@link Consumer#accept(Object) accept}
+     * method is invoked with the key for each channel that is ready to perform
+     * an operation identified by its key's interest set.  The {@code accept}
+     * method may be invoked more than once for the same key but with the
+     * ready-operation set containing a subset of the operations for which the
+     * channel is ready (as described above).  The {@code accept} method is
+     * invoked while synchronized on the selector and its selected-key set.
+     * Great care must be taken to avoid deadlocking with other threads that
+     * also synchronize on these objects.  Selection operations are not reentrant
+     * in general and consequently the <i>action</i> should take great care not
+     * to attempt a selection operation on the same selector.  The behavior when
+     * attempting a reentrant selection operation is implementation specific and
+     * therefore not specified.  If the <i>action</i> closes the selector then
+     * {@code ClosedSelectorException} is thrown when the action completes.
+     * The <i>action</i> is not prohibited from closing channels registered with
+     * the selector, nor prohibited from cancelling keys or changing a key's
+     * interest set.  If a channel is selected but its key is cancelled or its
+     * interest set changed before the <i>action</i> is performed on the key
+     * then it is implementation specific as to whether the <i>action</i> is
+     * invoked (it may be invoked with an {@link SelectionKey#isValid() invalid}
+     * key).  Exceptions thrown by the action are relayed to the caller.
+     *
+     * <p> This method does not offer real-time guarantees: It schedules the
+     * timeout as if by invoking the {@link Object#wait(long)} method.
+     *
+     * @implSpec The default implementation removes all keys from the
+     * selected-key set, invokes {@link #select(long) select(long)} with the
+     * given timeout and then performs the action for each key added to the
+     * selected-key set.  The default implementation does not detect the action
+     * performing a reentrant selection operation.  The selected-key set may
+     * or may not be empty on completion of the default implementation.
+     *
+     * @param  action   The action to perform
+     *
+     * @param  timeout  If positive, block for up to {@code timeout}
+     *                  milliseconds, more or less, while waiting for a
+     *                  channel to become ready; if zero, block indefinitely;
+     *                  must not be negative
+     *
+     * @return  The number of unique keys consumed, possibly zero
+     *
+     * @throws  IOException
+     *          If an I/O error occurs
+     *
+     * @throws  ClosedSelectorException
+     *          If this selector is closed or is closed by the action
+     *
+     * @throws  IllegalArgumentException
+     *          If the value of the timeout argument is negative
+     *
+     * @since 11
+     */
+    public int select(Consumer<SelectionKey> action, long timeout)
+        throws IOException
+    {
+        if (timeout < 0)
+            throw new IllegalArgumentException("Negative timeout");
+        return doSelect(Objects.requireNonNull(action), timeout);
+    }
+
+    /**
+     * Selects and performs an action on the keys whose corresponding channels
+     * are ready for I/O operations.
+     *
+     * <p> This method performs a blocking <a href="#selop">selection
+     * operation</a>.  It wakes up from querying the operating system only when
+     * at least one channel is selected, this selector's {@link #wakeup wakeup}
+     * method is invoked, or the current thread is interrupted, whichever comes
+     * first.
+     *
+     * <p> This method is equivalent to invoking the 2-arg
+     * {@link #select(Consumer, long) select} method with a timeout of {@code 0}
+     * to block indefinitely.  </p>
+     *
+     * @implSpec The default implementation invokes the 2-arg {@code select}
+     * method with a timeout of {@code 0}.
+     *
+     * @param  action   The action to perform
+     *
+     * @return  The number of unique keys consumed, possibly zero
+     *
+     * @throws  IOException
+     *          If an I/O error occurs
+     *
+     * @throws  ClosedSelectorException
+     *          If this selector is closed or is closed by the action
+     *
+     * @since 11
+     */
+    public int select(Consumer<SelectionKey> action) throws IOException {
+        return select(action, 0);
+    }
+
+    /**
+     * Selects and performs an action on the keys whose corresponding channels
+     * are ready for I/O operations.
+     *
+     * <p> This method performs a non-blocking <a href="#selop">selection
+     * operation</a>.
+     *
+     * <p> Invoking this method clears the effect of any previous invocations
+     * of the {@link #wakeup wakeup} method.  </p>
+     *
+     * @implSpec The default implementation removes all keys from the
+     * selected-key set, invokes {@link #selectNow() selectNow()} and then
+     * performs the action for each key added to the selected-key set.  The
+     * default implementation does not detect the action performing a reentrant
+     * selection operation.  The selected-key set may or may not be empty on
+     * completion of the default implementation.
+     *
+     * @param  action   The action to perform
+     *
+     * @return  The number of unique keys consumed, possibly zero
+     *
+     * @throws  IOException
+     *          If an I/O error occurs
+     *
+     * @throws  ClosedSelectorException
+     *          If this selector is closed or is closed by the action
+     *
+     * @since 11
+     */
+    public int selectNow(Consumer<SelectionKey> action) throws IOException {
+        return doSelect(Objects.requireNonNull(action), -1);
+    }
+
+    /**
+     * Default implementation of select(Consumer) and selectNow(Consumer).
+     */
+    private int doSelect(Consumer<SelectionKey> action, long timeout)
+        throws IOException
+    {
+        synchronized (this) {
+            Set<SelectionKey> selectedKeys = selectedKeys();
+            synchronized (selectedKeys) {
+                selectedKeys.clear();
+                int numKeySelected;
+                if (timeout < 0) {
+                    numKeySelected = selectNow();
+                } else {
+                    numKeySelected = select(timeout);
+                }
+
+                // copy selected-key set as action may remove keys
+                Set<SelectionKey> keysToConsume = Set.copyOf(selectedKeys);
+                assert keysToConsume.size() == numKeySelected;
+                selectedKeys.clear();
+
+                // invoke action for each selected key
+                keysToConsume.forEach(k -> {
+                    action.accept(k);
+                    if (!isOpen())
+                        throw new ClosedSelectorException();
+                });
+
+                return numKeySelected;
+            }
+        }
+    }
+
+
+    /**
      * Causes the first selection operation that has not yet returned to return
      * immediately.
      *
-     * <p> If another thread is currently blocked in an invocation of the
-     * {@link #select()} or {@link #select(long)} methods then that invocation
-     * will return immediately.  If no selection operation is currently in
-     * progress then the next invocation of one of these methods will return
-     * immediately unless the {@link #selectNow()} method is invoked in the
-     * meantime.  In any case the value returned by that invocation may be
-     * non-zero.  Subsequent invocations of the {@link #select()} or {@link
-     * #select(long)} methods will block as usual unless this method is invoked
-     * again in the meantime.
+     * <p> If another thread is currently blocked in a selection operation then
+     * that invocation will return immediately.  If no selection operation is
+     * currently in progress then the next invocation of a selection operation
+     * will return immediately unless {@link #selectNow()} or {@link
+     * #selectNow(Consumer)} is invoked in the meantime.  In any case the value
+     * returned by that invocation may be non-zero.  Subsequent selection
+     * operations will block as usual unless this method is invoked again in the
+     * meantime.
      *
      * <p> Invoking this method more than once between two successive selection
      * operations has the same effect as invoking it just once.  </p>
@@ -398,5 +630,4 @@
      *          If an I/O error occurs
      */
     public abstract void close() throws IOException;
-
 }
diff --git a/src/java.base/share/classes/sun/nio/ch/SelectorImpl.java b/src/java.base/share/classes/sun/nio/ch/SelectorImpl.java
index 281c95b..7bb2ba8 100644
--- a/src/java.base/share/classes/sun/nio/ch/SelectorImpl.java
+++ b/src/java.base/share/classes/sun/nio/ch/SelectorImpl.java
@@ -36,8 +36,10 @@
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
 
 
 /**
@@ -51,12 +53,15 @@
     private final Set<SelectionKey> keys;
 
     // The set of keys with data ready for an operation
-    protected final Set<SelectionKey> selectedKeys;
+    private final Set<SelectionKey> selectedKeys;
 
     // Public views of the key sets
     private final Set<SelectionKey> publicKeys;             // Immutable
     private final Set<SelectionKey> publicSelectedKeys;     // Removal allowed, but not addition
 
+    // used to check for reentrancy
+    private boolean inSelect;
+
     protected SelectorImpl(SelectorProvider sp) {
         super(sp);
         keys = ConcurrentHashMap.newKeySet();
@@ -83,13 +88,6 @@
     }
 
     /**
-     * Returns the public view of the selected-key set
-     */
-    protected final Set<SelectionKey> nioSelectedKeys() {
-        return publicSelectedKeys;
-    }
-
-    /**
      * Marks the beginning of a select operation that might block
      */
     protected final void begin(boolean blocking) {
@@ -106,16 +104,27 @@
     /**
      * Selects the keys for channels that are ready for I/O operations.
      *
+     * @param action  the action to perform, can be null
      * @param timeout timeout in milliseconds to wait, 0 to not wait, -1 to
      *                wait indefinitely
      */
-    protected abstract int doSelect(long timeout) throws IOException;
+    protected abstract int doSelect(Consumer<SelectionKey> action, long timeout)
+        throws IOException;
 
-    private int lockAndDoSelect(long timeout) throws IOException {
+    private int lockAndDoSelect(Consumer<SelectionKey> action, long timeout)
+        throws IOException
+    {
         synchronized (this) {
             ensureOpen();
-            synchronized (publicSelectedKeys) {
-                return doSelect(timeout);
+            if (inSelect)
+                throw new IllegalStateException("select in progress");
+            inSelect = true;
+            try {
+                synchronized (publicSelectedKeys) {
+                    return doSelect(action, timeout);
+                }
+            } finally {
+                inSelect = false;
             }
         }
     }
@@ -124,17 +133,39 @@
     public final int select(long timeout) throws IOException {
         if (timeout < 0)
             throw new IllegalArgumentException("Negative timeout");
-        return lockAndDoSelect((timeout == 0) ? -1 : timeout);
+        return lockAndDoSelect(null, (timeout == 0) ? -1 : timeout);
     }
 
     @Override
     public final int select() throws IOException {
-        return lockAndDoSelect(-1);
+        return lockAndDoSelect(null, -1);
     }
 
     @Override
     public final int selectNow() throws IOException {
-        return lockAndDoSelect(0);
+        return lockAndDoSelect(null, 0);
+    }
+
+    @Override
+    public final int select(Consumer<SelectionKey> action, long timeout)
+        throws IOException
+    {
+        Objects.requireNonNull(action);
+        if (timeout < 0)
+            throw new IllegalArgumentException("Negative timeout");
+        return lockAndDoSelect(action, (timeout == 0) ? -1 : timeout);
+    }
+
+    @Override
+    public final int select(Consumer<SelectionKey> action) throws IOException {
+        Objects.requireNonNull(action);
+        return lockAndDoSelect(action, -1);
+    }
+
+    @Override
+    public final int selectNow(Consumer<SelectionKey> action) throws IOException {
+        Objects.requireNonNull(action);
+        return lockAndDoSelect(action, 0);
     }
 
     /**
@@ -240,6 +271,39 @@
     }
 
     /**
+     * Invoked by selection operations to handle ready events. If an action
+     * is specified then it is invoked to handle the key, otherwise the key
+     * is added to the selected-key set (or updated when it is already in the
+     * set).
+     */
+    protected final int processReadyEvents(int rOps,
+                                           SelectionKeyImpl ski,
+                                           Consumer<SelectionKey> action) {
+        if (action != null) {
+            ski.translateAndSetReadyOps(rOps);
+            if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
+                action.accept(ski);
+                ensureOpen();
+                return 1;
+            }
+        } else {
+            assert Thread.holdsLock(publicSelectedKeys);
+            if (selectedKeys.contains(ski)) {
+                if (ski.translateAndUpdateReadyOps(rOps)) {
+                    return 1;
+                }
+            } else {
+                ski.translateAndSetReadyOps(rOps);
+                if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
+                    selectedKeys.add(ski);
+                    return 1;
+                }
+            }
+        }
+        return 0;
+    }
+
+    /**
      * Invoked by interestOps to ensure the interest ops are updated at the
      * next selection operation.
      */
diff --git a/src/java.base/solaris/classes/sun/nio/ch/DevPollSelectorImpl.java b/src/java.base/solaris/classes/sun/nio/ch/DevPollSelectorImpl.java
index 67aff8d..02167e9 100644
--- a/src/java.base/solaris/classes/sun/nio/ch/DevPollSelectorImpl.java
+++ b/src/java.base/solaris/classes/sun/nio/ch/DevPollSelectorImpl.java
@@ -27,6 +27,7 @@
 
 import java.io.IOException;
 import java.nio.channels.ClosedSelectorException;
+import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.spi.SelectorProvider;
 import java.util.ArrayDeque;
@@ -34,6 +35,7 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 
 import static sun.nio.ch.DevPollArrayWrapper.NUM_POLLFDS;
 import static sun.nio.ch.DevPollArrayWrapper.POLLREMOVE;
@@ -85,7 +87,9 @@
     }
 
     @Override
-    protected int doSelect(long timeout) throws IOException {
+    protected int doSelect(Consumer<SelectionKey> action, long timeout)
+        throws IOException
+    {
         assert Thread.holdsLock(this);
 
         long to = timeout;
@@ -117,7 +121,7 @@
             end(blocking);
         }
         processDeregisterQueue();
-        return updateSelectedKeys(numEntries);
+        return processEvents(numEntries, action);
     }
 
     /**
@@ -165,13 +169,13 @@
     }
 
     /**
-     * Update the keys of file descriptors that were polled and add them to
-     * the selected-key set.
+     * Process the polled events.
      * If the interrupt fd has been selected, drain it and clear the interrupt.
      */
-    private int updateSelectedKeys(int numEntries) throws IOException {
+    private int processEvents(int numEntries, Consumer<SelectionKey> action)
+        throws IOException
+    {
         assert Thread.holdsLock(this);
-        assert Thread.holdsLock(nioSelectedKeys());
 
         boolean interrupted = false;
         int numKeysUpdated = 0;
@@ -183,17 +187,7 @@
                 SelectionKeyImpl ski = fdToKey.get(fd);
                 if (ski != null) {
                     int rOps = pollWrapper.getReventOps(i);
-                    if (selectedKeys.contains(ski)) {
-                        if (ski.translateAndUpdateReadyOps(rOps)) {
-                            numKeysUpdated++;
-                        }
-                    } else {
-                        ski.translateAndSetReadyOps(rOps);
-                        if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
-                            selectedKeys.add(ski);
-                            numKeysUpdated++;
-                        }
-                    }
+                    numKeysUpdated += processReadyEvents(rOps, ski, action);
                 }
             }
         }
diff --git a/src/java.base/solaris/classes/sun/nio/ch/EventPortSelectorImpl.java b/src/java.base/solaris/classes/sun/nio/ch/EventPortSelectorImpl.java
index ff5ab90..8f82c43 100644
--- a/src/java.base/solaris/classes/sun/nio/ch/EventPortSelectorImpl.java
+++ b/src/java.base/solaris/classes/sun/nio/ch/EventPortSelectorImpl.java
@@ -35,6 +35,7 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 
 import static sun.nio.ch.SolarisEventPort.PORT_SOURCE_FD;
 import static sun.nio.ch.SolarisEventPort.PORT_SOURCE_USER;
@@ -97,7 +98,9 @@
     }
 
     @Override
-    protected int doSelect(long timeout) throws IOException {
+    protected int doSelect(Consumer<SelectionKey> action, long timeout)
+        throws IOException
+    {
         assert Thread.holdsLock(this);
 
         long to = timeout;
@@ -129,7 +132,7 @@
             end(blocking);
         }
         processDeregisterQueue();
-        return processPortEvents(numEvents);
+        return processPortEvents(numEvents, action);
     }
 
     /**
@@ -166,19 +169,21 @@
     }
 
     /**
-     * Process the port events. This method updates the keys of file descriptors
-     * that were polled. It also re-queues the key so that the file descriptor
-     * is re-associated at the next select operation.
-     *
-     * @return the number of selection keys updated.
+     * Process the polled events and re-queue the selected keys so the file
+     * descriptors are re-associated at the next select operation.
      */
-    private int processPortEvents(int numEvents) throws IOException {
+    private int processPortEvents(int numEvents, Consumer<SelectionKey> action)
+        throws IOException
+    {
         assert Thread.holdsLock(this);
-        assert Thread.holdsLock(nioSelectedKeys());
 
         int numKeysUpdated = 0;
         boolean interrupted = false;
 
+        // Process the polled events while holding the update lock. This allows
+        // keys to be queued for ready file descriptors so they can be
+        // re-associated at the next select. The selected-key can be updated
+        // in this pass.
         synchronized (updateLock) {
             for (int i = 0; i < numEvents; i++) {
                 short source = getSource(i);
@@ -186,22 +191,15 @@
                     int fd = getDescriptor(i);
                     SelectionKeyImpl ski = fdToKey.get(fd);
                     if (ski != null) {
-                        int rOps = getEventOps(i);
-                        if (selectedKeys.contains(ski)) {
-                            if (ski.translateAndUpdateReadyOps(rOps)) {
-                                numKeysUpdated++;
-                            }
-                        } else {
-                            ski.translateAndSetReadyOps(rOps);
-                            if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
-                                selectedKeys.add(ski);
-                                numKeysUpdated++;
-                            }
-                        }
-
-                        // re-queue key so it re-associated at next select
                         ski.registeredEvents(0);
                         updateKeys.addLast(ski);
+
+                        // update selected-key set if no action specified
+                        if (action == null) {
+                            int rOps = getEventOps(i);
+                            numKeysUpdated += processReadyEvents(rOps, ski, null);
+                        }
+
                     }
                 } else if (source == PORT_SOURCE_USER) {
                     interrupted = true;
@@ -211,6 +209,22 @@
             }
         }
 
+        // if an action specified then iterate over the polled events again so
+        // that the action is performed without holding the update lock.
+        if (action != null) {
+            for (int i = 0; i < numEvents; i++) {
+                short source = getSource(i);
+                if (source == PORT_SOURCE_FD) {
+                    int fd = getDescriptor(i);
+                    SelectionKeyImpl ski = fdToKey.get(fd);
+                    if (ski != null) {
+                        int rOps = getEventOps(i);
+                        numKeysUpdated += processReadyEvents(rOps, ski, action);
+                    }
+                }
+            }
+        }
+
         if (interrupted) {
             clearInterrupt();
         }
diff --git a/src/java.base/unix/classes/sun/nio/ch/PollSelectorImpl.java b/src/java.base/unix/classes/sun/nio/ch/PollSelectorImpl.java
index cd2a4af..2326dbd 100644
--- a/src/java.base/unix/classes/sun/nio/ch/PollSelectorImpl.java
+++ b/src/java.base/unix/classes/sun/nio/ch/PollSelectorImpl.java
@@ -26,6 +26,7 @@
 
 import java.io.IOException;
 import java.nio.channels.ClosedSelectorException;
+import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.spi.SelectorProvider;
 import java.util.ArrayDeque;
@@ -33,6 +34,7 @@
 import java.util.Deque;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 
 import jdk.internal.misc.Unsafe;
 
@@ -92,7 +94,9 @@
     }
 
     @Override
-    protected int doSelect(long timeout) throws IOException {
+    protected int doSelect(Consumer<SelectionKey> action, long timeout)
+        throws IOException
+    {
         assert Thread.holdsLock(this);
 
         int to = (int) Math.min(timeout, Integer.MAX_VALUE); // max poll timeout
@@ -125,7 +129,7 @@
         }
 
         processDeregisterQueue();
-        return updateSelectedKeys();
+        return processEvents(action);
     }
 
     /**
@@ -157,13 +161,13 @@
     }
 
     /**
-     * Update the keys of file descriptors that were polled and add them to
-     * the selected-key set.
+     * Process the polled events.
      * If the interrupt fd has been selected, drain it and clear the interrupt.
      */
-    private int updateSelectedKeys() throws IOException {
+    private int processEvents(Consumer<SelectionKey> action)
+        throws IOException
+    {
         assert Thread.holdsLock(this);
-        assert Thread.holdsLock(nioSelectedKeys());
         assert pollArraySize > 0 && pollArraySize == pollKeys.size();
 
         int numKeysUpdated = 0;
@@ -173,17 +177,7 @@
                 SelectionKeyImpl ski = pollKeys.get(i);
                 assert ski.getFDVal() == getDescriptor(i);
                 if (ski.isValid()) {
-                    if (selectedKeys.contains(ski)) {
-                        if (ski.translateAndUpdateReadyOps(rOps)) {
-                            numKeysUpdated++;
-                        }
-                    } else {
-                        ski.translateAndSetReadyOps(rOps);
-                        if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
-                            selectedKeys.add(ski);
-                            numKeysUpdated++;
-                        }
-                    }
+                    numKeysUpdated += processReadyEvents(rOps, ski, action);
                 }
             }
         }
diff --git a/src/java.base/windows/classes/sun/nio/ch/WindowsSelectorImpl.java b/src/java.base/windows/classes/sun/nio/ch/WindowsSelectorImpl.java
index 7f2f1d6..230ceae 100644
--- a/src/java.base/windows/classes/sun/nio/ch/WindowsSelectorImpl.java
+++ b/src/java.base/windows/classes/sun/nio/ch/WindowsSelectorImpl.java
@@ -28,6 +28,7 @@
 import java.io.IOException;
 import java.nio.channels.ClosedSelectorException;
 import java.nio.channels.Pipe;
+import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.spi.SelectorProvider;
 import java.util.ArrayDeque;
@@ -36,6 +37,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Consumer;
 
 /**
  * A multi-threaded implementation of Selector for Windows.
@@ -139,7 +141,9 @@
     }
 
     @Override
-    protected int doSelect(long timeout) throws IOException {
+    protected int doSelect(Consumer<SelectionKey> action, long timeout)
+        throws IOException
+    {
         assert Thread.holdsLock(this);
         this.timeout = timeout; // set selector timeout
         processUpdateQueue();
@@ -173,7 +177,7 @@
         // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
         finishLock.checkForException();
         processDeregisterQueue();
-        int updated = updateSelectedKeys();
+        int updated = updateSelectedKeys(action);
         // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
         resetWakeupSocket();
         return updated;
@@ -349,16 +353,16 @@
         private native int poll0(long pollAddress, int numfds,
              int[] readFds, int[] writeFds, int[] exceptFds, long timeout);
 
-        private int processSelectedKeys(long updateCount) {
+        private int processSelectedKeys(long updateCount, Consumer<SelectionKey> action) {
             int numKeysUpdated = 0;
-            numKeysUpdated += processFDSet(updateCount, readFds,
+            numKeysUpdated += processFDSet(updateCount, action, readFds,
                                            Net.POLLIN,
                                            false);
-            numKeysUpdated += processFDSet(updateCount, writeFds,
+            numKeysUpdated += processFDSet(updateCount, action, writeFds,
                                            Net.POLLCONN |
                                            Net.POLLOUT,
                                            false);
-            numKeysUpdated += processFDSet(updateCount, exceptFds,
+            numKeysUpdated += processFDSet(updateCount, action, exceptFds,
                                            Net.POLLIN |
                                            Net.POLLCONN |
                                            Net.POLLOUT,
@@ -372,7 +376,9 @@
          *
          * me.updateCount <= updateCount
          */
-        private int processFDSet(long updateCount, int[] fds, int rOps,
+        private int processFDSet(long updateCount,
+                                 Consumer<SelectionKey> action,
+                                 int[] fds, int rOps,
                                  boolean isExceptFds)
         {
             int numKeysUpdated = 0;
@@ -401,20 +407,10 @@
                     continue;
                 }
 
-                if (selectedKeys.contains(sk)) { // Key in selected set
-                    if (sk.translateAndUpdateReadyOps(rOps)) {
-                        if (me.updateCount != updateCount) {
-                            me.updateCount = updateCount;
-                            numKeysUpdated++;
-                        }
-                    }
-                } else { // Key is not in selected set yet
-                    sk.translateAndSetReadyOps(rOps);
-                    if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
-                        selectedKeys.add(sk);
-                        me.updateCount = updateCount;
-                        numKeysUpdated++;
-                    }
+                int updated = processReadyEvents(rOps, sk, action);
+                if (updated > 0 && me.updateCount != updateCount) {
+                    me.updateCount = updateCount;
+                    numKeysUpdated++;
                 }
             }
             return numKeysUpdated;
@@ -509,12 +505,12 @@
 
     // Update ops of the corresponding Channels. Add the ready keys to the
     // ready queue.
-    private int updateSelectedKeys() {
+    private int updateSelectedKeys(Consumer<SelectionKey> action) {
         updateCount++;
         int numKeysUpdated = 0;
-        numKeysUpdated += subSelector.processSelectedKeys(updateCount);
+        numKeysUpdated += subSelector.processSelectedKeys(updateCount, action);
         for (SelectThread t: threads) {
-            numKeysUpdated += t.subSelector.processSelectedKeys(updateCount);
+            numKeysUpdated += t.subSelector.processSelectedKeys(updateCount, action);
         }
         return numKeysUpdated;
     }
diff --git a/test/jdk/java/nio/channels/Selector/SelectWithConsumer.java b/test/jdk/java/nio/channels/Selector/SelectWithConsumer.java
new file mode 100644
index 0000000..171f2e8
--- /dev/null
+++ b/test/jdk/java/nio/channels/Selector/SelectWithConsumer.java
@@ -0,0 +1,755 @@
+/*
+ * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+/* @test
+ * @bug 8199433
+ * @run testng SelectWithConsumer
+ * @summary Unit test for Selector select(Consumer), select(Consumer,long) and
+ *          selectNow(Consumer)
+ */
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedSelectorException;
+import java.nio.channels.Pipe;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import static java.util.concurrent.TimeUnit.*;
+
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.Test;
+import static org.testng.Assert.*;
+
+@Test
+public class SelectWithConsumer {
+
+    /**
+     * Invoke the select methods that take an action and check that the
+     * accumulated ready ops notified to the action matches the expected ops.
+     */
+    void testActionInvoked(SelectionKey key, int expectedOps) throws Exception {
+        var callerThread = Thread.currentThread();
+        var sel = key.selector();
+        var interestOps = key.interestOps();
+        var notifiedOps = new AtomicInteger();
+
+        // select(Consumer)
+        if (expectedOps == 0)
+            sel.wakeup(); // ensure select does not block
+        notifiedOps.set(0);
+        int n = sel.select(k -> {
+            assertTrue(Thread.currentThread() == callerThread);
+            assertTrue(k == key);
+            int readyOps = key.readyOps();
+            assertTrue((readyOps & interestOps) != 0);
+            assertTrue((readyOps & notifiedOps.get()) == 0);
+            notifiedOps.set(notifiedOps.get() | readyOps);
+        });
+        assertTrue((n == 1) ^ (expectedOps == 0));
+        assertTrue(notifiedOps.get() == expectedOps);
+
+        // select(Consumer, timeout)
+        notifiedOps.set(0);
+        n = sel.select(k -> {
+            assertTrue(Thread.currentThread() == callerThread);
+            assertTrue(k == key);
+            int readyOps = key.readyOps();
+            assertTrue((readyOps & interestOps) != 0);
+            assertTrue((readyOps & notifiedOps.get()) == 0);
+            notifiedOps.set(notifiedOps.get() | readyOps);
+        }, 1000);
+        assertTrue((n == 1) ^ (expectedOps == 0));
+        assertTrue(notifiedOps.get() == expectedOps);
+
+        // selectNow(Consumer)
+        notifiedOps.set(0);
+        n = sel.selectNow(k -> {
+            assertTrue(Thread.currentThread() == callerThread);
+            assertTrue(k == key);
+            int readyOps = key.readyOps();
+            assertTrue((readyOps & interestOps) != 0);
+            assertTrue((readyOps & notifiedOps.get()) == 0);
+            notifiedOps.set(notifiedOps.get() | readyOps);
+        });
+        assertTrue((n == 1) ^ (expectedOps == 0));
+        assertTrue(notifiedOps.get() == expectedOps);
+    }
+
+    /**
+     * Test that an action is performed when a channel is ready for reading.
+     */
+    public void testReadable() throws Exception {
+        Pipe p = Pipe.open();
+        try (Selector sel = Selector.open()) {
+            Pipe.SinkChannel sink = p.sink();
+            Pipe.SourceChannel source = p.source();
+            source.configureBlocking(false);
+            SelectionKey key = source.register(sel, SelectionKey.OP_READ);
+
+            // write to sink to ensure source is readable
+            scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS);
+
+            // test that action is invoked
+            testActionInvoked(key, SelectionKey.OP_READ);
+        } finally {
+            closePipe(p);
+        }
+    }
+
+    /**
+     * Test that an action is performed when a channel is ready for writing.
+     */
+    public void testWritable() throws Exception {
+        Pipe p = Pipe.open();
+        try (Selector sel = Selector.open()) {
+            Pipe.SourceChannel source = p.source();
+            Pipe.SinkChannel sink = p.sink();
+            sink.configureBlocking(false);
+            SelectionKey key = sink.register(sel, SelectionKey.OP_WRITE);
+
+            // test that action is invoked
+            testActionInvoked(key, SelectionKey.OP_WRITE);
+        } finally {
+            closePipe(p);
+        }
+    }
+
+    /**
+     * Test that an action is performed when a channel is ready for both
+     * reading and writing.
+     */
+    public void testReadableAndWriteable() throws Exception {
+        ServerSocketChannel ssc = null;
+        SocketChannel sc = null;
+        SocketChannel peer = null;
+        try (Selector sel = Selector.open()) {
+            ssc = ServerSocketChannel.open().bind(new InetSocketAddress(0));
+            sc = SocketChannel.open(ssc.getLocalAddress());
+            sc.configureBlocking(false);
+            SelectionKey key = sc.register(sel, (SelectionKey.OP_READ |
+                                                 SelectionKey.OP_WRITE));
+
+            // accept connection and write data so the source is readable
+            peer = ssc.accept();
+            peer.write(messageBuffer());
+
+            // test that action is invoked
+            testActionInvoked(key, (SelectionKey.OP_READ | SelectionKey.OP_WRITE));
+        } finally {
+            if (ssc != null) ssc.close();
+            if (sc != null) sc.close();
+            if (peer != null) peer.close();
+        }
+    }
+
+    /**
+     * Test that the action is called for two selected channels
+     */
+    public void testTwoChannels() throws Exception {
+        Pipe p = Pipe.open();
+        try (Selector sel = Selector.open()) {
+            Pipe.SourceChannel source = p.source();
+            Pipe.SinkChannel sink = p.sink();
+            source.configureBlocking(false);
+            sink.configureBlocking(false);
+            SelectionKey key1 = source.register(sel, SelectionKey.OP_READ);
+            SelectionKey key2 = sink.register(sel, SelectionKey.OP_WRITE);
+
+            // write to sink to ensure that the source is readable
+            sink.write(messageBuffer());
+
+            var counter = new AtomicInteger();
+
+            // select(Consumer)
+            counter.set(0);
+            int n = sel.select(k -> {
+                counter.incrementAndGet();
+                if (k == key1) {
+                    assertTrue(k.isReadable());
+                } else if (k == key2) {
+                    assertTrue(k.isWritable());
+                } else {
+                    assertTrue(false);
+                }
+            });
+            assertTrue(n == 2);
+            assertTrue(counter.get() == 2);
+
+            // select(Consumer, timeout)
+            counter.set(0);
+            n = sel.select(k -> {
+                counter.incrementAndGet();
+                if (k == key1) {
+                    assertTrue(k.isReadable());
+                } else if (k == key2) {
+                    assertTrue(k.isWritable());
+                } else {
+                    assertTrue(false);
+                }
+            }, 1000);
+            assertTrue(n == 2);
+            assertTrue(counter.get() == 2);
+
+            // selectNow(Consumer)
+            counter.set(0);
+            n = sel.selectNow(k -> {
+                counter.incrementAndGet();
+                if (k == key1) {
+                    assertTrue(k.isReadable());
+                } else if (k == key2) {
+                    assertTrue(k.isWritable());
+                } else {
+                    assertTrue(false);
+                }
+            });
+            assertTrue(n == 2);
+            assertTrue(counter.get() == 2);
+        } finally {
+            closePipe(p);
+        }
+    }
+
+    /**
+     * Test calling select twice, the action should be invoked each time
+     */
+    public void testRepeatedSelect1() throws Exception {
+        Pipe p = Pipe.open();
+        try (Selector sel = Selector.open()) {
+            Pipe.SourceChannel source = p.source();
+            Pipe.SinkChannel sink = p.sink();
+            source.configureBlocking(false);
+            SelectionKey key = source.register(sel, SelectionKey.OP_READ);
+
+            // write to sink to ensure that the source is readable
+            sink.write(messageBuffer());
+
+            // test that action is invoked
+            testActionInvoked(key, SelectionKey.OP_READ);
+            testActionInvoked(key, SelectionKey.OP_READ);
+
+        } finally {
+            closePipe(p);
+        }
+    }
+
+    /**
+     * Test calling select twice. An I/O operation is performed after the
+     * first select so the channel will not be selected by the second select.
+     */
+    public void testRepeatedSelect2() throws Exception {
+        Pipe p = Pipe.open();
+        try (Selector sel = Selector.open()) {
+            Pipe.SourceChannel source = p.source();
+            Pipe.SinkChannel sink = p.sink();
+            source.configureBlocking(false);
+            SelectionKey key = source.register(sel, SelectionKey.OP_READ);
+
+            // write to sink to ensure that the source is readable
+            sink.write(messageBuffer());
+
+            // test that action is invoked
+            testActionInvoked(key, SelectionKey.OP_READ);
+
+            // read all bytes
+            int n;
+            ByteBuffer bb = ByteBuffer.allocate(100);
+            do {
+                n = source.read(bb);
+                bb.clear();
+            } while (n > 0);
+
+            // test that action is not invoked
+            testActionInvoked(key, 0);
+        } finally {
+            closePipe(p);
+        }
+    }
+
+    /**
+     * Test timeout
+     */
+    public void testTimeout() throws Exception {
+        Pipe p = Pipe.open();
+        try (Selector sel = Selector.open()) {
+            Pipe.SourceChannel source = p.source();
+            Pipe.SinkChannel sink = p.sink();
+            source.configureBlocking(false);
+            source.register(sel, SelectionKey.OP_READ);
+            long start = System.currentTimeMillis();
+            int n = sel.select(k -> assertTrue(false), 1000L);
+            long duration = System.currentTimeMillis() - start;
+            assertTrue(n == 0);
+            assertTrue(duration > 500, "select took " + duration + " ms");
+        } finally {
+            closePipe(p);
+        }
+    }
+
+    /**
+     * Test wakeup prior to select
+     */
+    public void testWakeupBeforeSelect() throws Exception {
+        // select(Consumer)
+        try (Selector sel = Selector.open()) {
+            sel.wakeup();
+            int n = sel.select(k -> assertTrue(false));
+            assertTrue(n == 0);
+        }
+
+        // select(Consumer, timeout)
+        try (Selector sel = Selector.open()) {
+            sel.wakeup();
+            long start = System.currentTimeMillis();
+            int n = sel.select(k -> assertTrue(false), 60*1000);
+            long duration = System.currentTimeMillis() - start;
+            assertTrue(n == 0);
+            assertTrue(duration < 5000, "select took " + duration + " ms");
+        }
+    }
+
+    /**
+     * Test wakeup during select
+     */
+    public void testWakeupDuringSelect() throws Exception {
+        // select(Consumer)
+        try (Selector sel = Selector.open()) {
+            scheduleWakeup(sel, 1, SECONDS);
+            int n = sel.select(k -> assertTrue(false));
+            assertTrue(n == 0);
+        }
+
+        // select(Consumer, timeout)
+        try (Selector sel = Selector.open()) {
+            scheduleWakeup(sel, 1, SECONDS);
+            long start = System.currentTimeMillis();
+            int n = sel.select(k -> assertTrue(false), 60*1000);
+            long duration = System.currentTimeMillis() - start;
+            assertTrue(n == 0);
+            assertTrue(duration > 500 && duration < 10*1000,
+                    "select took " + duration + " ms");
+        }
+    }
+
+    /**
+     * Test invoking select with interrupt status set
+     */
+    public void testInterruptBeforeSelect() throws Exception {
+        // select(Consumer)
+        try (Selector sel = Selector.open()) {
+            Thread.currentThread().interrupt();
+            int n = sel.select(k -> assertTrue(false));
+            assertTrue(n == 0);
+            assertTrue(Thread.currentThread().isInterrupted());
+            assertTrue(sel.isOpen());
+        } finally {
+            Thread.currentThread().interrupted();  // clear interrupt status
+        }
+
+        // select(Consumer, timeout)
+        try (Selector sel = Selector.open()) {
+            Thread.currentThread().interrupt();
+            long start = System.currentTimeMillis();
+            int n = sel.select(k -> assertTrue(false), 60*1000);
+            long duration = System.currentTimeMillis() - start;
+            assertTrue(n == 0);
+            assertTrue(duration < 5000, "select took " + duration + " ms");
+            assertTrue(Thread.currentThread().isInterrupted());
+            assertTrue(sel.isOpen());
+        } finally {
+            Thread.currentThread().interrupted();  // clear interrupt status
+        }
+    }
+
+    /**
+     * Test interrupt thread during select
+     */
+    public void testInterruptDuringSelect() throws Exception {
+        // select(Consumer)
+        try (Selector sel = Selector.open()) {
+            scheduleInterrupt(Thread.currentThread(), 1, SECONDS);
+            int n = sel.select(k -> assertTrue(false));
+            assertTrue(n == 0);
+            assertTrue(Thread.currentThread().isInterrupted());
+            assertTrue(sel.isOpen());
+        } finally {
+            Thread.currentThread().interrupted();  // clear interrupt status
+        }
+
+        // select(Consumer, timeout)
+        try (Selector sel = Selector.open()) {
+            scheduleInterrupt(Thread.currentThread(), 1, SECONDS);
+            long start = System.currentTimeMillis();
+            int n = sel.select(k -> assertTrue(false), 60*1000);
+            long duration = System.currentTimeMillis() - start;
+            assertTrue(n == 0);
+            assertTrue(duration > 500 && duration < 5000,
+                    "select took " + duration + " ms");
+            assertTrue(Thread.currentThread().isInterrupted());
+            assertTrue(sel.isOpen());
+        } finally {
+            Thread.currentThread().interrupted();  // clear interrupt status
+        }
+    }
+
+    /**
+     * Test invoking select on a closed selector
+     */
+    @Test(expectedExceptions = ClosedSelectorException.class)
+    public void testClosedSelector1() throws Exception {
+        Selector sel = Selector.open();
+        sel.close();
+        sel.select(k -> assertTrue(false));
+    }
+    @Test(expectedExceptions = ClosedSelectorException.class)
+    public void testClosedSelector2() throws Exception {
+        Selector sel = Selector.open();
+        sel.close();
+        sel.select(k -> assertTrue(false), 1000);
+    }
+    @Test(expectedExceptions = ClosedSelectorException.class)
+    public void testClosedSelector3() throws Exception {
+        Selector sel = Selector.open();
+        sel.close();
+        sel.selectNow(k -> assertTrue(false));
+    }
+
+    /**
+     * Test closing selector while in a selection operation
+     */
+    public void testCloseDuringSelect() throws Exception {
+        // select(Consumer)
+        try (Selector sel = Selector.open()) {
+            scheduleClose(sel, 3, SECONDS);
+            int n = sel.select(k -> assertTrue(false));
+            assertTrue(n == 0);
+            assertFalse(sel.isOpen());
+        }
+
+        // select(Consumer, timeout)
+        try (Selector sel = Selector.open()) {
+            scheduleClose(sel, 3, SECONDS);
+            long start = System.currentTimeMillis();
+            int n = sel.select(k -> assertTrue(false), 60*1000);
+            long duration = System.currentTimeMillis() - start;
+            assertTrue(n == 0);
+            assertTrue(duration > 2000 && duration < 10*1000,
+                    "select took " + duration + " ms");
+            assertFalse(sel.isOpen());
+        }
+    }
+
+    /**
+     * Test action closing selector
+     */
+    @Test(expectedExceptions = ClosedSelectorException.class)
+    public void testActionClosingSelector() throws Exception {
+        Pipe p = Pipe.open();
+        try (Selector sel = Selector.open()) {
+            Pipe.SourceChannel source = p.source();
+            Pipe.SinkChannel sink = p.sink();
+            source.configureBlocking(false);
+            SelectionKey key = source.register(sel, SelectionKey.OP_READ);
+
+            // write to sink to ensure that the source is readable
+            sink.write(messageBuffer());
+
+            // should relay ClosedSelectorException
+            sel.select(k -> {
+                assertTrue(k == key);
+                try {
+                    sel.close();
+                } catch (IOException ioe) { }
+            });
+        } finally {
+            closePipe(p);
+        }
+    }
+
+    /**
+     * Test that the action is invoked while synchronized on the selector and
+     * its selected-key set.
+     */
+    public void testLocks() throws Exception {
+        Pipe p = Pipe.open();
+        try (Selector sel = Selector.open()) {
+            Pipe.SourceChannel source = p.source();
+            Pipe.SinkChannel sink = p.sink();
+            source.configureBlocking(false);
+            SelectionKey key = source.register(sel, SelectionKey.OP_READ);
+
+            // write to sink to ensure that the source is readable
+            sink.write(messageBuffer());
+
+            // select(Consumer)
+            sel.select(k -> {
+                assertTrue(k == key);
+                assertTrue(Thread.holdsLock(sel));
+                assertFalse(Thread.holdsLock(sel.keys()));
+                assertTrue(Thread.holdsLock(sel.selectedKeys()));
+            });
+
+            // select(Consumer, timeout)
+            sel.select(k -> {
+                assertTrue(k == key);
+                assertTrue(Thread.holdsLock(sel));
+                assertFalse(Thread.holdsLock(sel.keys()));
+                assertTrue(Thread.holdsLock(sel.selectedKeys()));
+            }, 1000L);
+
+            // selectNow(Consumer)
+            sel.selectNow(k -> {
+                assertTrue(k == key);
+                assertTrue(Thread.holdsLock(sel));
+                assertFalse(Thread.holdsLock(sel.keys()));
+                assertTrue(Thread.holdsLock(sel.selectedKeys()));
+            });
+        } finally {
+            closePipe(p);
+        }
+    }
+
+    /**
+     * Test that selection operations remove cancelled keys from the selector's
+     * key and selected-key sets.
+     */
+    public void testCancel() throws Exception {
+        Pipe p = Pipe.open();
+        try (Selector sel = Selector.open()) {
+            Pipe.SinkChannel sink = p.sink();
+            Pipe.SourceChannel source = p.source();
+
+            // write to sink to ensure that the source is readable
+            sink.write(messageBuffer());
+
+            sink.configureBlocking(false);
+            source.configureBlocking(false);
+            SelectionKey key1 = sink.register(sel, SelectionKey.OP_WRITE);
+            SelectionKey key2 = source.register(sel, SelectionKey.OP_READ);
+
+            sel.selectNow();
+            assertTrue(sel.keys().contains(key1));
+            assertTrue(sel.keys().contains(key2));
+            assertTrue(sel.selectedKeys().contains(key1));
+            assertTrue(sel.selectedKeys().contains(key2));
+
+            // cancel key1
+            key1.cancel();
+            int n = sel.selectNow(k -> assertTrue(k == key2));
+            assertTrue(n == 1);
+            assertFalse(sel.keys().contains(key1));
+            assertTrue(sel.keys().contains(key2));
+            assertFalse(sel.selectedKeys().contains(key1));
+            assertTrue(sel.selectedKeys().contains(key2));
+
+            // cancel key2
+            key2.cancel();
+            n = sel.selectNow(k -> assertTrue(false));
+            assertTrue(n == 0);
+            assertFalse(sel.keys().contains(key1));
+            assertFalse(sel.keys().contains(key2));
+            assertFalse(sel.selectedKeys().contains(key1));
+            assertFalse(sel.selectedKeys().contains(key2));
+        } finally {
+            closePipe(p);
+        }
+    }
+
+    /**
+     * Test an action invoking select()
+     */
+    public void testReentrantSelect1() throws Exception {
+        Pipe p = Pipe.open();
+        try (Selector sel = Selector.open()) {
+            Pipe.SinkChannel sink = p.sink();
+            Pipe.SourceChannel source = p.source();
+            source.configureBlocking(false);
+            source.register(sel, SelectionKey.OP_READ);
+
+            // write to sink to ensure that the source is readable
+            scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS);
+
+            int n = sel.select(k -> {
+                try {
+                    sel.select();
+                    assertTrue(false);
+                } catch (IOException ioe) {
+                    throw new RuntimeException(ioe);
+                } catch (IllegalStateException expected) {
+                }
+            });
+            assertTrue(n == 1);
+        } finally {
+            closePipe(p);
+        }
+    }
+
+    /**
+     * Test an action invoking selectNow()
+     */
+    public void testReentrantSelect2() throws Exception {
+        Pipe p = Pipe.open();
+        try (Selector sel = Selector.open()) {
+            Pipe.SinkChannel sink = p.sink();
+            Pipe.SourceChannel source = p.source();
+
+            // write to sink to ensure that the source is readable
+            scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS);
+
+            source.configureBlocking(false);
+            source.register(sel, SelectionKey.OP_READ);
+            int n = sel.select(k -> {
+                try {
+                    sel.selectNow();
+                    assertTrue(false);
+                } catch (IOException ioe) {
+                    throw new RuntimeException(ioe);
+                } catch (IllegalStateException expected) {
+                }
+            });
+            assertTrue(n == 1);
+        } finally {
+            closePipe(p);
+        }
+    }
+
+    /**
+     * Test an action invoking select(Consumer)
+     */
+    public void testReentrantSelect3() throws Exception {
+        Pipe p = Pipe.open();
+        try (Selector sel = Selector.open()) {
+            Pipe.SinkChannel sink = p.sink();
+            Pipe.SourceChannel source = p.source();
+
+            // write to sink to ensure that the source is readable
+            scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS);
+
+            source.configureBlocking(false);
+            source.register(sel, SelectionKey.OP_READ);
+            int n = sel.select(k -> {
+                try {
+                    sel.select(x -> assertTrue(false));
+                    assertTrue(false);
+                } catch (IOException ioe) {
+                    throw new RuntimeException(ioe);
+                } catch (IllegalStateException expected) {
+                }
+            });
+            assertTrue(n == 1);
+        } finally {
+            closePipe(p);
+        }
+    }
+
+    /**
+     * Negative timeout
+     */
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testNegativeTimeout() throws Exception {
+        try (Selector sel = Selector.open()) {
+            sel.select(k -> { }, -1L);
+        }
+    }
+
+    /**
+     * Null action
+     */
+    @Test(expectedExceptions = NullPointerException.class)
+    public void testNull1() throws Exception {
+        try (Selector sel = Selector.open()) {
+            sel.select(null);
+        }
+    }
+    @Test(expectedExceptions = NullPointerException.class)
+    public void testNull2() throws Exception {
+        try (Selector sel = Selector.open()) {
+            sel.select(null, 1000);
+        }
+    }
+    @Test(expectedExceptions = NullPointerException.class)
+    public void testNull3() throws Exception {
+        try (Selector sel = Selector.open()) {
+            sel.selectNow(null);
+        }
+    }
+
+
+    // -- support methods ---
+
+    private final ScheduledExecutorService POOL = Executors.newScheduledThreadPool(1);
+
+    @AfterTest
+    void shutdownThreadPool() {
+        POOL.shutdown();
+    }
+
+    void scheduleWakeup(Selector sel, long delay, TimeUnit unit) {
+        POOL.schedule(() -> sel.wakeup(), delay, unit);
+    }
+
+    void scheduleInterrupt(Thread t, long delay, TimeUnit unit) {
+        POOL.schedule(() -> t.interrupt(), delay, unit);
+    }
+
+    void scheduleClose(Closeable c, long delay, TimeUnit unit) {
+        POOL.schedule(() -> {
+            try {
+                c.close();
+            } catch (IOException ioe) {
+                ioe.printStackTrace();
+            }
+        }, delay, unit);
+    }
+
+    void scheduleWrite(WritableByteChannel sink, ByteBuffer buf, long delay, TimeUnit unit) {
+        POOL.schedule(() -> {
+            try {
+                sink.write(buf);
+            } catch (IOException ioe) {
+                ioe.printStackTrace();
+            }
+        }, delay, unit);
+    }
+
+    static void closePipe(Pipe p) {
+        try { p.sink().close(); } catch (IOException ignore) { }
+        try { p.source().close(); } catch (IOException ignore) { }
+    }
+
+    static ByteBuffer messageBuffer() {
+        try {
+            return ByteBuffer.wrap("message".getBytes("UTF-8"));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}