| /* |
| * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. |
| * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
| * |
| * This code is free software; you can redistribute it and/or modify it |
| * under the terms of the GNU General Public License version 2 only, as |
| * published by the Free Software Foundation. Oracle designates this |
| * particular file as subject to the "Classpath" exception as provided |
| * by Oracle in the LICENSE file that accompanied this code. |
| * |
| * This code is distributed in the hope that it will be useful, but WITHOUT |
| * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
| * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
| * version 2 for more details (a copy is included in the LICENSE file that |
| * accompanied this code). |
| * |
| * You should have received a copy of the GNU General Public License version |
| * 2 along with this work; if not, write to the Free Software Foundation, |
| * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
| * |
| * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA |
| * or visit www.oracle.com if you need additional information or have any |
| * questions. |
| */ |
| |
| package jdk.incubator.http; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.concurrent.Semaphore; |
| import java.util.function.Consumer; |
| import java.util.function.Supplier; |
| |
| import static javax.net.ssl.SSLEngineResult.Status.*; |
| import javax.net.ssl.*; |
| |
| import jdk.incubator.http.internal.common.AsyncWriteQueue; |
| import jdk.incubator.http.internal.common.ByteBufferPool; |
| import jdk.incubator.http.internal.common.ByteBufferReference; |
| import jdk.incubator.http.internal.common.Log; |
| import jdk.incubator.http.internal.common.Queue; |
| import jdk.incubator.http.internal.common.Utils; |
| import static javax.net.ssl.SSLEngineResult.HandshakeStatus.*; |
| import jdk.incubator.http.internal.common.ExceptionallyCloseable; |
| |
| /** |
| * Asynchronous wrapper around SSLEngine. send and receive is fully non |
| * blocking. When handshaking is required, a thread is created to perform |
| * the handshake and application level sends do not take place during this time. |
| * |
| * Is implemented using queues and functions operating on the receiving end |
| * of each queue. |
| * |
| * Application writes to: |
| * || |
| * \/ |
| * appOutputQ |
| * || |
| * \/ |
| * appOutputQ read by "upperWrite" method which does SSLEngine.wrap |
| * and does async write to PlainHttpConnection |
| * |
| * Reading side is as follows |
| * -------------------------- |
| * |
| * "upperRead" method reads off channelInputQ and calls SSLEngine.unwrap and |
| * when decrypted data is returned, it is passed to the user's Consumer<ByteBuffer> |
| * /\ |
| * || |
| * channelInputQ |
| * /\ |
| * || |
| * "lowerRead" method puts buffers into channelInputQ. It is invoked from |
| * OP_READ events from the selector. |
| * |
| * Whenever handshaking is required, the doHandshaking() method is called |
| * which creates a thread to complete the handshake. It takes over the |
| * channelInputQ from upperRead, and puts outgoing packets on channelOutputQ. |
| * Selector events are delivered to lowerRead and lowerWrite as normal. |
| * |
| * Errors |
| * |
| * Any exception thrown by the engine or channel, causes all Queues to be closed |
| * the channel to be closed, and the error is reported to the user's |
| * Consumer<Throwable> |
| */ |
| class AsyncSSLDelegate implements ExceptionallyCloseable, AsyncConnection { |
| |
| // outgoing buffers put in this queue first and may remain here |
| // while SSL handshaking happening. |
| final AsyncWriteQueue appOutputQ = new AsyncWriteQueue(this::upperWrite); |
| |
| // queue of wrapped ByteBuffers waiting to be sent on socket channel |
| //final Queue<ByteBuffer> channelOutputQ; |
| |
| // Bytes read into this queue before being unwrapped. Backup on this |
| // Q should only happen when the engine is stalled due to delegated tasks |
| final Queue<ByteBufferReference> channelInputQ; |
| |
| // input occurs through the read() method which is expected to be called |
| // when the selector signals some data is waiting to be read. All incoming |
| // handshake data is handled in this method, which means some calls to |
| // read() may return zero bytes of user data. This is not a sign of spinning, |
| // just that the handshake mechanics are being executed. |
| |
| final SSLEngine engine; |
| final SSLParameters sslParameters; |
| //final SocketChannel chan; |
| final HttpConnection lowerOutput; |
| final HttpClientImpl client; |
| // should be volatile to provide proper synchronization(visibility) action |
| volatile Consumer<ByteBufferReference> asyncReceiver; |
| volatile Consumer<Throwable> errorHandler; |
| |
| // Locks. |
| final Object reader = new Object(); |
| // synchronizing handshake state |
| final Semaphore handshaker = new Semaphore(1); |
| // flag set when frame or writer is blocked waiting for handshake to finish |
| //boolean writerBlocked; |
| //boolean readerBlocked; |
| |
| // alpn[] may be null. upcall is callback which receives incoming decoded bytes off socket |
| |
| AsyncSSLDelegate(HttpConnection lowerOutput, HttpClientImpl client, String[] alpn) |
| { |
| SSLContext context = client.sslContext(); |
| //channelOutputQ = new Queue<>(); |
| //channelOutputQ.registerPutCallback(this::lowerWrite); |
| engine = context.createSSLEngine(); |
| engine.setUseClientMode(true); |
| SSLParameters sslp = client.sslParameters() |
| .orElseGet(context::getSupportedSSLParameters); |
| sslParameters = Utils.copySSLParameters(sslp); |
| if (alpn != null) { |
| sslParameters.setApplicationProtocols(alpn); |
| } |
| logParams(sslParameters); |
| engine.setSSLParameters(sslParameters); |
| this.lowerOutput = lowerOutput; |
| this.client = client; |
| this.channelInputQ = new Queue<>(); |
| this.channelInputQ.registerPutCallback(this::upperRead); |
| } |
| |
| @Override |
| public void writeAsync(ByteBufferReference[] src) throws IOException { |
| appOutputQ.put(src); |
| } |
| |
| @Override |
| public void writeAsyncUnordered(ByteBufferReference[] buffers) throws IOException { |
| appOutputQ.putFirst(buffers); |
| } |
| |
| @Override |
| public void flushAsync() throws IOException { |
| if (appOutputQ.flush()) { |
| lowerOutput.flushAsync(); |
| } |
| } |
| |
| @Override |
| public void closeExceptionally(Throwable t) { |
| Utils.close(t, appOutputQ, channelInputQ, lowerOutput); |
| } |
| |
| @Override |
| public void close() { |
| Utils.close(appOutputQ, channelInputQ, lowerOutput); |
| } |
| |
| // The code below can be uncommented to shake out |
| // the implementation by inserting random delays and trigger |
| // handshake in the SelectorManager thread (upperRead) |
| // static final java.util.Random random = |
| // new java.util.Random(System.currentTimeMillis()); |
| |
| /** |
| * Attempts to wrap buffers from appOutputQ and place them on the |
| * channelOutputQ for writing. If handshaking is happening, then the |
| * process stalls and last buffers taken off the appOutputQ are put back |
| * into it until handshaking completes. |
| * |
| * This same method is called to try and resume output after a blocking |
| * handshaking operation has completed. |
| */ |
| private void upperWrite(ByteBufferReference[] refs, AsyncWriteQueue delayCallback) { |
| // currently delayCallback is not used. Use it when it's needed to execute handshake in another thread. |
| try { |
| ByteBuffer[] buffers = ByteBufferReference.toBuffers(refs); |
| int bytes = Utils.remaining(buffers); |
| while (bytes > 0) { |
| EngineResult r = wrapBuffers(buffers); |
| int bytesProduced = r.bytesProduced(); |
| int bytesConsumed = r.bytesConsumed(); |
| bytes -= bytesConsumed; |
| if (bytesProduced > 0) { |
| lowerOutput.writeAsync(new ByteBufferReference[]{r.destBuffer}); |
| } |
| |
| // The code below can be uncommented to shake out |
| // the implementation by inserting random delays and trigger |
| // handshake in the SelectorManager thread (upperRead) |
| |
| // int sleep = random.nextInt(100); |
| // if (sleep > 20) { |
| // Thread.sleep(sleep); |
| // } |
| |
| // handshaking is happening or is needed |
| if (r.handshaking()) { |
| Log.logTrace("Write: needs handshake"); |
| doHandshakeNow("Write"); |
| } |
| } |
| ByteBufferReference.clear(refs); |
| } catch (Throwable t) { |
| closeExceptionally(t); |
| errorHandler.accept(t); |
| } |
| } |
| |
| private void startHandshake(String tag) { |
| Runnable run = () -> { |
| try { |
| doHandshakeNow(tag); |
| } catch (Throwable t) { |
| Log.logTrace("{0}: handshake failed: {1}", tag, t); |
| closeExceptionally(t); |
| errorHandler.accept(t); |
| } |
| }; |
| client.executor().execute(run); |
| } |
| |
| private void doHandshakeNow(String tag) |
| throws IOException, InterruptedException |
| { |
| handshaker.acquire(); |
| try { |
| channelInputQ.registerPutCallback(null); |
| lowerOutput.flushAsync(); |
| Log.logTrace("{0}: Starting handshake...", tag); |
| doHandshakeImpl(); |
| Log.logTrace("{0}: Handshake completed", tag); |
| channelInputQ.registerPutCallback(this::upperRead); |
| } finally { |
| handshaker.release(); |
| } |
| } |
| |
| /** |
| * Executes entire handshake in calling thread. |
| * Returns after handshake is completed or error occurs |
| */ |
| private void doHandshakeImpl() throws IOException { |
| while (true) { |
| SSLEngineResult.HandshakeStatus status = engine.getHandshakeStatus(); |
| switch(status) { |
| case NEED_TASK: { |
| List<Runnable> tasks = obtainTasks(); |
| for (Runnable task : tasks) { |
| task.run(); |
| } |
| } break; |
| case NEED_WRAP: |
| handshakeWrapAndSend(); |
| break; |
| case NEED_UNWRAP: case NEED_UNWRAP_AGAIN: |
| handshakeReceiveAndUnWrap(); |
| break; |
| case FINISHED: case NOT_HANDSHAKING: |
| return; |
| default: |
| throw new InternalError("Unexpected Handshake Status: " |
| + status); |
| } |
| } |
| } |
| |
| // acknowledge a received CLOSE request from peer |
| void doClosure() throws IOException { |
| //while (!wrapAndSend(emptyArray)) |
| //; |
| } |
| |
| List<Runnable> obtainTasks() { |
| List<Runnable> l = new ArrayList<>(); |
| Runnable r; |
| while ((r = engine.getDelegatedTask()) != null) { |
| l.add(r); |
| } |
| return l; |
| } |
| |
| @Override |
| public void setAsyncCallbacks(Consumer<ByteBufferReference> asyncReceiver, |
| Consumer<Throwable> errorReceiver, |
| Supplier<ByteBufferReference> readBufferSupplier) { |
| this.asyncReceiver = asyncReceiver; |
| this.errorHandler = errorReceiver; |
| // readBufferSupplier is not used, |
| // because of AsyncSSLDelegate has its own appBufferPool |
| } |
| |
| @Override |
| public void startReading() { |
| // maybe this class does not need to implement AsyncConnection |
| } |
| |
| static class EngineResult { |
| final SSLEngineResult result; |
| final ByteBufferReference destBuffer; |
| |
| |
| // normal result |
| EngineResult(SSLEngineResult result) { |
| this(result, null); |
| } |
| |
| EngineResult(SSLEngineResult result, ByteBufferReference destBuffer) { |
| this.result = result; |
| this.destBuffer = destBuffer; |
| } |
| |
| boolean handshaking() { |
| SSLEngineResult.HandshakeStatus s = result.getHandshakeStatus(); |
| return s != FINISHED && s != NOT_HANDSHAKING; |
| } |
| |
| int bytesConsumed() { |
| return result.bytesConsumed(); |
| } |
| |
| int bytesProduced() { |
| return result.bytesProduced(); |
| } |
| |
| SSLEngineResult.HandshakeStatus handshakeStatus() { |
| return result.getHandshakeStatus(); |
| } |
| |
| SSLEngineResult.Status status() { |
| return result.getStatus(); |
| } |
| } |
| |
| EngineResult handshakeWrapAndSend() throws IOException { |
| EngineResult r = wrapBuffer(Utils.EMPTY_BYTEBUFFER); |
| if (r.bytesProduced() > 0) { |
| lowerOutput.writeAsync(new ByteBufferReference[]{r.destBuffer}); |
| lowerOutput.flushAsync(); |
| } |
| return r; |
| } |
| |
| // called during handshaking. It blocks until a complete packet |
| // is available, unwraps it and returns. |
| void handshakeReceiveAndUnWrap() throws IOException { |
| ByteBufferReference ref = channelInputQ.take(); |
| while (true) { |
| // block waiting for input |
| EngineResult r = unwrapBuffer(ref.get()); |
| SSLEngineResult.Status status = r.status(); |
| if (status == BUFFER_UNDERFLOW) { |
| // wait for another buffer to arrive |
| ByteBufferReference ref1 = channelInputQ.take(); |
| ref = combine (ref, ref1); |
| continue; |
| } |
| // OK |
| // theoretically possible we could receive some user data |
| if (r.bytesProduced() > 0) { |
| asyncReceiver.accept(r.destBuffer); |
| } else { |
| r.destBuffer.clear(); |
| } |
| // it is also possible that a delegated task could be needed |
| // even though they are handled in the calling function |
| if (r.handshakeStatus() == NEED_TASK) { |
| obtainTasks().stream().forEach((task) -> task.run()); |
| } |
| |
| if (!ref.get().hasRemaining()) { |
| ref.clear(); |
| return; |
| } |
| } |
| } |
| |
| EngineResult wrapBuffer(ByteBuffer src) throws SSLException { |
| ByteBuffer[] bufs = new ByteBuffer[1]; |
| bufs[0] = src; |
| return wrapBuffers(bufs); |
| } |
| |
| private final ByteBufferPool netBufferPool = new ByteBufferPool(); |
| private final ByteBufferPool appBufferPool = new ByteBufferPool(); |
| |
| /** |
| * provides buffer of sslEngine@getPacketBufferSize(). |
| * used for encrypted buffers after wrap or before unwrap. |
| * @return ByteBufferReference |
| */ |
| public ByteBufferReference getNetBuffer() { |
| return netBufferPool.get(engine.getSession().getPacketBufferSize()); |
| } |
| |
| /** |
| * provides buffer of sslEngine@getApplicationBufferSize(). |
| * @return ByteBufferReference |
| */ |
| private ByteBufferReference getAppBuffer() { |
| return appBufferPool.get(engine.getSession().getApplicationBufferSize()); |
| } |
| |
| EngineResult wrapBuffers(ByteBuffer[] src) throws SSLException { |
| ByteBufferReference dst = getNetBuffer(); |
| while (true) { |
| SSLEngineResult sslResult = engine.wrap(src, dst.get()); |
| switch (sslResult.getStatus()) { |
| case BUFFER_OVERFLOW: |
| // Shouldn't happen. We allocated buffer with packet size |
| // get it again if net buffer size was changed |
| dst = getNetBuffer(); |
| break; |
| case CLOSED: |
| case OK: |
| dst.get().flip(); |
| return new EngineResult(sslResult, dst); |
| case BUFFER_UNDERFLOW: |
| // Shouldn't happen. Doesn't returns when wrap() |
| // underflow handled externally |
| return new EngineResult(sslResult); |
| default: |
| assert false; |
| } |
| } |
| } |
| |
| EngineResult unwrapBuffer(ByteBuffer srcbuf) throws IOException { |
| ByteBufferReference dst = getAppBuffer(); |
| while (true) { |
| SSLEngineResult sslResult = engine.unwrap(srcbuf, dst.get()); |
| switch (sslResult.getStatus()) { |
| case BUFFER_OVERFLOW: |
| // may happen only if app size buffer was changed. |
| // get it again if app buffer size changed |
| dst = getAppBuffer(); |
| break; |
| case CLOSED: |
| doClosure(); |
| throw new IOException("Engine closed"); |
| case BUFFER_UNDERFLOW: |
| dst.clear(); |
| return new EngineResult(sslResult); |
| case OK: |
| dst.get().flip(); |
| return new EngineResult(sslResult, dst); |
| } |
| } |
| } |
| |
| /** |
| * Asynchronous read input. Call this when selector fires. |
| * Unwrap done in upperRead because it also happens in |
| * doHandshake() when handshake taking place |
| */ |
| public void asyncReceive(ByteBufferReference buffer) { |
| try { |
| channelInputQ.put(buffer); |
| } catch (Throwable t) { |
| closeExceptionally(t); |
| errorHandler.accept(t); |
| } |
| } |
| |
| private ByteBufferReference pollInput() throws IOException { |
| return channelInputQ.poll(); |
| } |
| |
| private ByteBufferReference pollInput(ByteBufferReference next) throws IOException { |
| return next == null ? channelInputQ.poll() : next; |
| } |
| |
| public void upperRead() { |
| ByteBufferReference src; |
| ByteBufferReference next = null; |
| synchronized (reader) { |
| try { |
| src = pollInput(); |
| if (src == null) { |
| return; |
| } |
| while (true) { |
| EngineResult r = unwrapBuffer(src.get()); |
| switch (r.result.getStatus()) { |
| case BUFFER_UNDERFLOW: |
| // Buffer too small. Need to combine with next buf |
| next = pollInput(next); |
| if (next == null) { |
| // no data available. |
| // push buffer back until more data available |
| channelInputQ.pushback(src); |
| return; |
| } else { |
| src = shift(src, next); |
| if (!next.get().hasRemaining()) { |
| next.clear(); |
| next = null; |
| } |
| } |
| break; |
| case OK: |
| // check for any handshaking work |
| if (r.handshaking()) { |
| // handshaking is happening or is needed |
| // so we put the buffer back on Q to process again |
| // later. |
| Log.logTrace("Read: needs handshake"); |
| channelInputQ.pushback(src); |
| startHandshake("Read"); |
| return; |
| } |
| asyncReceiver.accept(r.destBuffer); |
| } |
| if (src.get().hasRemaining()) { |
| continue; |
| } |
| src.clear(); |
| src = pollInput(next); |
| next = null; |
| if (src == null) { |
| return; |
| } |
| } |
| } catch (Throwable t) { |
| closeExceptionally(t); |
| errorHandler.accept(t); |
| } |
| } |
| } |
| |
| ByteBufferReference shift(ByteBufferReference ref1, ByteBufferReference ref2) { |
| ByteBuffer buf1 = ref1.get(); |
| if (buf1.capacity() < engine.getSession().getPacketBufferSize()) { |
| ByteBufferReference newRef = getNetBuffer(); |
| ByteBuffer newBuf = newRef.get(); |
| newBuf.put(buf1); |
| buf1 = newBuf; |
| ref1.clear(); |
| ref1 = newRef; |
| } else { |
| buf1.compact(); |
| } |
| ByteBuffer buf2 = ref2.get(); |
| Utils.copy(buf2, buf1, Math.min(buf1.remaining(), buf2.remaining())); |
| buf1.flip(); |
| return ref1; |
| } |
| |
| |
| ByteBufferReference combine(ByteBufferReference ref1, ByteBufferReference ref2) { |
| ByteBuffer buf1 = ref1.get(); |
| ByteBuffer buf2 = ref2.get(); |
| int avail1 = buf1.capacity() - buf1.remaining(); |
| if (buf2.remaining() < avail1) { |
| buf1.compact(); |
| buf1.put(buf2); |
| buf1.flip(); |
| ref2.clear(); |
| return ref1; |
| } |
| int newsize = buf1.remaining() + buf2.remaining(); |
| ByteBuffer newbuf = ByteBuffer.allocate(newsize); // getting rid of buffer pools |
| newbuf.put(buf1); |
| newbuf.put(buf2); |
| newbuf.flip(); |
| ref1.clear(); |
| ref2.clear(); |
| return ByteBufferReference.of(newbuf); |
| } |
| |
| SSLParameters getSSLParameters() { |
| return sslParameters; |
| } |
| |
| static void logParams(SSLParameters p) { |
| if (!Log.ssl()) { |
| return; |
| } |
| |
| if (p == null) { |
| Log.logSSL("SSLParameters: Null params"); |
| return; |
| } |
| |
| final StringBuilder sb = new StringBuilder("SSLParameters:"); |
| final List<Object> params = new ArrayList<>(); |
| if (p.getCipherSuites() != null) { |
| for (String cipher : p.getCipherSuites()) { |
| sb.append("\n cipher: {") |
| .append(params.size()).append("}"); |
| params.add(cipher); |
| } |
| } |
| |
| // SSLParameters.getApplicationProtocols() can't return null |
| for (String approto : p.getApplicationProtocols()) { |
| sb.append("\n application protocol: {") |
| .append(params.size()).append("}"); |
| params.add(approto); |
| } |
| |
| if (p.getProtocols() != null) { |
| for (String protocol : p.getProtocols()) { |
| sb.append("\n protocol: {") |
| .append(params.size()).append("}"); |
| params.add(protocol); |
| } |
| } |
| |
| if (p.getServerNames() != null) { |
| for (SNIServerName sname : p.getServerNames()) { |
| sb.append("\n server name: {") |
| .append(params.size()).append("}"); |
| params.add(sname.toString()); |
| } |
| } |
| sb.append('\n'); |
| |
| Log.logSSL(sb.toString(), params.toArray()); |
| } |
| |
| String getSessionInfo() { |
| StringBuilder sb = new StringBuilder(); |
| String application = engine.getApplicationProtocol(); |
| SSLSession sess = engine.getSession(); |
| String cipher = sess.getCipherSuite(); |
| String protocol = sess.getProtocol(); |
| sb.append("Handshake complete alpn: ") |
| .append(application) |
| .append(", Cipher: ") |
| .append(cipher) |
| .append(", Protocol: ") |
| .append(protocol); |
| return sb.toString(); |
| } |
| } |