blob: a7fa252d0458013b924871883fa8b3f9a9510e2b [file] [log] [blame]
/*
* Copyright (C) 2019 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package android.cts.statsd.subscriber;
import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.Truth.assertWithMessage;
import com.android.compatibility.common.util.CpuFeatures;
import com.android.internal.os.StatsdConfigProto;
import com.android.os.AtomsProto.Atom;
import com.android.os.AtomsProto.SystemUptime;
import com.android.os.ShellConfig;
import com.android.os.statsd.ShellDataProto;
import com.android.tradefed.device.CollectingByteOutputReceiver;
import com.android.tradefed.device.DeviceNotAvailableException;
import com.android.tradefed.device.ITestDevice;
import com.android.tradefed.log.LogUtil;
import com.android.tradefed.testtype.DeviceTestCase;
import com.google.common.io.Files;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.File;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import android.cts.statsd.atom.AtomTestCase;
/**
* Statsd shell data subscription test.
*/
public class ShellSubscriberTest extends AtomTestCase {
private int sizetBytes;
// ArrayList to keep track of all active spawned data-subscribe processes
ArrayList<Process> processList = new ArrayList<Process>();
@Override
protected void setUp() throws Exception {
super.setUp();
sizetBytes = getSizetBytes();
}
private void killProcess(Process process) throws Exception {
// Sending SIGINT to end subscription and then waiting to make sure
// process terminated. Note: some resources may need more time to
// be deconstructed; e.g. private thread
Runtime runtime = Runtime.getRuntime();
runtime.exec("kill -SIGINT " + String.valueOf(process.pid()));
process.waitFor();
}
@Override
protected void tearDown() throws Exception {
// Kill all remaining processes if any
for (Process process : processList) {
killProcess(process);
}
processList.clear();
super.tearDown();
}
public void testShellSubscription() {
if (sizetBytes < 0) {
return;
}
CollectingByteOutputReceiver receiver = startSubscription();
checkOutput(receiver);
}
// This is testShellSubscription but 5x
public void testShellSubscriptionReconnect() {
int numOfSubs = 5;
if (sizetBytes < 0) {
return;
}
for (int i = 0; i < numOfSubs; i++) {
CollectingByteOutputReceiver receiver = startSubscription();
checkOutput(receiver);
}
}
// Tests that multiple clients can run at once and ignores subscription requests
// after the subscription limit is hit (20 active subscriptions).
public void testShellMaxSubscriptions() {
// Maximum number of active subscriptions, set in ShellSubscriber.h
int maxSubs = 20;
if (sizetBytes < 0) {
return;
}
CollectingByteOutputReceiver[] receivers = new CollectingByteOutputReceiver[maxSubs + 1];
Process[] processes = new Process[maxSubs + 1];
ShellConfig.ShellSubscription config = createConfig();
byte[] validConfig = makeValidConfig(config);
// timeout of -1 means the subscription won't timeout
int timeout = -1;
try {
for (int i = 0; i < maxSubs; i++) {
processes[i] = runDataSubscribe(validConfig, timeout);
}
// Sleep 2.5 seconds to make sure all subscription clients are initialized before
// first pushed event
Thread.sleep(2500);
// arbitrary label = 1
doAppBreadcrumbReported(1);
// Sleep 2.5 seconds to make sure the processes read the breadcrumb before being killed.
Thread.sleep(2500);
for (int i = 1; i < maxSubs; i++) {
killProcess(processes[i]);
processList.remove(processes[i]);
receivers[i] = readData(processes[i]);
checkOutput(receivers[i]);
}
// Sleep 2.5 seconds to make sure the processes are killed and resources are released.
Thread.sleep(2500);
for (int i = 1; i < maxSubs; i++) {
processes[i] = runDataSubscribe(validConfig, timeout);
}
// Sleep 2.5 seconds to make sure all subscription clients are initialized before
// pushed event
Thread.sleep(2500);
// arbitrary label = 1
doAppBreadcrumbReported(1);
// Sleep 2.5 seconds to make sure the processes read the breadcrumb before being killed.
Thread.sleep(2500);
// ShellSubscriber only allows 20 subscriptions at a time. This is the 21st which will
// be ignored
processes[maxSubs] = runDataSubscribe(validConfig, timeout);
// Sleep 2.5 seconds to make sure the invalid subscription attempts to run.
Thread.sleep(2500);
for (int i = 0; i <= maxSubs; i++) {
killProcess(processes[i]);
processList.remove(processes[i]);
receivers[i] = readData(processes[i]);
}
// Sleep 2.5 seconds to make sure the processes are killed and resources are released.
Thread.sleep(2500);
} catch (Exception e) {
fail(e.getMessage());
}
for (int i = 0; i < maxSubs; i++) {
checkOutput(receivers[i]);
}
byte[] output = receivers[maxSubs].getOutput();
assertThat(output.length).isEqualTo(0);
}
private int getSizetBytes() {
try {
ITestDevice device = getDevice();
if (CpuFeatures.isArm64(device)) {
return 8;
}
if (CpuFeatures.isArm32(device)) {
return 4;
}
return -1;
} catch (DeviceNotAvailableException e) {
return -1;
}
}
// Choose a pulled atom that is likely to be supported on all devices (SYSTEM_UPTIME). Testing
// pushed atoms is trickier because executeShellCommand() is blocking, so we cannot push a
// breadcrumb event while the shell subscription is running.
private ShellConfig.ShellSubscription createConfig() {
return ShellConfig.ShellSubscription.newBuilder()
.addPushed((StatsdConfigProto.SimpleAtomMatcher.newBuilder()
.setAtomId(Atom.APP_BREADCRUMB_REPORTED_FIELD_NUMBER))
.build()).build();
}
private byte[] makeValidConfig(ShellConfig.ShellSubscription config) {
int length = config.toByteArray().length;
byte[] validConfig = new byte[sizetBytes + length];
System.arraycopy(IntToByteArrayLittleEndian(length), 0, validConfig, 0, sizetBytes);
System.arraycopy(config.toByteArray(), 0, validConfig, sizetBytes, length);
return validConfig;
}
private Process runDataSubscribe(byte[] validConfig, int timeout) throws Exception {
Runtime runtime = Runtime.getRuntime();
Process process = runtime.exec("adb shell cmd stats data-subscribe " + timeout);
LogUtil.CLog.d("Starting new shell subscription.");
processList.add(process);
OutputStream stdin = process.getOutputStream();
stdin.write(validConfig);
stdin.close();
return process;
}
private CollectingByteOutputReceiver readData(Process process) throws Exception {
// Reading shell_data and passing it to the receiver struct
InputStream stdout = process.getInputStream();
byte[] output = stdout.readAllBytes();
LogUtil.CLog.d("output.length in readData: " + output.length);
CollectingByteOutputReceiver receiver = new CollectingByteOutputReceiver();
receiver.addOutput(output, 0, output.length);
stdout.close();
return receiver;
}
private CollectingByteOutputReceiver startSubscription() {
ShellConfig.ShellSubscription config = createConfig();
LogUtil.CLog.d("Uploading the following config:\n" + config.toString());
byte[] validConfig = makeValidConfig(config);
int timeout = 2;
try {
Process process = runDataSubscribe(validConfig, timeout);
// Sleep a second to make sure subscription is initiated
Thread.sleep(1000);
// arbitrary label = 1
doAppBreadcrumbReported(1);
// Wait for process to timeout. If the process does not timeout, kill the process
if (!process.waitFor(2, TimeUnit.SECONDS)) {
killProcess(process);
}
processList.remove(process);
return readData(process);
} catch (Exception e) {
fail(e.getMessage());
}
return new CollectingByteOutputReceiver();
}
private byte[] IntToByteArrayLittleEndian(int length) {
ByteBuffer b = ByteBuffer.allocate(sizetBytes);
b.order(ByteOrder.LITTLE_ENDIAN);
b.putInt(length);
return b.array();
}
// We do not know how much data will be returned, but we can check the data format.
private void checkOutput(CollectingByteOutputReceiver receiver) {
int atomCount = 0;
int startIndex = 0;
byte[] output = receiver.getOutput();
LogUtil.CLog.d("output length in checkOutput: " + output.length);
assertThat(output.length).isGreaterThan(0);
while (output.length > startIndex) {
assertThat(output.length).isAtLeast(startIndex + sizetBytes);
int dataLength = readSizetFromByteArray(output, startIndex);
if (dataLength == 0) {
// We have received a heartbeat from statsd. This heartbeat isn't accompanied by any
// atoms so return to top of while loop.
startIndex += sizetBytes;
continue;
}
assertThat(output.length).isAtLeast(startIndex + sizetBytes + dataLength);
ShellDataProto.ShellData data = null;
try {
int dataStart = startIndex + sizetBytes;
int dataEnd = dataStart + dataLength;
data = ShellDataProto.ShellData.parseFrom(
Arrays.copyOfRange(output, dataStart, dataEnd));
} catch (InvalidProtocolBufferException e) {
fail("Failed to parse proto");
}
assertThat(data.getAtomCount()).isEqualTo(1);
assertThat(data.getAtom(0).hasAppBreadcrumbReported()).isTrue();
assertThat(data.getAtom(0).getAppBreadcrumbReported().getLabel()).isEqualTo(1);
assertThat(data.getAtom(0).getAppBreadcrumbReported().getState().getNumber())
.isEqualTo(1);
atomCount++;
startIndex += sizetBytes + dataLength;
}
assertThat(atomCount).isGreaterThan(0);
}
// Converts the bytes in range [startIndex, startIndex + sizetBytes) from a little-endian array
// into an integer. Even though sizetBytes could be greater than 4, we assume that the result
// will fit within an int.
private int readSizetFromByteArray(byte[] arr, int startIndex) {
int value = 0;
for (int j = 0; j < sizetBytes; j++) {
value += ((int) arr[j + startIndex] & 0xffL) << (8 * j);
}
return value;
}
}