[Cherry-pick] Add statsd_tests for callback subscriptions

Also:
- rename timestamp_nanos to elapsed_timestamp_nanos in ShellData
- initialize mCacheSize to 0 in the constructor initializer list

Bug: 274609221
Test: m
Test: statsd_test
Test: libstatspull_test
Merged-In: I500ff9c22e10edbff76d97b00a7fee894f2eb736
Change-Id: I500ff9c22e10edbff76d97b00a7fee894f2eb736
(cherry picked from commit f874417330297cfc26d05314728684c846107a41)
diff --git a/lib/libstatspull/tests/stats_subscription_test.cpp b/lib/libstatspull/tests/stats_subscription_test.cpp
index c2127ce..eb99de5 100644
--- a/lib/libstatspull/tests/stats_subscription_test.cpp
+++ b/lib/libstatspull/tests/stats_subscription_test.cpp
@@ -180,8 +180,8 @@
         ASSERT_TRUE(actualShellData.ParseFromArray(callbackData.payload.data(),
                                                    callbackData.payload.size()));
 
-        ASSERT_EQ(actualShellData.timestamp_nanos_size(), 3);
-        EXPECT_THAT(actualShellData.timestamp_nanos(), Each(Gt(0LL)));
+        ASSERT_EQ(actualShellData.elapsed_timestamp_nanos_size(), 3);
+        EXPECT_THAT(actualShellData.elapsed_timestamp_nanos(), Each(Gt(0LL)));
 
         ASSERT_EQ(actualShellData.atom_size(), 3);
 
@@ -238,8 +238,8 @@
         ASSERT_TRUE(actualShellData.ParseFromArray(callbackData.payload.data(),
                                                    callbackData.payload.size()));
 
-        ASSERT_EQ(actualShellData.timestamp_nanos_size(), 1);
-        EXPECT_THAT(actualShellData.timestamp_nanos(), Each(Gt(0LL)));
+        ASSERT_EQ(actualShellData.elapsed_timestamp_nanos_size(), 1);
+        EXPECT_THAT(actualShellData.elapsed_timestamp_nanos(), Each(Gt(0LL)));
 
         ASSERT_EQ(actualShellData.atom_size(), 1);
 
@@ -265,8 +265,8 @@
         ASSERT_TRUE(actualShellData.ParseFromArray(callbackData.payload.data(),
                                                    callbackData.payload.size()));
 
-        ASSERT_EQ(actualShellData.timestamp_nanos_size(), 1);
-        EXPECT_THAT(actualShellData.timestamp_nanos(), Each(Gt(0LL)));
+        ASSERT_EQ(actualShellData.elapsed_timestamp_nanos_size(), 1);
+        EXPECT_THAT(actualShellData.elapsed_timestamp_nanos(), Each(Gt(0LL)));
 
         ASSERT_EQ(actualShellData.atom_size(), 1);
 
@@ -293,8 +293,8 @@
         ASSERT_TRUE(actualShellData.ParseFromArray(callbackData.payload.data(),
                                                    callbackData.payload.size()));
 
-        ASSERT_EQ(actualShellData.timestamp_nanos_size(), 1);
-        EXPECT_THAT(actualShellData.timestamp_nanos(), Each(Gt(0LL)));
+        ASSERT_EQ(actualShellData.elapsed_timestamp_nanos_size(), 1);
+        EXPECT_THAT(actualShellData.elapsed_timestamp_nanos(), Each(Gt(0LL)));
 
         ASSERT_EQ(actualShellData.atom_size(), 1);
 
diff --git a/statsd/src/shell/ShellSubscriberClient.cpp b/statsd/src/shell/ShellSubscriberClient.cpp
index 7be93d3..ab047b9 100644
--- a/statsd/src/shell/ShellSubscriberClient.cpp
+++ b/statsd/src/shell/ShellSubscriberClient.cpp
@@ -30,7 +30,7 @@
 namespace statsd {
 
 const static int FIELD_ID_SHELL_DATA__ATOM = 1;
-const static int FIELD_ID_SHELL_DATA__TIMESTAMP_NANOS = 2;
+const static int FIELD_ID_SHELL_DATA__ELAPSED_TIMESTAMP_NANOS = 2;
 
 struct ReadConfigResult {
     vector<SimpleAtomMatcher> pushedMatchers;
@@ -38,7 +38,8 @@
 };
 
 // Read and parse single config. There should only one config in the input.
-static optional<ReadConfigResult> readConfig(const vector<uint8_t>& configBytes) {
+static optional<ReadConfigResult> readConfig(const vector<uint8_t>& configBytes,
+                                             int64_t startTimeMs) {
     // Parse the config.
     ShellSubscription config;
     if (!config.ParseFromArray(configBytes.data(), configBytes.size())) {
@@ -63,7 +64,8 @@
             }
         }
 
-        result.pullInfo.emplace_back(pulled.matcher(), pulled.freq_millis(), packages, uids);
+        result.pullInfo.emplace_back(pulled.matcher(), startTimeMs, pulled.freq_millis(), packages,
+                                     uids);
         ALOGD("ShellSubscriberClient: adding matcher for pulled atom %d",
               pulled.matcher().atom_id());
     }
@@ -71,6 +73,17 @@
     return result;
 }
 
+ShellSubscriberClient::PullInfo::PullInfo(const SimpleAtomMatcher& matcher, int64_t startTimeMs,
+                                          int64_t intervalMs,
+                                          const std::vector<std::string>& packages,
+                                          const std::vector<int32_t>& uids)
+    : mPullerMatcher(matcher),
+      mIntervalMs(intervalMs),
+      mPrevPullElapsedRealtimeMs(startTimeMs),
+      mPullPackages(packages),
+      mPullUids(uids) {
+}
+
 ShellSubscriberClient::ShellSubscriberClient(
         int out, const std::shared_ptr<IStatsSubscriptionCallback>& callback,
         const std::vector<SimpleAtomMatcher>& pushedMatchers,
@@ -84,7 +97,8 @@
       mCallback(callback),
       mTimeoutSec(timeoutSec),
       mStartTimeSec(startTimeSec),
-      mLastWriteMs(getElapsedRealtimeMillis()){};
+      mLastWriteMs(startTimeSec * 1000),
+      mCacheSize(0){};
 
 unique_ptr<ShellSubscriberClient> ShellSubscriberClient::create(
         int in, int out, int64_t timeoutSec, int64_t startTimeSec, const sp<UidMap>& uidMap,
@@ -110,7 +124,7 @@
         return nullptr;
     }
 
-    const optional<ReadConfigResult> readConfigResult = readConfig(buffer);
+    const optional<ReadConfigResult> readConfigResult = readConfig(buffer, startTimeSec * 1000);
     if (!readConfigResult.has_value()) {
         return nullptr;
     }
@@ -136,7 +150,8 @@
         return nullptr;
     }
 
-    const optional<ReadConfigResult> readConfigResult = readConfig(subscriptionConfig);
+    const optional<ReadConfigResult> readConfigResult =
+            readConfig(subscriptionConfig, startTimeSec * 1000);
     if (!readConfigResult.has_value()) {
         return nullptr;
     }
@@ -161,7 +176,7 @@
 
     const int64_t timestampNs = event.GetElapsedTimestampNs();
     mProtoOut.write(util::FIELD_TYPE_INT64 | util::FIELD_COUNT_REPEATED |
-                            FIELD_ID_SHELL_DATA__TIMESTAMP_NANOS,
+                            FIELD_ID_SHELL_DATA__ELAPSED_TIMESTAMP_NANOS,
                     static_cast<long long>(timestampNs));
 
     // Update byte size of cached data.
@@ -192,7 +207,7 @@
 int64_t ShellSubscriberClient::pullIfNeeded(int64_t nowSecs, int64_t nowMillis, int64_t nowNanos) {
     int64_t sleepTimeMs = INT64_MAX;
     for (PullInfo& pullInfo : mPulledInfo) {
-        if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval < nowMillis) {
+        if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mIntervalMs < nowMillis) {
             vector<int32_t> uids;
             getUidsForPullAtom(&uids, pullInfo);
 
@@ -206,7 +221,7 @@
         }
 
         // Determine how long to sleep before doing more work.
-        int64_t nextPullTime = pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval;
+        int64_t nextPullTime = pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mIntervalMs;
         int64_t timeBeforePull = nextPullTime - nowMillis;  // guaranteed to be non-negative
         sleepTimeMs = min(sleepTimeMs, timeBeforePull);
     }
diff --git a/statsd/src/shell/ShellSubscriberClient.h b/statsd/src/shell/ShellSubscriberClient.h
index 93a3727..09480db 100644
--- a/statsd/src/shell/ShellSubscriberClient.h
+++ b/statsd/src/shell/ShellSubscriberClient.h
@@ -42,16 +42,11 @@
 class ShellSubscriberClient {
 public:
     struct PullInfo {
-        PullInfo(const SimpleAtomMatcher& matcher, int64_t interval,
-                 const std::vector<std::string>& packages, const std::vector<int32_t>& uids)
-            : mPullerMatcher(matcher),
-              mInterval(interval),
-              mPrevPullElapsedRealtimeMs(0),
-              mPullPackages(packages),
-              mPullUids(uids) {
-        }
+        PullInfo(const SimpleAtomMatcher& matcher, int64_t startTimeMs, int64_t interval,
+                 const std::vector<std::string>& packages, const std::vector<int32_t>& uids);
+
         const SimpleAtomMatcher mPullerMatcher;
-        const int64_t mInterval;
+        const int64_t mIntervalMs;
         int64_t mPrevPullElapsedRealtimeMs;
         const std::vector<std::string> mPullPackages;
         const std::vector<int32_t> mPullUids;
diff --git a/statsd/src/shell/shell_data.proto b/statsd/src/shell/shell_data.proto
index 3f2815f..fdfbcc3 100644
--- a/statsd/src/shell/shell_data.proto
+++ b/statsd/src/shell/shell_data.proto
@@ -26,5 +26,5 @@
 // The output of shell subscription, including both pulled and pushed subscriptions.
 message ShellData {
     repeated Atom atom = 1;
-    repeated int64 timestamp_nanos = 2 [packed = true];
+    repeated int64 elapsed_timestamp_nanos = 2 [packed = true];
 }
diff --git a/statsd/tests/shell/ShellSubscriber_test.cpp b/statsd/tests/shell/ShellSubscriber_test.cpp
index 612164e..26eec66 100644
--- a/statsd/tests/shell/ShellSubscriber_test.cpp
+++ b/statsd/tests/shell/ShellSubscriber_test.cpp
@@ -14,10 +14,12 @@
 
 #include "src/shell/ShellSubscriber.h"
 
+#include <aidl/android/os/StatsSubscriptionCallbackReason.h>
 #include <gtest/gtest.h>
 #include <stdio.h>
 #include <unistd.h>
 
+#include <optional>
 #include <vector>
 
 #include "frameworks/proto_logging/stats/atoms.pb.h"
@@ -25,14 +27,29 @@
 #include "src/shell/shell_config.pb.h"
 #include "src/shell/shell_data.pb.h"
 #include "stats_event.h"
+#include "statslog_statsdtest.h"
 #include "tests/metrics/metrics_test_helper.h"
 #include "tests/statsd_test_util.h"
 
+using ::aidl::android::os::StatsSubscriptionCallbackReason;
 using android::sp;
+using android::os::statsd::TestAtomReported;
+using android::os::statsd::TrainExperimentIds;
+using android::os::statsd::util::BytesField;
+using android::os::statsd::util::CPU_ACTIVE_TIME;
+using android::os::statsd::util::PLUGGED_STATE_CHANGED;
+using android::os::statsd::util::SCREEN_STATE_CHANGED;
+using android::os::statsd::util::TEST_ATOM_REPORTED;
 using std::vector;
 using testing::_;
+using testing::A;
+using testing::ByMove;
+using testing::DoAll;
 using testing::Invoke;
 using testing::NaggyMock;
+using testing::Return;
+using testing::SaveArg;
+using testing::SetArgPointee;
 using testing::StrictMock;
 
 namespace android {
@@ -64,12 +81,12 @@
     auto* atom1 = shellData.add_atom()->mutable_cpu_active_time();
     atom1->set_uid(kUid1);
     atom1->set_time_millis(kCpuTime1);
-    shellData.add_timestamp_nanos(kCpuActiveTimeEventTimestampNs);
+    shellData.add_elapsed_timestamp_nanos(kCpuActiveTimeEventTimestampNs);
 
     auto* atom2 = shellData.add_atom()->mutable_cpu_active_time();
     atom2->set_uid(kUid2);
     atom2->set_time_millis(kCpuTime2);
-    shellData.add_timestamp_nanos(kCpuActiveTimeEventTimestampNs);
+    shellData.add_elapsed_timestamp_nanos(kCpuActiveTimeEventTimestampNs);
 
     return shellData;
 }
@@ -78,7 +95,7 @@
 ShellSubscription getPulledConfig() {
     ShellSubscription config;
     auto* pull_config = config.add_pulled();
-    pull_config->mutable_matcher()->set_atom_id(10016);
+    pull_config->mutable_matcher()->set_atom_id(CPU_ACTIVE_TIME);
     pull_config->set_freq_millis(2000);
     return config;
 }
@@ -86,7 +103,7 @@
 // Utility to adjust CPU time for pulled events
 shared_ptr<LogEvent> makeCpuActiveTimeAtom(int32_t uid, int64_t timeMillis) {
     AStatsEvent* statsEvent = AStatsEvent_obtain();
-    AStatsEvent_setAtomId(statsEvent, 10016);
+    AStatsEvent_setAtomId(statsEvent, CPU_ACTIVE_TIME);
     AStatsEvent_overwriteTimestamp(statsEvent, kCpuActiveTimeEventTimestampNs);
     AStatsEvent_writeInt32(statsEvent, uid);
     AStatsEvent_writeInt64(statsEvent, timeMillis);
@@ -176,8 +193,318 @@
     // Not closing fds_datas[i][0] because this causes writes within ShellSubscriberClient to hang
 }
 
+unique_ptr<LogEvent> createTestAtomReportedEvent(const uint64_t timestampNs,
+                                                 const int32_t intFieldValue,
+                                                 const vector<int64_t>& expIds) {
+    TrainExperimentIds trainExpIds;
+    *trainExpIds.mutable_experiment_id() = {expIds.begin(), expIds.end()};
+    const vector<uint8_t> trainExpIdsBytes = protoToBytes(trainExpIds);
+    return CreateTestAtomReportedEvent(
+            timestampNs, /* attributionUids */ {1001},
+            /* attributionTags */ {"app1"}, intFieldValue, /*longField */ 0LL,
+            /* floatField */ 0.0f,
+            /* stringField */ "abc", /* boolField */ false, TestAtomReported::OFF, trainExpIdsBytes,
+            /* repeatedIntField */ {}, /* repeatedLongField */ {}, /* repeatedFloatField */ {},
+            /* repeatedStringField */ {}, /* repeatedBoolField */ {},
+            /* repeatedBoolFieldLength */ 0, /* repeatedEnumField */ {});
+}
+
+TestAtomReported createTestAtomReportedProto(const int32_t intFieldValue,
+                                             const vector<int64_t>& expIds) {
+    TestAtomReported t;
+    auto* attributionNode = t.add_attribution_node();
+    attributionNode->set_uid(1001);
+    attributionNode->set_tag("app1");
+    t.set_int_field(intFieldValue);
+    t.set_long_field(0);
+    t.set_float_field(0.0f);
+    t.set_string_field("abc");
+    t.set_boolean_field(false);
+    t.set_state(TestAtomReported_State_OFF);
+    *t.mutable_bytes_field()->mutable_experiment_id() = {expIds.begin(), expIds.end()};
+    return t;
+}
+
+class ShellSubscriberCallbackTest : public ::testing::Test {
+protected:
+    ShellSubscriberCallbackTest()
+        : uidMap(new NaggyMock<MockUidMap>()),
+          pullerManager(new StrictMock<MockStatsPullerManager>()),
+          shellSubscriber(uidMap, pullerManager),
+          callback(SharedRefBase::make<StrictMock<MockStatsSubscriptionCallback>>()),
+          reason(nullopt) {
+    }
+
+    void SetUp() override {
+        // Save callback arguments when it is invoked.
+        ON_CALL(*callback, onSubscriptionData(_, _))
+                .WillByDefault(DoAll(SaveArg<0>(&reason), SaveArg<1>(&payload),
+                                     Return(ByMove(Status::ok()))));
+
+        ShellSubscription config;
+        config.add_pushed()->set_atom_id(TEST_ATOM_REPORTED);
+        config.add_pushed()->set_atom_id(SCREEN_STATE_CHANGED);
+        configBytes = protoToBytes(config);
+    }
+
+    sp<MockUidMap> uidMap;
+    sp<MockStatsPullerManager> pullerManager;
+    ShellSubscriber shellSubscriber;
+    std::shared_ptr<MockStatsSubscriptionCallback> callback;
+    vector<uint8_t> configBytes;
+
+    // Capture callback arguments.
+    std::optional<StatsSubscriptionCallbackReason> reason;
+    vector<uint8_t> payload;
+};
+
+class ShellSubscriberCallbackPulledTest : public ShellSubscriberCallbackTest {
+protected:
+    void SetUp() override {
+        ShellSubscriberCallbackTest::SetUp();
+
+        const vector<int32_t> uids{AID_SYSTEM};
+        const vector<std::shared_ptr<LogEvent>> pulledData{
+                makeCpuActiveTimeAtom(/*uid=*/kUid1, /*timeMillis=*/kCpuTime1),
+                makeCpuActiveTimeAtom(/*uid=*/kUid2, /*timeMillis=*/kCpuTime2)};
+        ON_CALL(*pullerManager, Pull(CPU_ACTIVE_TIME, uids, _, _))
+                .WillByDefault(DoAll(SetArgPointee<3>(pulledData), Return(true)));
+
+        configBytes = protoToBytes(getPulledConfig());
+
+        // Used to call pullAndSendHeartbeatsIfNeeded directly without depending on sleep.
+        shellSubscriberClient = std::move(ShellSubscriberClient::create(
+                configBytes, callback, /* startTimeSec= */ 0, uidMap, pullerManager));
+    }
+
+    unique_ptr<ShellSubscriberClient> shellSubscriberClient;
+};
+
 }  // namespace
 
+TEST_F(ShellSubscriberCallbackTest, testAddSubscription) {
+    EXPECT_TRUE(shellSubscriber.startNewSubscription(configBytes, callback));
+}
+
+TEST_F(ShellSubscriberCallbackTest, testAddSubscriptionExceedMax) {
+    const size_t maxSubs = ShellSubscriber::getMaxSubscriptions();
+    vector<bool> results(maxSubs, false);
+    for (int i = 0; i < maxSubs; i++) {
+        results[i] = shellSubscriber.startNewSubscription(configBytes, callback);
+    }
+
+    // First maxSubs subscriptions should succeed.
+    EXPECT_THAT(results, Each(IsTrue()));
+
+    // Subsequent startNewSubscription should fail.
+    EXPECT_FALSE(shellSubscriber.startNewSubscription(configBytes, callback));
+}
+
+TEST_F(ShellSubscriberCallbackTest, testPushedEventsAreCached) {
+    // Expect callback to not be invoked
+    EXPECT_CALL(*callback, onSubscriptionData(_, _)).Times(Exactly(0));
+
+    shellSubscriber.startNewSubscription(configBytes, callback);
+
+    // Log an event that does NOT invoke the callack.
+    shellSubscriber.onLogEvent(*CreateScreenStateChangedEvent(
+            1000 /*timestamp*/, ::android::view::DisplayStateEnum::DISPLAY_STATE_ON));
+}
+
+TEST_F(ShellSubscriberCallbackTest, testOverflowCacheIsFlushed) {
+    // Expect callback to be invoked once.
+    EXPECT_CALL(*callback, onSubscriptionData(_, _)).Times(Exactly(1));
+
+    shellSubscriber.startNewSubscription(configBytes, callback);
+
+    shellSubscriber.onLogEvent(*CreateScreenStateChangedEvent(
+            1000 /*timestamp*/, ::android::view::DisplayStateEnum::DISPLAY_STATE_ON));
+
+    // Inflate size of TestAtomReported through the MODE_BYTES field.
+    const vector<int64_t> expIds = vector<int64_t>(200, INT64_MAX);
+
+    // This event should trigger cache overflow flush.
+    shellSubscriber.onLogEvent(*createTestAtomReportedEvent(/*timestampNs=*/1100,
+                                                            /*intFieldValue=*/1, expIds));
+
+    EXPECT_THAT(reason, Eq(StatsSubscriptionCallbackReason::STATSD_INITIATED));
+
+    // Get ShellData proto from the bytes payload of the callback.
+    ShellData actualShellData;
+    ASSERT_TRUE(actualShellData.ParseFromArray(payload.data(), payload.size()));
+
+    ShellData expectedShellData;
+    expectedShellData.add_atom()->mutable_screen_state_changed()->set_state(
+            ::android::view::DisplayStateEnum::DISPLAY_STATE_ON);
+    *expectedShellData.add_atom()->mutable_test_atom_reported() =
+            createTestAtomReportedProto(/* intFieldValue=*/1, expIds);
+    expectedShellData.add_elapsed_timestamp_nanos(1000);
+    expectedShellData.add_elapsed_timestamp_nanos(1100);
+
+    EXPECT_THAT(actualShellData, EqShellData(expectedShellData));
+}
+
+TEST_F(ShellSubscriberCallbackTest, testFlushTrigger) {
+    // Expect callback to be invoked once.
+    EXPECT_CALL(*callback, onSubscriptionData(_, _)).Times(Exactly(1));
+
+    shellSubscriber.startNewSubscription(configBytes, callback);
+
+    shellSubscriber.onLogEvent(*CreateScreenStateChangedEvent(
+            1000 /*timestamp*/, ::android::view::DisplayStateEnum::DISPLAY_STATE_ON));
+
+    shellSubscriber.flushSubscription(callback);
+
+    EXPECT_THAT(reason, Eq(StatsSubscriptionCallbackReason::FLUSH_REQUESTED));
+
+    // Get ShellData proto from the bytes payload of the callback.
+    ShellData actualShellData;
+    ASSERT_TRUE(actualShellData.ParseFromArray(payload.data(), payload.size()));
+
+    ShellData expectedShellData;
+    expectedShellData.add_atom()->mutable_screen_state_changed()->set_state(
+            ::android::view::DisplayStateEnum::DISPLAY_STATE_ON);
+    expectedShellData.add_elapsed_timestamp_nanos(1000);
+
+    EXPECT_THAT(actualShellData, EqShellData(expectedShellData));
+}
+
+TEST_F(ShellSubscriberCallbackTest, testFlushTriggerEmptyCache) {
+    // Expect callback to be invoked once.
+    EXPECT_CALL(*callback, onSubscriptionData(_, _)).Times(Exactly(1));
+
+    shellSubscriber.startNewSubscription(configBytes, callback);
+
+    shellSubscriber.flushSubscription(callback);
+
+    EXPECT_THAT(reason, Eq(StatsSubscriptionCallbackReason::FLUSH_REQUESTED));
+
+    // Get ShellData proto from the bytes payload of the callback.
+    ShellData actualShellData;
+    ASSERT_TRUE(actualShellData.ParseFromArray(payload.data(), payload.size()));
+
+    ShellData expectedShellData;
+
+    EXPECT_THAT(actualShellData, EqShellData(expectedShellData));
+}
+
+TEST_F(ShellSubscriberCallbackTest, testUnsubscribe) {
+    // Expect callback to be invoked once.
+    EXPECT_CALL(*callback, onSubscriptionData(_, _)).Times(Exactly(1));
+
+    shellSubscriber.startNewSubscription(configBytes, callback);
+
+    shellSubscriber.onLogEvent(*CreateScreenStateChangedEvent(
+            1000 /*timestamp*/, ::android::view::DisplayStateEnum::DISPLAY_STATE_ON));
+
+    shellSubscriber.unsubscribe(callback);
+
+    EXPECT_THAT(reason, Eq(StatsSubscriptionCallbackReason::SUBSCRIPTION_ENDED));
+
+    // Get ShellData proto from the bytes payload of the callback.
+    ShellData actualShellData;
+    ASSERT_TRUE(actualShellData.ParseFromArray(payload.data(), payload.size()));
+
+    ShellData expectedShellData;
+    expectedShellData.add_atom()->mutable_screen_state_changed()->set_state(
+            ::android::view::DisplayStateEnum::DISPLAY_STATE_ON);
+    expectedShellData.add_elapsed_timestamp_nanos(1000);
+
+    EXPECT_THAT(actualShellData, EqShellData(expectedShellData));
+
+    // This event is ignored as the subscription has ended.
+    shellSubscriber.onLogEvent(*CreateScreenStateChangedEvent(
+            1000 /*timestamp*/, ::android::view::DisplayStateEnum::DISPLAY_STATE_ON));
+
+    // This should be a no-op as we've already unsubscribed.
+    shellSubscriber.unsubscribe(callback);
+}
+
+TEST_F(ShellSubscriberCallbackTest, testUnsubscribeEmptyCache) {
+    // Expect callback to be invoked once.
+    EXPECT_CALL(*callback, onSubscriptionData(_, _)).Times(Exactly(1));
+
+    shellSubscriber.startNewSubscription(configBytes, callback);
+
+    shellSubscriber.unsubscribe(callback);
+
+    EXPECT_THAT(reason, Eq(StatsSubscriptionCallbackReason::SUBSCRIPTION_ENDED));
+
+    // Get ShellData proto from the bytes payload of the callback.
+    ShellData actualShellData;
+    ASSERT_TRUE(actualShellData.ParseFromArray(payload.data(), payload.size()));
+
+    ShellData expectedShellData;
+
+    EXPECT_THAT(actualShellData, EqShellData(expectedShellData));
+}
+
+TEST_F(ShellSubscriberCallbackPulledTest, testPullIfNeededBeforeInterval) {
+    // Pull should not happen
+    EXPECT_CALL(*pullerManager, Pull(_, A<const vector<int32_t>&>(), _, _)).Times(Exactly(0));
+
+    // Expect callback to not be invoked.
+    EXPECT_CALL(*callback, onSubscriptionData(_, _)).Times(Exactly(0));
+
+    shellSubscriberClient->pullAndSendHeartbeatsIfNeeded(/* nowSecs= */ 0, /* nowMillis= */ 0,
+                                                         /* nowNanos= */ 0);
+}
+
+TEST_F(ShellSubscriberCallbackPulledTest, testPullAtInterval) {
+    // Pull should happen once. The data is cached.
+    EXPECT_CALL(*pullerManager, Pull(_, A<const vector<int32_t>&>(), _, _)).Times(Exactly(1));
+
+    // Expect callback to not be invoked.
+    EXPECT_CALL(*callback, onSubscriptionData(_, _)).Times(Exactly(0));
+
+    // This pull should NOT trigger a cache flush.
+    shellSubscriberClient->pullAndSendHeartbeatsIfNeeded(/* nowSecs= */ 3, /* nowMillis= */ 3000,
+                                                         /* nowNanos= */ 3'000'000'000);
+}
+
+TEST_F(ShellSubscriberCallbackPulledTest, testCachedPullIsFlushed) {
+    // Pull should happen once. The data is cached.
+    EXPECT_CALL(*pullerManager, Pull(_, A<const vector<int32_t>&>(), _, _)).Times(Exactly(1));
+
+    // This pull should NOT trigger a cache flush.
+    shellSubscriberClient->pullAndSendHeartbeatsIfNeeded(/* nowSecs= */ 3, /* nowMillis= */ 3000,
+                                                         /* nowNanos= */ 3'000'000'000);
+
+    // Expect callback to be invoked once flush is requested.
+    EXPECT_CALL(*callback, onSubscriptionData(_, _)).Times(Exactly(1));
+
+    // This should flush out data cached from the pull.
+    shellSubscriberClient->flush();
+
+    EXPECT_THAT(reason, Eq(StatsSubscriptionCallbackReason::FLUSH_REQUESTED));
+
+    // Get ShellData proto from the bytes payload of the callback.
+    ShellData actualShellData;
+    ASSERT_TRUE(actualShellData.ParseFromArray(payload.data(), payload.size()));
+
+    EXPECT_THAT(actualShellData, EqShellData(getExpectedPulledData()));
+}
+
+TEST_F(ShellSubscriberCallbackPulledTest, testPullAtCacheTimeout) {
+    // Pull should happen once. The data is flushed.
+    EXPECT_CALL(*pullerManager, Pull(_, A<const vector<int32_t>&>(), _, _)).Times(Exactly(1));
+
+    // Expect callback to be invoked.
+    EXPECT_CALL(*callback, onSubscriptionData(_, _)).Times(Exactly(1));
+
+    // This pull should trigger a cache flush.
+    shellSubscriberClient->pullAndSendHeartbeatsIfNeeded(/* nowSecs= */ 4, /* nowMillis= */ 4000,
+                                                         /* nowNanos= */ 4'000'000'000);
+
+    EXPECT_THAT(reason, Eq(StatsSubscriptionCallbackReason::STATSD_INITIATED));
+
+    // Get ShellData proto from the bytes payload of the callback.
+    ShellData actualShellData;
+    ASSERT_TRUE(actualShellData.ParseFromArray(payload.data(), payload.size()));
+
+    EXPECT_THAT(actualShellData, EqShellData(getExpectedPulledData()));
+}
+
 TEST(ShellSubscriberTest, testPushedSubscription) {
     sp<MockUidMap> uidMap = new NaggyMock<MockUidMap>();
     sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
@@ -186,18 +513,18 @@
 
     // create a simple config to get screen events
     ShellSubscription config;
-    config.add_pushed()->set_atom_id(29);
+    config.add_pushed()->set_atom_id(SCREEN_STATE_CHANGED);
 
     // this is the expected screen event atom.
     vector<ShellData> expectedData;
     ShellData shellData1;
     shellData1.add_atom()->mutable_screen_state_changed()->set_state(
             ::android::view::DisplayStateEnum::DISPLAY_STATE_ON);
-    shellData1.add_timestamp_nanos(pushedList[0]->GetElapsedTimestampNs());
+    shellData1.add_elapsed_timestamp_nanos(pushedList[0]->GetElapsedTimestampNs());
     ShellData shellData2;
     shellData2.add_atom()->mutable_screen_state_changed()->set_state(
             ::android::view::DisplayStateEnum::DISPLAY_STATE_OFF);
-    shellData2.add_timestamp_nanos(pushedList[1]->GetElapsedTimestampNs());
+    shellData2.add_elapsed_timestamp_nanos(pushedList[1]->GetElapsedTimestampNs());
     expectedData.push_back(shellData1);
     expectedData.push_back(shellData2);
 
@@ -214,7 +541,7 @@
     sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
 
     const vector<int32_t> uids = {AID_SYSTEM};
-    EXPECT_CALL(*pullerManager, Pull(10016, uids, _, _))
+    EXPECT_CALL(*pullerManager, Pull(CPU_ACTIVE_TIME, uids, _, _))
             .WillRepeatedly(Invoke([](int tagId, const vector<int32_t>&, const int64_t,
                                       vector<std::shared_ptr<LogEvent>>* data) {
                 data->clear();
@@ -237,7 +564,7 @@
     sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
 
     const vector<int32_t> uids = {AID_SYSTEM};
-    EXPECT_CALL(*pullerManager, Pull(10016, uids, _, _))
+    EXPECT_CALL(*pullerManager, Pull(CPU_ACTIVE_TIME, uids, _, _))
             .WillRepeatedly(Invoke([](int tagId, const vector<int32_t>&, const int64_t,
                                       vector<std::shared_ptr<LogEvent>>* data) {
                 data->clear();
@@ -249,17 +576,17 @@
     vector<std::shared_ptr<LogEvent>> pushedList = getPushedEvents();
 
     ShellSubscription config = getPulledConfig();
-    config.add_pushed()->set_atom_id(29);
+    config.add_pushed()->set_atom_id(SCREEN_STATE_CHANGED);
 
     vector<ShellData> expectedData;
     ShellData shellData1;
     shellData1.add_atom()->mutable_screen_state_changed()->set_state(
             ::android::view::DisplayStateEnum::DISPLAY_STATE_ON);
-    shellData1.add_timestamp_nanos(pushedList[0]->GetElapsedTimestampNs());
+    shellData1.add_elapsed_timestamp_nanos(pushedList[0]->GetElapsedTimestampNs());
     ShellData shellData2;
     shellData2.add_atom()->mutable_screen_state_changed()->set_state(
             ::android::view::DisplayStateEnum::DISPLAY_STATE_OFF);
-    shellData2.add_timestamp_nanos(pushedList[1]->GetElapsedTimestampNs());
+    shellData2.add_elapsed_timestamp_nanos(pushedList[1]->GetElapsedTimestampNs());
     expectedData.push_back(getExpectedPulledData());
     expectedData.push_back(shellData1);
     expectedData.push_back(shellData2);
@@ -302,7 +629,7 @@
 
     // create a simple config to get screen events
     ShellSubscription config;
-    config.add_pushed()->set_atom_id(29);
+    config.add_pushed()->set_atom_id(SCREEN_STATE_CHANGED);
 
     size_t bufferSize = config.ByteSize();
     vector<uint8_t> buffer(bufferSize);
@@ -354,8 +681,8 @@
 
     // create a simple config to get screen events
     ShellSubscription configs[numConfigs];
-    configs[0].add_pushed()->set_atom_id(29);
-    configs[1].add_pushed()->set_atom_id(32);
+    configs[0].add_pushed()->set_atom_id(SCREEN_STATE_CHANGED);
+    configs[1].add_pushed()->set_atom_id(PLUGGED_STATE_CHANGED);
 
     vector<vector<uint8_t>> configBuffers;
     for (int i = 0; i < numConfigs; i++) {
@@ -396,14 +723,14 @@
     ShellData expected1;
     expected1.add_atom()->mutable_screen_state_changed()->set_state(
             ::android::view::DisplayStateEnum::DISPLAY_STATE_ON);
-    expected1.add_timestamp_nanos(pushedList[0]->GetElapsedTimestampNs());
+    expected1.add_elapsed_timestamp_nanos(pushedList[0]->GetElapsedTimestampNs());
     EXPECT_THAT(expected1, EqShellData(actual1));
 
     ShellData actual2 = readData(fds_datas[0][0]);
     ShellData expected2;
     expected2.add_atom()->mutable_screen_state_changed()->set_state(
             ::android::view::DisplayStateEnum::DISPLAY_STATE_OFF);
-    expected2.add_timestamp_nanos(pushedList[1]->GetElapsedTimestampNs());
+    expected2.add_elapsed_timestamp_nanos(pushedList[1]->GetElapsedTimestampNs());
     EXPECT_THAT(expected2, EqShellData(actual2));
 
     // Validate Config 2, repeating the process
@@ -411,14 +738,14 @@
     ShellData expected3;
     expected3.add_atom()->mutable_plugged_state_changed()->set_state(
             BatteryPluggedStateEnum::BATTERY_PLUGGED_USB);
-    expected3.add_timestamp_nanos(pushedList[2]->GetElapsedTimestampNs());
+    expected3.add_elapsed_timestamp_nanos(pushedList[2]->GetElapsedTimestampNs());
     EXPECT_THAT(expected3, EqShellData(actual3));
 
     ShellData actual4 = readData(fds_datas[1][0]);
     ShellData expected4;
     expected4.add_atom()->mutable_plugged_state_changed()->set_state(
             BatteryPluggedStateEnum::BATTERY_PLUGGED_NONE);
-    expected4.add_timestamp_nanos(pushedList[3]->GetElapsedTimestampNs());
+    expected4.add_elapsed_timestamp_nanos(pushedList[3]->GetElapsedTimestampNs());
     EXPECT_THAT(expected4, EqShellData(actual4));
 
     // Not closing fds_datas[i][0] because this causes writes within ShellSubscriberClient to hang
diff --git a/statsd/tests/statsd_test_util.h b/statsd/tests/statsd_test_util.h
index ebaa6a2..0123625 100644
--- a/statsd/tests/statsd_test_util.h
+++ b/statsd/tests/statsd_test_util.h
@@ -16,8 +16,10 @@
 
 #include <aidl/android/os/BnPendingIntentRef.h>
 #include <aidl/android/os/BnPullAtomCallback.h>
+#include <aidl/android/os/BnStatsSubscriptionCallback.h>
 #include <aidl/android/os/IPullAtomCallback.h>
 #include <aidl/android/os/IPullAtomResultReceiver.h>
+#include <aidl/android/os/StatsSubscriptionCallbackReason.h>
 #include <gmock/gmock.h>
 #include <gtest/gtest.h>
 
@@ -40,8 +42,10 @@
 
 using namespace testing;
 using ::aidl::android::os::BnPullAtomCallback;
+using ::aidl::android::os::BnStatsSubscriptionCallback;
 using ::aidl::android::os::IPullAtomCallback;
 using ::aidl::android::os::IPullAtomResultReceiver;
+using ::aidl::android::os::StatsSubscriptionCallbackReason;
 using android::util::ProtoReader;
 using google::protobuf::RepeatedPtrField;
 using Status = ::ndk::ScopedAStatus;
@@ -89,6 +93,14 @@
                         const StatsDimensionsValueParcel& dimensionsValueParcel));
 };
 
+class MockStatsSubscriptionCallback : public BnStatsSubscriptionCallback {
+public:
+    MOCK_METHOD(Status, onSubscriptionData,
+                (StatsSubscriptionCallbackReason in_reason,
+                 const std::vector<uint8_t>& in_subscriptionPayload),
+                (override));
+};
+
 class StatsServiceConfigTest : public ::testing::Test {
 protected:
     shared_ptr<StatsService> service;
@@ -755,6 +767,14 @@
 }
 
 StatsdStatsReport_PulledAtomStats getPulledAtomStats(int atom_id);
+
+template <typename P>
+std::vector<uint8_t> protoToBytes(const P& proto) {
+    const size_t byteSize = proto.ByteSizeLong();
+    vector<uint8_t> bytes(byteSize);
+    proto.SerializeToArray(bytes.data(), byteSize);
+    return bytes;
+}
 }  // namespace statsd
 }  // namespace os
 }  // namespace android