A base for scheduled listener to report time-series.

Bug: 139753910
Test: build and run test
Change-Id: I6068b3fdf311c779cdd466a6ac7042da92795710
(cherry picked from commit 493927f1fa79bcf7b19b356fe100e8e0392409be)
diff --git a/libraries/device-collectors/src/main/java/android/device/collectors/ScheduledRunCollectionListener.java b/libraries/device-collectors/src/main/java/android/device/collectors/ScheduledRunCollectionListener.java
new file mode 100644
index 0000000..ce319c3
--- /dev/null
+++ b/libraries/device-collectors/src/main/java/android/device/collectors/ScheduledRunCollectionListener.java
@@ -0,0 +1,224 @@
+/*
+ * Copyright (C) 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package android.device.collectors;
+
+import android.device.collectors.util.SendToInstrumentation;
+import android.os.Bundle;
+import android.os.SystemClock;
+import android.util.Log;
+import androidx.annotation.VisibleForTesting;
+
+import com.android.helpers.ICollectorHelper;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.junit.runner.Description;
+import org.junit.runner.Result;
+
+/**
+ * Extend this class for a periodic metric collection which relies on ICollectorHelper to collect
+ * metrics and dump the time-series in csv format. In case of system crashes, the time series up to
+ * the point where the crash happened will still be stored.
+ *
+ * In case of running tests with Tradefed file pulller, use the option
+ * {@link file-puller-log-collector:directory-keys} from {{@link FilePullerLogCollector} to
+ * specify the directory path under which the output file should be pulled from (i.e.
+ * <external_storage>/test_results, where <external_storage> is /sdcard for Android phones and
+ * /storage/emulated/10 for Android Auto), instead of using
+ * {@link file-puller-log-collector:pull-pattern-keys}.
+ */
+public class ScheduledRunCollectionListener<T extends Number> extends ScheduledRunMetricListener {
+    private static final String LOG_TAG = ScheduledRunCollectionListener.class.getSimpleName();
+    private static final String TIME_SERIES_PREFIX = "time_series_";
+    @VisibleForTesting public static final String OUTPUT_ROOT = "test_results";
+    @VisibleForTesting public static final String OUTPUT_FILE_PATH = "%s_time_series_path";
+
+    @VisibleForTesting
+    public static final String TIME_SERIES_HEADER =
+            String.format("%-20s,%-100s,%-20s", "time", "metric_key", "value");
+
+    private static final String TIME_SERIES_BODY = "%-20d,%-100s,%-20s";
+    @VisibleForTesting public static final String MEAN_SUFFIX = "-mean";
+    @VisibleForTesting public static final String MAX_SUFFIX = "-max";
+    @VisibleForTesting public static final String MIN_SUFFIX = "-min";
+
+    protected ICollectorHelper<T> mHelper;
+    private TimeSeriesCsvWriter mTimeSeriesCsvWriter;
+    private TimeSeriesStatistics mTimeSeriesStatistics;
+    private long mStartTime;
+
+    public ScheduledRunCollectionListener() {}
+
+    @VisibleForTesting
+    ScheduledRunCollectionListener(Bundle argsBundle, ICollectorHelper helper) {
+        super(argsBundle);
+        mHelper = helper;
+    }
+
+    /**
+     * Write a time-series in csv format to the given destination under external storage as an
+     * unpivoted table like:
+     *
+     * time  ,metric_key ,value
+     * 0     ,metric1    ,5
+     * 0     ,metric2    ,10
+     * 0     ,metric3    ,15
+     * 1000  ,metric1    ,6
+     * 1000  ,metric2    ,11
+     * 1000  ,metric3    ,16
+     */
+    private class TimeSeriesCsvWriter {
+        private File mDestFile;
+        private boolean mIsHeaderWritten = false;
+
+        private TimeSeriesCsvWriter(Path destination) {
+            // Create parent directory if it doesn't exist.
+            File destDir = createAndEmptyDirectory(destination.getParent().toString());
+            mDestFile = new File(destDir, destination.getFileName().toString());
+        }
+
+        private void write(Map<String, T> dataPoint, long timeStamp) {
+            try (BufferedWriter writer = new BufferedWriter(new FileWriter(mDestFile, true))) {
+                if (!mIsHeaderWritten) {
+                    writer.append(TIME_SERIES_HEADER);
+                    writer.append("\n");
+                    mIsHeaderWritten = true;
+                }
+
+                for (String key : dataPoint.keySet()) {
+                    writer.append(
+                            String.format(TIME_SERIES_BODY, timeStamp, key, dataPoint.get(key)));
+                    writer.append("\n");
+                }
+            } catch (IOException e) {
+                Log.e(
+                        LOG_TAG,
+                        String.format("Fail to output time series due to : %s.", e.getMessage()));
+            }
+        }
+    }
+
+    private class TimeSeriesStatistics {
+        Map<String, T> minMap = new HashMap<>();
+        Map<String, T> maxMap = new HashMap<>();
+        Map<String, Double> sumMap = new HashMap<>();
+        Map<String, Long> countMap = new HashMap<>();
+
+        private void update(Map<String, T> dataPoint) {
+            for (String key : dataPoint.keySet()) {
+                T value = dataPoint.get(key);
+                // Add / replace min.
+                minMap.computeIfPresent(key, (k, v) -> compareAsDouble(value, v) == -1 ? value : v);
+                minMap.computeIfAbsent(key, k -> value);
+                // Add / replace max.
+                maxMap.computeIfPresent(key, (k, v) -> compareAsDouble(value, v) == 1 ? value : v);
+                maxMap.computeIfAbsent(key, k -> value);
+                // Add / update sum.
+                sumMap.put(key, value.doubleValue() + sumMap.getOrDefault(key, 0.));
+                // Add / update count.
+                countMap.put(key, 1 + countMap.getOrDefault(key, 0L));
+            }
+        }
+
+        private Map<String, String> getStatistics() {
+            Map<String, String> res = new HashMap<>();
+            for (String key : minMap.keySet()) {
+                res.put(key + MIN_SUFFIX, minMap.get(key).toString());
+            }
+            for (String key : maxMap.keySet()) {
+                res.put(key + MAX_SUFFIX, maxMap.get(key).toString());
+            }
+            for (String key : sumMap.keySet()) {
+                if (countMap.containsKey(key)) {
+                    double mean = sumMap.get(key) / countMap.get(key);
+                    res.put(key + MEAN_SUFFIX, Double.toString(mean));
+                }
+            }
+            return res;
+        }
+
+        /** Compare to Number objects. Return -1 if the n1 < n2; 0 if n1 == n2; 1 if n1 > n2. */
+        private int compareAsDouble(Number n1, Number n2) {
+            Double d1 = Double.valueOf(n1.doubleValue());
+            Double d2 = Double.valueOf(n2.doubleValue());
+            return d1.compareTo(d2);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    void onStart(DataRecord runData, Description description) {
+        setupAdditionalArgs();
+        Path path =
+                Paths.get(
+                        OUTPUT_ROOT,
+                        getClass().getSimpleName(),
+                        String.format(
+                                "%s%s-%d.csv",
+                                TIME_SERIES_PREFIX,
+                                getClass().getSimpleName(),
+                                UUID.randomUUID().hashCode()));
+        mTimeSeriesCsvWriter = new TimeSeriesCsvWriter(path);
+        mTimeSeriesStatistics = new TimeSeriesStatistics();
+        mStartTime = SystemClock.uptimeMillis();
+        mHelper.startCollecting();
+        // Send to stdout the path where the time-series files will be stored.
+        Bundle filePathBundle = new Bundle();
+        filePathBundle.putString(
+                String.format(OUTPUT_FILE_PATH, getClass().getSimpleName()),
+                mTimeSeriesCsvWriter.mDestFile.toString());
+        SendToInstrumentation.sendBundle(getInstrumentation(), filePathBundle);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    void onEnd(DataRecord runData, Result result) {
+        mHelper.stopCollecting();
+        for (Map.Entry<String, String> entry : mTimeSeriesStatistics.getStatistics().entrySet()) {
+            runData.addStringMetric(entry.getKey(), entry.getValue());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void collect(DataRecord runData, Description description) throws InterruptedException {
+        long timeStamp = SystemClock.uptimeMillis() - mStartTime;
+        Map<String, T> dataPoint = mHelper.getMetrics();
+        mTimeSeriesCsvWriter.write(dataPoint, timeStamp);
+        mTimeSeriesStatistics.update(dataPoint);
+    }
+
+    /**
+     * To add listener specific extra args implement this method in the sub class and add the
+     * listener specific args.
+     */
+    public void setupAdditionalArgs() {
+        // NO-OP by default
+    }
+
+    protected void createHelperInstance(ICollectorHelper helper) {
+        mHelper = helper;
+    }
+}
+
diff --git a/libraries/device-collectors/src/test/java/android/device/collectors/ScheduledRunCollectionListenerTest.java b/libraries/device-collectors/src/test/java/android/device/collectors/ScheduledRunCollectionListenerTest.java
new file mode 100644
index 0000000..f573dc2
--- /dev/null
+++ b/libraries/device-collectors/src/test/java/android/device/collectors/ScheduledRunCollectionListenerTest.java
@@ -0,0 +1,224 @@
+/*
+ * Copyright (C) 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package android.device.collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import android.app.Instrumentation;
+import android.device.collectors.util.SendToInstrumentation;
+import android.os.Bundle;
+import android.os.Environment;
+import androidx.test.InstrumentationRegistry;
+import androidx.test.runner.AndroidJUnit4;
+
+import com.android.helpers.ICollectorHelper;
+
+import java.io.File;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.Description;
+import org.junit.runner.Result;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Android Unit tests for {@link ScheduledRunCollectionListener}. */
+@RunWith(AndroidJUnit4.class)
+public class ScheduledRunCollectionListenerTest {
+
+    private static final String TEST_METRIC_KEY = "test_metric_key";
+    private static final Integer[] TEST_METRIC_VALUES = {0, 1, 2, 3, 4};
+    private static final long TEST_INTERVAL = 100L;
+    private static final long TEST_DURATION = 450L;
+    // Collects at 0ms, 100ms, 200ms, and so on.
+    private static final long NUMBER_OF_COLLECTIONS = TEST_DURATION / TEST_INTERVAL + 1;
+    private static final String DATA_REGEX =
+            "(?<timestamp>[0-9]+)\\s+," + TEST_METRIC_KEY + "\\s+,(?<value>[0-9])\\s+";
+
+    @Mock private ICollectorHelper mHelper;
+
+    @Mock private Instrumentation mInstrumentation;
+
+    private ScheduledRunCollectionListener mListener;
+
+    private ScheduledRunCollectionListener initListener() {
+        Bundle b = new Bundle();
+        b.putString(ScheduledRunCollectionListener.INTERVAL_ARG_KEY, Long.toString(TEST_INTERVAL));
+        doReturn(true).when(mHelper).startCollecting();
+        Map<String, Integer> first = new HashMap<>();
+        first.put(TEST_METRIC_KEY, TEST_METRIC_VALUES[0]);
+        Map<String, Integer>[] rest =
+                Arrays.stream(TEST_METRIC_VALUES)
+                        .skip(1)
+                        .map(
+                                testMetricValue -> {
+                                    Map<String, Integer> m = new HashMap<>();
+                                    m.put(TEST_METRIC_KEY, testMetricValue);
+                                    return m;
+                                })
+                        .toArray(Map[]::new);
+        // <code>thenReturn</code> call signature requires thenReturn(T value, T... values).
+        when(mHelper.getMetrics()).thenReturn(first, rest);
+        doReturn(true).when(mHelper).stopCollecting();
+        ScheduledRunCollectionListener listener =
+                new ScheduledRunCollectionListener<Integer>(b, mHelper);
+        // Mock getUiAutomation method for the purpose of enabling createAndEmptyDirectory method
+        // from BaseMetricListener.
+        doReturn(InstrumentationRegistry.getInstrumentation().getUiAutomation())
+                .when(mInstrumentation)
+                .getUiAutomation();
+        listener.setInstrumentation(mInstrumentation);
+        return listener;
+    }
+
+    @Before
+    public void setUp() {
+        MockitoAnnotations.initMocks(this);
+        mListener = initListener();
+    }
+
+    @After
+    public void tearDown() {
+        // Remove files/directories that have been created.
+        Path outputFilePath =
+                Paths.get(
+                        Environment.getExternalStorageDirectory().toString(),
+                        ScheduledRunCollectionListener.OUTPUT_ROOT,
+                        ScheduledRunCollectionListener.class.getSimpleName());
+        mListener.executeCommandBlocking("rm -rf " + outputFilePath.toString());
+    }
+
+    @Test
+    public void testCompleteRun() throws Exception {
+        testRun(true);
+    }
+
+    @Test
+    public void testIncompleteRun() throws Exception {
+        testRun(false);
+    }
+
+    @Test
+    public void testInstrumentationResult() throws Exception {
+        Description runDescription = Description.createSuiteDescription("run");
+        mListener.testRunStarted(runDescription);
+
+        Thread.sleep(TEST_DURATION);
+        mListener.testRunFinished(new Result());
+        // AJUR runner is then gonna call instrumentationRunFinished.
+        Bundle result = new Bundle();
+        mListener.instrumentationRunFinished(System.out, result, new Result());
+        int expectedMin = Arrays.stream(TEST_METRIC_VALUES).min(Integer::compare).get();
+        assertEquals(
+                expectedMin,
+                Integer.parseInt(
+                        result.getString(
+                                TEST_METRIC_KEY + ScheduledRunCollectionListener.MIN_SUFFIX)));
+        int expectedMax = Arrays.stream(TEST_METRIC_VALUES).max(Integer::compare).get();
+        assertEquals(
+                expectedMax,
+                Integer.parseInt(
+                        result.getString(
+                                TEST_METRIC_KEY + ScheduledRunCollectionListener.MAX_SUFFIX)));
+        double expectedMean =
+                Arrays.stream(TEST_METRIC_VALUES).mapToDouble(i -> i.doubleValue()).sum()
+                        / NUMBER_OF_COLLECTIONS;
+        assertEquals(
+                expectedMean,
+                Double.parseDouble(
+                        result.getString(
+                                TEST_METRIC_KEY + ScheduledRunCollectionListener.MEAN_SUFFIX)),
+                0.1);
+    }
+
+    private void testRun(boolean isComplete) throws Exception {
+        Description runDescription = Description.createSuiteDescription("run");
+        mListener.testRunStarted(runDescription);
+
+        Thread.sleep(TEST_DURATION);
+        // If incomplete run happens, for example, when a system crash happens half way through the
+        // run, <code>testRunFinished</code> method will be skipped, but the output file should
+        // still be present, and the time-series up to the point when the crash happens should still
+        // be recorded.
+        if (isComplete) {
+            mListener.testRunFinished(new Result());
+        }
+
+        ArgumentCaptor<Bundle> bundle = ArgumentCaptor.forClass(Bundle.class);
+
+        // Verify that the path of the time-series output file has been sent to instrumentation.
+        verify(mInstrumentation, atLeast(1))
+                .sendStatus(eq(SendToInstrumentation.INST_STATUS_IN_PROGRESS), bundle.capture());
+        Bundle pathBundle = bundle.getAllValues().get(0);
+        String pathKey =
+                String.format(
+                        ScheduledRunCollectionListener.OUTPUT_FILE_PATH,
+                        ScheduledRunCollectionListener.class.getSimpleName());
+        String path = pathBundle.getString(pathKey);
+        assertNotNull(path);
+
+        // Check the output file exists.
+        File outputFile = new File(path);
+        assertTrue(outputFile.exists());
+
+        // Check that output file contains some of the periodic run results, sample results are
+        // like:
+        //
+        // time  ,metric_key       ,value
+        // 2     ,test_metric_key  ,0
+        // 102   ,test_metric_key  ,0
+        // 203   ,test_metric_key  ,0
+        // ...
+        List<String> lines = Files.readAllLines(outputFile.toPath(), Charset.defaultCharset());
+        assertEquals(NUMBER_OF_COLLECTIONS, lines.size() - 1);
+        assertEquals(lines.get(0), ScheduledRunCollectionListener.TIME_SERIES_HEADER);
+        for (int i = 1; i != lines.size(); ++i) {
+            Pattern p = Pattern.compile(DATA_REGEX);
+            Matcher m = p.matcher(lines.get(i));
+            assertTrue(m.matches());
+            long timestamp = Long.parseLong(m.group("timestamp"));
+            long delta = TEST_INTERVAL / 2;
+            assertEquals((i - 1) * TEST_INTERVAL, timestamp, delta);
+            Integer value = Integer.valueOf(m.group("value"));
+            assertEquals(TEST_METRIC_VALUES[i - 1], value);
+        }
+
+        // For incomplete run, invoke testRunFinished in the end to prevent resource leak.
+        if (!isComplete) {
+            mListener.testRunFinished(new Result());
+        }
+    }
+}