blob: 3a41e7422bc75e2b749182320bb46c895de5dacc [file] [log] [blame]
/*
* Copyright (C) 2018 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package okio
import org.junit.After
import org.junit.Assert.assertEquals
import org.junit.Assert.assertFalse
import org.junit.Assert.assertTrue
import org.junit.Assert.fail
import org.junit.Rule
import org.junit.Test
import java.io.IOException
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import kotlin.test.assertFailsWith
import org.junit.rules.Timeout as JUnitTimeout
class PipeKotlinTest {
@JvmField @Rule val timeout = JUnitTimeout(5, TimeUnit.SECONDS)
private val executorService = Executors.newScheduledThreadPool(1)
@After @Throws(Exception::class)
fun tearDown() {
executorService.shutdown()
}
@Test fun pipe() {
val pipe = Pipe(6)
pipe.sink.write(Buffer().writeUtf8("abc"), 3L)
val readBuffer = Buffer()
assertEquals(3L, pipe.source.read(readBuffer, 6L))
assertEquals("abc", readBuffer.readUtf8())
pipe.sink.close()
assertEquals(-1L, pipe.source.read(readBuffer, 6L))
pipe.source.close()
}
@Test fun fold() {
val pipe = Pipe(128)
val pipeSink = pipe.sink.buffer()
pipeSink.writeUtf8("hello")
pipeSink.emit()
val pipeSource = pipe.source.buffer()
assertEquals("hello", pipeSource.readUtf8(5))
val foldedSinkBuffer = Buffer()
var foldedSinkClosed = false
val foldedSink = object : ForwardingSink(foldedSinkBuffer) {
override fun close() {
foldedSinkClosed = true
super.close()
}
}
pipe.fold(foldedSink)
pipeSink.writeUtf8("world")
pipeSink.emit()
assertEquals("world", foldedSinkBuffer.readUtf8(5))
assertFailsWith<IllegalStateException> {
pipeSource.readUtf8()
}
pipeSink.close()
assertTrue(foldedSinkClosed)
}
@Test fun foldWritesPipeContentsToSink() {
val pipe = Pipe(128)
val pipeSink = pipe.sink.buffer()
pipeSink.writeUtf8("hello")
pipeSink.emit()
val foldSink = Buffer()
pipe.fold(foldSink)
assertEquals("hello", foldSink.readUtf8(5))
}
@Test fun foldUnblocksBlockedWrite() {
val pipe = Pipe(4)
val foldSink = Buffer()
val latch = CountDownLatch(1)
executorService.schedule(
{
pipe.fold(foldSink)
latch.countDown()
},
500, TimeUnit.MILLISECONDS
)
val sink = pipe.sink.buffer()
sink.writeUtf8("abcdefgh") // Blocks writing 8 bytes to a 4 byte pipe.
sink.close()
latch.await()
assertEquals("abcdefgh", foldSink.readUtf8())
}
@Test fun accessSourceAfterFold() {
val pipe = Pipe(100L)
pipe.fold(Buffer())
assertFailsWith<IllegalStateException> {
pipe.source.read(Buffer(), 1L)
}
}
@Test fun honorsPipeSinkTimeoutOnWritingWhenItIsSmaller() {
val pipe = Pipe(4)
val underlying = TimeoutWritingSink()
underlying.timeout.timeout(biggerTimeoutNanos, TimeUnit.NANOSECONDS)
pipe.sink.timeout().timeout(smallerTimeoutNanos, TimeUnit.NANOSECONDS)
pipe.fold(underlying)
assertDuration(smallerTimeoutNanos) {
pipe.sink.write(Buffer().writeUtf8("abc"), 3)
}
assertEquals(biggerTimeoutNanos, underlying.timeout().timeoutNanos())
}
@Test fun honorsUnderlyingTimeoutOnWritingWhenItIsSmaller() {
val pipe = Pipe(4)
val underlying = TimeoutWritingSink()
underlying.timeout.timeout(smallerTimeoutNanos, TimeUnit.NANOSECONDS)
pipe.sink.timeout().timeout(biggerTimeoutNanos, TimeUnit.NANOSECONDS)
pipe.fold(underlying)
assertDuration(smallerTimeoutNanos) {
pipe.sink.write(Buffer().writeUtf8("abc"), 3)
}
assertEquals(smallerTimeoutNanos, underlying.timeout().timeoutNanos())
}
@Test fun honorsPipeSinkTimeoutOnFlushingWhenItIsSmaller() {
val pipe = Pipe(4)
val underlying = TimeoutFlushingSink()
underlying.timeout.timeout(biggerTimeoutNanos, TimeUnit.NANOSECONDS)
pipe.sink.timeout().timeout(smallerTimeoutNanos, TimeUnit.NANOSECONDS)
pipe.fold(underlying)
assertDuration(smallerTimeoutNanos) {
pipe.sink.flush()
}
assertEquals(biggerTimeoutNanos, underlying.timeout().timeoutNanos())
}
@Test fun honorsUnderlyingTimeoutOnFlushingWhenItIsSmaller() {
val pipe = Pipe(4)
val underlying = TimeoutFlushingSink()
underlying.timeout.timeout(smallerTimeoutNanos, TimeUnit.NANOSECONDS)
pipe.sink.timeout().timeout(biggerTimeoutNanos, TimeUnit.NANOSECONDS)
pipe.fold(underlying)
assertDuration(smallerTimeoutNanos) {
pipe.sink.flush()
}
assertEquals(smallerTimeoutNanos, underlying.timeout().timeoutNanos())
}
@Test fun honorsPipeSinkTimeoutOnClosingWhenItIsSmaller() {
val pipe = Pipe(4)
val underlying = TimeoutClosingSink()
underlying.timeout.timeout(biggerTimeoutNanos, TimeUnit.NANOSECONDS)
pipe.sink.timeout().timeout(smallerTimeoutNanos, TimeUnit.NANOSECONDS)
pipe.fold(underlying)
assertDuration(smallerTimeoutNanos) {
pipe.sink.close()
}
assertEquals(biggerTimeoutNanos, underlying.timeout().timeoutNanos())
}
@Test fun honorsUnderlyingTimeoutOnClosingWhenItIsSmaller() {
val pipe = Pipe(4)
val underlying = TimeoutClosingSink()
underlying.timeout.timeout(smallerTimeoutNanos, TimeUnit.NANOSECONDS)
pipe.sink.timeout().timeout(biggerTimeoutNanos, TimeUnit.NANOSECONDS)
pipe.fold(underlying)
assertDuration(smallerTimeoutNanos) {
pipe.sink.close()
}
assertEquals(smallerTimeoutNanos, underlying.timeout().timeoutNanos())
}
@Test fun honorsPipeSinkTimeoutOnWritingWhenUnderlyingSinkTimeoutIsZero() {
val pipeSinkTimeoutNanos = smallerTimeoutNanos
val pipe = Pipe(4)
val underlying = TimeoutWritingSink()
pipe.sink.timeout().timeout(pipeSinkTimeoutNanos, TimeUnit.NANOSECONDS)
pipe.fold(underlying)
assertDuration(pipeSinkTimeoutNanos) {
pipe.sink.write(Buffer().writeUtf8("abc"), 3)
}
assertEquals(0L, underlying.timeout().timeoutNanos())
}
@Test fun honorsUnderlyingSinkTimeoutOnWritingWhenPipeSinkTimeoutIsZero() {
val underlyingSinkTimeoutNanos = smallerTimeoutNanos
val pipe = Pipe(4)
val underlying = TimeoutWritingSink()
underlying.timeout().timeout(underlyingSinkTimeoutNanos, TimeUnit.NANOSECONDS)
pipe.fold(underlying)
assertDuration(underlyingSinkTimeoutNanos) {
pipe.sink.write(Buffer().writeUtf8("abc"), 3)
}
assertEquals(underlyingSinkTimeoutNanos, underlying.timeout().timeoutNanos())
}
@Test fun honorsPipeSinkTimeoutOnFlushingWhenUnderlyingSinkTimeoutIsZero() {
val pipeSinkTimeoutNanos = smallerTimeoutNanos
val pipe = Pipe(4)
val underlying = TimeoutFlushingSink()
pipe.sink.timeout().timeout(pipeSinkTimeoutNanos, TimeUnit.NANOSECONDS)
pipe.fold(underlying)
assertDuration(pipeSinkTimeoutNanos) {
pipe.sink.flush()
}
assertEquals(0L, underlying.timeout().timeoutNanos())
}
@Test fun honorsUnderlyingSinkTimeoutOnFlushingWhenPipeSinkTimeoutIsZero() {
val underlyingSinkTimeoutNanos = smallerTimeoutNanos
val pipe = Pipe(4)
val underlying = TimeoutFlushingSink()
underlying.timeout().timeout(underlyingSinkTimeoutNanos, TimeUnit.NANOSECONDS)
pipe.fold(underlying)
assertDuration(underlyingSinkTimeoutNanos) {
pipe.sink.flush()
}
assertEquals(underlyingSinkTimeoutNanos, underlying.timeout().timeoutNanos())
}
@Test fun honorsPipeSinkTimeoutOnClosingWhenUnderlyingSinkTimeoutIsZero() {
val pipeSinkTimeoutNanos = smallerTimeoutNanos
val pipe = Pipe(4)
val underlying = TimeoutClosingSink()
pipe.sink.timeout().timeout(pipeSinkTimeoutNanos, TimeUnit.NANOSECONDS)
pipe.fold(underlying)
assertDuration(pipeSinkTimeoutNanos) {
pipe.sink.close()
}
assertEquals(0L, underlying.timeout().timeoutNanos())
}
@Test fun honorsUnderlyingSinkTimeoutOnClosingWhenPipeSinkTimeoutIsZero() {
val underlyingSinkTimeoutNanos = smallerTimeoutNanos
val pipe = Pipe(4)
val underlying = TimeoutClosingSink()
underlying.timeout().timeout(underlyingSinkTimeoutNanos, TimeUnit.NANOSECONDS)
pipe.fold(underlying)
assertDuration(underlyingSinkTimeoutNanos) {
pipe.sink.close()
}
assertEquals(underlyingSinkTimeoutNanos, underlying.timeout().timeoutNanos())
}
@Test fun honorsPipeSinkDeadlineOnWritingWhenItIsSmaller() {
val pipe = Pipe(4)
val underlying = TimeoutWritingSink()
val underlyingOriginalDeadline = System.nanoTime() + biggerDeadlineNanos
underlying.timeout.deadlineNanoTime(underlyingOriginalDeadline)
pipe.sink.timeout().deadlineNanoTime(System.nanoTime() + smallerDeadlineNanos)
pipe.fold(underlying)
assertDuration(smallerDeadlineNanos) {
pipe.sink.write(Buffer().writeUtf8("abc"), 3)
}
assertEquals(underlyingOriginalDeadline, underlying.timeout().deadlineNanoTime())
}
@Test fun honorsPipeSinkDeadlineOnWritingWhenUnderlyingSinkHasNoDeadline() {
val deadlineNanos = smallerDeadlineNanos
val pipe = Pipe(4)
val underlying = TimeoutWritingSink()
underlying.timeout.clearDeadline()
pipe.sink.timeout().deadlineNanoTime(System.nanoTime() + deadlineNanos)
pipe.fold(underlying)
assertDuration(deadlineNanos) {
pipe.sink.write(Buffer().writeUtf8("abc"), 3)
}
assertFalse(underlying.timeout().hasDeadline())
}
@Test fun honorsUnderlyingSinkDeadlineOnWritingWhenItIsSmaller() {
val pipe = Pipe(4)
val underlying = TimeoutWritingSink()
val underlyingOriginalDeadline = System.nanoTime() + smallerDeadlineNanos
underlying.timeout.deadlineNanoTime(underlyingOriginalDeadline)
pipe.sink.timeout().deadlineNanoTime(System.nanoTime() + biggerDeadlineNanos)
pipe.fold(underlying)
assertDuration(smallerDeadlineNanos) {
pipe.sink.write(Buffer().writeUtf8("abc"), 3)
}
assertEquals(underlyingOriginalDeadline, underlying.timeout().deadlineNanoTime())
}
@Test fun honorsUnderlyingSinkDeadlineOnWritingWhenPipeSinkHasNoDeadline() {
val deadlineNanos = smallerDeadlineNanos
val pipe = Pipe(4)
val underlying = TimeoutWritingSink()
val underlyingOriginalDeadline = System.nanoTime() + deadlineNanos
underlying.timeout().deadlineNanoTime(underlyingOriginalDeadline)
pipe.sink.timeout().clearDeadline()
pipe.fold(underlying)
assertDuration(deadlineNanos) {
pipe.sink.write(Buffer().writeUtf8("abc"), 3)
}
assertEquals(underlyingOriginalDeadline, underlying.timeout().deadlineNanoTime())
}
@Test fun honorsPipeSinkDeadlineOnFlushingWhenItIsSmaller() {
val pipe = Pipe(4)
val underlying = TimeoutFlushingSink()
val underlyingOriginalDeadline = System.nanoTime() + biggerDeadlineNanos
underlying.timeout.deadlineNanoTime(underlyingOriginalDeadline)
pipe.sink.timeout().deadlineNanoTime(System.nanoTime() + smallerDeadlineNanos)
pipe.fold(underlying)
assertDuration(smallerDeadlineNanos) {
pipe.sink.flush()
}
assertEquals(underlyingOriginalDeadline, underlying.timeout().deadlineNanoTime())
}
@Test fun honorsPipeSinkDeadlineOnFlushingWhenUnderlyingSinkHasNoDeadline() {
val deadlineNanos = smallerDeadlineNanos
val pipe = Pipe(4)
val underlying = TimeoutFlushingSink()
underlying.timeout.clearDeadline()
pipe.sink.timeout().deadlineNanoTime(System.nanoTime() + deadlineNanos)
pipe.fold(underlying)
assertDuration(deadlineNanos) {
pipe.sink.flush()
}
assertFalse(underlying.timeout().hasDeadline())
}
@Test fun honorsUnderlyingSinkDeadlineOnFlushingWhenItIsSmaller() {
val pipe = Pipe(4)
val underlying = TimeoutFlushingSink()
val underlyingOriginalDeadline = System.nanoTime() + smallerDeadlineNanos
underlying.timeout.deadlineNanoTime(underlyingOriginalDeadline)
pipe.sink.timeout().deadlineNanoTime(System.nanoTime() + biggerDeadlineNanos)
pipe.fold(underlying)
assertDuration(smallerDeadlineNanos) {
pipe.sink.flush()
}
assertEquals(underlyingOriginalDeadline, underlying.timeout().deadlineNanoTime())
}
@Test fun honorsUnderlyingSinkDeadlineOnFlushingWhenPipeSinkHasNoDeadline() {
val deadlineNanos = smallerDeadlineNanos
val pipe = Pipe(4)
val underlying = TimeoutFlushingSink()
val underlyingOriginalDeadline = System.nanoTime() + deadlineNanos
underlying.timeout().deadlineNanoTime(underlyingOriginalDeadline)
pipe.sink.timeout().clearDeadline()
pipe.fold(underlying)
assertDuration(deadlineNanos) {
pipe.sink.flush()
}
assertEquals(underlyingOriginalDeadline, underlying.timeout().deadlineNanoTime())
}
@Test fun honorsPipeSinkDeadlineOnClosingWhenItIsSmaller() {
val pipe = Pipe(4)
val underlying = TimeoutClosingSink()
val underlyingOriginalDeadline = System.nanoTime() + biggerDeadlineNanos
underlying.timeout.deadlineNanoTime(underlyingOriginalDeadline)
pipe.sink.timeout().deadlineNanoTime(System.nanoTime() + smallerDeadlineNanos)
pipe.fold(underlying)
assertDuration(smallerDeadlineNanos) {
pipe.sink.close()
}
assertEquals(underlyingOriginalDeadline, underlying.timeout().deadlineNanoTime())
}
@Test fun honorsPipeSinkDeadlineOnClosingWhenUnderlyingSinkHasNoDeadline() {
val deadlineNanos = smallerDeadlineNanos
val pipe = Pipe(4)
val underlying = TimeoutClosingSink()
underlying.timeout.clearDeadline()
pipe.sink.timeout().deadlineNanoTime(System.nanoTime() + deadlineNanos)
pipe.fold(underlying)
assertDuration(deadlineNanos) {
pipe.sink.close()
}
assertFalse(underlying.timeout().hasDeadline())
}
@Test fun honorsUnderlyingSinkDeadlineOnClosingWhenItIsSmaller() {
val pipe = Pipe(4)
val underlying = TimeoutClosingSink()
val underlyingOriginalDeadline = System.nanoTime() + smallerDeadlineNanos
underlying.timeout.deadlineNanoTime(underlyingOriginalDeadline)
pipe.sink.timeout().deadlineNanoTime(System.nanoTime() + biggerDeadlineNanos)
pipe.fold(underlying)
assertDuration(smallerDeadlineNanos) {
pipe.sink.close()
}
assertEquals(underlyingOriginalDeadline, underlying.timeout().deadlineNanoTime())
}
@Test fun honorsUnderlyingSinkDeadlineOnClosingWhenPipeSinkHasNoDeadline() {
val deadlineNanos = smallerDeadlineNanos
val pipe = Pipe(4)
val underlying = TimeoutClosingSink()
val underlyingOriginalDeadline = System.nanoTime() + deadlineNanos
underlying.timeout().deadlineNanoTime(underlyingOriginalDeadline)
pipe.sink.timeout().clearDeadline()
pipe.fold(underlying)
assertDuration(deadlineNanos) {
pipe.sink.close()
}
assertEquals(underlyingOriginalDeadline, underlying.timeout().deadlineNanoTime())
}
@Test fun foldingTwiceThrows() {
val pipe = Pipe(128)
pipe.fold(Buffer())
assertFailsWith<IllegalStateException> {
pipe.fold(Buffer())
}
}
@Test fun sinkWriteThrowsIOExceptionUnblockBlockedWriter() {
val pipe = Pipe(4)
val foldFuture = executorService.schedule(
{
val foldFailure = assertFailsWith<IOException> {
pipe.fold(object : ForwardingSink(blackholeSink()) {
override fun write(source: Buffer, byteCount: Long) {
throw IOException("boom")
}
})
}
assertEquals("boom", foldFailure.message)
},
500, TimeUnit.MILLISECONDS
)
val writeFailure = assertFailsWith<IOException> {
val pipeSink = pipe.sink.buffer()
pipeSink.writeUtf8("abcdefghij")
pipeSink.emit() // Block writing 10 bytes to a 4 byte pipe.
}
assertEquals("source is closed", writeFailure.message)
foldFuture.get() // Confirm no unexpected exceptions.
}
@Test fun foldHoldsNoLocksWhenForwardingWrites() {
val pipe = Pipe(4)
val pipeSink = pipe.sink.buffer()
pipeSink.writeUtf8("abcd")
pipeSink.emit()
pipe.fold(object : ForwardingSink(blackholeSink()) {
override fun write(source: Buffer, byteCount: Long) {
assertFalse(Thread.holdsLock(pipe.buffer))
}
})
}
/**
* Flushing the pipe wasn't causing the sink to be flushed when it was later folded. This was
* causing problems because the folded data was stalled.
*/
@Test fun foldFlushesWhenThereIsFoldedData() {
val pipe = Pipe(128)
val pipeSink = pipe.sink.buffer()
pipeSink.writeUtf8("hello")
pipeSink.emit()
val ultimateSink = Buffer()
val unnecessaryWrapper = (ultimateSink as Sink).buffer()
pipe.fold(unnecessaryWrapper)
// Data should not have been flushed through the wrapper to the ultimate sink.
assertEquals("hello", ultimateSink.readUtf8())
}
@Test fun foldDoesNotFlushWhenThereIsNoFoldedData() {
val pipe = Pipe(128)
val ultimateSink = Buffer()
val unnecessaryWrapper = (ultimateSink as Sink).buffer()
unnecessaryWrapper.writeUtf8("hello")
pipe.fold(unnecessaryWrapper)
// Data should not have been flushed through the wrapper to the ultimate sink.
assertEquals("", ultimateSink.readUtf8())
}
@Test fun foldingClosesUnderlyingSinkWhenPipeSinkIsClose() {
val pipe = Pipe(128)
val pipeSink = pipe.sink.buffer()
pipeSink.writeUtf8("world")
pipeSink.close()
val foldedSinkBuffer = Buffer()
var foldedSinkClosed = false
val foldedSink = object : ForwardingSink(foldedSinkBuffer) {
override fun close() {
foldedSinkClosed = true
super.close()
}
}
pipe.fold(foldedSink)
assertEquals("world", foldedSinkBuffer.readUtf8(5))
assertTrue(foldedSinkClosed)
}
@Test fun cancelPreventsSinkWrite() {
val pipe = Pipe(8)
pipe.cancel()
val pipeSink = pipe.sink.buffer()
pipeSink.writeUtf8("hello world")
try {
pipeSink.emit()
fail()
} catch (e: IOException) {
assertEquals("canceled", e.message)
}
}
@Test fun cancelPreventsSinkFlush() {
val pipe = Pipe(8)
pipe.cancel()
try {
pipe.sink.flush()
fail()
} catch (e: IOException) {
assertEquals("canceled", e.message)
}
}
@Test fun sinkCloseAfterCancelDoesNotThrow() {
val pipe = Pipe(8)
pipe.cancel()
pipe.sink.close()
}
@Test fun cancelInterruptsSinkWrite() {
val pipe = Pipe(8)
executorService.schedule(
{
pipe.cancel()
},
smallerTimeoutNanos, TimeUnit.NANOSECONDS
)
val pipeSink = pipe.sink.buffer()
pipeSink.writeUtf8("hello world")
assertDuration(smallerTimeoutNanos) {
try {
pipeSink.emit()
fail()
} catch (e: IOException) {
assertEquals("canceled", e.message)
}
}
}
@Test fun cancelPreventsSourceRead() {
val pipe = Pipe(8)
pipe.cancel()
val pipeSource = pipe.source.buffer()
try {
pipeSource.require(1)
fail()
} catch (e: IOException) {
assertEquals("canceled", e.message)
}
}
@Test fun sourceCloseAfterCancelDoesNotThrow() {
val pipe = Pipe(8)
pipe.cancel()
pipe.source.close()
}
@Test fun cancelInterruptsSourceRead() {
val pipe = Pipe(8)
executorService.schedule(
{
pipe.cancel()
},
smallerTimeoutNanos, TimeUnit.NANOSECONDS
)
val pipeSource = pipe.source.buffer()
assertDuration(smallerTimeoutNanos) {
try {
pipeSource.require(1)
fail()
} catch (e: IOException) {
assertEquals("canceled", e.message)
}
}
}
@Test fun cancelPreventsSinkFold() {
val pipe = Pipe(8)
pipe.cancel()
var foldedSinkClosed = false
val foldedSink = object : ForwardingSink(Buffer()) {
override fun close() {
foldedSinkClosed = true
super.close()
}
}
try {
pipe.fold(foldedSink)
fail()
} catch (e: IOException) {
assertEquals("canceled", e.message)
}
// But the fold is still performed so close() closes everything.
assertFalse(foldedSinkClosed)
pipe.sink.close()
assertTrue(foldedSinkClosed)
}
@Test fun cancelInterruptsSinkFold() {
val pipe = Pipe(128)
val pipeSink = pipe.sink.buffer()
pipeSink.writeUtf8("hello")
pipeSink.emit()
var foldedSinkClosed = false
val foldedSink = object : ForwardingSink(Buffer()) {
override fun write(source: Buffer, byteCount: Long) {
assertEquals("hello", source.readUtf8(byteCount))
// Write bytes to the original pipe so the pipe write doesn't complete!
pipeSink.writeUtf8("more bytes")
pipeSink.emit()
// Cancel while the pipe is writing.
pipe.cancel()
}
override fun close() {
foldedSinkClosed = true
super.close()
}
}
try {
pipe.fold(foldedSink)
fail()
} catch (e: IOException) {
assertEquals("canceled", e.message)
}
// But the fold is still performed so close() closes everything.
assertFalse(foldedSinkClosed)
pipe.sink.close()
assertTrue(foldedSinkClosed)
}
private fun assertDuration(expected: Long, block: () -> Unit) {
val start = System.currentTimeMillis()
block()
val elapsed = TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - start)
assertEquals(
expected.toDouble(), elapsed.toDouble(),
TimeUnit.MILLISECONDS.toNanos(200).toDouble()
)
}
/** Writes on this sink never complete. They can only time out. */
class TimeoutWritingSink : Sink {
val timeout = object : AsyncTimeout() {
override fun timedOut() {
synchronized(this@TimeoutWritingSink) {
(this@TimeoutWritingSink as Object).notifyAll()
}
}
}
override fun write(source: Buffer, byteCount: Long) {
timeout.enter()
try {
synchronized(this) {
(this as Object).wait()
}
} finally {
timeout.exit()
}
source.skip(byteCount)
}
override fun flush() = Unit
override fun close() = Unit
override fun timeout() = timeout
}
/** Flushes on this sink never complete. They can only time out. */
class TimeoutFlushingSink : Sink {
val timeout = object : AsyncTimeout() {
override fun timedOut() {
synchronized(this@TimeoutFlushingSink) {
(this@TimeoutFlushingSink as Object).notifyAll()
}
}
}
override fun write(source: Buffer, byteCount: Long) = source.skip(byteCount)
override fun flush() {
timeout.enter()
try {
synchronized(this) {
(this as Object).wait()
}
} finally {
timeout.exit()
}
}
override fun close() = Unit
override fun timeout() = timeout
}
/** Closes on this sink never complete. They can only time out. */
class TimeoutClosingSink : Sink {
val timeout = object : AsyncTimeout() {
override fun timedOut() {
synchronized(this@TimeoutClosingSink) {
(this@TimeoutClosingSink as Object).notifyAll()
}
}
}
override fun write(source: Buffer, byteCount: Long) = source.skip(byteCount)
override fun flush() = Unit
override fun close() {
timeout.enter()
try {
synchronized(this) {
(this as Object).wait()
}
} finally {
timeout.exit()
}
}
override fun timeout() = timeout
}
companion object {
val smallerTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(500L)
val biggerTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(1500L)
val smallerDeadlineNanos = TimeUnit.MILLISECONDS.toNanos(500L)
val biggerDeadlineNanos = TimeUnit.MILLISECONDS.toNanos(1500L)
}
}