Audio Test Harness Client Libraries - Implement the GrpcAudioCaptureStream:
- Add PipedCaptureChunkStreamObserver to handle the receipt of messages back from the Audio Test Harness gRPC Server.
- Updates the GrpcAudioCaptureStream to utilize PipedInputStream as the backend so that data received from the gRPC is available for any reader with a small buffer.
- Adds tests for above implementation
Test: Unit tests provided.
Bug: 168812332
Bug: 168812333
Change-Id: I71e55d7738c3726b19924f76ad0956b30a6802c6
diff --git a/libraries/audio-test-harness/client-lib/Android.bp b/libraries/audio-test-harness/client-lib/Android.bp
index 0daf9bd..dd46127 100644
--- a/libraries/audio-test-harness/client-lib/Android.bp
+++ b/libraries/audio-test-harness/client-lib/Android.bp
@@ -51,6 +51,7 @@
"audiotestharness-client-corelib",
"grpc-java-okhttp-client-lite",
"audiotestharness-servicegrpclib-lite",
+ "audiotestharness-commonlib-lite",
"guava",
],
sdk_version: "current",
@@ -68,6 +69,7 @@
static_libs: [
"audiotestharness-client-grpclib",
"grpc-java-core-inprocess",
+ "grpc-java-testing",
"junit",
"junit-params",
"mockito",
diff --git a/libraries/audio-test-harness/client-lib/src/main/java/com/android/media/audiotestharness/client/core/AudioTestHarnessCommunicationException.java b/libraries/audio-test-harness/client-lib/src/main/java/com/android/media/audiotestharness/client/core/AudioTestHarnessCommunicationException.java
new file mode 100644
index 0000000..8922d0b
--- /dev/null
+++ b/libraries/audio-test-harness/client-lib/src/main/java/com/android/media/audiotestharness/client/core/AudioTestHarnessCommunicationException.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.media.audiotestharness.client.core;
+
+/**
+ * {@link RuntimeException} used when the Audio Test Harness client libraries run into an issue
+ * while attempting to communicate with the host-side server.
+ */
+public class AudioTestHarnessCommunicationException extends RuntimeException {
+ public AudioTestHarnessCommunicationException(String message, Exception cause) {
+ super(message, cause);
+ }
+}
diff --git a/libraries/audio-test-harness/client-lib/src/main/java/com/android/media/audiotestharness/client/grpc/GrpcAudioCaptureStream.java b/libraries/audio-test-harness/client-lib/src/main/java/com/android/media/audiotestharness/client/grpc/GrpcAudioCaptureStream.java
index 4c7b909..aa9f9b7 100644
--- a/libraries/audio-test-harness/client-lib/src/main/java/com/android/media/audiotestharness/client/grpc/GrpcAudioCaptureStream.java
+++ b/libraries/audio-test-harness/client-lib/src/main/java/com/android/media/audiotestharness/client/grpc/GrpcAudioCaptureStream.java
@@ -17,69 +17,251 @@
package com.android.media.audiotestharness.client.grpc;
import com.android.media.audiotestharness.client.core.AudioCaptureStream;
+import com.android.media.audiotestharness.common.Defaults;
import com.android.media.audiotestharness.proto.AudioTestHarnessGrpc;
+import com.android.media.audiotestharness.proto.AudioTestHarnessService;
+
+import com.google.common.base.Preconditions;
+
+import io.grpc.Context;
+import io.grpc.Status;
+import io.grpc.stub.StreamObserver;
import java.io.IOException;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
-/** {@link AudioCaptureStream} that utilizes gRPC as its transfer mechanism. */
+/**
+ * {@link AudioCaptureStream} that utilizes gRPC as its transfer mechanism.
+ *
+ * <p>Utilizes Piped I/O streams with the gRPC call writing to the sending side of the pipe, and the
+ * exposed methods from the {@link java.io.InputStream} class being passed to the receiving end of
+ * the pipe.
+ */
public class GrpcAudioCaptureStream extends AudioCaptureStream {
+ private static final Logger LOGGER = Logger.getLogger(GrpcAudioCaptureStream.class.getName());
- // TODO(b/168817017): Implement this class to utlize gRPC and Piped I/O streams to provide an
- // InputStream from which the audio samples can be read.
+ /**
+ * Size of the buffer used by the {@link PipedInputStream} to cache internal messages received
+ * over the gRPC connection before they are read by the client.
+ *
+ * <p>This value is currently equal to 35 chunks, 8960 bytes, or just about 100ms of audio
+ * recorded at CD quality.
+ */
+ private static final int BUFFER_SIZE = 35 * Defaults.CAPTURE_CHUNK_TARGET_SIZE_BYTES;
- private final AudioTestHarnessGrpc.AudioTestHarnessStub mAudioTestHarnessStub;
+ private final Context.CancellableContext mCancellableContext;
+ private final PipedInputStream mInputStream;
+ private final PipedOutputStream mOutputStream;
- private GrpcAudioCaptureStream(AudioTestHarnessGrpc.AudioTestHarnessStub audioTestHarnessStub) {
- mAudioTestHarnessStub = audioTestHarnessStub;
+ /**
+ * {@link Throwable} field used when the underlying gRPC call has an error. This error is
+ * propagated back from the gRPC thread through a callback within the {@link
+ * PipedCaptureChunkStreamObserver}. This field is volatile, as it will only be read by or
+ * written to by single separate threads, but we want to make sure the reading thread is
+ * immediately notified when an error occurs. Furthermore, this is safe since the underlying
+ * Throwable will be immutable.
+ */
+ private volatile Throwable mGrpcError = null;
+
+ private GrpcAudioCaptureStream(
+ Context.CancellableContext cancellableContext,
+ PipedInputStream inputStream,
+ PipedOutputStream outputStream) {
+ mCancellableContext = cancellableContext;
+ mInputStream = inputStream;
+ mOutputStream = outputStream;
}
static GrpcAudioCaptureStream create(
- AudioTestHarnessGrpc.AudioTestHarnessStub audioTestHarnessStub) {
- return new GrpcAudioCaptureStream(audioTestHarnessStub);
+ AudioTestHarnessGrpc.AudioTestHarnessStub audioTestHarnessStub,
+ ScheduledExecutorService scheduledExecutorService)
+ throws IOException {
+ Preconditions.checkNotNull(audioTestHarnessStub, "audioTestHarnessStub cannot be null.");
+ Preconditions.checkNotNull(
+ scheduledExecutorService, "scheduledExecutorService cannot be null.");
+
+ // Create the piped streams that back the stream stream itself.
+ PipedInputStream pipedInputStream = new PipedInputStream(BUFFER_SIZE);
+ PipedOutputStream pipedOutputStream;
+ try {
+ pipedOutputStream = new PipedOutputStream(pipedInputStream);
+ } catch (IOException ioe) {
+ throw new IOException(
+ "Unable to create Capture Stream due to pipe creation failure", ioe);
+ }
+
+ // Start the gRPC call with a context that can be used for cancellation later.
+ Context.CancellableContext grpcContext =
+ Context.current()
+ .withCancellation()
+ .withDeadlineAfter(
+ Defaults.SYSTEM_TIMEOUT.getSeconds(),
+ TimeUnit.SECONDS,
+ scheduledExecutorService);
+
+ GrpcAudioCaptureStream captureStream =
+ new GrpcAudioCaptureStream(grpcContext, pipedInputStream, pipedOutputStream);
+
+ try {
+ grpcContext.call(
+ () -> {
+ audioTestHarnessStub.capture(
+ AudioTestHarnessService.CaptureRequest.getDefaultInstance(),
+ new PipedCaptureChunkStreamObserver(
+ pipedOutputStream,
+ (throwable) -> captureStream.mGrpcError = throwable));
+ return true;
+ });
+ } catch (Exception e) {
+ throw new IOException("Audio Test Harness gRPC Communication Error", e);
+ }
+
+ return captureStream;
}
@Override
public int read(byte[] b) throws IOException {
- return super.read(b);
+ if (mGrpcError != null) {
+ throw new IOException("Audio Test Harness gRPC Communication Error", mGrpcError);
+ }
+
+ try {
+ return mInputStream.read(b);
+ } catch (IOException ioe) {
+ throw new IOException("Audio Test Harness gRPC Internal Error", ioe);
+ }
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
- return super.read(b, off, len);
+ if (mGrpcError != null) {
+ throw new IOException("Audio Test Harness gRPC Communication Error", mGrpcError);
+ }
+
+ try {
+ return mInputStream.read(b, off, len);
+ } catch (IOException ioe) {
+ throw new IOException("Audio Test Harness gRPC Internal Error", mGrpcError);
+ }
}
@Override
public long skip(long n) throws IOException {
- return super.skip(n);
+ if (mGrpcError != null) {
+ throw new IOException("Audio Test Harness gRPC Communication Error", mGrpcError);
+ }
+
+ return mInputStream.skip(n);
}
@Override
public int available() throws IOException {
- return super.available();
+ if (mGrpcError != null) {
+ throw new IOException("Audio Test Harness gRPC Communication Error", mGrpcError);
+ }
+
+ return mInputStream.available();
}
@Override
public void close() throws IOException {
- super.close();
+ mCancellableContext.cancel(
+ Status.CANCELLED.withDescription("Capture stopped by client").asException());
+
+ mInputStream.close();
+ mOutputStream.close();
}
@Override
public synchronized void mark(int readlimit) {
- super.mark(readlimit);
+ mInputStream.mark(readlimit);
}
@Override
public synchronized void reset() throws IOException {
- super.reset();
+ if (mGrpcError != null) {
+ throw new IOException("Audio Test Harness gRPC Communication Error", mGrpcError);
+ }
+
+ try {
+ mInputStream.reset();
+ } catch (IOException ioe) {
+ throw new IOException("Audio Test Harness gRPC Internal Error", ioe);
+ }
}
@Override
public boolean markSupported() {
- return super.markSupported();
+ return mInputStream.markSupported();
}
@Override
public int read() throws IOException {
- return 0;
+ if (mGrpcError != null) {
+ throw new IOException("Audio Test Harness gRPC Communication Error", mGrpcError);
+ }
+
+ try {
+ return mInputStream.read();
+ } catch (IOException ioe) {
+ throw new IOException("Audio Test Harness gRPC Internal Error", ioe);
+ }
+ }
+
+ /**
+ * {@link StreamObserver} that publishes audio samples received over a gRPC connection to a
+ * piped output stream.
+ */
+ private static final class PipedCaptureChunkStreamObserver
+ implements StreamObserver<AudioTestHarnessService.CaptureChunk> {
+ private static final Logger LOGGER =
+ Logger.getLogger(PipedCaptureChunkStreamObserver.class.getName());
+
+ private final PipedOutputStream mPipedOutputStream;
+ private final Consumer<Throwable> mOnErrorCallback;
+
+ private PipedCaptureChunkStreamObserver(
+ PipedOutputStream pipedOutputStream, Consumer<Throwable> onErrorCallback) {
+ mPipedOutputStream = pipedOutputStream;
+ mOnErrorCallback = onErrorCallback;
+ }
+
+ @Override
+ public void onNext(AudioTestHarnessService.CaptureChunk value) {
+ try {
+ mPipedOutputStream.write(value.getData().toByteArray());
+ } catch (IOException ioe) {
+ LOGGER.log(
+ Level.WARNING,
+ "Unable to write segment of audio data, data may have been lost",
+ ioe);
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ // Immediately propagate the throwable back to the callback.
+ mOnErrorCallback.accept(t);
+ LOGGER.log(Level.WARNING, "onError called: ", t);
+
+ // On error, close the stream so that any corresponding input streams will also
+ // throw exceptions when attempting to read.
+ try {
+ mPipedOutputStream.close();
+ } catch (IOException ioe) {
+ LOGGER.log(Level.WARNING, "Unable to close the pipedOutputStream", ioe);
+ }
+ }
+
+ @Override
+ public void onCompleted() {
+ LOGGER.log(Level.FINE, "onCompleted called");
+ }
}
}
diff --git a/libraries/audio-test-harness/client-lib/src/main/java/com/android/media/audiotestharness/client/grpc/GrpcAudioCaptureStreamFactory.java b/libraries/audio-test-harness/client-lib/src/main/java/com/android/media/audiotestharness/client/grpc/GrpcAudioCaptureStreamFactory.java
index 5d2fd46..a40f1d2 100644
--- a/libraries/audio-test-harness/client-lib/src/main/java/com/android/media/audiotestharness/client/grpc/GrpcAudioCaptureStreamFactory.java
+++ b/libraries/audio-test-harness/client-lib/src/main/java/com/android/media/audiotestharness/client/grpc/GrpcAudioCaptureStreamFactory.java
@@ -20,17 +20,27 @@
import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.concurrent.ScheduledExecutorService;
+
/** Factory for the {@link GrpcAudioCaptureStream}. */
public class GrpcAudioCaptureStreamFactory {
- private GrpcAudioCaptureStreamFactory() {}
- public static GrpcAudioCaptureStreamFactory create() {
- return new GrpcAudioCaptureStreamFactory();
+ private final ScheduledExecutorService mScheduledExecutorService;
+
+ private GrpcAudioCaptureStreamFactory(ScheduledExecutorService scheduledExecutorService) {
+ mScheduledExecutorService = scheduledExecutorService;
}
- GrpcAudioCaptureStream newStream(
- AudioTestHarnessGrpc.AudioTestHarnessStub audioTestHarnessStub) {
+ public static GrpcAudioCaptureStreamFactory create(
+ ScheduledExecutorService scheduledExecutorService) {
+ Preconditions.checkNotNull(scheduledExecutorService);
+ return new GrpcAudioCaptureStreamFactory(scheduledExecutorService);
+ }
+
+ GrpcAudioCaptureStream newStream(AudioTestHarnessGrpc.AudioTestHarnessStub audioTestHarnessStub)
+ throws IOException {
Preconditions.checkNotNull(audioTestHarnessStub, "audioTestHarnessStub cannot be null");
- return GrpcAudioCaptureStream.create(audioTestHarnessStub);
+ return GrpcAudioCaptureStream.create(audioTestHarnessStub, mScheduledExecutorService);
}
}
diff --git a/libraries/audio-test-harness/client-lib/src/main/java/com/android/media/audiotestharness/client/grpc/GrpcAudioTestHarnessClient.java b/libraries/audio-test-harness/client-lib/src/main/java/com/android/media/audiotestharness/client/grpc/GrpcAudioTestHarnessClient.java
index b8fa93b..9e1cad7 100644
--- a/libraries/audio-test-harness/client-lib/src/main/java/com/android/media/audiotestharness/client/grpc/GrpcAudioTestHarnessClient.java
+++ b/libraries/audio-test-harness/client-lib/src/main/java/com/android/media/audiotestharness/client/grpc/GrpcAudioTestHarnessClient.java
@@ -18,6 +18,7 @@
import com.android.media.audiotestharness.client.core.AudioCaptureStream;
import com.android.media.audiotestharness.client.core.AudioTestHarnessClient;
+import com.android.media.audiotestharness.client.core.AudioTestHarnessCommunicationException;
import com.android.media.audiotestharness.proto.AudioTestHarnessGrpc;
import com.google.common.annotations.VisibleForTesting;
@@ -27,8 +28,9 @@
import io.grpc.ManagedChannelBuilder;
import java.io.IOException;
-import java.util.concurrent.Executor;
+import java.util.Locale;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -52,17 +54,23 @@
}
public static GrpcAudioTestHarnessClient.Builder builder() {
- return new Builder().setCaptureStreamFactory(GrpcAudioCaptureStreamFactory.create());
+ return new Builder();
}
@Override
public AudioCaptureStream startCapture() {
- AudioCaptureStream newStream =
- mGrpcAudioCaptureStreamFactory.newStream(
- AudioTestHarnessGrpc.newStub(mManagedChannel));
+ AudioCaptureStream newStream;
+
+ try {
+ newStream =
+ mGrpcAudioCaptureStreamFactory.newStream(
+ AudioTestHarnessGrpc.newStub(mManagedChannel));
+ } catch (IOException ioe) {
+ throw new AudioTestHarnessCommunicationException(
+ "Unable to start a new capture stream.", ioe);
+ }
mAudioCaptureStreams.add(newStream);
-
return newStream;
}
@@ -88,7 +96,7 @@
private String mHostname;
private int mPort;
- private Executor mExecutor;
+ private ScheduledExecutorService mExecutor;
private GrpcAudioCaptureStreamFactory mGrpcAudioCaptureStreamFactory;
private ManagedChannel mManagedChannel;
@@ -98,7 +106,11 @@
Preconditions.checkNotNull(hostname, "Hostname cannot be null");
Preconditions.checkArgument(
port >= MIN_PORT && port <= MAX_PORT,
- String.format("Port expected in range [%d, %d]", MIN_PORT, MAX_PORT));
+ String.format(
+ Locale.getDefault(),
+ "Port expected in range [%d, %d]",
+ MIN_PORT,
+ MAX_PORT));
mHostname = hostname;
mPort = port;
@@ -106,7 +118,7 @@
return this;
}
- public Builder setExecutor(Executor executor) {
+ public Builder setExecutor(ScheduledExecutorService executor) {
mExecutor = executor;
return this;
}
@@ -131,7 +143,12 @@
Preconditions.checkState(mHostname != null, "Address must be set.");
if (mExecutor == null) {
- mExecutor = Executors.newFixedThreadPool(DEFAULT_NUM_THREADS);
+ mExecutor = Executors.newScheduledThreadPool(DEFAULT_NUM_THREADS);
+ }
+
+ if (mGrpcAudioCaptureStreamFactory == null) {
+ mGrpcAudioCaptureStreamFactory =
+ GrpcAudioCaptureStreamFactory.create(mExecutor);
}
mManagedChannel =
diff --git a/libraries/audio-test-harness/client-lib/src/test/java/com/android/media/audiotestharness/client/grpc/AudioTestHarnessTestImpl.java b/libraries/audio-test-harness/client-lib/src/test/java/com/android/media/audiotestharness/client/grpc/AudioTestHarnessTestImpl.java
new file mode 100644
index 0000000..e460f7c
--- /dev/null
+++ b/libraries/audio-test-harness/client-lib/src/test/java/com/android/media/audiotestharness/client/grpc/AudioTestHarnessTestImpl.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.media.audiotestharness.client.grpc;
+
+import com.android.media.audiotestharness.proto.AudioTestHarnessGrpc;
+import com.android.media.audiotestharness.proto.AudioTestHarnessService;
+
+import com.google.protobuf.ByteString;
+
+import io.grpc.stub.StreamObserver;
+
+/**
+ * Test implementation of the {@link AudioTestHarnessGrpc.AudioTestHarnessImplBase} that contains
+ * stubbed methods.
+ */
+public class AudioTestHarnessTestImpl extends AudioTestHarnessGrpc.AudioTestHarnessImplBase {
+
+ public static final byte[] MESSAGE = {0x0, 0x1, 0x2, 0x3};
+
+ /**
+ * Test implementation of the <code>Capture</code> procedure that simply ignores the request and
+ * sends back a predefined response containing the {@link #MESSAGE} bytes.
+ */
+ @Override
+ public void capture(
+ AudioTestHarnessService.CaptureRequest request,
+ StreamObserver<AudioTestHarnessService.CaptureChunk> responseObserver) {
+ responseObserver.onNext(
+ AudioTestHarnessService.CaptureChunk.newBuilder()
+ .setData(ByteString.copyFrom(MESSAGE))
+ .build());
+ responseObserver.onCompleted();
+ }
+}
diff --git a/libraries/audio-test-harness/client-lib/src/test/java/com/android/media/audiotestharness/client/grpc/GrpcAudioCaptureStreamTests.java b/libraries/audio-test-harness/client-lib/src/test/java/com/android/media/audiotestharness/client/grpc/GrpcAudioCaptureStreamTests.java
index b490169..9bab2da 100644
--- a/libraries/audio-test-harness/client-lib/src/test/java/com/android/media/audiotestharness/client/grpc/GrpcAudioCaptureStreamTests.java
+++ b/libraries/audio-test-harness/client-lib/src/test/java/com/android/media/audiotestharness/client/grpc/GrpcAudioCaptureStreamTests.java
@@ -16,25 +16,242 @@
package com.android.media.audiotestharness.client.grpc;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
+
+import com.android.media.audiotestharness.proto.AudioTestHarnessGrpc;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.testing.GrpcCleanupRule;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import org.mockito.ArgumentCaptor;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+
+import java.time.Duration;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
@RunWith(JUnit4.class)
public class GrpcAudioCaptureStreamTests {
+ @Rule public GrpcCleanupRule mGrpcCleanupRule = new GrpcCleanupRule();
+
+ @Rule public MockitoRule mMockitoRule = MockitoJUnit.rule();
+
+ @Rule public ExpectedException mExceptionRule = ExpectedException.none();
+
+ @Mock ScheduledExecutorService mScheduledExecutorService;
+
+ private ListeningExecutorService mListeningExecutorService;
+
private GrpcAudioCaptureStreamFactory mGrpcAudioCaptureStreamFactory;
+ private AudioTestHarnessGrpc.AudioTestHarnessStub mAudioTestHarnessStub;
+
@Before
public void setUp() throws Exception {
- mGrpcAudioCaptureStreamFactory = GrpcAudioCaptureStreamFactory.create();
+ mListeningExecutorService = MoreExecutors.newDirectExecutorService();
+
+ String serverName = InProcessServerBuilder.generateName();
+
+ mGrpcCleanupRule.register(
+ InProcessServerBuilder.forName(serverName)
+ .executor(mListeningExecutorService)
+ .addService(new AudioTestHarnessTestImpl())
+ .build()
+ .start());
+
+ ManagedChannel channel =
+ mGrpcCleanupRule.register(
+ InProcessChannelBuilder.forName(serverName)
+ .executor(mListeningExecutorService)
+ .build());
+ mAudioTestHarnessStub = AudioTestHarnessGrpc.newStub(channel);
+
+ mGrpcAudioCaptureStreamFactory =
+ GrpcAudioCaptureStreamFactory.create(mScheduledExecutorService);
+ }
+
+ @Test
+ public void create_returnsNotNullInstance() throws Exception {
+ assertNotNull(
+ GrpcAudioCaptureStream.create(mAudioTestHarnessStub, mScheduledExecutorService));
+ }
+
+ @Test
+ public void create_schedulesCancellationTaskWithProperDeadline() throws Exception {
+ ArgumentCaptor<Long> timeValueCaptor = ArgumentCaptor.forClass(Long.class);
+ ArgumentCaptor<TimeUnit> timeUnitCaptor = ArgumentCaptor.forClass(TimeUnit.class);
+
+ GrpcAudioCaptureStream.create(mAudioTestHarnessStub, mScheduledExecutorService);
+
+ // Grab the timeout from the call to the Scheduled Executor Service as a Duration.
+ verify(mScheduledExecutorService)
+ .schedule((Runnable) any(), timeValueCaptor.capture(), timeUnitCaptor.capture());
+ Duration actualScheduledDuration =
+ Duration.ofNanos(
+ TimeUnit.NANOSECONDS.convert(
+ timeValueCaptor.getValue(), timeUnitCaptor.getValue()));
+
+ // Ensure that we are within our expected timeout within a 1s epsilon.
+ Duration difference = actualScheduledDuration.minus(Duration.ofHours(1)).abs();
+ assertTrue(difference.getSeconds() < 1L);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void newStream_throwsNullPointerException_nullStub() throws Exception {
+ assertNotNull(mGrpcAudioCaptureStreamFactory.newStream(/* audioTestHarnessStub= */ null));
}
@Test(expected = NullPointerException.class)
public void create_throwsNullPointerException_nullStub() throws Exception {
- assertNotNull(mGrpcAudioCaptureStreamFactory.newStream(/* audioTestHarnessStub= */ null));
+ GrpcAudioCaptureStream.create(/* audioTestHarnessStub= */ null, mScheduledExecutorService);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void create_throwsNullPointerException_nullScheduledExecutorService() throws Exception {
+ GrpcAudioCaptureStream.create(mAudioTestHarnessStub, /* scheduledExecutorService= */ null);
+ }
+
+ @Test
+ public void available_throwsProperIOException_grpcError() throws Exception {
+ expectGrpcCommunicationErrorException();
+ GrpcAudioCaptureStream grpcAudioCaptureStream = buildCaptureStreamWithPendingGrpcError();
+
+ grpcAudioCaptureStream.available();
+ }
+
+ @Test
+ public void read_returnsProperDataFromGrpc() throws Exception {
+ GrpcAudioCaptureStream grpcAudioCaptureStream =
+ GrpcAudioCaptureStream.create(mAudioTestHarnessStub, mScheduledExecutorService);
+
+ byte[] readBytes = new byte[AudioTestHarnessTestImpl.MESSAGE.length];
+
+ int numBytesRead = grpcAudioCaptureStream.read(readBytes);
+
+ assertEquals(AudioTestHarnessTestImpl.MESSAGE.length, numBytesRead);
+ assertArrayEquals(AudioTestHarnessTestImpl.MESSAGE, readBytes);
+ }
+
+ @Test
+ public void read_singleByte_throwsProperIOException_whenStreamClosed() throws Exception {
+ expectInternalErrorException();
+ GrpcAudioCaptureStream grpcAudioCaptureStream =
+ GrpcAudioCaptureStream.create(mAudioTestHarnessStub, mScheduledExecutorService);
+ grpcAudioCaptureStream.close();
+
+ grpcAudioCaptureStream.read();
+ }
+
+ @Test
+ public void read_multipleBytes_throwsProperIOException_whenStreamClosed() throws Exception {
+ expectInternalErrorException();
+ GrpcAudioCaptureStream grpcAudioCaptureStream =
+ GrpcAudioCaptureStream.create(mAudioTestHarnessStub, mScheduledExecutorService);
+
+ grpcAudioCaptureStream.close();
+
+ grpcAudioCaptureStream.read(new byte[AudioTestHarnessTestImpl.MESSAGE.length]);
+ }
+
+ @Test
+ public void read_multipleBytesWithOffset_throwsProperIOException_whenStreamClosed()
+ throws Exception {
+ expectInternalErrorException();
+ GrpcAudioCaptureStream grpcAudioCaptureStream =
+ GrpcAudioCaptureStream.create(mAudioTestHarnessStub, mScheduledExecutorService);
+ grpcAudioCaptureStream.close();
+
+ grpcAudioCaptureStream.read(
+ new byte[AudioTestHarnessTestImpl.MESSAGE.length], /* off= */ 1, /* len= */ 2);
+ }
+
+ @Test
+ public void read_throwsIOException_grpcError() throws Exception {
+ expectGrpcCommunicationErrorException();
+ GrpcAudioCaptureStream grpcAudioCaptureStream = buildCaptureStreamWithPendingGrpcError();
+
+ grpcAudioCaptureStream.read(new byte[AudioTestHarnessTestImpl.MESSAGE.length]);
+ }
+
+ @Test
+ public void reset_throwsProperIOException_whenStreamClosed() throws Exception {
+ expectInternalErrorException();
+ GrpcAudioCaptureStream grpcAudioCaptureStream =
+ GrpcAudioCaptureStream.create(mAudioTestHarnessStub, mScheduledExecutorService);
+ grpcAudioCaptureStream.close();
+
+ grpcAudioCaptureStream.reset();
+ }
+
+ @Test
+ public void reset_throwsProperIOException_grpcError() throws Exception {
+ expectGrpcCommunicationErrorException();
+ GrpcAudioCaptureStream grpcAudioCaptureStream = buildCaptureStreamWithPendingGrpcError();
+
+ grpcAudioCaptureStream.reset();
+ }
+
+ @Test
+ public void skip_throwsProperIOException_grpcError() throws Exception {
+ expectGrpcCommunicationErrorException();
+ GrpcAudioCaptureStream grpcAudioCaptureStream = buildCaptureStreamWithPendingGrpcError();
+
+ grpcAudioCaptureStream.skip(/* n= */ 1);
+ }
+
+ /**
+ * Builds a new {@link GrpcAudioCaptureStream} with a pending gRPC error internally.
+ *
+ * <p>This method is used to verify that gRPC errors take precedence and are propagated properly
+ * to callers.
+ */
+ private GrpcAudioCaptureStream buildCaptureStreamWithPendingGrpcError() throws Exception {
+ AudioTestHarnessGrpc.AudioTestHarnessStub disconnectedStub =
+ AudioTestHarnessGrpc.newStub(
+ mGrpcCleanupRule.register(
+ ManagedChannelBuilder.forAddress("localhost", 12345)
+ .executor(mListeningExecutorService)
+ .build()));
+
+ GrpcAudioCaptureStream grpcAudioCaptureStream =
+ GrpcAudioCaptureStream.create(disconnectedStub, mScheduledExecutorService);
+
+ // Read once from the stream, giving the gRPC exception time to propagate. Even with
+ // the direct executor this is necessary and *should* be deterministic.
+ grpcAudioCaptureStream.read(new byte[1]);
+ return grpcAudioCaptureStream;
+ }
+
+ /** Configures the exception rule to expect the gRPC Communication Error exception. */
+ private void expectGrpcCommunicationErrorException() throws Exception {
+ mExceptionRule.expectMessage("Audio Test Harness gRPC Communication Error");
+ mExceptionRule.expectCause(ArgumentMatchers.notNull());
+ }
+
+ /** Configures the exception rule to expect the gRPC Internal Error exception. */
+ private void expectInternalErrorException() throws Exception {
+ mExceptionRule.expectMessage("Audio Test Harness gRPC Internal Error");
+ mExceptionRule.expectCause(ArgumentMatchers.notNull());
}
}
diff --git a/libraries/audio-test-harness/client-lib/src/test/java/com/android/media/audiotestharness/client/grpc/GrpcAudioTestHarnessClientTests.java b/libraries/audio-test-harness/client-lib/src/test/java/com/android/media/audiotestharness/client/grpc/GrpcAudioTestHarnessClientTests.java
index 0beb234..c0cb4b7 100644
--- a/libraries/audio-test-harness/client-lib/src/test/java/com/android/media/audiotestharness/client/grpc/GrpcAudioTestHarnessClientTests.java
+++ b/libraries/audio-test-harness/client-lib/src/test/java/com/android/media/audiotestharness/client/grpc/GrpcAudioTestHarnessClientTests.java
@@ -81,7 +81,7 @@
{
GrpcAudioTestHarnessClient.builder()
.setAddress("service.google.com", 49152)
- .setExecutor(Executors.newSingleThreadExecutor())
+ .setExecutor(Executors.newSingleThreadScheduledExecutor())
},
{
GrpcAudioTestHarnessClient.builder()
@@ -133,7 +133,7 @@
verify(mGrpcAudioCaptureStream).close();
}
- public GrpcAudioTestHarnessClient initMocksAndClient() {
+ public GrpcAudioTestHarnessClient initMocksAndClient() throws Exception {
when(mGrpcAudioCaptureStreamFactory.newStream(any())).thenReturn(mGrpcAudioCaptureStream);
return GrpcAudioTestHarnessClient.builder()
.setManagedChannel(mManagedChannel)
diff --git a/libraries/audio-test-harness/common/src/main/java/com/android/media/audiotestharness/common/Defaults.java b/libraries/audio-test-harness/common/src/main/java/com/android/media/audiotestharness/common/Defaults.java
index 7ee2a8a..e106ccb 100644
--- a/libraries/audio-test-harness/common/src/main/java/com/android/media/audiotestharness/common/Defaults.java
+++ b/libraries/audio-test-harness/common/src/main/java/com/android/media/audiotestharness/common/Defaults.java
@@ -19,6 +19,8 @@
import com.android.media.audiotestharness.proto.AudioDeviceOuterClass.AudioDevice;
import com.android.media.audiotestharness.proto.AudioFormatOuterClass.AudioFormat;
+import java.time.Duration;
+
/**
* Contains all the defaults for the Audio Test Harness system shared between the client and server
* libraries.
@@ -51,4 +53,13 @@
.setName("UMM6 [plughw:0,0]")
.addCapabilities(AudioDevice.Capability.CAPTURE)
.build();
+
+ /** Target size for each chunk captured and sent from host to client. */
+ public static final int CAPTURE_CHUNK_TARGET_SIZE_BYTES = 256;
+
+ /**
+ * Timeout for all calls between client and host at which point any outstanding calls will be
+ * cancelled or aborted.
+ */
+ public static final Duration SYSTEM_TIMEOUT = Duration.ofHours(1);
}