| /* |
| * Copyright (c) 1996, 1997, Oracle and/or its affiliates. All rights reserved. |
| * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
| * |
| * This code is free software; you can redistribute it and/or modify it |
| * under the terms of the GNU General Public License version 2 only, as |
| * published by the Free Software Foundation. Oracle designates this |
| * particular file as subject to the "Classpath" exception as provided |
| * by Oracle in the LICENSE file that accompanied this code. |
| * |
| * This code is distributed in the hope that it will be useful, but WITHOUT |
| * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
| * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
| * version 2 for more details (a copy is included in the LICENSE file that |
| * accompanied this code). |
| * |
| * You should have received a copy of the GNU General Public License version |
| * 2 along with this work; if not, write to the Free Software Foundation, |
| * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
| * |
| * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA |
| * or visit www.oracle.com if you need additional information or have any |
| * questions. |
| */ |
| package sun.rmi.transport.tcp; |
| |
| import java.io.*; |
| |
| /** |
| * MultiplexInputStream manages receiving data over a connection managed |
| * by a ConnectionMultiplexer object. This object is responsible for |
| * requesting more bytes of data as space in its internal buffer becomes |
| * available. |
| * |
| * @author Peter Jones |
| */ |
| final class MultiplexInputStream extends InputStream { |
| |
| /** object managing multiplexed connection */ |
| private ConnectionMultiplexer manager; |
| |
| /** information about the connection this is the input stream for */ |
| private MultiplexConnectionInfo info; |
| |
| /** input buffer */ |
| private byte buffer[]; |
| |
| /** number of real data bytes present in buffer */ |
| private int present = 0; |
| |
| /** current position to read from in input buffer */ |
| private int pos = 0; |
| |
| /** pending number of bytes this stream has requested */ |
| private int requested = 0; |
| |
| /** true if this connection has been disconnected */ |
| private boolean disconnected = false; |
| |
| /** |
| * lock acquired to access shared variables: |
| * buffer, present, pos, requested, & disconnected |
| * WARNING: Any of the methods manager.send*() should not be |
| * invoked while this lock is held, since they could potentially |
| * block if the underlying connection's transport buffers are |
| * full, and the manager may need to acquire this lock to process |
| * and consume data coming over the underlying connection. |
| */ |
| private Object lock = new Object(); |
| |
| /** level at which more data is requested when read past */ |
| private int waterMark; |
| |
| /** data structure for holding reads of one byte */ |
| private byte temp[] = new byte[1]; |
| |
| /** |
| * Create a new MultiplexInputStream for the given manager. |
| * @param manager object that manages this connection |
| * @param info structure for connection this stream reads from |
| * @param bufferLength length of input buffer |
| */ |
| MultiplexInputStream( |
| ConnectionMultiplexer manager, |
| MultiplexConnectionInfo info, |
| int bufferLength) |
| { |
| this.manager = manager; |
| this.info = info; |
| |
| buffer = new byte[bufferLength]; |
| waterMark = bufferLength / 2; |
| } |
| |
| /** |
| * Read a byte from the connection. |
| */ |
| public synchronized int read() throws IOException |
| { |
| int n = read(temp, 0, 1); |
| if (n != 1) |
| return -1; |
| return temp[0] & 0xFF; |
| } |
| |
| /** |
| * Read a subarray of bytes from connection. This method blocks for |
| * at least one byte, and it returns the number of bytes actually read, |
| * or -1 if the end of the stream was detected. |
| * @param b array to read bytes into |
| * @param off offset of beginning of bytes to read into |
| * @param len number of bytes to read |
| */ |
| public synchronized int read(byte b[], int off, int len) throws IOException |
| { |
| if (len <= 0) |
| return 0; |
| |
| int moreSpace; |
| synchronized (lock) { |
| if (pos >= present) |
| pos = present = 0; |
| else if (pos >= waterMark) { |
| System.arraycopy(buffer, pos, buffer, 0, present - pos); |
| present -= pos; |
| pos = 0; |
| } |
| int freeSpace = buffer.length - present; |
| moreSpace = Math.max(freeSpace - requested, 0); |
| } |
| if (moreSpace > 0) |
| manager.sendRequest(info, moreSpace); |
| synchronized (lock) { |
| requested += moreSpace; |
| while ((pos >= present) && !disconnected) { |
| try { |
| lock.wait(); |
| } catch (InterruptedException e) { |
| } |
| } |
| if (disconnected && pos >= present) |
| return -1; |
| |
| int available = present - pos; |
| if (len < available) { |
| System.arraycopy(buffer, pos, b, off, len); |
| pos += len; |
| return len; |
| } |
| else { |
| System.arraycopy(buffer, pos, b, off, available); |
| pos = present = 0; |
| // could send another request here, if len > available?? |
| return available; |
| } |
| } |
| } |
| |
| /** |
| * Return the number of bytes immediately available for reading. |
| */ |
| public int available() throws IOException |
| { |
| synchronized (lock) { |
| return present - pos; |
| } |
| } |
| |
| /** |
| * Close this connection. |
| */ |
| public void close() throws IOException |
| { |
| manager.sendClose(info); |
| } |
| |
| /** |
| * Receive bytes transmitted from connection at remote endpoint. |
| * @param length number of bytes transmitted |
| * @param in input stream with those bytes ready to be read |
| */ |
| void receive(int length, DataInputStream in) |
| throws IOException |
| { |
| /* TO DO: Optimize so that data received from stream can be loaded |
| * directly into user's buffer if there is a pending read(). |
| */ |
| synchronized (lock) { |
| if ((pos > 0) && ((buffer.length - present) < length)) { |
| System.arraycopy(buffer, pos, buffer, 0, present - pos); |
| present -= pos; |
| pos = 0; |
| } |
| if ((buffer.length - present) < length) |
| throw new IOException("Receive buffer overflow"); |
| in.readFully(buffer, present, length); |
| present += length; |
| requested -= length; |
| lock.notifyAll(); |
| } |
| } |
| |
| /** |
| * Disconnect this stream from all connection activity. |
| */ |
| void disconnect() |
| { |
| synchronized (lock) { |
| disconnected = true; |
| lock.notifyAll(); |
| } |
| } |
| } |