Skip data pulling when metric is not active

Statsd is pulling data when the metric is not active. This cl causes
statsd to skip data pulls when the metric is not active

Test: unit tests
Test: cts
Bug: 129910938
Change-Id: Ibc9459f38c2f6128b8a1f10c6dea683bfe07b22a
diff --git a/bin/src/StatsLogProcessor.h b/bin/src/StatsLogProcessor.h
index f4db0af..6178a4b 100644
--- a/bin/src/StatsLogProcessor.h
+++ b/bin/src/StatsLogProcessor.h
@@ -235,9 +235,12 @@
     FRIEND_TEST(GaugeMetricE2eTest, TestMultipleFieldsForPushedEvent);
     FRIEND_TEST(GaugeMetricE2eTest, TestRandomSamplePulledEvents);
     FRIEND_TEST(GaugeMetricE2eTest, TestRandomSamplePulledEvent_LateAlarm);
+    FRIEND_TEST(GaugeMetricE2eTest, TestRandomSamplePulledEventsWithActivation);
+    FRIEND_TEST(GaugeMetricE2eTest, TestRandomSamplePulledEventsNoCondition);
     FRIEND_TEST(GaugeMetricE2eTest, TestConditionChangeToTrueSamplePulledEvents);
     FRIEND_TEST(ValueMetricE2eTest, TestPulledEvents);
     FRIEND_TEST(ValueMetricE2eTest, TestPulledEvents_LateAlarm);
+    FRIEND_TEST(ValueMetricE2eTest, TestPulledEvents_WithActivation);
 
     FRIEND_TEST(DimensionInConditionE2eTest, TestCreateCountMetric_NoLink_OR_CombinationCondition);
     FRIEND_TEST(DimensionInConditionE2eTest, TestCreateCountMetric_Link_OR_CombinationCondition);
diff --git a/bin/src/external/StatsPullerManager.h b/bin/src/external/StatsPullerManager.h
index 45f6b35..4ea1386 100644
--- a/bin/src/external/StatsPullerManager.h
+++ b/bin/src/external/StatsPullerManager.h
@@ -120,8 +120,11 @@
 
     FRIEND_TEST(GaugeMetricE2eTest, TestRandomSamplePulledEvents);
     FRIEND_TEST(GaugeMetricE2eTest, TestRandomSamplePulledEvent_LateAlarm);
+    FRIEND_TEST(GaugeMetricE2eTest, TestRandomSamplePulledEventsWithActivation);
+    FRIEND_TEST(GaugeMetricE2eTest, TestRandomSamplePulledEventsNoCondition);
     FRIEND_TEST(ValueMetricE2eTest, TestPulledEvents);
     FRIEND_TEST(ValueMetricE2eTest, TestPulledEvents_LateAlarm);
+    FRIEND_TEST(ValueMetricE2eTest, TestPulledEvents_WithActivation);
 };
 
 }  // namespace statsd
diff --git a/bin/src/metrics/GaugeMetricProducer.cpp b/bin/src/metrics/GaugeMetricProducer.cpp
index 2f9afa5..49fe7ef 100644
--- a/bin/src/metrics/GaugeMetricProducer.cpp
+++ b/bin/src/metrics/GaugeMetricProducer.cpp
@@ -141,9 +141,6 @@
 
     // Adjust start for partial bucket
     mCurrentBucketStartTimeNs = startTimeNs;
-    if (mIsPulled && mSamplingType == GaugeMetric::RANDOM_ONE_SAMPLE) {
-        pullAndMatchEventsLocked(startTimeNs);
-    }
 
     VLOG("Gauge metric %lld created. bucket size %lld start_time: %lld sliced %d",
          (long long)metric.id(), (long long)mBucketSizeNs, (long long)mTimeBaseNs,
@@ -315,6 +312,12 @@
     }
 }
 
+void GaugeMetricProducer::prepareFistBucketLocked() {
+    if (mIsActive && mIsPulled && mSamplingType == GaugeMetric::RANDOM_ONE_SAMPLE) {
+        pullAndMatchEventsLocked(mCurrentBucketStartTimeNs);
+    }
+}
+
 void GaugeMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) {
     bool triggerPuller = false;
     switch(mSamplingType) {
diff --git a/bin/src/metrics/GaugeMetricProducer.h b/bin/src/metrics/GaugeMetricProducer.h
index 9b99fb1..d3007c8 100644
--- a/bin/src/metrics/GaugeMetricProducer.h
+++ b/bin/src/metrics/GaugeMetricProducer.h
@@ -122,6 +122,8 @@
     void flushCurrentBucketLocked(const int64_t& eventTimeNs,
                                   const int64_t& nextBucketStartTimeNs) override;
 
+    void prepareFistBucketLocked() override;
+
     void pullAndMatchEventsLocked(const int64_t timestampNs);
 
     const int mWhatMatcherIndex;
diff --git a/bin/src/metrics/MetricProducer.cpp b/bin/src/metrics/MetricProducer.cpp
index 5ed95ed..e22b853 100644
--- a/bin/src/metrics/MetricProducer.cpp
+++ b/bin/src/metrics/MetricProducer.cpp
@@ -25,6 +25,9 @@
 using std::map;
 
 void MetricProducer::onMatchedLogEventLocked(const size_t matcherIndex, const LogEvent& event) {
+    if (!mIsActive) {
+        return;
+    }
     int64_t eventTimeNs = event.GetElapsedTimestampNs();
     // this is old event, maybe statsd restarted?
     if (eventTimeNs < mTimeBaseNs) {
diff --git a/bin/src/metrics/MetricProducer.h b/bin/src/metrics/MetricProducer.h
index 70fbd47..750566d 100644
--- a/bin/src/metrics/MetricProducer.h
+++ b/bin/src/metrics/MetricProducer.h
@@ -250,6 +250,11 @@
         mActivationType = activationType;
     }
 
+    void prepareFistBucket() {
+        std::lock_guard<std::mutex> lock(mMutex);
+        prepareFistBucketLocked();
+    }
+
     void flushIfExpire(int64_t elapsedTimestampNs);
 
 protected:
@@ -281,6 +286,7 @@
 
     void setActiveLocked(int64_t currentTimeNs, int64_t remainingTtlNs);
 
+    virtual void prepareFistBucketLocked() {};
     /**
      * Flushes the current bucket if the eventTime is after the current bucket's end time. This will
        also flush the current partial bucket in memory.
diff --git a/bin/src/metrics/MetricsManager.h b/bin/src/metrics/MetricsManager.h
index 00ae3b7..d05bb8b 100644
--- a/bin/src/metrics/MetricsManager.h
+++ b/bin/src/metrics/MetricsManager.h
@@ -266,9 +266,12 @@
     FRIEND_TEST(GaugeMetricE2eTest, TestMultipleFieldsForPushedEvent);
     FRIEND_TEST(GaugeMetricE2eTest, TestRandomSamplePulledEvents);
     FRIEND_TEST(GaugeMetricE2eTest, TestRandomSamplePulledEvent_LateAlarm);
+    FRIEND_TEST(GaugeMetricE2eTest, TestRandomSamplePulledEventsWithActivation);
+    FRIEND_TEST(GaugeMetricE2eTest, TestRandomSamplePulledEventsNoCondition);
     FRIEND_TEST(GaugeMetricE2eTest, TestConditionChangeToTrueSamplePulledEvents);
     FRIEND_TEST(ValueMetricE2eTest, TestPulledEvents);
     FRIEND_TEST(ValueMetricE2eTest, TestPulledEvents_LateAlarm);
+    FRIEND_TEST(ValueMetricE2eTest, TestPulledEvents_WithActivation);
     FRIEND_TEST(DimensionInConditionE2eTest, TestCreateCountMetric_NoLink_OR_CombinationCondition);
     FRIEND_TEST(DimensionInConditionE2eTest, TestCreateCountMetric_Link_OR_CombinationCondition);
     FRIEND_TEST(DimensionInConditionE2eTest, TestDurationMetric_NoLink_OR_CombinationCondition);
diff --git a/bin/src/metrics/ValueMetricProducer.cpp b/bin/src/metrics/ValueMetricProducer.cpp
index c44ea8a..0bd6e62 100644
--- a/bin/src/metrics/ValueMetricProducer.cpp
+++ b/bin/src/metrics/ValueMetricProducer.cpp
@@ -144,6 +144,9 @@
     mSliceByPositionALL = HasPositionALL(metric.dimensions_in_what()) ||
                           HasPositionALL(metric.dimensions_in_condition());
 
+    int64_t numBucketsForward = calcBucketsForwardCount(startTimeNs);
+    mCurrentBucketNum += numBucketsForward;
+
     flushIfNeededLocked(startTimeNs);
 
     if (mIsPulled) {
@@ -156,10 +159,6 @@
     // Adjust start for partial bucket
     mCurrentBucketStartTimeNs = startTimeNs;
     mConditionTimer.newBucketStart(mCurrentBucketStartTimeNs);
-    // Kicks off the puller immediately if condition is true and diff based.
-    if (mIsPulled && mCondition == ConditionState::kTrue && mUseDiff) {
-        pullAndMatchEventsLocked(startTimeNs, mCondition);
-    }
     VLOG("value metric %lld created. bucket size %lld start_time: %lld", (long long)metric.id(),
          (long long)mBucketSizeNs, (long long)mTimeBaseNs);
 }
@@ -171,6 +170,13 @@
     }
 }
 
+void ValueMetricProducer::prepareFistBucketLocked() {
+    // Kicks off the puller immediately if condition is true and diff based.
+    if (mIsActive && mIsPulled && mCondition == ConditionState::kTrue && mUseDiff) {
+        pullAndMatchEventsLocked(mCurrentBucketStartTimeNs, mCondition);
+    }
+}
+
 void ValueMetricProducer::onSlicedConditionMayChangeLocked(bool overallCondition,
                                                            const int64_t eventTime) {
     VLOG("Metric %lld onSlicedConditionMayChange", (long long)mMetricId);
diff --git a/bin/src/metrics/ValueMetricProducer.h b/bin/src/metrics/ValueMetricProducer.h
index 8c19995..1821dea 100644
--- a/bin/src/metrics/ValueMetricProducer.h
+++ b/bin/src/metrics/ValueMetricProducer.h
@@ -113,6 +113,8 @@
     void flushCurrentBucketLocked(const int64_t& eventTimeNs,
                                   const int64_t& nextBucketStartTimeNs) override;
 
+    void prepareFistBucketLocked() override;
+
     void dropDataLocked(const int64_t dropTimeNs) override;
 
     // Calculate previous bucket end time based on current time.
diff --git a/bin/src/metrics/metrics_manager_util.cpp b/bin/src/metrics/metrics_manager_util.cpp
index 082382c..31b424e 100644
--- a/bin/src/metrics/metrics_manager_util.cpp
+++ b/bin/src/metrics/metrics_manager_util.cpp
@@ -760,6 +760,12 @@
     return true;
 }
 
+void prepareFistBucket(const vector<sp<MetricProducer>>& allMetricProducers) {
+    for (const auto& metric: allMetricProducers) {
+        metric->prepareFistBucket();
+    }
+}
+
 bool initStatsdConfig(const ConfigKey& key, const StatsdConfig& config, UidMap& uidMap,
                       const sp<StatsPullerManager>& pullerManager,
                       const sp<AlarmMonitor>& anomalyAlarmMonitor,
@@ -817,6 +823,8 @@
         return false;
     }
 
+    prepareFistBucket(allMetricProducers);
+
     return true;
 }
 
diff --git a/bin/tests/e2e/GaugeMetric_e2e_pull_test.cpp b/bin/tests/e2e/GaugeMetric_e2e_pull_test.cpp
index 946eccf..f01ad06 100644
--- a/bin/tests/e2e/GaugeMetric_e2e_pull_test.cpp
+++ b/bin/tests/e2e/GaugeMetric_e2e_pull_test.cpp
@@ -28,7 +28,10 @@
 
 namespace {
 
-StatsdConfig CreateStatsdConfig(const GaugeMetric::SamplingType sampling_type) {
+const int64_t metricId = 123456;
+
+StatsdConfig CreateStatsdConfig(const GaugeMetric::SamplingType sampling_type,
+                                bool useCondition = true) {
     StatsdConfig config;
     config.add_allowed_log_source("AID_ROOT"); // LogEvent defaults to UID of root.
     auto atomMatcher = CreateSimpleAtomMatcher("TestMatcher", android::util::SUBSYSTEM_SLEEP_STATE);
@@ -40,9 +43,11 @@
     *config.add_predicate() = screenIsOffPredicate;
 
     auto gaugeMetric = config.add_gauge_metric();
-    gaugeMetric->set_id(123456);
+    gaugeMetric->set_id(metricId);
     gaugeMetric->set_what(atomMatcher.id());
-    gaugeMetric->set_condition(screenIsOffPredicate.id());
+    if (useCondition) {
+        gaugeMetric->set_condition(screenIsOffPredicate.id());
+    }
     gaugeMetric->set_sampling_type(sampling_type);
     gaugeMetric->mutable_gauge_fields_filter()->set_include_all(true);
     *gaugeMetric->mutable_dimensions_in_what() =
@@ -158,7 +163,7 @@
     EXPECT_EQ(1, data.bucket_info(1).atom_size());
     EXPECT_EQ(baseTimeNs + 3 * bucketSizeNs + 1,
               data.bucket_info(1).elapsed_timestamp_nanos(0));
-    EXPECT_EQ(configAddedTimeNs + 55, data.bucket_info(0).elapsed_timestamp_nanos(0));
+    EXPECT_EQ(baseTimeNs + 3 * bucketSizeNs + 1, data.bucket_info(1).elapsed_timestamp_nanos(0));
     EXPECT_EQ(baseTimeNs + 3 * bucketSizeNs, data.bucket_info(1).start_bucket_elapsed_nanos());
     EXPECT_EQ(baseTimeNs + 4 * bucketSizeNs, data.bucket_info(1).end_bucket_elapsed_nanos());
     EXPECT_TRUE(data.bucket_info(1).atom(0).subsystem_sleep_state().subsystem_name().empty());
@@ -400,6 +405,209 @@
     EXPECT_GT(data.bucket_info(2).atom(0).subsystem_sleep_state().time_millis(), 0);
 }
 
+TEST(GaugeMetricE2eTest, TestRandomSamplePulledEventsWithActivation) {
+    auto config = CreateStatsdConfig(GaugeMetric::RANDOM_ONE_SAMPLE, /*useCondition=*/false);
+
+    int64_t baseTimeNs = getElapsedRealtimeNs();
+    int64_t configAddedTimeNs = 10 * 60 * NS_PER_SEC + baseTimeNs;
+    int64_t bucketSizeNs =
+        TimeUnitToBucketSizeInMillis(config.gauge_metric(0).bucket()) * 1000000;
+
+    auto batterySaverStartMatcher = CreateBatterySaverModeStartAtomMatcher();
+    *config.add_atom_matcher() = batterySaverStartMatcher;
+    const int64_t ttlNs = 2 * bucketSizeNs; // Two buckets.
+    auto metric_activation = config.add_metric_activation();
+    metric_activation->set_metric_id(metricId);
+    metric_activation->set_activation_type(MetricActivation::ACTIVATE_IMMEDIATELY);
+    auto event_activation = metric_activation->add_event_activation();
+    event_activation->set_atom_matcher_id(batterySaverStartMatcher.id());
+    event_activation->set_ttl_seconds(ttlNs / 1000000000);
+
+    ConfigKey cfgKey;
+    auto processor = CreateStatsLogProcessor(
+        baseTimeNs, configAddedTimeNs, config, cfgKey);
+    EXPECT_EQ(processor->mMetricsManagers.size(), 1u);
+    EXPECT_TRUE(processor->mMetricsManagers.begin()->second->isConfigValid());
+    processor->mPullerManager->ForceClearPullerCache();
+
+    int startBucketNum = processor->mMetricsManagers.begin()->second->
+            mAllMetricProducers[0]->getCurrentBucketNum();
+    EXPECT_GT(startBucketNum, (int64_t)0);
+    EXPECT_FALSE(processor->mMetricsManagers.begin()->second->mAllMetricProducers[0]->isActive());
+
+    // When creating the config, the gauge metric producer should register the alarm at the
+    // end of the current bucket.
+    EXPECT_EQ((size_t)1, processor->mPullerManager->mReceivers.size());
+    EXPECT_EQ(bucketSizeNs,
+              processor->mPullerManager->mReceivers.begin()->second.front().intervalNs);
+    int64_t& nextPullTimeNs =
+            processor->mPullerManager->mReceivers.begin()->second.front().nextPullTimeNs;
+    EXPECT_EQ(baseTimeNs + startBucketNum * bucketSizeNs + bucketSizeNs, nextPullTimeNs);
+
+    // Pulling alarm arrives on time and reset the sequential pulling alarm.
+    // Event should not be kept.
+    processor->informPullAlarmFired(nextPullTimeNs + 1);
+    EXPECT_EQ(baseTimeNs + startBucketNum * bucketSizeNs + 2 * bucketSizeNs, nextPullTimeNs);
+    EXPECT_FALSE(processor->mMetricsManagers.begin()->second->mAllMetricProducers[0]->isActive());
+
+    const int64_t activationNs = configAddedTimeNs + bucketSizeNs + (2 * 1000 * 1000); // 2 millis.
+    auto batterySaverOnEvent = CreateBatterySaverOnEvent(activationNs);
+    processor->OnLogEvent(batterySaverOnEvent.get());
+    EXPECT_TRUE(processor->mMetricsManagers.begin()->second->mAllMetricProducers[0]->isActive());
+
+    // This event should be kept. 1 total.
+    processor->informPullAlarmFired(nextPullTimeNs + 1);
+    EXPECT_EQ(baseTimeNs + startBucketNum * bucketSizeNs + 3 * bucketSizeNs,
+              nextPullTimeNs);
+
+    // This event should be kept. 2 total.
+    processor->informPullAlarmFired(nextPullTimeNs + 2);
+    EXPECT_EQ(baseTimeNs + startBucketNum * bucketSizeNs + 4 * bucketSizeNs, nextPullTimeNs);
+
+    // Create random event to deactivate metric.
+    auto deactivationEvent = CreateScreenBrightnessChangedEvent(50, activationNs + ttlNs + 1);
+    processor->OnLogEvent(deactivationEvent.get());
+    EXPECT_FALSE(processor->mMetricsManagers.begin()->second->mAllMetricProducers[0]->isActive());
+
+    // Event should not be kept. 2 total.
+    processor->informPullAlarmFired(nextPullTimeNs + 3);
+    EXPECT_EQ(baseTimeNs + startBucketNum * bucketSizeNs + 5 * bucketSizeNs, nextPullTimeNs);
+
+    processor->informPullAlarmFired(nextPullTimeNs + 2);
+
+    ConfigMetricsReportList reports;
+    vector<uint8_t> buffer;
+    processor->onDumpReport(cfgKey, configAddedTimeNs + 7 * bucketSizeNs + 10, false, true,
+                            ADB_DUMP, FAST, &buffer);
+    EXPECT_TRUE(buffer.size() > 0);
+    EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size()));
+    backfillDimensionPath(&reports);
+    backfillStringInReport(&reports);
+    backfillStartEndTimestamp(&reports);
+    EXPECT_EQ(1, reports.reports_size());
+    EXPECT_EQ(1, reports.reports(0).metrics_size());
+    StatsLogReport::GaugeMetricDataWrapper gaugeMetrics;
+    sortMetricDataByDimensionsValue(
+            reports.reports(0).metrics(0).gauge_metrics(), &gaugeMetrics);
+    EXPECT_GT((int)gaugeMetrics.data_size(), 0);
+
+    auto data = gaugeMetrics.data(0);
+    EXPECT_EQ(android::util::SUBSYSTEM_SLEEP_STATE, data.dimensions_in_what().field());
+    EXPECT_EQ(1, data.dimensions_in_what().value_tuple().dimensions_value_size());
+    EXPECT_EQ(1 /* subsystem name field */,
+              data.dimensions_in_what().value_tuple().dimensions_value(0).field());
+    EXPECT_FALSE(data.dimensions_in_what().value_tuple().dimensions_value(0).value_str().empty());
+    EXPECT_EQ(2, data.bucket_info_size());
+
+    EXPECT_EQ(1, data.bucket_info(0).atom_size());
+    EXPECT_EQ(1, data.bucket_info(0).elapsed_timestamp_nanos_size());
+    EXPECT_EQ(baseTimeNs + 4 * bucketSizeNs + 1, data.bucket_info(0).elapsed_timestamp_nanos(0));
+    EXPECT_EQ(0, data.bucket_info(0).wall_clock_timestamp_nanos_size());
+    EXPECT_EQ(baseTimeNs + 4 * bucketSizeNs, data.bucket_info(0).start_bucket_elapsed_nanos());
+    EXPECT_EQ(baseTimeNs + 5 * bucketSizeNs, data.bucket_info(0).end_bucket_elapsed_nanos());
+    EXPECT_TRUE(data.bucket_info(0).atom(0).subsystem_sleep_state().subsystem_name().empty());
+    EXPECT_GT(data.bucket_info(0).atom(0).subsystem_sleep_state().time_millis(), 0);
+
+    EXPECT_EQ(1, data.bucket_info(1).atom_size());
+    EXPECT_EQ(1, data.bucket_info(1).elapsed_timestamp_nanos_size());
+    EXPECT_EQ(baseTimeNs + 5 * bucketSizeNs + 2, data.bucket_info(1).elapsed_timestamp_nanos(0));
+    EXPECT_EQ(0, data.bucket_info(1).wall_clock_timestamp_nanos_size());
+    EXPECT_EQ(MillisToNano(NanoToMillis(baseTimeNs + 5 * bucketSizeNs)),
+            data.bucket_info(1).start_bucket_elapsed_nanos());
+    EXPECT_EQ(MillisToNano(NanoToMillis(activationNs + ttlNs + 1)),
+            data.bucket_info(1).end_bucket_elapsed_nanos());
+    EXPECT_TRUE(data.bucket_info(1).atom(0).subsystem_sleep_state().subsystem_name().empty());
+    EXPECT_GT(data.bucket_info(1).atom(0).subsystem_sleep_state().time_millis(), 0);
+}
+
+TEST(GaugeMetricE2eTest, TestRandomSamplePulledEventsNoCondition) {
+    auto config = CreateStatsdConfig(GaugeMetric::RANDOM_ONE_SAMPLE, /*useCondition=*/false);
+
+    int64_t baseTimeNs = getElapsedRealtimeNs();
+    int64_t configAddedTimeNs = 10 * 60 * NS_PER_SEC + baseTimeNs;
+    int64_t bucketSizeNs =
+        TimeUnitToBucketSizeInMillis(config.gauge_metric(0).bucket()) * 1000000;
+
+    ConfigKey cfgKey;
+    auto processor = CreateStatsLogProcessor(
+        baseTimeNs, configAddedTimeNs, config, cfgKey);
+    EXPECT_EQ(processor->mMetricsManagers.size(), 1u);
+    EXPECT_TRUE(processor->mMetricsManagers.begin()->second->isConfigValid());
+    processor->mPullerManager->ForceClearPullerCache();
+
+    int startBucketNum = processor->mMetricsManagers.begin()->second->
+            mAllMetricProducers[0]->getCurrentBucketNum();
+    EXPECT_GT(startBucketNum, (int64_t)0);
+
+    // When creating the config, the gauge metric producer should register the alarm at the
+    // end of the current bucket.
+    EXPECT_EQ((size_t)1, processor->mPullerManager->mReceivers.size());
+    EXPECT_EQ(bucketSizeNs,
+              processor->mPullerManager->mReceivers.begin()->second.front().intervalNs);
+    int64_t& nextPullTimeNs =
+            processor->mPullerManager->mReceivers.begin()->second.front().nextPullTimeNs;
+    EXPECT_EQ(baseTimeNs + startBucketNum * bucketSizeNs + bucketSizeNs, nextPullTimeNs);
+
+    // Pulling alarm arrives on time and reset the sequential pulling alarm.
+    processor->informPullAlarmFired(nextPullTimeNs + 1);
+    EXPECT_EQ(baseTimeNs + startBucketNum * bucketSizeNs + 2 * bucketSizeNs, nextPullTimeNs);
+
+    processor->informPullAlarmFired(nextPullTimeNs + 4);
+    EXPECT_EQ(baseTimeNs + startBucketNum * bucketSizeNs + 3 * bucketSizeNs,
+              nextPullTimeNs);
+
+    ConfigMetricsReportList reports;
+    vector<uint8_t> buffer;
+    processor->onDumpReport(cfgKey, configAddedTimeNs + 7 * bucketSizeNs + 10, false, true,
+                            ADB_DUMP, FAST, &buffer);
+    EXPECT_TRUE(buffer.size() > 0);
+    EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size()));
+    backfillDimensionPath(&reports);
+    backfillStringInReport(&reports);
+    backfillStartEndTimestamp(&reports);
+    EXPECT_EQ(1, reports.reports_size());
+    EXPECT_EQ(1, reports.reports(0).metrics_size());
+    StatsLogReport::GaugeMetricDataWrapper gaugeMetrics;
+    sortMetricDataByDimensionsValue(
+            reports.reports(0).metrics(0).gauge_metrics(), &gaugeMetrics);
+    EXPECT_GT((int)gaugeMetrics.data_size(), 0);
+
+    auto data = gaugeMetrics.data(0);
+    EXPECT_EQ(android::util::SUBSYSTEM_SLEEP_STATE, data.dimensions_in_what().field());
+    EXPECT_EQ(1, data.dimensions_in_what().value_tuple().dimensions_value_size());
+    EXPECT_EQ(1 /* subsystem name field */,
+              data.dimensions_in_what().value_tuple().dimensions_value(0).field());
+    EXPECT_FALSE(data.dimensions_in_what().value_tuple().dimensions_value(0).value_str().empty());
+    EXPECT_EQ(3, data.bucket_info_size());
+
+    EXPECT_EQ(1, data.bucket_info(0).atom_size());
+    EXPECT_EQ(1, data.bucket_info(0).elapsed_timestamp_nanos_size());
+    EXPECT_EQ(configAddedTimeNs, data.bucket_info(0).elapsed_timestamp_nanos(0));
+    EXPECT_EQ(0, data.bucket_info(0).wall_clock_timestamp_nanos_size());
+    EXPECT_EQ(baseTimeNs + 2 * bucketSizeNs, data.bucket_info(0).start_bucket_elapsed_nanos());
+    EXPECT_EQ(baseTimeNs + 3 * bucketSizeNs, data.bucket_info(0).end_bucket_elapsed_nanos());
+    EXPECT_TRUE(data.bucket_info(0).atom(0).subsystem_sleep_state().subsystem_name().empty());
+    EXPECT_GT(data.bucket_info(0).atom(0).subsystem_sleep_state().time_millis(), 0);
+
+    EXPECT_EQ(1, data.bucket_info(1).atom_size());
+    EXPECT_EQ(1, data.bucket_info(1).elapsed_timestamp_nanos_size());
+    EXPECT_EQ(baseTimeNs + 3 * bucketSizeNs + 1, data.bucket_info(1).elapsed_timestamp_nanos(0));
+    EXPECT_EQ(0, data.bucket_info(1).wall_clock_timestamp_nanos_size());
+    EXPECT_EQ(baseTimeNs + 3 * bucketSizeNs, data.bucket_info(1).start_bucket_elapsed_nanos());
+    EXPECT_EQ(baseTimeNs + 4 * bucketSizeNs, data.bucket_info(1).end_bucket_elapsed_nanos());
+    EXPECT_TRUE(data.bucket_info(1).atom(0).subsystem_sleep_state().subsystem_name().empty());
+    EXPECT_GT(data.bucket_info(1).atom(0).subsystem_sleep_state().time_millis(), 0);
+
+    EXPECT_EQ(1, data.bucket_info(2).atom_size());
+    EXPECT_EQ(1, data.bucket_info(2).elapsed_timestamp_nanos_size());
+    EXPECT_EQ(baseTimeNs + 4 * bucketSizeNs + 4, data.bucket_info(2).elapsed_timestamp_nanos(0));
+    EXPECT_EQ(0, data.bucket_info(2).wall_clock_timestamp_nanos_size());
+    EXPECT_EQ(baseTimeNs + 4 * bucketSizeNs, data.bucket_info(2).start_bucket_elapsed_nanos());
+    EXPECT_EQ(baseTimeNs + 5 * bucketSizeNs, data.bucket_info(2).end_bucket_elapsed_nanos());
+    EXPECT_TRUE(data.bucket_info(2).atom(0).subsystem_sleep_state().subsystem_name().empty());
+    EXPECT_GT(data.bucket_info(2).atom(0).subsystem_sleep_state().time_millis(), 0);
+}
+
 #else
 GTEST_LOG_(INFO) << "This test does nothing.\n";
 #endif
diff --git a/bin/tests/e2e/ValueMetric_pull_e2e_test.cpp b/bin/tests/e2e/ValueMetric_pull_e2e_test.cpp
index b316562..e967eb3 100644
--- a/bin/tests/e2e/ValueMetric_pull_e2e_test.cpp
+++ b/bin/tests/e2e/ValueMetric_pull_e2e_test.cpp
@@ -28,7 +28,9 @@
 
 namespace {
 
-StatsdConfig CreateStatsdConfig() {
+const int64_t metricId = 123456;
+
+StatsdConfig CreateStatsdConfig(bool useCondition = true) {
     StatsdConfig config;
     config.add_allowed_log_source("AID_ROOT"); // LogEvent defaults to UID of root.
     auto pulledAtomMatcher =
@@ -41,9 +43,11 @@
     *config.add_predicate() = screenIsOffPredicate;
 
     auto valueMetric = config.add_value_metric();
-    valueMetric->set_id(123456);
+    valueMetric->set_id(metricId);
     valueMetric->set_what(pulledAtomMatcher.id());
-    valueMetric->set_condition(screenIsOffPredicate.id());
+    if (useCondition) {
+        valueMetric->set_condition(screenIsOffPredicate.id());
+    }
     *valueMetric->mutable_value_field() =
             CreateDimensions(android::util::SUBSYSTEM_SLEEP_STATE, {4 /* time sleeping field */});
     *valueMetric->mutable_dimensions_in_what() =
@@ -263,6 +267,99 @@
     EXPECT_EQ(1, data.bucket_info(2).values_size());
 }
 
+TEST(ValueMetricE2eTest, TestPulledEvents_WithActivation) {
+    auto config = CreateStatsdConfig(false);
+    int64_t baseTimeNs = getElapsedRealtimeNs();
+    int64_t configAddedTimeNs = 10 * 60 * NS_PER_SEC + baseTimeNs;
+    int64_t bucketSizeNs =
+        TimeUnitToBucketSizeInMillis(config.value_metric(0).bucket()) * 1000000;
+
+    auto batterySaverStartMatcher = CreateBatterySaverModeStartAtomMatcher();
+    *config.add_atom_matcher() = batterySaverStartMatcher;
+    const int64_t ttlNs = 2 * bucketSizeNs; // Two buckets.
+    auto metric_activation = config.add_metric_activation();
+    metric_activation->set_metric_id(metricId);
+    metric_activation->set_activation_type(MetricActivation::ACTIVATE_IMMEDIATELY);
+    auto event_activation = metric_activation->add_event_activation();
+    event_activation->set_atom_matcher_id(batterySaverStartMatcher.id());
+    event_activation->set_ttl_seconds(ttlNs / 1000000000);
+
+    ConfigKey cfgKey;
+    auto processor = CreateStatsLogProcessor(
+        baseTimeNs, configAddedTimeNs, config, cfgKey);
+    EXPECT_EQ(processor->mMetricsManagers.size(), 1u);
+    EXPECT_TRUE(processor->mMetricsManagers.begin()->second->isConfigValid());
+    processor->mPullerManager->ForceClearPullerCache();
+
+    int startBucketNum = processor->mMetricsManagers.begin()->second->
+            mAllMetricProducers[0]->getCurrentBucketNum();
+    EXPECT_GT(startBucketNum, (int64_t)0);
+    EXPECT_FALSE(processor->mMetricsManagers.begin()->second->mAllMetricProducers[0]->isActive());
+
+    // When creating the config, the value metric producer should register the alarm at the
+    // end of the current bucket.
+    EXPECT_EQ((size_t)1, processor->mPullerManager->mReceivers.size());
+    EXPECT_EQ(bucketSizeNs,
+              processor->mPullerManager->mReceivers.begin()->second.front().intervalNs);
+    int64_t& expectedPullTimeNs =
+            processor->mPullerManager->mReceivers.begin()->second.front().nextPullTimeNs;
+    EXPECT_EQ(baseTimeNs + startBucketNum * bucketSizeNs + bucketSizeNs, expectedPullTimeNs);
+
+    // Pulling alarm arrives on time and reset the sequential pulling alarm.
+    processor->informPullAlarmFired(expectedPullTimeNs + 1);
+    EXPECT_EQ(baseTimeNs + startBucketNum * bucketSizeNs + 2 * bucketSizeNs, expectedPullTimeNs);
+
+    const int64_t activationNs = configAddedTimeNs + bucketSizeNs + (2 * 1000 * 1000); // 2 millis.
+    auto batterySaverOnEvent = CreateBatterySaverOnEvent(activationNs);
+    processor->OnLogEvent(batterySaverOnEvent.get());
+    EXPECT_TRUE(processor->mMetricsManagers.begin()->second->mAllMetricProducers[0]->isActive());
+
+    processor->informPullAlarmFired(expectedPullTimeNs + 1);
+    EXPECT_EQ(baseTimeNs + startBucketNum * bucketSizeNs + 3 * bucketSizeNs, expectedPullTimeNs);
+
+    processor->informPullAlarmFired(expectedPullTimeNs + 1);
+    EXPECT_EQ(baseTimeNs + startBucketNum * bucketSizeNs + 4 * bucketSizeNs, expectedPullTimeNs);
+
+    // Create random event to deactivate metric.
+    auto deactivationEvent = CreateScreenBrightnessChangedEvent(50, activationNs + ttlNs + 1);
+    processor->OnLogEvent(deactivationEvent.get());
+    EXPECT_FALSE(processor->mMetricsManagers.begin()->second->mAllMetricProducers[0]->isActive());
+
+    processor->informPullAlarmFired(expectedPullTimeNs + 1);
+    EXPECT_EQ(baseTimeNs + startBucketNum * bucketSizeNs + 5 * bucketSizeNs, expectedPullTimeNs);
+
+    processor->informPullAlarmFired(expectedPullTimeNs + 1);
+
+    ConfigMetricsReportList reports;
+    vector<uint8_t> buffer;
+    processor->onDumpReport(cfgKey, configAddedTimeNs + 7 * bucketSizeNs + 10, false, true,
+                            ADB_DUMP, FAST, &buffer);
+    EXPECT_TRUE(buffer.size() > 0);
+    EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size()));
+    backfillDimensionPath(&reports);
+    backfillStringInReport(&reports);
+    backfillStartEndTimestamp(&reports);
+    EXPECT_EQ(1, reports.reports_size());
+    EXPECT_EQ(1, reports.reports(0).metrics_size());
+    StatsLogReport::ValueMetricDataWrapper valueMetrics;
+    sortMetricDataByDimensionsValue(
+            reports.reports(0).metrics(0).value_metrics(), &valueMetrics);
+    EXPECT_GT((int)valueMetrics.data_size(), 0);
+
+    auto data = valueMetrics.data(0);
+    EXPECT_EQ(android::util::SUBSYSTEM_SLEEP_STATE, data.dimensions_in_what().field());
+    EXPECT_EQ(1, data.dimensions_in_what().value_tuple().dimensions_value_size());
+    EXPECT_EQ(1 /* subsystem name field */,
+              data.dimensions_in_what().value_tuple().dimensions_value(0).field());
+    EXPECT_FALSE(data.dimensions_in_what().value_tuple().dimensions_value(0).value_str().empty());
+    // We have 1 full bucket, the two surrounding the activation are dropped.
+    EXPECT_EQ(1, data.bucket_info_size());
+
+    EXPECT_EQ(baseTimeNs + 4 * bucketSizeNs, data.bucket_info(0).start_bucket_elapsed_nanos());
+    EXPECT_EQ(baseTimeNs + 5 * bucketSizeNs, data.bucket_info(0).end_bucket_elapsed_nanos());
+    EXPECT_EQ(1, data.bucket_info(0).values_size());
+}
+
 #else
 GTEST_LOG_(INFO) << "This test does nothing.\n";
 #endif
diff --git a/bin/tests/metrics/GaugeMetricProducer_test.cpp b/bin/tests/metrics/GaugeMetricProducer_test.cpp
index 6286823..b9a5867 100644
--- a/bin/tests/metrics/GaugeMetricProducer_test.cpp
+++ b/bin/tests/metrics/GaugeMetricProducer_test.cpp
@@ -79,6 +79,8 @@
                                       logEventMatcherIndex, eventMatcherWizard,
                                       -1, -1, tagId, 5, 600 * NS_PER_SEC + NS_PER_SEC / 2,
                                       pullerManager);
+    gaugeProducer.prepareFistBucket();
+
 
     EXPECT_EQ(600500000000, gaugeProducer.mCurrentBucketStartTimeNs);
     EXPECT_EQ(10, gaugeProducer.mCurrentBucketNum);
@@ -124,6 +126,8 @@
                                       tagId, -1, tagId, bucketStartTimeNs, bucketStartTimeNs,
                                       pullerManager);
 
+    gaugeProducer.prepareFistBucket();
+
     vector<shared_ptr<LogEvent>> allData;
     allData.clear();
     shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1);
@@ -207,6 +211,7 @@
                                       logEventMatcherIndex, eventMatcherWizard,
                                       -1 /* -1 means no pulling */, -1, tagId, bucketStartTimeNs,
                                       bucketStartTimeNs, pullerManager);
+    gaugeProducer.prepareFistBucket();
 
     sp<AnomalyTracker> anomalyTracker = gaugeProducer.addAnomalyTracker(alert, alarmMonitor);
     EXPECT_TRUE(anomalyTracker != nullptr);
@@ -298,6 +303,7 @@
     GaugeMetricProducer gaugeProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, wizard,
                                       logEventMatcherIndex, eventMatcherWizard, tagId, -1, tagId,
                                       bucketStartTimeNs, bucketStartTimeNs, pullerManager);
+    gaugeProducer.prepareFistBucket();
 
     vector<shared_ptr<LogEvent>> allData;
     shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 1);
@@ -364,6 +370,7 @@
                                       logEventMatcherIndex, eventMatcherWizard,
                                       tagId, -1, tagId, bucketStartTimeNs, bucketStartTimeNs,
                                       pullerManager);
+    gaugeProducer.prepareFistBucket();
 
     vector<shared_ptr<LogEvent>> allData;
     shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 1);
@@ -424,6 +431,7 @@
     GaugeMetricProducer gaugeProducer(kConfigKey, metric, 1, wizard,
                                       logEventMatcherIndex, eventMatcherWizard, tagId, -1, tagId,
                                       bucketStartTimeNs, bucketStartTimeNs, pullerManager);
+    gaugeProducer.prepareFistBucket();
 
     gaugeProducer.onConditionChanged(true, bucketStartTimeNs + 8);
     EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
@@ -521,6 +529,7 @@
     GaugeMetricProducer gaugeProducer(kConfigKey, metric, 1, wizard,
                                       logEventMatcherIndex, eventMatcherWizard, tagId, -1, tagId,
                                       bucketStartTimeNs, bucketStartTimeNs, pullerManager);
+    gaugeProducer.prepareFistBucket();
 
     gaugeProducer.onSlicedConditionMayChange(true, bucketStartTimeNs + 8);
 
@@ -574,6 +583,7 @@
                                       logEventMatcherIndex, eventMatcherWizard,
                                       tagId, -1, tagId, bucketStartTimeNs, bucketStartTimeNs,
                                       pullerManager);
+    gaugeProducer.prepareFistBucket();
 
     Alert alert;
     alert.set_id(101);
@@ -682,6 +692,7 @@
                                       logEventMatcherIndex, eventMatcherWizard,
                                       tagId, triggerId, tagId, bucketStartTimeNs, bucketStartTimeNs,
                                       pullerManager);
+    gaugeProducer.prepareFistBucket();
 
     vector<shared_ptr<LogEvent>> allData;
 
@@ -766,6 +777,7 @@
                                       logEventMatcherIndex, eventMatcherWizard,
                                       tagId, triggerId, tagId, bucketStartTimeNs, bucketStartTimeNs,
                                       pullerManager);
+    gaugeProducer.prepareFistBucket();
 
     vector<shared_ptr<LogEvent>> allData;
 
diff --git a/bin/tests/metrics/ValueMetricProducer_test.cpp b/bin/tests/metrics/ValueMetricProducer_test.cpp
index 43a3c7b..148ed5a 100644
--- a/bin/tests/metrics/ValueMetricProducer_test.cpp
+++ b/bin/tests/metrics/ValueMetricProducer_test.cpp
@@ -105,6 +105,7 @@
                 kConfigKey, metric, -1 /*-1 meaning no condition*/, wizard,
                 logEventMatcherIndex, eventMatcherWizard, tagId,
                 bucketStartTimeNs, bucketStartTimeNs, pullerManager);
+        valueProducer->prepareFistBucket();
         return valueProducer;
     }
 
@@ -124,6 +125,7 @@
                 new ValueMetricProducer(kConfigKey, metric, 1, wizard, logEventMatcherIndex,
                                         eventMatcherWizard, tagId, bucketStartTimeNs,
                                         bucketStartTimeNs, pullerManager);
+        valueProducer->prepareFistBucket();
         valueProducer->mCondition = ConditionState::kFalse;
         return valueProducer;
     }
@@ -167,6 +169,7 @@
     ValueMetricProducer valueProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, wizard,
                                       logEventMatcherIndex, eventMatcherWizard, -1, startTimeBase,
                                       22, pullerManager);
+    valueProducer.prepareFistBucket();
 
     EXPECT_EQ(startTimeBase, valueProducer.calcPreviousBucketEndTime(60 * NS_PER_SEC + 10));
     EXPECT_EQ(startTimeBase, valueProducer.calcPreviousBucketEndTime(60 * NS_PER_SEC + 10));
@@ -196,6 +199,7 @@
     ValueMetricProducer valueProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, wizard,
                                       logEventMatcherIndex, eventMatcherWizard, -1, 5,
                                       600 * NS_PER_SEC + NS_PER_SEC / 2, pullerManager);
+    valueProducer.prepareFistBucket();
 
     EXPECT_EQ(600500000000, valueProducer.mCurrentBucketStartTimeNs);
     EXPECT_EQ(10, valueProducer.mCurrentBucketNum);
@@ -377,6 +381,7 @@
             kConfigKey, metric, -1 /*-1 meaning no condition*/, wizard,
             logEventMatcherIndex, eventMatcherWizard, tagId,
             bucketStartTimeNs, bucketStartTimeNs, pullerManager);
+    valueProducer->prepareFistBucket();
 
     vector<shared_ptr<LogEvent>> allData;
     allData.clear();
@@ -665,6 +670,7 @@
     ValueMetricProducer valueProducer(kConfigKey, metric, -1, wizard, logEventMatcherIndex,
                                       eventMatcherWizard, -1, bucketStartTimeNs, bucketStartTimeNs,
                                       pullerManager);
+    valueProducer.prepareFistBucket();
 
     shared_ptr<LogEvent> event1 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
     event1->write(1);
@@ -722,6 +728,7 @@
     ValueMetricProducer valueProducer(kConfigKey, metric, -1, wizard, logEventMatcherIndex,
                                       eventMatcherWizard, tagId, bucketStartTimeNs,
                                       bucketStartTimeNs, pullerManager);
+    valueProducer.prepareFistBucket();
 
     vector<shared_ptr<LogEvent>> allData;
     allData.clear();
@@ -772,6 +779,7 @@
     ValueMetricProducer valueProducer(kConfigKey, metric, -1, wizard, logEventMatcherIndex,
                                       eventMatcherWizard, tagId, bucketStartTimeNs,
                                       bucketStartTimeNs, pullerManager);
+    valueProducer.prepareFistBucket();
 
     vector<shared_ptr<LogEvent>> allData;
     allData.clear();
@@ -846,6 +854,7 @@
     ValueMetricProducer valueProducer(kConfigKey, metric, -1, wizard, logEventMatcherIndex,
                                       eventMatcherWizard, -1, bucketStartTimeNs, bucketStartTimeNs,
                                       pullerManager);
+    valueProducer.prepareFistBucket();
 
     shared_ptr<LogEvent> event1 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
     event1->write(1);
@@ -888,6 +897,7 @@
     ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, logEventMatcherIndex,
                                       eventMatcherWizard, -1, bucketStartTimeNs, bucketStartTimeNs,
                                       pullerManager);
+    valueProducer.prepareFistBucket();
     valueProducer.mCondition = ConditionState::kFalse;
 
     shared_ptr<LogEvent> event1 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
@@ -962,6 +972,7 @@
     ValueMetricProducer valueProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, wizard,
                                       logEventMatcherIndex, eventMatcherWizard, -1 /*not pulled*/,
                                       bucketStartTimeNs, bucketStartTimeNs, pullerManager);
+    valueProducer.prepareFistBucket();
 
     sp<AnomalyTracker> anomalyTracker = valueProducer.addAnomalyTracker(alert, alarmMonitor);
 
@@ -1258,6 +1269,7 @@
     ValueMetricProducer valueProducer(kConfigKey, metric, -1, wizard, logEventMatcherIndex,
                                       eventMatcherWizard, -1, bucketStartTimeNs, bucketStartTimeNs,
                                       pullerManager);
+    valueProducer.prepareFistBucket();
 
     shared_ptr<LogEvent> event1 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
     event1->write(1);
@@ -1302,6 +1314,7 @@
     ValueMetricProducer valueProducer(kConfigKey, metric, -1, wizard, logEventMatcherIndex,
                                       eventMatcherWizard, -1, bucketStartTimeNs, bucketStartTimeNs,
                                       pullerManager);
+    valueProducer.prepareFistBucket();
 
     shared_ptr<LogEvent> event1 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
     event1->write(1);
@@ -1348,6 +1361,7 @@
     ValueMetricProducer valueProducer(kConfigKey, metric, -1, wizard, logEventMatcherIndex,
                                       eventMatcherWizard, -1, bucketStartTimeNs, bucketStartTimeNs,
                                       pullerManager);
+    valueProducer.prepareFistBucket();
 
     shared_ptr<LogEvent> event1 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
     event1->write(1);
@@ -1398,6 +1412,7 @@
     ValueMetricProducer valueProducer(kConfigKey, metric, -1, wizard, logEventMatcherIndex,
                                       eventMatcherWizard, -1, bucketStartTimeNs, bucketStartTimeNs,
                                       pullerManager);
+    valueProducer.prepareFistBucket();
 
     shared_ptr<LogEvent> event1 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
     event1->write(1);
@@ -1443,6 +1458,7 @@
     ValueMetricProducer valueProducer(kConfigKey, metric, -1, wizard, logEventMatcherIndex,
                                       eventMatcherWizard, -1, bucketStartTimeNs, bucketStartTimeNs,
                                       pullerManager);
+    valueProducer.prepareFistBucket();
 
     shared_ptr<LogEvent> event1 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
     event1->write(1);
@@ -1516,6 +1532,7 @@
     ValueMetricProducer valueProducer(kConfigKey, metric, -1, wizard, logEventMatcherIndex,
                                       eventMatcherWizard, -1, bucketStartTimeNs, bucketStartTimeNs,
                                       pullerManager);
+    valueProducer.prepareFistBucket();
 
     shared_ptr<LogEvent> event1 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
     event1->write(1);
@@ -2064,7 +2081,7 @@
     ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, logEventMatcherIndex,
                                       eventMatcherWizard, tagId, bucket2StartTimeNs,
                                       bucket2StartTimeNs, pullerManager);
-
+    valueProducer.prepareFistBucket();
     valueProducer.mCondition = ConditionState::kFalse;
 
     // Event should be skipped since it is from previous bucket.
@@ -2845,6 +2862,7 @@
     ValueMetricProducer valueProducer(kConfigKey, metric, -1, wizard, logEventMatcherIndex,
                                       eventMatcherWizard, tagId, bucketStartTimeNs,
                                       bucketStartTimeNs, pullerManager);
+    valueProducer.prepareFistBucket();
 
     ProtoOutputStream output;
     std::set<string> strSet;
@@ -2887,6 +2905,7 @@
     ValueMetricProducer valueProducer(kConfigKey, metric, -1, wizard, logEventMatcherIndex,
                                       eventMatcherWizard, tagId, bucketStartTimeNs,
                                       bucketStartTimeNs, pullerManager);
+    valueProducer.prepareFistBucket();
 
     vector<shared_ptr<LogEvent>> allData;
     allData.clear();
@@ -2950,6 +2969,7 @@
     ValueMetricProducer valueProducer(kConfigKey, metric, -1, wizard, logEventMatcherIndex,
                                       eventMatcherWizard, tagId, bucketStartTimeNs,
                                       bucketStartTimeNs, pullerManager);
+    valueProducer.prepareFistBucket();
 
     ProtoOutputStream output;
     std::set<string> strSet;