| /* |
| * Copyright (C) 2023 The Android Open Source Project |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package com.google.android.utils.chre; |
| |
| import androidx.annotation.NonNull; |
| |
| import com.google.android.chre.utils.pigweed.ChreRpcClient; |
| import com.google.protobuf.Empty; |
| import com.google.protobuf.MessageLite; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| |
| import dev.chre.rpc.proto.ChreApiTest; |
| import dev.pigweed.pw_rpc.Call.ServerStreamingFuture; |
| import dev.pigweed.pw_rpc.Call.UnaryFuture; |
| import dev.pigweed.pw_rpc.MethodClient; |
| import dev.pigweed.pw_rpc.Service; |
| import dev.pigweed.pw_rpc.UnaryResult; |
| |
| /** |
| * A set of helper functions for tests that use the CHRE API Test nanoapp. |
| */ |
| public class ChreApiTestUtil { |
| /** |
| * The default timeout for an RPC call in seconds. |
| */ |
| public static final int RPC_TIMEOUT_IN_SECONDS = 5; |
| |
| /** |
| * The default timeout for an RPC call in milliseconds. |
| */ |
| public static final int RPC_TIMEOUT_IN_MS = RPC_TIMEOUT_IN_SECONDS * 1000; |
| |
| /** |
| * The default timeout for an RPC call in nanosecond. |
| */ |
| public static final long RPC_TIMEOUT_IN_NS = RPC_TIMEOUT_IN_SECONDS * 1000000000L; |
| |
| /** |
| * The number of threads for the executor that executes the futures. |
| * We need at least 2 here. One to process the RPCs for server streaming |
| * and one to process events (which has server streaming as a dependent). |
| * 2 is the minimum needed to run smoothly without timeout issues. |
| */ |
| private static final int NUM_THREADS_FOR_EXECUTOR = 2; |
| |
| /** |
| * Executor for use with server streaming RPCs. |
| */ |
| private final ExecutorService mExecutor = |
| Executors.newFixedThreadPool(NUM_THREADS_FOR_EXECUTOR); |
| |
| /** |
| * Storage for nanoapp streaming messages. This is a map from each RPC client to the |
| * list of messages received. |
| */ |
| private final Map<ChreRpcClient, List<MessageLite>> mNanoappStreamingMessages = |
| new HashMap<ChreRpcClient, List<MessageLite>>(); |
| |
| /** |
| * If true, there is an active server streaming RPC ongoing. |
| */ |
| private boolean mActiveServerStreamingRpc = false; |
| |
| /** |
| * Calls a server streaming RPC method on multiple RPC clients. The RPC will be initiated for |
| * each client, then we will give each client a maximum of RPC_TIMEOUT_IN_SECONDS seconds of |
| * timeout, getting the futures in sequential order. The responses will have the same size |
| * as the input rpcClients size. |
| * |
| * @param <RequestType> the type of the request (proto generated type). |
| * @param <ResponseType> the type of the response (proto generated type). |
| * @param rpcClients the RPC clients. |
| * @param method the fully-qualified method name. |
| * @param request the request. |
| * |
| * @return the proto responses or null if there was an error. |
| */ |
| public <RequestType extends MessageLite, ResponseType extends MessageLite> |
| List<List<ResponseType>> callConcurrentServerStreamingRpcMethodSync( |
| @NonNull List<ChreRpcClient> rpcClients, |
| @NonNull String method, |
| @NonNull RequestType request) throws Exception { |
| Objects.requireNonNull(rpcClients); |
| Objects.requireNonNull(method); |
| Objects.requireNonNull(request); |
| |
| Future<List<List<ResponseType>>> responseFuture = |
| callConcurrentServerStreamingRpcMethodAsync(rpcClients, method, request, |
| RPC_TIMEOUT_IN_MS); |
| return responseFuture == null |
| ? null |
| : responseFuture.get(RPC_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS); |
| } |
| |
| /** |
| * Calls a server streaming RPC method with RPC_TIMEOUT_IN_SECONDS seconds of timeout. |
| * |
| * @param <RequestType> the type of the request (proto generated type). |
| * @param <ResponseType> the type of the response (proto generated type). |
| * @param rpcClient the RPC client. |
| * @param method the fully-qualified method name. |
| * @param request the request. |
| * |
| * @return the proto response or null if there was an error. |
| */ |
| public <RequestType extends MessageLite, ResponseType extends MessageLite> List<ResponseType> |
| callServerStreamingRpcMethodSync( |
| @NonNull ChreRpcClient rpcClient, |
| @NonNull String method, |
| @NonNull RequestType request) throws Exception { |
| Objects.requireNonNull(rpcClient); |
| Objects.requireNonNull(method); |
| Objects.requireNonNull(request); |
| |
| List<List<ResponseType>> responses = callConcurrentServerStreamingRpcMethodSync( |
| Arrays.asList(rpcClient), |
| method, |
| request); |
| return responses == null || responses.isEmpty() ? null : responses.get(0); |
| } |
| |
| /** |
| * Calls a server streaming RPC method with RPC_TIMEOUT_IN_SECONDS seconds of |
| * timeout with an empty request. |
| * |
| * @param <ResponseType> the type of the response (proto generated type). |
| * @param rpcClient the RPC client. |
| * @param method the fully-qualified method name. |
| * |
| * @return the proto response or null if there was an error. |
| */ |
| public <ResponseType extends MessageLite> List<ResponseType> |
| callServerStreamingRpcMethodSync( |
| @NonNull ChreRpcClient rpcClient, |
| @NonNull String method) throws Exception { |
| Objects.requireNonNull(rpcClient); |
| Objects.requireNonNull(method); |
| |
| Empty request = Empty.newBuilder().build(); |
| return callServerStreamingRpcMethodSync(rpcClient, method, request); |
| } |
| |
| /** |
| * Calls an RPC method with RPC_TIMEOUT_IN_SECONDS seconds of timeout for concurrent |
| * instances of the ChreApiTest nanoapp. |
| * |
| * @param <RequestType> the type of the request (proto generated type). |
| * @param <ResponseType> the type of the response (proto generated type). |
| * @param rpcClients the RPC clients corresponding to the instances of the |
| * ChreApiTest nanoapp. |
| * @param method the fully-qualified method name. |
| * @param requests the list of requests. |
| * |
| * @return the proto response or null if there was an error. |
| */ |
| public static <RequestType extends MessageLite, ResponseType extends MessageLite> |
| List<ResponseType> callConcurrentUnaryRpcMethodSync( |
| @NonNull List<ChreRpcClient> rpcClients, |
| @NonNull String method, |
| @NonNull List<RequestType> requests) throws Exception { |
| Objects.requireNonNull(rpcClients); |
| Objects.requireNonNull(method); |
| Objects.requireNonNull(requests); |
| if (rpcClients.size() != requests.size()) { |
| return null; |
| } |
| |
| List<UnaryFuture<ResponseType>> responseFutures = |
| new ArrayList<UnaryFuture<ResponseType>>(); |
| Iterator<ChreRpcClient> rpcClientsIter = rpcClients.iterator(); |
| Iterator<RequestType> requestsIter = requests.iterator(); |
| while (rpcClientsIter.hasNext() && requestsIter.hasNext()) { |
| ChreRpcClient rpcClient = rpcClientsIter.next(); |
| RequestType request = requestsIter.next(); |
| MethodClient methodClient = rpcClient.getMethodClient(method); |
| responseFutures.add(methodClient.invokeUnaryFuture(request)); |
| } |
| |
| List<ResponseType> responses = new ArrayList<ResponseType>(); |
| boolean success = true; |
| long endTimeInMs = System.currentTimeMillis() + RPC_TIMEOUT_IN_MS; |
| for (UnaryFuture<ResponseType> responseFuture: responseFutures) { |
| try { |
| UnaryResult<ResponseType> responseResult = responseFuture.get( |
| Math.max(0, endTimeInMs - System.currentTimeMillis()), |
| TimeUnit.MILLISECONDS); |
| responses.add(responseResult.response()); |
| } catch (Exception exception) { |
| success = false; |
| } |
| } |
| return success ? responses : null; |
| } |
| |
| /** |
| * Calls an RPC method with RPC_TIMEOUT_IN_SECONDS seconds of timeout for concurrent |
| * instances of the ChreApiTest nanoapp. |
| * |
| * @param <RequestType> the type of the request (proto generated type). |
| * @param <ResponseType> the type of the response (proto generated type). |
| * @param rpcClients the RPC clients corresponding to the instances of the |
| * ChreApiTest nanoapp. |
| * @param method the fully-qualified method name. |
| * @param request the request. |
| * |
| * @return the proto response or null if there was an error. |
| */ |
| public static <RequestType extends MessageLite, ResponseType extends MessageLite> |
| List<ResponseType> callConcurrentUnaryRpcMethodSync( |
| @NonNull List<ChreRpcClient> rpcClients, |
| @NonNull String method, |
| @NonNull RequestType request) throws Exception { |
| Objects.requireNonNull(rpcClients); |
| Objects.requireNonNull(method); |
| Objects.requireNonNull(request); |
| |
| List<RequestType> requests = new ArrayList<RequestType>(); |
| for (int i = 0; i < rpcClients.size(); ++i) { |
| requests.add(request); |
| } |
| return callConcurrentUnaryRpcMethodSync(rpcClients, method, requests); |
| } |
| |
| /** |
| * Calls an RPC method with RPC_TIMEOUT_IN_SECONDS seconds of timeout. |
| * |
| * @param <RequestType> the type of the request (proto generated type). |
| * @param <ResponseType> the type of the response (proto generated type). |
| * @param rpcClient the RPC client. |
| * @param method the fully-qualified method name. |
| * @param request the request. |
| * |
| * @return the proto response or null if there was an error. |
| */ |
| public static <RequestType extends MessageLite, ResponseType extends MessageLite> ResponseType |
| callUnaryRpcMethodSync( |
| @NonNull ChreRpcClient rpcClient, |
| @NonNull String method, |
| @NonNull RequestType request) throws Exception { |
| Objects.requireNonNull(rpcClient); |
| Objects.requireNonNull(method); |
| Objects.requireNonNull(request); |
| |
| List<ResponseType> responses = callConcurrentUnaryRpcMethodSync(Arrays.asList(rpcClient), |
| method, request); |
| return responses == null || responses.isEmpty() ? null : responses.get(0); |
| } |
| |
| /** |
| * Calls an RPC method with RPC_TIMEOUT_IN_SECONDS seconds of timeout with an empty request. |
| * |
| * @param <ResponseType> the type of the response (proto generated type). |
| * @param rpcClient the RPC client. |
| * @param method the fully-qualified method name. |
| * |
| * @return the proto response or null if there was an error. |
| */ |
| public static <ResponseType extends MessageLite> ResponseType |
| callUnaryRpcMethodSync(@NonNull ChreRpcClient rpcClient, @NonNull String method) |
| throws Exception { |
| Objects.requireNonNull(rpcClient); |
| Objects.requireNonNull(method); |
| |
| Empty request = Empty.newBuilder().build(); |
| return callUnaryRpcMethodSync(rpcClient, method, request); |
| } |
| |
| /** |
| * Gathers events that match the eventTypes for each RPC client. This gathers |
| * events until eventCount events are gathered or timeoutInNs nanoseconds has passed. |
| * The host will wait until 2 * timeoutInNs to timeout receiving the response. |
| * The responses will have the same size as the input rpcClients size. |
| * |
| * @param rpcClients the RPC clients. |
| * @param eventTypes the types of event to gather. |
| * @param eventCount the number of events to gather. |
| * |
| * @return the events future. |
| */ |
| public Future<List<List<ChreApiTest.GeneralEventsMessage>>> gatherEventsConcurrent( |
| @NonNull List<ChreRpcClient> rpcClients, List<Integer> eventTypes, int eventCount, |
| long timeoutInNs) throws Exception { |
| Objects.requireNonNull(rpcClients); |
| |
| ChreApiTest.GatherEventsInput input = ChreApiTest.GatherEventsInput.newBuilder() |
| .addAllEventTypes(eventTypes) |
| .setEventTypeCount(eventTypes.size()) |
| .setEventCount(eventCount) |
| .setTimeoutInNs(timeoutInNs) |
| .build(); |
| return callConcurrentServerStreamingRpcMethodAsync(rpcClients, |
| "chre.rpc.ChreApiTestService.GatherEvents", input, |
| TimeUnit.NANOSECONDS.toMillis(2 * timeoutInNs)); |
| } |
| |
| /** |
| * Gathers events that match the eventType for each RPC client. This gathers |
| * events until eventCount events are gathered or timeoutInNs nanoseconds has passed. |
| * The host will wait until 2 * timeoutInNs to timeout receiving the response. |
| * The responses will have the same size as the input rpcClients size. |
| * |
| * @param rpcClients the RPC clients. |
| * @param eventType the type of event to gather. |
| * @param eventCount the number of events to gather. |
| * |
| * @return the events future. |
| */ |
| public Future<List<List<ChreApiTest.GeneralEventsMessage>>> gatherEventsConcurrent( |
| @NonNull List<ChreRpcClient> rpcClients, int eventType, int eventCount, |
| long timeoutInNs) throws Exception { |
| Objects.requireNonNull(rpcClients); |
| |
| return gatherEventsConcurrent(rpcClients, Arrays.asList(eventType), |
| eventCount, timeoutInNs); |
| } |
| |
| /** |
| * Gathers events that match the eventTypes for the RPC client. This gathers |
| * events until eventCount events are gathered or timeoutInNs nanoseconds has passed. |
| * The host will wait until 2 * timeoutInNs to timeout receiving the response. |
| * |
| * @param rpcClient the RPC client. |
| * @param eventTypes the types of event to gather. |
| * @param eventCount the number of events to gather. |
| * |
| * @return the events future. |
| */ |
| public Future<List<ChreApiTest.GeneralEventsMessage>> gatherEvents( |
| @NonNull ChreRpcClient rpcClient, List<Integer> eventTypes, int eventCount, |
| long timeoutInNs) throws Exception { |
| Objects.requireNonNull(rpcClient); |
| |
| Future<List<List<ChreApiTest.GeneralEventsMessage>>> eventsConcurrentFuture = |
| gatherEventsConcurrent(Arrays.asList(rpcClient), eventTypes, eventCount, |
| timeoutInNs); |
| return eventsConcurrentFuture == null ? null : mExecutor.submit(() -> { |
| List<List<ChreApiTest.GeneralEventsMessage>> events = |
| eventsConcurrentFuture.get(2 * timeoutInNs, TimeUnit.NANOSECONDS); |
| return events == null || events.size() == 0 ? null : events.get(0); |
| }); |
| } |
| |
| /** |
| * Gathers events that match the eventType for the RPC client. This gathers |
| * events until eventCount events are gathered or timeoutInNs nanoseconds has passed. |
| * The host will wait until 2 * timeoutInNs to timeout receiving the response. |
| * |
| * @param rpcClient the RPC client. |
| * @param eventType the type of event to gather. |
| * @param eventCount the number of events to gather. |
| * |
| * @return the events future. |
| */ |
| public Future<List<ChreApiTest.GeneralEventsMessage>> gatherEvents( |
| @NonNull ChreRpcClient rpcClient, int eventType, int eventCount, |
| long timeoutInNs) throws Exception { |
| Objects.requireNonNull(rpcClient); |
| |
| return gatherEvents(rpcClient, Arrays.asList(eventType), eventCount, timeoutInNs); |
| } |
| |
| /** |
| * Gets the RPC service for the CHRE API Test nanoapp. |
| */ |
| public static Service getChreApiService() { |
| return new Service("chre.rpc.ChreApiTestService", |
| Service.unaryMethod( |
| "ChreBleGetCapabilities", |
| Empty.class, |
| ChreApiTest.Capabilities.class), |
| Service.unaryMethod( |
| "ChreBleGetFilterCapabilities", |
| Empty.class, |
| ChreApiTest.Capabilities.class), |
| Service.serverStreamingMethod( |
| "ChreBleStartScanSync", |
| ChreApiTest.ChreBleStartScanAsyncInput.class, |
| ChreApiTest.GeneralSyncMessage.class), |
| Service.serverStreamingMethod( |
| "ChreBleStopScanSync", |
| Empty.class, |
| ChreApiTest.GeneralSyncMessage.class), |
| Service.unaryMethod( |
| "ChreSensorFindDefault", |
| ChreApiTest.ChreSensorFindDefaultInput.class, |
| ChreApiTest.ChreSensorFindDefaultOutput.class), |
| Service.unaryMethod( |
| "ChreGetSensorInfo", |
| ChreApiTest.ChreHandleInput.class, |
| ChreApiTest.ChreGetSensorInfoOutput.class), |
| Service.unaryMethod( |
| "ChreGetSensorSamplingStatus", |
| ChreApiTest.ChreHandleInput.class, |
| ChreApiTest.ChreGetSensorSamplingStatusOutput.class), |
| Service.unaryMethod( |
| "ChreSensorConfigure", |
| ChreApiTest.ChreSensorConfigureInput.class, |
| ChreApiTest.Status.class), |
| Service.unaryMethod( |
| "ChreSensorConfigureModeOnly", |
| ChreApiTest.ChreSensorConfigureModeOnlyInput.class, |
| ChreApiTest.Status.class), |
| Service.unaryMethod( |
| "ChreAudioGetSource", |
| ChreApiTest.ChreHandleInput.class, |
| ChreApiTest.ChreAudioGetSourceOutput.class), |
| Service.unaryMethod( |
| "ChreConfigureHostEndpointNotifications", |
| ChreApiTest.ChreConfigureHostEndpointNotificationsInput.class, |
| ChreApiTest.Status.class), |
| Service.unaryMethod( |
| "ChreGetHostEndpointInfo", |
| ChreApiTest.ChreGetHostEndpointInfoInput.class, |
| ChreApiTest.ChreGetHostEndpointInfoOutput.class), |
| Service.serverStreamingMethod( |
| "GatherEvents", |
| ChreApiTest.GatherEventsInput.class, |
| ChreApiTest.GeneralEventsMessage.class)); |
| } |
| |
| /** |
| * Calls a server streaming RPC method with timeoutInMs milliseconds of timeout on |
| * multiple RPC clients. This returns a Future for the result. The responses will have the same |
| * size as the input rpcClients size. |
| * |
| * @param <RequestType> the type of the request (proto generated type). |
| * @param <ResponseType> the type of the response (proto generated type). |
| * @param rpcClients the RPC clients. |
| * @param method the fully-qualified method name. |
| * @param requests the list of requests. |
| * @param timeoutInMs the timeout in milliseconds. |
| * |
| * @return the Future for the response for null if there was an error. |
| */ |
| private <RequestType extends MessageLite, ResponseType extends MessageLite> |
| Future<List<List<ResponseType>>> callConcurrentServerStreamingRpcMethodAsync( |
| @NonNull List<ChreRpcClient> rpcClients, |
| @NonNull String method, |
| @NonNull List<RequestType> requests, |
| long timeoutInMs) throws Exception { |
| Objects.requireNonNull(rpcClients); |
| Objects.requireNonNull(method); |
| Objects.requireNonNull(requests); |
| if (rpcClients.size() != requests.size()) { |
| return null; |
| } |
| |
| List<ServerStreamingFuture> responseFutures = new ArrayList<ServerStreamingFuture>(); |
| synchronized (mNanoappStreamingMessages) { |
| if (mActiveServerStreamingRpc) { |
| return null; |
| } |
| |
| Iterator<ChreRpcClient> rpcClientsIter = rpcClients.iterator(); |
| Iterator<RequestType> requestsIter = requests.iterator(); |
| while (rpcClientsIter.hasNext() && requestsIter.hasNext()) { |
| ChreRpcClient rpcClient = rpcClientsIter.next(); |
| RequestType request = requestsIter.next(); |
| MethodClient methodClient = rpcClient.getMethodClient(method); |
| ServerStreamingFuture responseFuture = methodClient.invokeServerStreamingFuture( |
| request, |
| (ResponseType response) -> { |
| synchronized (mNanoappStreamingMessages) { |
| mNanoappStreamingMessages.putIfAbsent(rpcClient, |
| new ArrayList<MessageLite>()); |
| mNanoappStreamingMessages.get(rpcClient).add(response); |
| } |
| }); |
| responseFutures.add(responseFuture); |
| } |
| mActiveServerStreamingRpc = true; |
| } |
| |
| final List<ChreRpcClient> rpcClientsFinal = rpcClients; |
| Future<List<List<ResponseType>>> responseFuture = mExecutor.submit(() -> { |
| boolean success = true; |
| long endTimeInMs = System.currentTimeMillis() + timeoutInMs; |
| for (ServerStreamingFuture future: responseFutures) { |
| try { |
| future.get(Math.max(0, endTimeInMs - System.currentTimeMillis()), |
| TimeUnit.MILLISECONDS); |
| } catch (Exception exception) { |
| success = false; |
| } |
| } |
| |
| synchronized (mNanoappStreamingMessages) { |
| List<List<ResponseType>> responses = null; |
| if (success) { |
| responses = new ArrayList<List<ResponseType>>(); |
| for (ChreRpcClient rpcClient: rpcClientsFinal) { |
| List<MessageLite> messages = mNanoappStreamingMessages.get(rpcClient); |
| List<ResponseType> responseList = new ArrayList<ResponseType>(); |
| if (messages != null) { |
| // Only needed to cast the type. |
| for (MessageLite message: messages) { |
| responseList.add((ResponseType) message); |
| } |
| } |
| |
| responses.add(responseList); |
| } |
| } |
| |
| mNanoappStreamingMessages.clear(); |
| mActiveServerStreamingRpc = false; |
| return responses; |
| } |
| }); |
| return responseFuture; |
| } |
| |
| /** |
| * Calls a server streaming RPC method with timeoutInMs milliseconds of timeout on |
| * multiple RPC clients. This returns a Future for the result. The responses will have the same |
| * size as the input rpcClients size. |
| * |
| * @param <RequestType> the type of the request (proto generated type). |
| * @param <ResponseType> the type of the response (proto generated type). |
| * @param rpcClients the RPC clients. |
| * @param method the fully-qualified method name. |
| * @param request the request. |
| * @param timeoutInMs the timeout in milliseconds. |
| * |
| * @return the Future for the response for null if there was an error. |
| */ |
| private <RequestType extends MessageLite, ResponseType extends MessageLite> |
| Future<List<List<ResponseType>>> callConcurrentServerStreamingRpcMethodAsync( |
| @NonNull List<ChreRpcClient> rpcClients, |
| @NonNull String method, |
| @NonNull RequestType request, |
| long timeoutInMs) throws Exception { |
| Objects.requireNonNull(rpcClients); |
| Objects.requireNonNull(method); |
| Objects.requireNonNull(request); |
| |
| ArrayList<RequestType> requests = new ArrayList<RequestType>(); |
| for (int i = 0; i < rpcClients.size(); ++i) { |
| requests.add(request); |
| } |
| return callConcurrentServerStreamingRpcMethodAsync(rpcClients, method, |
| requests, timeoutInMs); |
| } |
| } |