Audio Test Harness Client Libraries - Implement the GrpcAudioCaptureStream:

- Add PipedCaptureChunkStreamObserver to handle the receipt of messages back from the Audio Test Harness gRPC Server.
- Updates the GrpcAudioCaptureStream to utilize PipedInputStream as the backend so that data received from the gRPC is available for any reader with a small buffer.
- Adds tests for above implementation

Test: Unit tests provided.
Bug: 168812332
Bug: 168812333
Change-Id: I71e55d7738c3726b19924f76ad0956b30a6802c6
diff --git a/libraries/audio-test-harness/client-lib/Android.bp b/libraries/audio-test-harness/client-lib/Android.bp
index 0daf9bd..dd46127 100644
--- a/libraries/audio-test-harness/client-lib/Android.bp
+++ b/libraries/audio-test-harness/client-lib/Android.bp
@@ -51,6 +51,7 @@
         "audiotestharness-client-corelib",
         "grpc-java-okhttp-client-lite",
         "audiotestharness-servicegrpclib-lite",
+        "audiotestharness-commonlib-lite",
         "guava",
     ],
     sdk_version: "current",
@@ -68,6 +69,7 @@
     static_libs: [
         "audiotestharness-client-grpclib",
         "grpc-java-core-inprocess",
+        "grpc-java-testing",
         "junit",
         "junit-params",
         "mockito",
diff --git a/libraries/audio-test-harness/client-lib/src/main/java/com/android/media/audiotestharness/client/core/AudioTestHarnessCommunicationException.java b/libraries/audio-test-harness/client-lib/src/main/java/com/android/media/audiotestharness/client/core/AudioTestHarnessCommunicationException.java
new file mode 100644
index 0000000..8922d0b
--- /dev/null
+++ b/libraries/audio-test-harness/client-lib/src/main/java/com/android/media/audiotestharness/client/core/AudioTestHarnessCommunicationException.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.media.audiotestharness.client.core;
+
+/**
+ * {@link RuntimeException} used when the Audio Test Harness client libraries run into an issue
+ * while attempting to communicate with the host-side server.
+ */
+public class AudioTestHarnessCommunicationException extends RuntimeException {
+    public AudioTestHarnessCommunicationException(String message, Exception cause) {
+        super(message, cause);
+    }
+}
diff --git a/libraries/audio-test-harness/client-lib/src/main/java/com/android/media/audiotestharness/client/grpc/GrpcAudioCaptureStream.java b/libraries/audio-test-harness/client-lib/src/main/java/com/android/media/audiotestharness/client/grpc/GrpcAudioCaptureStream.java
index 4c7b909..aa9f9b7 100644
--- a/libraries/audio-test-harness/client-lib/src/main/java/com/android/media/audiotestharness/client/grpc/GrpcAudioCaptureStream.java
+++ b/libraries/audio-test-harness/client-lib/src/main/java/com/android/media/audiotestharness/client/grpc/GrpcAudioCaptureStream.java
@@ -17,69 +17,251 @@
 package com.android.media.audiotestharness.client.grpc;
 
 import com.android.media.audiotestharness.client.core.AudioCaptureStream;
+import com.android.media.audiotestharness.common.Defaults;
 import com.android.media.audiotestharness.proto.AudioTestHarnessGrpc;
+import com.android.media.audiotestharness.proto.AudioTestHarnessService;
+
+import com.google.common.base.Preconditions;
+
+import io.grpc.Context;
+import io.grpc.Status;
+import io.grpc.stub.StreamObserver;
 
 import java.io.IOException;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
-/** {@link AudioCaptureStream} that utilizes gRPC as its transfer mechanism. */
+/**
+ * {@link AudioCaptureStream} that utilizes gRPC as its transfer mechanism.
+ *
+ * <p>Utilizes Piped I/O streams with the gRPC call writing to the sending side of the pipe, and the
+ * exposed methods from the {@link java.io.InputStream} class being passed to the receiving end of
+ * the pipe.
+ */
 public class GrpcAudioCaptureStream extends AudioCaptureStream {
+    private static final Logger LOGGER = Logger.getLogger(GrpcAudioCaptureStream.class.getName());
 
-    // TODO(b/168817017): Implement this class to utlize gRPC and Piped I/O streams to provide an
-    // InputStream from which the audio samples can be read.
+    /**
+     * Size of the buffer used by the {@link PipedInputStream} to cache internal messages received
+     * over the gRPC connection before they are read by the client.
+     *
+     * <p>This value is currently equal to 35 chunks, 8960 bytes, or just about 100ms of audio
+     * recorded at CD quality.
+     */
+    private static final int BUFFER_SIZE = 35 * Defaults.CAPTURE_CHUNK_TARGET_SIZE_BYTES;
 
-    private final AudioTestHarnessGrpc.AudioTestHarnessStub mAudioTestHarnessStub;
+    private final Context.CancellableContext mCancellableContext;
+    private final PipedInputStream mInputStream;
+    private final PipedOutputStream mOutputStream;
 
-    private GrpcAudioCaptureStream(AudioTestHarnessGrpc.AudioTestHarnessStub audioTestHarnessStub) {
-        mAudioTestHarnessStub = audioTestHarnessStub;
+    /**
+     * {@link Throwable} field used when the underlying gRPC call has an error. This error is
+     * propagated back from the gRPC thread through a callback within the {@link
+     * PipedCaptureChunkStreamObserver}. This field is volatile, as it will only be read by or
+     * written to by single separate threads, but we want to make sure the reading thread is
+     * immediately notified when an error occurs. Furthermore, this is safe since the underlying
+     * Throwable will be immutable.
+     */
+    private volatile Throwable mGrpcError = null;
+
+    private GrpcAudioCaptureStream(
+            Context.CancellableContext cancellableContext,
+            PipedInputStream inputStream,
+            PipedOutputStream outputStream) {
+        mCancellableContext = cancellableContext;
+        mInputStream = inputStream;
+        mOutputStream = outputStream;
     }
 
     static GrpcAudioCaptureStream create(
-            AudioTestHarnessGrpc.AudioTestHarnessStub audioTestHarnessStub) {
-        return new GrpcAudioCaptureStream(audioTestHarnessStub);
+            AudioTestHarnessGrpc.AudioTestHarnessStub audioTestHarnessStub,
+            ScheduledExecutorService scheduledExecutorService)
+            throws IOException {
+        Preconditions.checkNotNull(audioTestHarnessStub, "audioTestHarnessStub cannot be null.");
+        Preconditions.checkNotNull(
+                scheduledExecutorService, "scheduledExecutorService cannot be null.");
+
+        // Create the piped streams that back the stream stream itself.
+        PipedInputStream pipedInputStream = new PipedInputStream(BUFFER_SIZE);
+        PipedOutputStream pipedOutputStream;
+        try {
+            pipedOutputStream = new PipedOutputStream(pipedInputStream);
+        } catch (IOException ioe) {
+            throw new IOException(
+                    "Unable to create Capture Stream due to pipe creation failure", ioe);
+        }
+
+        // Start the gRPC call with a context that can be used for cancellation later.
+        Context.CancellableContext grpcContext =
+                Context.current()
+                        .withCancellation()
+                        .withDeadlineAfter(
+                                Defaults.SYSTEM_TIMEOUT.getSeconds(),
+                                TimeUnit.SECONDS,
+                                scheduledExecutorService);
+
+        GrpcAudioCaptureStream captureStream =
+                new GrpcAudioCaptureStream(grpcContext, pipedInputStream, pipedOutputStream);
+
+        try {
+            grpcContext.call(
+                    () -> {
+                        audioTestHarnessStub.capture(
+                                AudioTestHarnessService.CaptureRequest.getDefaultInstance(),
+                                new PipedCaptureChunkStreamObserver(
+                                        pipedOutputStream,
+                                        (throwable) -> captureStream.mGrpcError = throwable));
+                        return true;
+                    });
+        } catch (Exception e) {
+            throw new IOException("Audio Test Harness gRPC Communication Error", e);
+        }
+
+        return captureStream;
     }
 
     @Override
     public int read(byte[] b) throws IOException {
-        return super.read(b);
+        if (mGrpcError != null) {
+            throw new IOException("Audio Test Harness gRPC Communication Error", mGrpcError);
+        }
+
+        try {
+            return mInputStream.read(b);
+        } catch (IOException ioe) {
+            throw new IOException("Audio Test Harness gRPC Internal Error", ioe);
+        }
     }
 
     @Override
     public int read(byte[] b, int off, int len) throws IOException {
-        return super.read(b, off, len);
+        if (mGrpcError != null) {
+            throw new IOException("Audio Test Harness gRPC Communication Error", mGrpcError);
+        }
+
+        try {
+            return mInputStream.read(b, off, len);
+        } catch (IOException ioe) {
+            throw new IOException("Audio Test Harness gRPC Internal Error", mGrpcError);
+        }
     }
 
     @Override
     public long skip(long n) throws IOException {
-        return super.skip(n);
+        if (mGrpcError != null) {
+            throw new IOException("Audio Test Harness gRPC Communication Error", mGrpcError);
+        }
+
+        return mInputStream.skip(n);
     }
 
     @Override
     public int available() throws IOException {
-        return super.available();
+        if (mGrpcError != null) {
+            throw new IOException("Audio Test Harness gRPC Communication Error", mGrpcError);
+        }
+
+        return mInputStream.available();
     }
 
     @Override
     public void close() throws IOException {
-        super.close();
+        mCancellableContext.cancel(
+                Status.CANCELLED.withDescription("Capture stopped by client").asException());
+
+        mInputStream.close();
+        mOutputStream.close();
     }
 
     @Override
     public synchronized void mark(int readlimit) {
-        super.mark(readlimit);
+        mInputStream.mark(readlimit);
     }
 
     @Override
     public synchronized void reset() throws IOException {
-        super.reset();
+        if (mGrpcError != null) {
+            throw new IOException("Audio Test Harness gRPC Communication Error", mGrpcError);
+        }
+
+        try {
+            mInputStream.reset();
+        } catch (IOException ioe) {
+            throw new IOException("Audio Test Harness gRPC Internal Error", ioe);
+        }
     }
 
     @Override
     public boolean markSupported() {
-        return super.markSupported();
+        return mInputStream.markSupported();
     }
 
     @Override
     public int read() throws IOException {
-        return 0;
+        if (mGrpcError != null) {
+            throw new IOException("Audio Test Harness gRPC Communication Error", mGrpcError);
+        }
+
+        try {
+            return mInputStream.read();
+        } catch (IOException ioe) {
+            throw new IOException("Audio Test Harness gRPC Internal Error", ioe);
+        }
+    }
+
+    /**
+     * {@link StreamObserver} that publishes audio samples received over a gRPC connection to a
+     * piped output stream.
+     */
+    private static final class PipedCaptureChunkStreamObserver
+            implements StreamObserver<AudioTestHarnessService.CaptureChunk> {
+        private static final Logger LOGGER =
+                Logger.getLogger(PipedCaptureChunkStreamObserver.class.getName());
+
+        private final PipedOutputStream mPipedOutputStream;
+        private final Consumer<Throwable> mOnErrorCallback;
+
+        private PipedCaptureChunkStreamObserver(
+                PipedOutputStream pipedOutputStream, Consumer<Throwable> onErrorCallback) {
+            mPipedOutputStream = pipedOutputStream;
+            mOnErrorCallback = onErrorCallback;
+        }
+
+        @Override
+        public void onNext(AudioTestHarnessService.CaptureChunk value) {
+            try {
+                mPipedOutputStream.write(value.getData().toByteArray());
+            } catch (IOException ioe) {
+                LOGGER.log(
+                        Level.WARNING,
+                        "Unable to write segment of audio data, data may have been lost",
+                        ioe);
+            }
+        }
+
+        @Override
+        public void onError(Throwable t) {
+            // Immediately propagate the throwable back to the callback.
+            mOnErrorCallback.accept(t);
+            LOGGER.log(Level.WARNING, "onError called: ", t);
+
+            // On error, close the stream so that any corresponding input streams will also
+            // throw exceptions when attempting to read.
+            try {
+                mPipedOutputStream.close();
+            } catch (IOException ioe) {
+                LOGGER.log(Level.WARNING, "Unable to close the pipedOutputStream", ioe);
+            }
+        }
+
+        @Override
+        public void onCompleted() {
+            LOGGER.log(Level.FINE, "onCompleted called");
+        }
     }
 }
diff --git a/libraries/audio-test-harness/client-lib/src/main/java/com/android/media/audiotestharness/client/grpc/GrpcAudioCaptureStreamFactory.java b/libraries/audio-test-harness/client-lib/src/main/java/com/android/media/audiotestharness/client/grpc/GrpcAudioCaptureStreamFactory.java
index 5d2fd46..a40f1d2 100644
--- a/libraries/audio-test-harness/client-lib/src/main/java/com/android/media/audiotestharness/client/grpc/GrpcAudioCaptureStreamFactory.java
+++ b/libraries/audio-test-harness/client-lib/src/main/java/com/android/media/audiotestharness/client/grpc/GrpcAudioCaptureStreamFactory.java
@@ -20,17 +20,27 @@
 
 import com.google.common.base.Preconditions;
 
+import java.io.IOException;
+import java.util.concurrent.ScheduledExecutorService;
+
 /** Factory for the {@link GrpcAudioCaptureStream}. */
 public class GrpcAudioCaptureStreamFactory {
-    private GrpcAudioCaptureStreamFactory() {}
 
-    public static GrpcAudioCaptureStreamFactory create() {
-        return new GrpcAudioCaptureStreamFactory();
+    private final ScheduledExecutorService mScheduledExecutorService;
+
+    private GrpcAudioCaptureStreamFactory(ScheduledExecutorService scheduledExecutorService) {
+        mScheduledExecutorService = scheduledExecutorService;
     }
 
-    GrpcAudioCaptureStream newStream(
-            AudioTestHarnessGrpc.AudioTestHarnessStub audioTestHarnessStub) {
+    public static GrpcAudioCaptureStreamFactory create(
+            ScheduledExecutorService scheduledExecutorService) {
+        Preconditions.checkNotNull(scheduledExecutorService);
+        return new GrpcAudioCaptureStreamFactory(scheduledExecutorService);
+    }
+
+    GrpcAudioCaptureStream newStream(AudioTestHarnessGrpc.AudioTestHarnessStub audioTestHarnessStub)
+            throws IOException {
         Preconditions.checkNotNull(audioTestHarnessStub, "audioTestHarnessStub cannot be null");
-        return GrpcAudioCaptureStream.create(audioTestHarnessStub);
+        return GrpcAudioCaptureStream.create(audioTestHarnessStub, mScheduledExecutorService);
     }
 }
diff --git a/libraries/audio-test-harness/client-lib/src/main/java/com/android/media/audiotestharness/client/grpc/GrpcAudioTestHarnessClient.java b/libraries/audio-test-harness/client-lib/src/main/java/com/android/media/audiotestharness/client/grpc/GrpcAudioTestHarnessClient.java
index b8fa93b..9e1cad7 100644
--- a/libraries/audio-test-harness/client-lib/src/main/java/com/android/media/audiotestharness/client/grpc/GrpcAudioTestHarnessClient.java
+++ b/libraries/audio-test-harness/client-lib/src/main/java/com/android/media/audiotestharness/client/grpc/GrpcAudioTestHarnessClient.java
@@ -18,6 +18,7 @@
 
 import com.android.media.audiotestharness.client.core.AudioCaptureStream;
 import com.android.media.audiotestharness.client.core.AudioTestHarnessClient;
+import com.android.media.audiotestharness.client.core.AudioTestHarnessCommunicationException;
 import com.android.media.audiotestharness.proto.AudioTestHarnessGrpc;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -27,8 +28,9 @@
 import io.grpc.ManagedChannelBuilder;
 
 import java.io.IOException;
-import java.util.concurrent.Executor;
+import java.util.Locale;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -52,17 +54,23 @@
     }
 
     public static GrpcAudioTestHarnessClient.Builder builder() {
-        return new Builder().setCaptureStreamFactory(GrpcAudioCaptureStreamFactory.create());
+        return new Builder();
     }
 
     @Override
     public AudioCaptureStream startCapture() {
-        AudioCaptureStream newStream =
-                mGrpcAudioCaptureStreamFactory.newStream(
-                        AudioTestHarnessGrpc.newStub(mManagedChannel));
+        AudioCaptureStream newStream;
+
+        try {
+            newStream =
+                    mGrpcAudioCaptureStreamFactory.newStream(
+                            AudioTestHarnessGrpc.newStub(mManagedChannel));
+        } catch (IOException ioe) {
+            throw new AudioTestHarnessCommunicationException(
+                    "Unable to start a new capture stream.", ioe);
+        }
 
         mAudioCaptureStreams.add(newStream);
-
         return newStream;
     }
 
@@ -88,7 +96,7 @@
 
         private String mHostname;
         private int mPort;
-        private Executor mExecutor;
+        private ScheduledExecutorService mExecutor;
         private GrpcAudioCaptureStreamFactory mGrpcAudioCaptureStreamFactory;
         private ManagedChannel mManagedChannel;
 
@@ -98,7 +106,11 @@
             Preconditions.checkNotNull(hostname, "Hostname cannot be null");
             Preconditions.checkArgument(
                     port >= MIN_PORT && port <= MAX_PORT,
-                    String.format("Port expected in range [%d, %d]", MIN_PORT, MAX_PORT));
+                    String.format(
+                            Locale.getDefault(),
+                            "Port expected in range [%d, %d]",
+                            MIN_PORT,
+                            MAX_PORT));
 
             mHostname = hostname;
             mPort = port;
@@ -106,7 +118,7 @@
             return this;
         }
 
-        public Builder setExecutor(Executor executor) {
+        public Builder setExecutor(ScheduledExecutorService executor) {
             mExecutor = executor;
             return this;
         }
@@ -131,7 +143,12 @@
                 Preconditions.checkState(mHostname != null, "Address must be set.");
 
                 if (mExecutor == null) {
-                    mExecutor = Executors.newFixedThreadPool(DEFAULT_NUM_THREADS);
+                    mExecutor = Executors.newScheduledThreadPool(DEFAULT_NUM_THREADS);
+                }
+
+                if (mGrpcAudioCaptureStreamFactory == null) {
+                    mGrpcAudioCaptureStreamFactory =
+                            GrpcAudioCaptureStreamFactory.create(mExecutor);
                 }
 
                 mManagedChannel =
diff --git a/libraries/audio-test-harness/client-lib/src/test/java/com/android/media/audiotestharness/client/grpc/AudioTestHarnessTestImpl.java b/libraries/audio-test-harness/client-lib/src/test/java/com/android/media/audiotestharness/client/grpc/AudioTestHarnessTestImpl.java
new file mode 100644
index 0000000..e460f7c
--- /dev/null
+++ b/libraries/audio-test-harness/client-lib/src/test/java/com/android/media/audiotestharness/client/grpc/AudioTestHarnessTestImpl.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.media.audiotestharness.client.grpc;
+
+import com.android.media.audiotestharness.proto.AudioTestHarnessGrpc;
+import com.android.media.audiotestharness.proto.AudioTestHarnessService;
+
+import com.google.protobuf.ByteString;
+
+import io.grpc.stub.StreamObserver;
+
+/**
+ * Test implementation of the {@link AudioTestHarnessGrpc.AudioTestHarnessImplBase} that contains
+ * stubbed methods.
+ */
+public class AudioTestHarnessTestImpl extends AudioTestHarnessGrpc.AudioTestHarnessImplBase {
+
+    public static final byte[] MESSAGE = {0x0, 0x1, 0x2, 0x3};
+
+    /**
+     * Test implementation of the <code>Capture</code> procedure that simply ignores the request and
+     * sends back a predefined response containing the {@link #MESSAGE} bytes.
+     */
+    @Override
+    public void capture(
+            AudioTestHarnessService.CaptureRequest request,
+            StreamObserver<AudioTestHarnessService.CaptureChunk> responseObserver) {
+        responseObserver.onNext(
+                AudioTestHarnessService.CaptureChunk.newBuilder()
+                        .setData(ByteString.copyFrom(MESSAGE))
+                        .build());
+        responseObserver.onCompleted();
+    }
+}
diff --git a/libraries/audio-test-harness/client-lib/src/test/java/com/android/media/audiotestharness/client/grpc/GrpcAudioCaptureStreamTests.java b/libraries/audio-test-harness/client-lib/src/test/java/com/android/media/audiotestharness/client/grpc/GrpcAudioCaptureStreamTests.java
index b490169..9bab2da 100644
--- a/libraries/audio-test-harness/client-lib/src/test/java/com/android/media/audiotestharness/client/grpc/GrpcAudioCaptureStreamTests.java
+++ b/libraries/audio-test-harness/client-lib/src/test/java/com/android/media/audiotestharness/client/grpc/GrpcAudioCaptureStreamTests.java
@@ -16,25 +16,242 @@
 
 package com.android.media.audiotestharness.client.grpc;
 
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
+
+import com.android.media.audiotestharness.proto.AudioTestHarnessGrpc;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.testing.GrpcCleanupRule;
 
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.mockito.ArgumentCaptor;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+
+import java.time.Duration;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 @RunWith(JUnit4.class)
 public class GrpcAudioCaptureStreamTests {
 
+    @Rule public GrpcCleanupRule mGrpcCleanupRule = new GrpcCleanupRule();
+
+    @Rule public MockitoRule mMockitoRule = MockitoJUnit.rule();
+
+    @Rule public ExpectedException mExceptionRule = ExpectedException.none();
+
+    @Mock ScheduledExecutorService mScheduledExecutorService;
+
+    private ListeningExecutorService mListeningExecutorService;
+
     private GrpcAudioCaptureStreamFactory mGrpcAudioCaptureStreamFactory;
 
+    private AudioTestHarnessGrpc.AudioTestHarnessStub mAudioTestHarnessStub;
+
     @Before
     public void setUp() throws Exception {
-        mGrpcAudioCaptureStreamFactory = GrpcAudioCaptureStreamFactory.create();
+        mListeningExecutorService = MoreExecutors.newDirectExecutorService();
+
+        String serverName = InProcessServerBuilder.generateName();
+
+        mGrpcCleanupRule.register(
+                InProcessServerBuilder.forName(serverName)
+                        .executor(mListeningExecutorService)
+                        .addService(new AudioTestHarnessTestImpl())
+                        .build()
+                        .start());
+
+        ManagedChannel channel =
+                mGrpcCleanupRule.register(
+                        InProcessChannelBuilder.forName(serverName)
+                                .executor(mListeningExecutorService)
+                                .build());
+        mAudioTestHarnessStub = AudioTestHarnessGrpc.newStub(channel);
+
+        mGrpcAudioCaptureStreamFactory =
+                GrpcAudioCaptureStreamFactory.create(mScheduledExecutorService);
+    }
+
+    @Test
+    public void create_returnsNotNullInstance() throws Exception {
+        assertNotNull(
+                GrpcAudioCaptureStream.create(mAudioTestHarnessStub, mScheduledExecutorService));
+    }
+
+    @Test
+    public void create_schedulesCancellationTaskWithProperDeadline() throws Exception {
+        ArgumentCaptor<Long> timeValueCaptor = ArgumentCaptor.forClass(Long.class);
+        ArgumentCaptor<TimeUnit> timeUnitCaptor = ArgumentCaptor.forClass(TimeUnit.class);
+
+        GrpcAudioCaptureStream.create(mAudioTestHarnessStub, mScheduledExecutorService);
+
+        // Grab the timeout from the call to the Scheduled Executor Service as a Duration.
+        verify(mScheduledExecutorService)
+                .schedule((Runnable) any(), timeValueCaptor.capture(), timeUnitCaptor.capture());
+        Duration actualScheduledDuration =
+                Duration.ofNanos(
+                        TimeUnit.NANOSECONDS.convert(
+                                timeValueCaptor.getValue(), timeUnitCaptor.getValue()));
+
+        // Ensure that we are within our expected timeout within a 1s epsilon.
+        Duration difference = actualScheduledDuration.minus(Duration.ofHours(1)).abs();
+        assertTrue(difference.getSeconds() < 1L);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void newStream_throwsNullPointerException_nullStub() throws Exception {
+        assertNotNull(mGrpcAudioCaptureStreamFactory.newStream(/* audioTestHarnessStub= */ null));
     }
 
     @Test(expected = NullPointerException.class)
     public void create_throwsNullPointerException_nullStub() throws Exception {
-        assertNotNull(mGrpcAudioCaptureStreamFactory.newStream(/* audioTestHarnessStub= */ null));
+        GrpcAudioCaptureStream.create(/* audioTestHarnessStub= */ null, mScheduledExecutorService);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void create_throwsNullPointerException_nullScheduledExecutorService() throws Exception {
+        GrpcAudioCaptureStream.create(mAudioTestHarnessStub, /* scheduledExecutorService= */ null);
+    }
+
+    @Test
+    public void available_throwsProperIOException_grpcError() throws Exception {
+        expectGrpcCommunicationErrorException();
+        GrpcAudioCaptureStream grpcAudioCaptureStream = buildCaptureStreamWithPendingGrpcError();
+
+        grpcAudioCaptureStream.available();
+    }
+
+    @Test
+    public void read_returnsProperDataFromGrpc() throws Exception {
+        GrpcAudioCaptureStream grpcAudioCaptureStream =
+                GrpcAudioCaptureStream.create(mAudioTestHarnessStub, mScheduledExecutorService);
+
+        byte[] readBytes = new byte[AudioTestHarnessTestImpl.MESSAGE.length];
+
+        int numBytesRead = grpcAudioCaptureStream.read(readBytes);
+
+        assertEquals(AudioTestHarnessTestImpl.MESSAGE.length, numBytesRead);
+        assertArrayEquals(AudioTestHarnessTestImpl.MESSAGE, readBytes);
+    }
+
+    @Test
+    public void read_singleByte_throwsProperIOException_whenStreamClosed() throws Exception {
+        expectInternalErrorException();
+        GrpcAudioCaptureStream grpcAudioCaptureStream =
+                GrpcAudioCaptureStream.create(mAudioTestHarnessStub, mScheduledExecutorService);
+        grpcAudioCaptureStream.close();
+
+        grpcAudioCaptureStream.read();
+    }
+
+    @Test
+    public void read_multipleBytes_throwsProperIOException_whenStreamClosed() throws Exception {
+        expectInternalErrorException();
+        GrpcAudioCaptureStream grpcAudioCaptureStream =
+                GrpcAudioCaptureStream.create(mAudioTestHarnessStub, mScheduledExecutorService);
+
+        grpcAudioCaptureStream.close();
+
+        grpcAudioCaptureStream.read(new byte[AudioTestHarnessTestImpl.MESSAGE.length]);
+    }
+
+    @Test
+    public void read_multipleBytesWithOffset_throwsProperIOException_whenStreamClosed()
+            throws Exception {
+        expectInternalErrorException();
+        GrpcAudioCaptureStream grpcAudioCaptureStream =
+                GrpcAudioCaptureStream.create(mAudioTestHarnessStub, mScheduledExecutorService);
+        grpcAudioCaptureStream.close();
+
+        grpcAudioCaptureStream.read(
+                new byte[AudioTestHarnessTestImpl.MESSAGE.length], /* off= */ 1, /* len= */ 2);
+    }
+
+    @Test
+    public void read_throwsIOException_grpcError() throws Exception {
+        expectGrpcCommunicationErrorException();
+        GrpcAudioCaptureStream grpcAudioCaptureStream = buildCaptureStreamWithPendingGrpcError();
+
+        grpcAudioCaptureStream.read(new byte[AudioTestHarnessTestImpl.MESSAGE.length]);
+    }
+
+    @Test
+    public void reset_throwsProperIOException_whenStreamClosed() throws Exception {
+        expectInternalErrorException();
+        GrpcAudioCaptureStream grpcAudioCaptureStream =
+                GrpcAudioCaptureStream.create(mAudioTestHarnessStub, mScheduledExecutorService);
+        grpcAudioCaptureStream.close();
+
+        grpcAudioCaptureStream.reset();
+    }
+
+    @Test
+    public void reset_throwsProperIOException_grpcError() throws Exception {
+        expectGrpcCommunicationErrorException();
+        GrpcAudioCaptureStream grpcAudioCaptureStream = buildCaptureStreamWithPendingGrpcError();
+
+        grpcAudioCaptureStream.reset();
+    }
+
+    @Test
+    public void skip_throwsProperIOException_grpcError() throws Exception {
+        expectGrpcCommunicationErrorException();
+        GrpcAudioCaptureStream grpcAudioCaptureStream = buildCaptureStreamWithPendingGrpcError();
+
+        grpcAudioCaptureStream.skip(/* n= */ 1);
+    }
+
+    /**
+     * Builds a new {@link GrpcAudioCaptureStream} with a pending gRPC error internally.
+     *
+     * <p>This method is used to verify that gRPC errors take precedence and are propagated properly
+     * to callers.
+     */
+    private GrpcAudioCaptureStream buildCaptureStreamWithPendingGrpcError() throws Exception {
+        AudioTestHarnessGrpc.AudioTestHarnessStub disconnectedStub =
+                AudioTestHarnessGrpc.newStub(
+                        mGrpcCleanupRule.register(
+                                ManagedChannelBuilder.forAddress("localhost", 12345)
+                                        .executor(mListeningExecutorService)
+                                        .build()));
+
+        GrpcAudioCaptureStream grpcAudioCaptureStream =
+                GrpcAudioCaptureStream.create(disconnectedStub, mScheduledExecutorService);
+
+        // Read once from the stream, giving the gRPC exception time to propagate. Even with
+        // the direct executor this is necessary and *should* be deterministic.
+        grpcAudioCaptureStream.read(new byte[1]);
+        return grpcAudioCaptureStream;
+    }
+
+    /** Configures the exception rule to expect the gRPC Communication Error exception. */
+    private void expectGrpcCommunicationErrorException() throws Exception {
+        mExceptionRule.expectMessage("Audio Test Harness gRPC Communication Error");
+        mExceptionRule.expectCause(ArgumentMatchers.notNull());
+    }
+
+    /** Configures the exception rule to expect the gRPC Internal Error exception. */
+    private void expectInternalErrorException() throws Exception {
+        mExceptionRule.expectMessage("Audio Test Harness gRPC Internal Error");
+        mExceptionRule.expectCause(ArgumentMatchers.notNull());
     }
 }
diff --git a/libraries/audio-test-harness/client-lib/src/test/java/com/android/media/audiotestharness/client/grpc/GrpcAudioTestHarnessClientTests.java b/libraries/audio-test-harness/client-lib/src/test/java/com/android/media/audiotestharness/client/grpc/GrpcAudioTestHarnessClientTests.java
index 0beb234..c0cb4b7 100644
--- a/libraries/audio-test-harness/client-lib/src/test/java/com/android/media/audiotestharness/client/grpc/GrpcAudioTestHarnessClientTests.java
+++ b/libraries/audio-test-harness/client-lib/src/test/java/com/android/media/audiotestharness/client/grpc/GrpcAudioTestHarnessClientTests.java
@@ -81,7 +81,7 @@
             {
                 GrpcAudioTestHarnessClient.builder()
                         .setAddress("service.google.com", 49152)
-                        .setExecutor(Executors.newSingleThreadExecutor())
+                        .setExecutor(Executors.newSingleThreadScheduledExecutor())
             },
             {
                 GrpcAudioTestHarnessClient.builder()
@@ -133,7 +133,7 @@
         verify(mGrpcAudioCaptureStream).close();
     }
 
-    public GrpcAudioTestHarnessClient initMocksAndClient() {
+    public GrpcAudioTestHarnessClient initMocksAndClient() throws Exception {
         when(mGrpcAudioCaptureStreamFactory.newStream(any())).thenReturn(mGrpcAudioCaptureStream);
         return GrpcAudioTestHarnessClient.builder()
                 .setManagedChannel(mManagedChannel)
diff --git a/libraries/audio-test-harness/common/src/main/java/com/android/media/audiotestharness/common/Defaults.java b/libraries/audio-test-harness/common/src/main/java/com/android/media/audiotestharness/common/Defaults.java
index 7ee2a8a..e106ccb 100644
--- a/libraries/audio-test-harness/common/src/main/java/com/android/media/audiotestharness/common/Defaults.java
+++ b/libraries/audio-test-harness/common/src/main/java/com/android/media/audiotestharness/common/Defaults.java
@@ -19,6 +19,8 @@
 import com.android.media.audiotestharness.proto.AudioDeviceOuterClass.AudioDevice;
 import com.android.media.audiotestharness.proto.AudioFormatOuterClass.AudioFormat;
 
+import java.time.Duration;
+
 /**
  * Contains all the defaults for the Audio Test Harness system shared between the client and server
  * libraries.
@@ -51,4 +53,13 @@
                     .setName("UMM6 [plughw:0,0]")
                     .addCapabilities(AudioDevice.Capability.CAPTURE)
                     .build();
+
+    /** Target size for each chunk captured and sent from host to client. */
+    public static final int CAPTURE_CHUNK_TARGET_SIZE_BYTES = 256;
+
+    /**
+     * Timeout for all calls between client and host at which point any outstanding calls will be
+     * cancelled or aborted.
+     */
+    public static final Duration SYSTEM_TIMEOUT = Duration.ofHours(1);
 }