blob: 43c23bfd3aa5737a05a42e681bc369bb9ecb1e63 [file] [log] [blame]
/*
* Copyright (C) 2016 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package okio
/**
* A source and a sink that are attached. The sink's output is the source's input. Typically each
* is accessed by its own thread: a producer thread writes data to the sink and a consumer thread
* reads data from the source.
*
* This class uses a buffer to decouple source and sink. This buffer has a user-specified maximum
* size. When a producer thread outruns its consumer the buffer fills up and eventually writes to
* the sink will block until the consumer has caught up. Symmetrically, if a consumer outruns its
* producer reads block until there is data to be read. Limits on the amount of time spent waiting
* for the other party can be configured with [timeouts][Timeout] on the source and the
* sink.
*
* When the sink is closed, source reads will continue to complete normally until the buffer has
* been exhausted. At that point reads will return -1, indicating the end of the stream. But if the
* source is closed first, writes to the sink will immediately fail with an [IOException].
*
* A pipe may be canceled to immediately fail writes to the sink and reads from the source.
*/
class Pipe(internal val maxBufferSize: Long) {
internal val buffer = Buffer()
internal var canceled = false
internal var sinkClosed = false
internal var sourceClosed = false
internal var foldedSink: Sink? = null
init {
require(maxBufferSize >= 1L) { "maxBufferSize < 1: $maxBufferSize" }
}
@get:JvmName("sink")
val sink = object : Sink {
private val timeout = Timeout()
override fun write(source: Buffer, byteCount: Long) {
var byteCount = byteCount
var delegate: Sink? = null
synchronized(buffer) {
check(!sinkClosed) { "closed" }
if (canceled) throw IOException("canceled")
while (byteCount > 0) {
foldedSink?.let {
delegate = it
return@synchronized
}
if (sourceClosed) throw IOException("source is closed")
val bufferSpaceAvailable = maxBufferSize - buffer.size
if (bufferSpaceAvailable == 0L) {
timeout.waitUntilNotified(buffer) // Wait until the source drains the buffer.
if (canceled) throw IOException("canceled")
continue
}
val bytesToWrite = minOf(bufferSpaceAvailable, byteCount)
buffer.write(source, bytesToWrite)
byteCount -= bytesToWrite
(buffer as Object).notifyAll() // Notify the source that it can resume reading.
}
}
delegate?.forward { write(source, byteCount) }
}
override fun flush() {
var delegate: Sink? = null
synchronized(buffer) {
check(!sinkClosed) { "closed" }
if (canceled) throw IOException("canceled")
foldedSink?.let {
delegate = it
return@synchronized
}
if (sourceClosed && buffer.size > 0L) {
throw IOException("source is closed")
}
}
delegate?.forward { flush() }
}
override fun close() {
var delegate: Sink? = null
synchronized(buffer) {
if (sinkClosed) return
foldedSink?.let {
delegate = it
return@synchronized
}
if (sourceClosed && buffer.size > 0L) throw IOException("source is closed")
sinkClosed = true
(buffer as Object).notifyAll() // Notify the source that no more bytes are coming.
}
delegate?.forward { close() }
}
override fun timeout(): Timeout = timeout
}
@get:JvmName("source")
val source = object : Source {
private val timeout = Timeout()
override fun read(sink: Buffer, byteCount: Long): Long {
synchronized(buffer) {
check(!sourceClosed) { "closed" }
if (canceled) throw IOException("canceled")
while (buffer.size == 0L) {
if (sinkClosed) return -1L
timeout.waitUntilNotified(buffer) // Wait until the sink fills the buffer.
if (canceled) throw IOException("canceled")
}
val result = buffer.read(sink, byteCount)
(buffer as Object).notifyAll() // Notify the sink that it can resume writing.
return result
}
}
override fun close() {
synchronized(buffer) {
sourceClosed = true
(buffer as Object).notifyAll() // Notify the sink that no more bytes are desired.
}
}
override fun timeout(): Timeout = timeout
}
/**
* Writes any buffered contents of this pipe to `sink`, then replace this pipe's source with
* `sink`. This pipe's source is closed and attempts to read it will throw an
* [IllegalStateException].
*
* This method must not be called while concurrently accessing this pipe's source. It is safe,
* however, to call this while concurrently writing this pipe's sink.
*/
@Throws(IOException::class)
fun fold(sink: Sink) {
while (true) {
// Either the buffer is empty and we can swap and return. Or the buffer is non-empty and we
// must copy it to sink without holding any locks, then try it all again.
var closed = false
lateinit var sinkBuffer: Buffer
synchronized(buffer) {
check(foldedSink == null) { "sink already folded" }
if (canceled) {
foldedSink = sink
throw IOException("canceled")
}
if (buffer.exhausted()) {
sourceClosed = true
foldedSink = sink
return@fold
}
closed = sinkClosed
sinkBuffer = Buffer()
sinkBuffer.write(buffer, buffer.size)
(buffer as Object).notifyAll() // Notify the sink that it can resume writing.
}
var success = false
try {
sink.write(sinkBuffer, sinkBuffer.size)
if (closed) {
sink.close()
} else {
sink.flush()
}
success = true
} finally {
if (!success) {
synchronized(buffer) {
sourceClosed = true
(buffer as Object).notifyAll() // Notify the sink that it can resume writing.
}
}
}
}
}
private inline fun Sink.forward(block: Sink.() -> Unit) {
this.timeout().intersectWith(this@Pipe.sink.timeout()) { this.block() }
}
@JvmName("-deprecated_sink")
@Deprecated(
message = "moved to val",
replaceWith = ReplaceWith(expression = "sink"),
level = DeprecationLevel.ERROR
)
fun sink() = sink
@JvmName("-deprecated_source")
@Deprecated(
message = "moved to val",
replaceWith = ReplaceWith(expression = "source"),
level = DeprecationLevel.ERROR
)
fun source() = source
/**
* Fail any in-flight and future operations. After canceling:
*
* * Any attempt to write or flush [sink] will fail immediately with an [IOException].
* * Any attempt to read [source] will fail immediately with an [IOException].
* * Any attempt to [fold] will fail immediately with an [IOException].
*
* Closing the source and the sink will complete normally even after a pipe has been canceled. If
* this sink has been folded, closing it will close the folded sink. This operation may block.
*
* This operation may be called by any thread at any time. It is safe to call concurrently while
* operating on the source or the sink.
*/
fun cancel() {
synchronized(buffer) {
canceled = true
buffer.clear()
(buffer as Object).notifyAll() // Notify the source and sink that they're canceled.
}
}
}