Adds the concept of invalid bucket.

Whenever a pull fails, we cannot compute a correct metric for this
bucket. Whether the failures happen at the beginning, in the middle or
at the end of the bucket, we cannot trust the data inside the bucket.

Let's say we have a metric with a screen on condition, a bucket size of
4 hours and we have the following events:
- h+0 bucket start -- screen is on -- data pull failed
- h+2 screen off -- pull succeed
- h+3 screen on -- pull succeed
- h+4 bucket end -- screen on -- pull succeed

The current logic will be wrong, it will ignore any data between h+0
and h+2. That timespan might be a huge contributor of the total bucket
value so this is wrong to keep any data from this bucket.

We also extend the concept of invalid buckets to other problems like a
pull being too long.

Bug: 123866830
Test: atest statsd_test
Change-Id: I300ab05cd7582cd2d7af9167de8d99b349071f0d
diff --git a/bin/src/guardrail/StatsdStats.cpp b/bin/src/guardrail/StatsdStats.cpp
index 37ccad5..a5bd5c6 100644
--- a/bin/src/guardrail/StatsdStats.cpp
+++ b/bin/src/guardrail/StatsdStats.cpp
@@ -448,6 +448,11 @@
     getAtomMetricStats(metricId).conditionChangeInNextBucket++;
 }
 
+void StatsdStats::noteInvalidatedBucket(int metricId) {
+    lock_guard<std::mutex> lock(mLock);
+    getAtomMetricStats(metricId).invalidatedBucket++;
+}
+
 StatsdStats::AtomMetricStats& StatsdStats::getAtomMetricStats(int metricId) {
     auto atomMetricStatsIter = mAtomMetricStats.find(metricId);
     if (atomMetricStatsIter != mAtomMetricStats.end()) {
diff --git a/bin/src/guardrail/StatsdStats.h b/bin/src/guardrail/StatsdStats.h
index 01e9ca1..cb17061 100644
--- a/bin/src/guardrail/StatsdStats.h
+++ b/bin/src/guardrail/StatsdStats.h
@@ -365,6 +365,11 @@
     void noteConditionChangeInNextBucket(int atomId);
 
     /**
+     * A bucket has been tagged as invalid.
+     */
+    void noteInvalidatedBucket(int metricId);
+
+    /**
      * Reset the historical stats. Including all stats in icebox, and the tracked stats about
      * metrics, matchers, and atoms. The active configs will be kept and StatsdStats will continue
      * to collect stats after reset() has been called.
@@ -408,6 +413,7 @@
         long skippedForwardBuckets = 0;
         long badValueType = 0;
         long conditionChangeInNextBucket = 0;
+        long invalidatedBucket = 0;
     } AtomMetricStats;
 
 private:
diff --git a/bin/src/metrics/ValueMetricProducer.cpp b/bin/src/metrics/ValueMetricProducer.cpp
index 9fe07e1..9fb78e7 100644
--- a/bin/src/metrics/ValueMetricProducer.cpp
+++ b/bin/src/metrics/ValueMetricProducer.cpp
@@ -104,6 +104,7 @@
       mSkipZeroDiffOutput(metric.skip_zero_diff_output()),
       mUseZeroDefaultBase(metric.use_zero_default_base()),
       mHasGlobalBase(false),
+      mCurrentBucketIsInvalid(false),
       mMaxPullDelayNs(metric.max_pull_delay_sec() > 0 ? metric.max_pull_delay_sec() * NS_PER_SEC
                                                       : StatsdStats::kPullMaxDelayNs),
       mSplitBucketForAppUpgrade(metric.split_bucket_for_app_upgrade()) {
@@ -308,6 +309,15 @@
     }
 }
 
+void ValueMetricProducer::invalidateCurrentBucket() {
+    if (!mCurrentBucketIsInvalid) {
+        // Only report once per invalid bucket.
+        StatsdStats::getInstance().noteInvalidatedBucket(mMetricId);
+    }
+    mCurrentBucketIsInvalid = true;
+    resetBase();
+}
+
 void ValueMetricProducer::resetBase() {
     for (auto& slice : mCurrentSlicedBucket) {
         for (auto& interval : slice.second) {
@@ -323,6 +333,7 @@
         VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
              (long long)mCurrentBucketStartTimeNs);
         StatsdStats::getInstance().noteConditionChangeInNextBucket(mMetricId);
+        invalidateCurrentBucket();
         return;
     }
 
@@ -346,19 +357,20 @@
     vector<std::shared_ptr<LogEvent>> allData;
     if (!mPullerManager->Pull(mPullTagId, &allData)) {
         ALOGE("Gauge Stats puller failed for tag: %d at %lld", mPullTagId, (long long)timestampNs);
-        resetBase();
+        invalidateCurrentBucket();
         return;
     }
     const int64_t pullDelayNs = getElapsedRealtimeNs() - timestampNs;
+    StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);
     if (pullDelayNs > mMaxPullDelayNs) {
         ALOGE("Pull finish too late for atom %d, longer than %lld", mPullTagId,
               (long long)mMaxPullDelayNs);
         StatsdStats::getInstance().notePullExceedMaxDelay(mPullTagId);
-        StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);
-        resetBase();
+        // We are missing one pull from the bucket which means we will not have a complete view of
+        // what's going on.
+        invalidateCurrentBucket();
         return;
     }
-    StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);
 
     if (timestampNs < mCurrentBucketStartTimeNs) {
         // The data will be skipped in onMatchedLogEventInternalLocked, but we don't want to report
@@ -388,7 +400,7 @@
     if (mCondition) {
         if (!pullSuccess) {
             // If the pull failed, we won't be able to compute a diff.
-            resetBase();
+            invalidateCurrentBucket();
             return;
         }
 
@@ -687,31 +699,13 @@
     VLOG("finalizing bucket for %ld, dumping %d slices", (long)mCurrentBucketStartTimeNs,
          (int)mCurrentSlicedBucket.size());
     int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs();
-
     int64_t bucketEndTime = eventTimeNs < fullBucketEndTimeNs ? eventTimeNs : fullBucketEndTimeNs;
 
-    if (bucketEndTime - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) {
+    bool isBucketLargeEnough = bucketEndTime - mCurrentBucketStartTimeNs >= mMinBucketSizeNs;
+    if (isBucketLargeEnough && !mCurrentBucketIsInvalid) {
         // The current bucket is large enough to keep.
         for (const auto& slice : mCurrentSlicedBucket) {
-            ValueBucket bucket;
-            bucket.mBucketStartNs = mCurrentBucketStartTimeNs;
-            bucket.mBucketEndNs = bucketEndTime;
-            for (const auto& interval : slice.second) {
-                if (interval.hasValue) {
-                    // skip the output if the diff is zero
-                    if (mSkipZeroDiffOutput && mUseDiff && interval.value.isZero()) {
-                        continue;
-                    }
-                    bucket.valueIndex.push_back(interval.valueIndex);
-                    if (mAggregationType != ValueMetric::AVG) {
-                        bucket.values.push_back(interval.value);
-                    } else {
-                        double sum = interval.value.type == LONG ? (double)interval.value.long_value
-                                                                 : interval.value.double_value;
-                        bucket.values.push_back(Value((double)sum / interval.sampleSize));
-                    }
-                }
-            }
+            ValueBucket bucket = buildPartialBucket(bucketEndTime, slice.second);
             // it will auto create new vector of ValuebucketInfo if the key is not found.
             if (bucket.valueIndex.size() > 0) {
                 auto& bucketList = mPastBuckets[slice.first];
@@ -722,6 +716,58 @@
         mSkippedBuckets.emplace_back(mCurrentBucketStartTimeNs, bucketEndTime);
     }
 
+    if (!mCurrentBucketIsInvalid) {
+        appendToFullBucket(eventTimeNs, fullBucketEndTimeNs);
+    }
+    initCurrentSlicedBucket();
+    mCurrentBucketIsInvalid = false;
+}
+
+ValueBucket ValueMetricProducer::buildPartialBucket(int64_t bucketEndTime,
+                                                    const std::vector<Interval>& intervals) {
+    ValueBucket bucket;
+    bucket.mBucketStartNs = mCurrentBucketStartTimeNs;
+    bucket.mBucketEndNs = bucketEndTime;
+    for (const auto& interval : intervals) {
+        if (interval.hasValue) {
+            // skip the output if the diff is zero
+            if (mSkipZeroDiffOutput && mUseDiff && interval.value.isZero()) {
+                continue;
+            }
+            bucket.valueIndex.push_back(interval.valueIndex);
+            if (mAggregationType != ValueMetric::AVG) {
+                bucket.values.push_back(interval.value);
+            } else {
+                double sum = interval.value.type == LONG ? (double)interval.value.long_value
+                                                         : interval.value.double_value;
+                bucket.values.push_back(Value((double)sum / interval.sampleSize));
+            }
+        }
+    }
+    return bucket;
+}
+
+void ValueMetricProducer::initCurrentSlicedBucket() {
+    for (auto it = mCurrentSlicedBucket.begin(); it != mCurrentSlicedBucket.end();) {
+        bool obsolete = true;
+        for (auto& interval : it->second) {
+            interval.hasValue = false;
+            interval.sampleSize = 0;
+            if (interval.seenNewData) {
+                obsolete = false;
+            }
+            interval.seenNewData = false;
+        }
+
+        if (obsolete) {
+            it = mCurrentSlicedBucket.erase(it);
+        } else {
+            it++;
+        }
+    }
+}
+
+void ValueMetricProducer::appendToFullBucket(int64_t eventTimeNs, int64_t fullBucketEndTimeNs) {
     if (eventTimeNs > fullBucketEndTimeNs) {  // If full bucket, send to anomaly tracker.
         // Accumulate partial buckets with current value and then send to anomaly tracker.
         if (mCurrentFullBucket.size() > 0) {
@@ -759,24 +805,6 @@
             mCurrentFullBucket[slice.first] += slice.second[0].value.long_value;
         }
     }
-
-    for (auto it = mCurrentSlicedBucket.begin(); it != mCurrentSlicedBucket.end();) {
-        bool obsolete = true;
-        for (auto& interval : it->second) {
-            interval.hasValue = false;
-            interval.sampleSize = 0;
-            if (interval.seenNewData) {
-                obsolete = false;
-            }
-            interval.seenNewData = false;
-        }
-
-        if (obsolete) {
-            it = mCurrentSlicedBucket.erase(it);
-        } else {
-            it++;
-        }
-    }
 }
 
 size_t ValueMetricProducer::byteSizeLocked() const {
diff --git a/bin/src/metrics/ValueMetricProducer.h b/bin/src/metrics/ValueMetricProducer.h
index cab50aa..d9bec5d 100644
--- a/bin/src/metrics/ValueMetricProducer.h
+++ b/bin/src/metrics/ValueMetricProducer.h
@@ -103,6 +103,9 @@
     // Calculate previous bucket end time based on current time.
     int64_t calcPreviousBucketEndTime(const int64_t currentTimeNs);
 
+    // Mark the data as invalid.
+    void invalidateCurrentBucket();
+
     const int mWhatMatcherIndex;
 
     sp<EventMatcherWizard> mEventMatcherWizard;
@@ -156,6 +159,11 @@
 
     void pullAndMatchEventsLocked(const int64_t timestampNs);
 
+    ValueBucket buildPartialBucket(int64_t bucketEndTime,
+                                   const std::vector<Interval>& intervals);
+    void initCurrentSlicedBucket();
+    void appendToFullBucket(int64_t eventTimeNs, int64_t fullBucketEndTimeNs);
+
     // Reset diff base and mHasGlobalBase
     void resetBase();
 
@@ -187,6 +195,12 @@
     // diff against.
     bool mHasGlobalBase;
 
+    // Invalid bucket. There was a problem in collecting data in the current bucket so we cannot
+    // trust any of the data in this bucket.
+    //
+    // For instance, one pull failed.
+    bool mCurrentBucketIsInvalid;
+
     const int64_t mMaxPullDelayNs;
 
     const bool mSplitBucketForAppUpgrade;
@@ -221,6 +235,9 @@
     FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFailAfterConditionChange);
     FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFailAfterConditionChange_EndOfBucket);
     FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullTooLate);
+    FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenOneConditionFailed);
+    FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenInitialPullFailed);
+    FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenLastPullFailed);
 };
 
 }  // namespace statsd
diff --git a/bin/src/stats_log.proto b/bin/src/stats_log.proto
index cca09ac..5b91482 100644
--- a/bin/src/stats_log.proto
+++ b/bin/src/stats_log.proto
@@ -417,6 +417,7 @@
       optional int64 skipped_forward_buckets = 4;
       optional int64 bad_value_type = 5;
       optional int64 condition_change_in_next_bucket = 6;
+      optional int64 invalidated_bucket = 7;
     }
     repeated AtomMetricStats atom_metric_stats = 17;
 
diff --git a/bin/src/stats_log_util.cpp b/bin/src/stats_log_util.cpp
index 9c9985e..3cb7563 100644
--- a/bin/src/stats_log_util.cpp
+++ b/bin/src/stats_log_util.cpp
@@ -78,6 +78,7 @@
 const int FIELD_ID_SKIPPED_FORWARD_BUCKETS = 4;
 const int FIELD_ID_BAD_VALUE_TYPE = 5;
 const int FIELD_ID_CONDITION_CHANGE_IN_NEXT_BUCKET = 6;
+const int FIELD_ID_INVALIDATED_BUCKET = 7;
 
 namespace {
 
@@ -494,6 +495,8 @@
                        (long long)pair.second.badValueType);
     protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_CONDITION_CHANGE_IN_NEXT_BUCKET,
                        (long long)pair.second.conditionChangeInNextBucket);
+    protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_INVALIDATED_BUCKET,
+                       (long long)pair.second.invalidatedBucket);
     protoOutput->end(token);
 }
 
diff --git a/bin/tests/metrics/ValueMetricProducer_test.cpp b/bin/tests/metrics/ValueMetricProducer_test.cpp
index df0ae38..6404552 100644
--- a/bin/tests/metrics/ValueMetricProducer_test.cpp
+++ b/bin/tests/metrics/ValueMetricProducer_test.cpp
@@ -2067,7 +2067,7 @@
     metric.mutable_value_field()->set_field(tagId);
     metric.mutable_value_field()->add_child()->set_field(2);
     metric.set_condition(StringToId("SCREEN_ON"));
-    metric.set_max_pull_delay_sec(0);
+    metric.set_max_pull_delay_sec(INT_MAX);
 
     UidMap uidMap;
     SimpleAtomMatcher atomMatcher;
@@ -2100,11 +2100,14 @@
     vector<shared_ptr<LogEvent>> allData;
     valueProducer.onDataPulled(allData, /** succeed */ false);
     EXPECT_EQ(0UL, valueProducer.mCurrentSlicedBucket.size());
+
+    valueProducer.onConditionChanged(false, bucketStartTimeNs + 1);
+    EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     ValueMetricProducer::Interval& curInterval =
             valueProducer.mCurrentSlicedBucket.begin()->second[0];
-
-    valueProducer.onConditionChanged(false, bucket2StartTimeNs + 1);
-    EXPECT_EQ(0UL, valueProducer.mCurrentSlicedBucket.size());
+    EXPECT_EQ(false, curInterval.hasBase);
+    EXPECT_EQ(false, curInterval.hasValue);
+    EXPECT_EQ(false, valueProducer.mHasGlobalBase);
 }
 
 TEST(ValueMetricProducerTest, TestResetBaseOnPullTooLate) {
@@ -2173,6 +2176,250 @@
     EXPECT_EQ(false, valueProducer.mHasGlobalBase);
 }
 
+TEST(ValueMetricProducerTest, TestInvalidBucketWhenOneConditionFailed) {
+    ValueMetric metric;
+    metric.set_id(metricId);
+    metric.set_bucket(ONE_MINUTE);
+    metric.mutable_value_field()->set_field(tagId);
+    metric.mutable_value_field()->add_child()->set_field(2);
+    metric.set_condition(StringToId("SCREEN_ON"));
+    metric.set_max_pull_delay_sec(INT_MAX);
+
+    UidMap uidMap;
+    SimpleAtomMatcher atomMatcher;
+    atomMatcher.set_atom_id(tagId);
+    sp<EventMatcherWizard> eventMatcherWizard =
+            new EventMatcherWizard({new SimpleLogMatchingTracker(
+                    atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)});
+    sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
+    sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+    EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
+    EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return());
+
+    EXPECT_CALL(*pullerManager, Pull(tagId, _))
+            // First onConditionChanged
+            .WillOnce(Return(false))
+            // Second onConditionChanged
+            .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+                data->clear();
+                shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 8);
+                event->write(tagId);
+                event->write(130);
+                event->init();
+                data->push_back(event);
+                return true;
+            }));
+
+    ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, logEventMatcherIndex,
+                                      eventMatcherWizard, tagId, bucketStartTimeNs,
+                                      bucketStartTimeNs, pullerManager);
+
+    valueProducer.mCondition = true;
+
+    // Bucket start.
+    vector<shared_ptr<LogEvent>> allData;
+    allData.clear();
+    shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 1);
+    event->write(1);
+    event->write(110);
+    event->init();
+    allData.push_back(event);
+    valueProducer.onDataPulled(allData, /** succeed */ true);
+
+    // This will fail and should invalidate the whole bucket since we do not have all the data
+    // needed to compute the metric value when the screen was on.
+    valueProducer.onConditionChanged(false, bucketStartTimeNs + 2);
+    valueProducer.onConditionChanged(true, bucketStartTimeNs + 3);
+
+    // Bucket end.
+    allData.clear();
+    shared_ptr<LogEvent> event2 = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1);
+    event2->write(1);
+    event2->write(140);
+    event2->init();
+    allData.push_back(event2);
+    valueProducer.onDataPulled(allData, /** succeed */ true);
+
+    valueProducer.flushIfNeededLocked(bucket2StartTimeNs + 1);
+    
+    EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
+    // Contains base from last pull which was successful.
+    EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+    ValueMetricProducer::Interval& curInterval =
+            valueProducer.mCurrentSlicedBucket.begin()->second[0];
+    EXPECT_EQ(true, curInterval.hasBase);
+    EXPECT_EQ(140, curInterval.base.long_value);
+    EXPECT_EQ(false, curInterval.hasValue);
+    EXPECT_EQ(true, valueProducer.mHasGlobalBase);
+}
+
+TEST(ValueMetricProducerTest, TestInvalidBucketWhenInitialPullFailed) {
+    ValueMetric metric;
+    metric.set_id(metricId);
+    metric.set_bucket(ONE_MINUTE);
+    metric.mutable_value_field()->set_field(tagId);
+    metric.mutable_value_field()->add_child()->set_field(2);
+    metric.set_condition(StringToId("SCREEN_ON"));
+    metric.set_max_pull_delay_sec(INT_MAX);
+
+    UidMap uidMap;
+    SimpleAtomMatcher atomMatcher;
+    atomMatcher.set_atom_id(tagId);
+    sp<EventMatcherWizard> eventMatcherWizard =
+            new EventMatcherWizard({new SimpleLogMatchingTracker(
+                    atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)});
+    sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
+    sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+    EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
+    EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return());
+
+    EXPECT_CALL(*pullerManager, Pull(tagId, _))
+            // First onConditionChanged
+            .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+                data->clear();
+                shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 8);
+                event->write(tagId);
+                event->write(120);
+                event->init();
+                data->push_back(event);
+                return true;
+            }))
+            // Second onConditionChanged
+            .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+                data->clear();
+                shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 8);
+                event->write(tagId);
+                event->write(130);
+                event->init();
+                data->push_back(event);
+                return true;
+            }));
+
+    ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, logEventMatcherIndex,
+                                      eventMatcherWizard, tagId, bucketStartTimeNs,
+                                      bucketStartTimeNs, pullerManager);
+
+    valueProducer.mCondition = true;
+
+    // Bucket start.
+    vector<shared_ptr<LogEvent>> allData;
+    allData.clear();
+    shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 1);
+    event->write(1);
+    event->write(110);
+    event->init();
+    allData.push_back(event);
+    valueProducer.onDataPulled(allData, /** succeed */ false);
+
+    valueProducer.onConditionChanged(false, bucketStartTimeNs + 2);
+    valueProducer.onConditionChanged(true, bucketStartTimeNs + 3);
+
+    // Bucket end.
+    allData.clear();
+    shared_ptr<LogEvent> event2 = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1);
+    event2->write(1);
+    event2->write(140);
+    event2->init();
+    allData.push_back(event2);
+    valueProducer.onDataPulled(allData, /** succeed */ true);
+
+    valueProducer.flushIfNeededLocked(bucket2StartTimeNs + 1);
+    
+    EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
+    // Contains base from last pull which was successful.
+    EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+    ValueMetricProducer::Interval& curInterval =
+            valueProducer.mCurrentSlicedBucket.begin()->second[0];
+    EXPECT_EQ(true, curInterval.hasBase);
+    EXPECT_EQ(140, curInterval.base.long_value);
+    EXPECT_EQ(false, curInterval.hasValue);
+    EXPECT_EQ(true, valueProducer.mHasGlobalBase);
+}
+
+TEST(ValueMetricProducerTest, TestInvalidBucketWhenLastPullFailed) {
+    ValueMetric metric;
+    metric.set_id(metricId);
+    metric.set_bucket(ONE_MINUTE);
+    metric.mutable_value_field()->set_field(tagId);
+    metric.mutable_value_field()->add_child()->set_field(2);
+    metric.set_condition(StringToId("SCREEN_ON"));
+    metric.set_max_pull_delay_sec(INT_MAX);
+
+    UidMap uidMap;
+    SimpleAtomMatcher atomMatcher;
+    atomMatcher.set_atom_id(tagId);
+    sp<EventMatcherWizard> eventMatcherWizard =
+            new EventMatcherWizard({new SimpleLogMatchingTracker(
+                    atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)});
+    sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
+    sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+    EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
+    EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return());
+
+    EXPECT_CALL(*pullerManager, Pull(tagId, _))
+            // First onConditionChanged
+            .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+                data->clear();
+                shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 8);
+                event->write(tagId);
+                event->write(120);
+                event->init();
+                data->push_back(event);
+                return true;
+            }))
+            // Second onConditionChanged
+            .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+                data->clear();
+                shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 8);
+                event->write(tagId);
+                event->write(130);
+                event->init();
+                data->push_back(event);
+                return true;
+            }));
+
+    ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, logEventMatcherIndex,
+                                      eventMatcherWizard, tagId, bucketStartTimeNs,
+                                      bucketStartTimeNs, pullerManager);
+
+    valueProducer.mCondition = true;
+
+    // Bucket start.
+    vector<shared_ptr<LogEvent>> allData;
+    allData.clear();
+    shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 1);
+    event->write(1);
+    event->write(110);
+    event->init();
+    allData.push_back(event);
+    valueProducer.onDataPulled(allData, /** succeed */ true);
+
+    // This will fail and should invalidate the whole bucket since we do not have all the data
+    // needed to compute the metric value when the screen was on.
+    valueProducer.onConditionChanged(false, bucketStartTimeNs + 2);
+    valueProducer.onConditionChanged(true, bucketStartTimeNs + 3);
+
+    // Bucket end.
+    allData.clear();
+    shared_ptr<LogEvent> event2 = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1);
+    event2->write(1);
+    event2->write(140);
+    event2->init();
+    allData.push_back(event2);
+    valueProducer.onDataPulled(allData, /** succeed */ false);
+
+    valueProducer.flushIfNeededLocked(bucket2StartTimeNs + 1);
+    
+    EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
+    // Last pull failed so based has been reset.
+    EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+    ValueMetricProducer::Interval& curInterval =
+            valueProducer.mCurrentSlicedBucket.begin()->second[0];
+    EXPECT_EQ(false, curInterval.hasBase);
+    EXPECT_EQ(false, curInterval.hasValue);
+    EXPECT_EQ(false, valueProducer.mHasGlobalBase);
+}
+
 }  // namespace statsd
 }  // namespace os
 }  // namespace android