/*
 * Copyright (C) 2021 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#define STATSD_DEBUG false  // STOPSHIP if true
#include "Log.h"

#include "NumericValueMetricProducer.h"

#include <stdlib.h>

#include <algorithm>

#include "FieldValue.h"
#include "guardrail/StatsdStats.h"
#include "metrics/HistogramValue.h"
#include "metrics/NumericValue.h"
#include "metrics/parsing_utils/metrics_manager_util.h"
#include "stats_log_util.h"

using android::util::FIELD_COUNT_REPEATED;
using android::util::FIELD_TYPE_BOOL;
using android::util::FIELD_TYPE_DOUBLE;
using android::util::FIELD_TYPE_INT32;
using android::util::FIELD_TYPE_INT64;
using android::util::FIELD_TYPE_MESSAGE;
using android::util::FIELD_TYPE_STRING;
using android::util::ProtoOutputStream;
using std::shared_ptr;
using std::string;
using std::unordered_map;

namespace android {
namespace os {
namespace statsd {

namespace {  // anonymous namespace
// for StatsLogReport
const int FIELD_ID_VALUE_METRICS = 7;
// for ValueBucketInfo
const int FIELD_ID_VALUE_INDEX = 1;
const int FIELD_ID_VALUE_LONG = 2;
const int FIELD_ID_VALUE_DOUBLE = 3;
const int FIELD_ID_VALUE_HISTOGRAM = 5;
const int FIELD_ID_VALUE_SAMPLESIZE = 4;
const int FIELD_ID_VALUES = 9;
const int FIELD_ID_BUCKET_NUM = 4;
const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 5;
const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 6;
const int FIELD_ID_CONDITION_TRUE_NS = 10;
const int FIELD_ID_CONDITION_CORRECTION_NS = 11;

const NumericValue ZERO_LONG((int64_t)0);
const NumericValue ZERO_DOUBLE((double)0);

double toDouble(const NumericValue& value) {
    return value.is<int64_t>() ? value.getValue<int64_t>() : value.getValueOrDefault<double>(0);
}

}  // anonymous namespace

// ValueMetric has a minimum bucket size of 10min so that we don't pull too frequently
NumericValueMetricProducer::NumericValueMetricProducer(
        const ConfigKey& key, const ValueMetric& metric, const uint64_t protoHash,
        const PullOptions& pullOptions, const BucketOptions& bucketOptions,
        const WhatOptions& whatOptions, const ConditionOptions& conditionOptions,
        const StateOptions& stateOptions, const ActivationOptions& activationOptions,
        const GuardrailOptions& guardrailOptions,
        const wp<ConfigMetadataProvider> configMetadataProvider)
    : ValueMetricProducer(metric.id(), key, protoHash, pullOptions, bucketOptions, whatOptions,
                          conditionOptions, stateOptions, activationOptions, guardrailOptions,
                          configMetadataProvider),
      mUseAbsoluteValueOnReset(metric.use_absolute_value_on_reset()),
      mAggregationTypes(whatOptions.aggregationTypes),
      mIncludeSampleSize(metric.has_include_sample_size()
                                 ? metric.include_sample_size()
                                 : hasAvgAggregationType(whatOptions.aggregationTypes)),
      mUseDiff(metric.has_use_diff() ? metric.use_diff() : isPulled()),
      mValueDirection(metric.value_direction()),
      mSkipZeroDiffOutput(metric.skip_zero_diff_output()),
      mUseZeroDefaultBase(metric.use_zero_default_base()),
      mHasGlobalBase(false),
      mMaxPullDelayNs(metric.has_max_pull_delay_sec() ? metric.max_pull_delay_sec() * NS_PER_SEC
                                                      : StatsdStats::kPullMaxDelayNs),
      mDedupedFieldMatchers(dedupFieldMatchers(whatOptions.fieldMatchers)),
      mBinStartsList(whatOptions.binStartsList) {
    // TODO(b/186677791): Use initializer list to initialize mUploadThreshold.
    if (metric.has_threshold()) {
        mUploadThreshold = metric.threshold();
    }
}

void NumericValueMetricProducer::invalidateCurrentBucket(const int64_t dropTimeNs,
                                                         const BucketDropReason reason) {
    ValueMetricProducer::invalidateCurrentBucket(dropTimeNs, reason);

    switch (reason) {
        case BucketDropReason::DUMP_REPORT_REQUESTED:
        case BucketDropReason::EVENT_IN_WRONG_BUCKET:
        case BucketDropReason::CONDITION_UNKNOWN:
        case BucketDropReason::PULL_FAILED:
        case BucketDropReason::PULL_DELAYED:
        case BucketDropReason::DIMENSION_GUARDRAIL_REACHED:
            resetBase();
            break;
        default:
            break;
    }
}

void NumericValueMetricProducer::resetBase() {
    for (auto& [_, dimInfo] : mDimInfos) {
        for (NumericValue& base : dimInfo.dimExtras) {
            base.reset();
        }
    }
    mHasGlobalBase = false;
}

void NumericValueMetricProducer::writePastBucketAggregateToProto(
        const int aggIndex, const NumericValue& value, const int sampleSize,
        ProtoOutputStream* const protoOutput) const {
    uint64_t valueToken =
            protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_VALUES);
    protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_VALUE_INDEX, aggIndex);
    if (mIncludeSampleSize) {
        protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_VALUE_SAMPLESIZE, sampleSize);
    }
    if (value.is<int64_t>()) {
        const int64_t val = value.getValue<int64_t>();
        protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_LONG, (long long)val);
        VLOG("\t\t value %d: %lld", aggIndex, (long long)val);
    } else if (value.is<double>()) {
        const double val = value.getValue<double>();
        protoOutput->write(FIELD_TYPE_DOUBLE | FIELD_ID_VALUE_DOUBLE, val);
        VLOG("\t\t value %d: %.2f", aggIndex, val);
    } else if (value.is<HistogramValue>()) {
        const HistogramValue& val = value.getValue<HistogramValue>();
        const uint64_t histToken =
                protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_VALUE_HISTOGRAM);
        val.toProto(*protoOutput);
        protoOutput->end(histToken);
        VLOG("\t\t value %d: %s", aggIndex, val.toString().c_str());
    } else {
        VLOG("Wrong value type for ValueMetric output");
    }
    protoOutput->end(valueToken);
}

void NumericValueMetricProducer::onActiveStateChangedInternalLocked(const int64_t eventTimeNs,
                                                                    const bool isActive) {
    // When active state changes from true to false for pulled metric, clear diff base but don't
    // reset other counters as we may accumulate more value in the bucket.
    if (mUseDiff && !isActive) {
        resetBase();
    }
}

// Only called when mIsActive and the event is NOT too late.
void NumericValueMetricProducer::onConditionChangedInternalLocked(const ConditionState oldCondition,
                                                                  const ConditionState newCondition,
                                                                  const int64_t eventTimeNs) {
    // For metrics that use diff, when condition changes from true to false,
    // clear diff base but don't reset other counts because we may accumulate
    // more value in the bucket.
    if (mUseDiff &&
        (oldCondition == ConditionState::kTrue && newCondition == ConditionState::kFalse)) {
        resetBase();
    }
}

void NumericValueMetricProducer::prepareFirstBucketLocked() {
    // Kicks off the puller immediately if condition is true and diff based.
    if (mIsActive && isPulled() && mCondition == ConditionState::kTrue && mUseDiff) {
        pullAndMatchEventsLocked(mCurrentBucketStartTimeNs);
    }
}

void NumericValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) {
    vector<shared_ptr<LogEvent>> allData;
    if (!mPullerManager->Pull(mPullAtomId, mConfigKey, timestampNs, &allData)) {
        ALOGE("Stats puller failed for tag: %d at %lld", mPullAtomId, (long long)timestampNs);
        invalidateCurrentBucket(timestampNs, BucketDropReason::PULL_FAILED);
        return;
    }

    accumulateEvents(allData, timestampNs, timestampNs);
}

int64_t NumericValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTimeNs) {
    return mTimeBaseNs + ((currentTimeNs - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs;
}

// By design, statsd pulls data at bucket boundaries using AlarmManager. These pulls are likely
// to be delayed. Other events like condition changes or app upgrade which are not based on
// AlarmManager might have arrived earlier and close the bucket.
void NumericValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData,
                                              PullResult pullResult, int64_t originalPullTimeNs) {
    lock_guard<mutex> lock(mMutex);
    if (mCondition == ConditionState::kTrue) {
        // If the pull failed, we won't be able to compute a diff.
        if (pullResult == PullResult::PULL_RESULT_FAIL) {
            invalidateCurrentBucket(originalPullTimeNs, BucketDropReason::PULL_FAILED);
        } else if (pullResult == PullResult::PULL_RESULT_SUCCESS) {
            bool isEventLate = originalPullTimeNs < getCurrentBucketEndTimeNs();
            if (isEventLate) {
                // If the event is late, we are in the middle of a bucket. Just
                // process the data without trying to snap the data to the nearest bucket.
                accumulateEvents(allData, originalPullTimeNs, originalPullTimeNs);
            } else {
                // For scheduled pulled data, the effective event time is snap to the nearest
                // bucket end. In the case of waking up from a deep sleep state, we will
                // attribute to the previous bucket end. If the sleep was long but not very
                // long, we will be in the immediate next bucket. Previous bucket may get a
                // larger number as we pull at a later time than real bucket end.
                //
                // If the sleep was very long, we skip more than one bucket before sleep. In
                // this case, if the diff base will be cleared and this new data will serve as
                // new diff base.
                int64_t bucketEndTimeNs = calcPreviousBucketEndTime(originalPullTimeNs) - 1;
                StatsdStats::getInstance().noteBucketBoundaryDelayNs(
                        mMetricId, originalPullTimeNs - bucketEndTimeNs);
                accumulateEvents(allData, originalPullTimeNs, bucketEndTimeNs);
            }
        }
    }

    // We can probably flush the bucket. Since we used bucketEndTimeNs when calling
    // #onMatchedLogEventInternalLocked, the current bucket will not have been flushed.
    flushIfNeededLocked(originalPullTimeNs);
}

void NumericValueMetricProducer::combineValueFields(pair<LogEvent, vector<int>>& eventValues,
                                                    const LogEvent& newEvent,
                                                    const vector<int>& newValueIndices) const {
    if (eventValues.second.size() != newValueIndices.size()) {
        ALOGE("NumericValueMetricProducer value indices sizes don't match");
        return;
    }
    vector<FieldValue>* const aggregateFieldValues = eventValues.first.getMutableValues();
    const vector<FieldValue>& newFieldValues = newEvent.getValues();
    for (size_t i = 0; i < eventValues.second.size(); ++i) {
        if (newValueIndices[i] != -1 && eventValues.second[i] != -1) {
            (*aggregateFieldValues)[eventValues.second[i]].mValue +=
                    newFieldValues[newValueIndices[i]].mValue;
        }
    }
}

// Process events retrieved from a pull.
void NumericValueMetricProducer::accumulateEvents(const vector<shared_ptr<LogEvent>>& allData,
                                                  int64_t originalPullTimeNs,
                                                  int64_t eventElapsedTimeNs) {
    if (isEventLateLocked(eventElapsedTimeNs)) {
        VLOG("Skip bucket end pull due to late arrival: %lld vs %lld",
             (long long)eventElapsedTimeNs, (long long)mCurrentBucketStartTimeNs);
        StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
        invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET);
        return;
    }

    const int64_t elapsedRealtimeNs = getElapsedRealtimeNs();
    const int64_t pullDelayNs = elapsedRealtimeNs - originalPullTimeNs;
    StatsdStats::getInstance().notePullDelay(mPullAtomId, pullDelayNs);
    if (pullDelayNs > mMaxPullDelayNs) {
        ALOGE("Pull finish too late for atom %d, longer than %lld", mPullAtomId,
              (long long)mMaxPullDelayNs);
        StatsdStats::getInstance().notePullExceedMaxDelay(mPullAtomId);
        // We are missing one pull from the bucket which means we will not have a complete view of
        // what's going on.
        invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::PULL_DELAYED);
        return;
    }

    mMatchedMetricDimensionKeys.clear();
    if (mUseDiff) {
        // An extra aggregation step is needed to sum values with matching dimensions
        // before calculating the diff between sums of consecutive pulls.
        std::unordered_map<HashableDimensionKey, pair<LogEvent, vector<int>>> aggregateEvents;
        for (const auto& data : allData) {
            const auto [matchResult, transformedEvent] =
                    mEventMatcherWizard->matchLogEvent(*data, mWhatMatcherIndex);
            if (matchResult != MatchingState::kMatched) {
                continue;
            }

            // Get dimensions_in_what key and value indices.
            HashableDimensionKey dimensionsInWhat;
            vector<int> valueIndices(mDedupedFieldMatchers.size(), -1);
            const LogEvent& eventRef = transformedEvent == nullptr ? *data : *transformedEvent;
            if (!filterValues(mDimensionsInWhat, mDedupedFieldMatchers, eventRef.getValues(),
                              dimensionsInWhat, valueIndices)) {
                StatsdStats::getInstance().noteBadValueType(mMetricId);
            }

            // Store new event in map or combine values in existing event.
            auto it = aggregateEvents.find(dimensionsInWhat);
            if (it == aggregateEvents.end()) {
                aggregateEvents.emplace(std::piecewise_construct,
                                        std::forward_as_tuple(dimensionsInWhat),
                                        std::forward_as_tuple(eventRef, valueIndices));
            } else {
                combineValueFields(it->second, eventRef, valueIndices);
            }
        }

        for (auto& [dimKey, eventInfo] : aggregateEvents) {
            eventInfo.first.setElapsedTimestampNs(eventElapsedTimeNs);
            onMatchedLogEventLocked(mWhatMatcherIndex, eventInfo.first);
        }
    } else {
        for (const auto& data : allData) {
            const auto [matchResult, transformedEvent] =
                    mEventMatcherWizard->matchLogEvent(*data, mWhatMatcherIndex);
            if (matchResult == MatchingState::kMatched) {
                LogEvent localCopy = transformedEvent == nullptr ? *data : *transformedEvent;
                localCopy.setElapsedTimestampNs(eventElapsedTimeNs);
                onMatchedLogEventLocked(mWhatMatcherIndex, localCopy);
            }
        }
    }

    // If a key that is:
    // 1. Tracked in mCurrentSlicedBucket and
    // 2. A superset of the current mStateChangePrimaryKey
    // was not found in the new pulled data (i.e. not in mMatchedDimensionInWhatKeys)
    // then we clear the data from mDimInfos to reset the base and current state key.
    for (auto& [metricDimensionKey, currentValueBucket] : mCurrentSlicedBucket) {
        const auto& whatKey = metricDimensionKey.getDimensionKeyInWhat();
        bool presentInPulledData =
                mMatchedMetricDimensionKeys.find(whatKey) != mMatchedMetricDimensionKeys.end();
        if (!presentInPulledData &&
            containsLinkedStateValues(whatKey, mStateChangePrimaryKey.second, mMetric2StateLinks,
                                      mStateChangePrimaryKey.first)) {
            auto it = mDimInfos.find(whatKey);
            if (it != mDimInfos.end()) {
                mDimInfos.erase(it);
            }
            // Turn OFF condition timer for keys not present in pulled data.
            currentValueBucket.conditionTimer.onConditionChanged(false, eventElapsedTimeNs);
        }
    }
    mMatchedMetricDimensionKeys.clear();
    mHasGlobalBase = true;

    // If we reach the guardrail, we might have dropped some data which means the bucket is
    // incomplete.
    //
    // The base also needs to be reset. If we do not have the full data, we might
    // incorrectly compute the diff when mUseZeroDefaultBase is true since an existing key
    // might be missing from mCurrentSlicedBucket.
    if (hasReachedGuardRailLimit()) {
        invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::DIMENSION_GUARDRAIL_REACHED);
        mCurrentSlicedBucket.clear();
    }
}

bool NumericValueMetricProducer::hitFullBucketGuardRailLocked(const MetricDimensionKey& newKey) {
    // ===========GuardRail==============
    // 1. Report the tuple count if the tuple count > soft limit
    if (mCurrentFullBucket.find(newKey) != mCurrentFullBucket.end()) {
        return false;
    }
    if (mCurrentFullBucket.size() > mDimensionSoftLimit - 1) {
        size_t newTupleCount = mCurrentFullBucket.size() + 1;
        // 2. Don't add more tuples, we are above the allowed threshold. Drop the data.
        if (newTupleCount > mDimensionHardLimit) {
            if (!mHasHitGuardrail) {
                ALOGE("ValueMetric %lld dropping data for full bucket dimension key %s",
                      (long long)mMetricId, newKey.toString().c_str());
                mHasHitGuardrail = true;
            }
            return true;
        }
    }

    return false;
}

namespace {
NumericValue getAggregationInputValue(const LogEvent& event, const Matcher& matcher) {
    if (matcher.hasAllPositionMatcher()) {  // client-aggregated histogram
        vector<int> binCounts;
        for (const FieldValue& value : event.getValues()) {
            if (!value.mField.matches(matcher)) {
                continue;
            }
            if (value.mValue.getType() == INT) {
                binCounts.push_back(value.mValue.int_value);
            } else {
                return NumericValue{};
            }
        }
        return NumericValue(HistogramValue(binCounts));
    }

    for (const FieldValue& value : event.getValues()) {
        if (!value.mField.matches(matcher)) {
            continue;
        }
        switch (value.mValue.type) {
            case INT:
                return NumericValue((int64_t)value.mValue.int_value);
            case LONG:
                return NumericValue((int64_t)value.mValue.long_value);
            case FLOAT:
                return NumericValue((double)value.mValue.float_value);
            case DOUBLE:
                return NumericValue((double)value.mValue.double_value);
            default:
                return NumericValue{};
        }
    }
    return NumericValue{};
}

void addValueToHistogram(const NumericValue& value, const optional<const BinStarts>& binStarts,
                         HistogramValue& histValue) {
    if (binStarts == nullopt) {
        ALOGE("Missing bin configuration!");
        return;
    }
    histValue.addValue(static_cast<float>(toDouble(value)), *binStarts);
}

}  // anonymous namespace

bool NumericValueMetricProducer::aggregateFields(const int64_t eventTimeNs,
                                                 const MetricDimensionKey& eventKey,
                                                 const LogEvent& event, vector<Interval>& intervals,
                                                 Bases& bases) {
    if (bases.size() < mFieldMatchers.size()) {
        VLOG("Resizing number of bases to %zu", mFieldMatchers.size());
        bases.resize(mFieldMatchers.size());
    }

    // We only use anomaly detection under certain cases.
    // N.B.: The anomaly detection cases were modified in order to fix an issue with value metrics
    // containing multiple values. We tried to retain all previous behaviour, but we are unsure the
    // previous behaviour was correct. At the time of the fix, anomaly detection had no owner.
    // Whoever next works on it should look into the cases where it is triggered in this function.
    // Discussion here: http://ag/6124370.
    bool useAnomalyDetection = true;
    bool seenNewData = false;
    for (size_t i = 0; i < mFieldMatchers.size(); i++) {
        const Matcher& matcher = mFieldMatchers[i];
        Interval& interval = intervals[i];
        interval.aggIndex = i;
        NumericValue& base = bases[i];
        NumericValue value = getAggregationInputValue(event, matcher);
        if (!value.hasValue()) {
            VLOG("Failed to get value %zu from event %s", i, event.ToString().c_str());
            StatsdStats::getInstance().noteBadValueType(mMetricId);
            return seenNewData;
        }

        if (value.is<HistogramValue>() && !value.getValue<HistogramValue>().isValid()) {
            ALOGE("Invalid histogram at %zu from event %s", i, event.ToString().c_str());
            StatsdStats::getInstance().noteBadValueType(mMetricId);
            if (mUseDiff) {
                base.reset();
            }
            continue;
        }

        if (mUseDiff) {
            if (!base.hasValue()) {
                if (mHasGlobalBase && mUseZeroDefaultBase) {
                    // The bucket has global base. This key does not.
                    // Optionally use zero as base.
                    if (value.is<int64_t>()) {
                        base = ZERO_LONG;
                    } else if (value.is<double>()) {
                        base = ZERO_DOUBLE;
                    } else if (value.is<HistogramValue>()) {
                        base = HistogramValue();
                    }
                } else {
                    // no base. just update base and return.
                    base = value;

                    // If we're missing a base, do not use anomaly detection on incomplete data
                    useAnomalyDetection = false;

                    seenNewData = true;
                    // Continue (instead of return) here in order to set base value for other bases
                    continue;
                }
            }
            NumericValue diff{};
            if (value.is<HistogramValue>()) {
                diff = value - base;
                seenNewData = true;
                base = value;
                if (diff == HistogramValue::ERROR_BINS_MISMATCH) {
                    ALOGE("Value %zu from event %s does not have enough bins", i,
                          event.ToString().c_str());
                    StatsdStats::getInstance().noteBadValueType(mMetricId);
                    continue;
                }
                if (diff == HistogramValue::ERROR_BIN_COUNT_TOO_HIGH) {
                    ALOGE("Value %zu from event %s has decreasing bin count", i,
                          event.ToString().c_str());
                    StatsdStats::getInstance().noteBadValueType(mMetricId);
                    continue;
                }
            } else {
                seenNewData = true;
                switch (mValueDirection) {
                    case ValueMetric::INCREASING:
                        if (value >= base) {
                            diff = value - base;
                        } else if (mUseAbsoluteValueOnReset) {
                            diff = value;
                        } else {
                            VLOG("Unexpected decreasing value");
                            StatsdStats::getInstance().notePullDataError(mPullAtomId);
                            base = value;
                            // If we've got bad data, do not use anomaly detection
                            useAnomalyDetection = false;
                            continue;
                        }
                        break;
                    case ValueMetric::DECREASING:
                        if (base >= value) {
                            diff = base - value;
                        } else if (mUseAbsoluteValueOnReset) {
                            diff = value;
                        } else {
                            VLOG("Unexpected increasing value");
                            StatsdStats::getInstance().notePullDataError(mPullAtomId);
                            base = value;
                            // If we've got bad data, do not use anomaly detection
                            useAnomalyDetection = false;
                            continue;
                        }
                        break;
                    case ValueMetric::ANY:
                        diff = value - base;
                        break;
                    default:
                        break;
                }
                base = value;
            }
            value = diff;
        }

        const ValueMetric::AggregationType aggType = getAggregationTypeLocked(i);
        if (interval.hasValue()) {
            switch (aggType) {
                case ValueMetric::SUM:
                    // for AVG, we add up and take average when flushing the bucket
                case ValueMetric::AVG:
                    interval.aggregate += value;
                    break;
                case ValueMetric::MIN:
                    interval.aggregate = min(value, interval.aggregate);
                    break;
                case ValueMetric::MAX:
                    interval.aggregate = max(value, interval.aggregate);
                    break;
                case ValueMetric::HISTOGRAM:
                    if (value.is<HistogramValue>()) {
                        // client-aggregated histogram: add the corresponding bin counts.
                        NumericValue sum = interval.aggregate + value;
                        if (sum == HistogramValue::ERROR_BINS_MISMATCH) {
                            ALOGE("Value %zu from event %s has too many bins", i,
                                  event.ToString().c_str());
                            StatsdStats::getInstance().noteBadValueType(mMetricId);
                            continue;
                        }
                        interval.aggregate = sum;
                    } else {
                        // statsd-aggregated histogram: add the raw value to histogram.
                        addValueToHistogram(value, getBinStarts(i),
                                            interval.aggregate.getValue<HistogramValue>());
                    }
                    break;
                default:
                    break;
            }
        } else if (aggType == ValueMetric::HISTOGRAM && !value.is<HistogramValue>()) {
            // statsd-aggregated histogram: add raw value to histogram.
            interval.aggregate = HistogramValue();
            addValueToHistogram(value, getBinStarts(i),
                                interval.aggregate.getValue<HistogramValue>());
        } else {
            interval.aggregate = value;
        }
        seenNewData = true;
        interval.sampleSize += 1;
    }

    // Only trigger the tracker if all intervals are correct and we have not skipped the bucket due
    // to MULTIPLE_BUCKETS_SKIPPED.
    if (useAnomalyDetection && !multipleBucketsSkipped(calcBucketsForwardCount(eventTimeNs))) {
        // TODO: propgate proper values down stream when anomaly support doubles
        long wholeBucketVal = intervals[0].aggregate.getValueOrDefault<int64_t>(0);
        auto prev = mCurrentFullBucket.find(eventKey);
        if (prev != mCurrentFullBucket.end()) {
            wholeBucketVal += prev->second;
        }
        for (auto& tracker : mAnomalyTrackers) {
            tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, mMetricId, eventKey,
                                             wholeBucketVal);
        }
    }
    return seenNewData;
}

PastBucket<NumericValue> NumericValueMetricProducer::buildPartialBucket(
        int64_t bucketEndTimeNs, vector<Interval>& intervals) {
    PastBucket<NumericValue> bucket;
    bucket.mBucketStartNs = mCurrentBucketStartTimeNs;
    bucket.mBucketEndNs = bucketEndTimeNs;

    // The first value field acts as a "gatekeeper" - if it does not pass the specified threshold,
    // then all interval values are discarded for this bucket.
    if (intervals.empty() || (intervals[0].hasValue() && !valuePassesThreshold(intervals[0]))) {
        return bucket;
    }

    for (const Interval& interval : intervals) {
        // skip the output if the diff is zero
        if (!interval.hasValue() ||
            (mSkipZeroDiffOutput && mUseDiff && interval.aggregate.isZero())) {
            continue;
        }

        bucket.aggIndex.push_back(interval.aggIndex);
        bucket.aggregates.push_back(getFinalValue(interval));
        if (mIncludeSampleSize) {
            bucket.sampleSizes.push_back(interval.sampleSize);
        }
    }
    return bucket;
}

// Also invalidates current bucket if multiple buckets have been skipped
void NumericValueMetricProducer::closeCurrentBucket(const int64_t eventTimeNs,
                                                    const int64_t nextBucketStartTimeNs) {
    ValueMetricProducer::closeCurrentBucket(eventTimeNs, nextBucketStartTimeNs);
    if (mAnomalyTrackers.size() > 0) {
        appendToFullBucket(eventTimeNs > getCurrentBucketEndTimeNs());
    }
}

void NumericValueMetricProducer::initNextSlicedBucket(int64_t nextBucketStartTimeNs) {
    ValueMetricProducer::initNextSlicedBucket(nextBucketStartTimeNs);

    // If we do not have a global base when the condition is true,
    // we will have incomplete bucket for the next bucket.
    if (mUseDiff && !mHasGlobalBase && mCondition) {
        // TODO(b/188878815): mCurrentBucketIsSkipped should probably be set to true here.
        mCurrentBucketIsSkipped = false;
    }
}

void NumericValueMetricProducer::appendToFullBucket(const bool isFullBucketReached) {
    if (mCurrentBucketIsSkipped) {
        if (isFullBucketReached) {
            // If the bucket is invalid, we ignore the full bucket since it contains invalid data.
            mCurrentFullBucket.clear();
        }
        // Current bucket is invalid, we do not add it to the full bucket.
        return;
    }

    if (isFullBucketReached) {  // If full bucket, send to anomaly tracker.
        // Accumulate partial buckets with current value and then send to anomaly tracker.
        if (mCurrentFullBucket.size() > 0) {
            for (const auto& [metricDimensionKey, currentBucket] : mCurrentSlicedBucket) {
                if (hitFullBucketGuardRailLocked(metricDimensionKey) ||
                    currentBucket.intervals.empty()) {
                    continue;
                }
                // TODO: fix this when anomaly can accept double values
                auto& interval = currentBucket.intervals[0];
                if (interval.hasValue()) {
                    mCurrentFullBucket[metricDimensionKey] +=
                            interval.aggregate.getValueOrDefault<int64_t>(0);
                }
            }
            for (const auto& [metricDimensionKey, value] : mCurrentFullBucket) {
                for (auto& tracker : mAnomalyTrackers) {
                    if (tracker != nullptr) {
                        tracker->addPastBucket(metricDimensionKey, value, mCurrentBucketNum);
                    }
                }
            }
            mCurrentFullBucket.clear();
        } else {
            // Skip aggregating the partial buckets since there's no previous partial bucket.
            for (const auto& [metricDimensionKey, currentBucket] : mCurrentSlicedBucket) {
                for (auto& tracker : mAnomalyTrackers) {
                    if (tracker != nullptr && !currentBucket.intervals.empty()) {
                        // TODO: fix this when anomaly can accept double values
                        auto& interval = currentBucket.intervals[0];
                        if (interval.hasValue()) {
                            const int64_t longVal =
                                    interval.aggregate.getValueOrDefault<int64_t>(0);
                            tracker->addPastBucket(metricDimensionKey, longVal, mCurrentBucketNum);
                        }
                    }
                }
            }
        }
    } else {
        // Accumulate partial bucket.
        for (const auto& [metricDimensionKey, currentBucket] : mCurrentSlicedBucket) {
            if (!currentBucket.intervals.empty()) {
                // TODO: fix this when anomaly can accept double values
                auto& interval = currentBucket.intervals[0];
                if (interval.hasValue()) {
                    mCurrentFullBucket[metricDimensionKey] +=
                            interval.aggregate.getValueOrDefault<int64_t>(0);
                }
            }
        }
    }
}

const optional<const BinStarts>& NumericValueMetricProducer::getBinStarts(
        int valueFieldIndex) const {
    return mBinStartsList.size() == 1 ? mBinStartsList[0] : mBinStartsList[valueFieldIndex];
}

// Estimate for the size of NumericValues.
size_t NumericValueMetricProducer::getAggregatedValueSize(const NumericValue& value) const {
    size_t valueSize = 0;
    // Index
    valueSize += sizeof(int32_t);

    // Value
    valueSize += value.getSize();

    // Sample Size
    if (mIncludeSampleSize) {
        valueSize += sizeof(int32_t);
    }
    return valueSize;
}

size_t NumericValueMetricProducer::byteSizeLocked() const {
    sp<ConfigMetadataProvider> configMetadataProvider = getConfigMetadataProvider();
    if (configMetadataProvider != nullptr && configMetadataProvider->useV2SoftMemoryCalculation()) {
        bool dimensionGuardrailHit = StatsdStats::getInstance().hasHitDimensionGuardrail(mMetricId);
        return computeOverheadSizeLocked(!mPastBuckets.empty() || !mSkippedBuckets.empty(),
                                         dimensionGuardrailHit) +
               mTotalDataSize;
    }
    size_t totalSize = 0;
    for (const auto& [_, buckets] : mPastBuckets) {
        totalSize += buckets.size() * kBucketSize;
        // TODO(b/189283526): Add bytes used to store PastBucket.aggIndex vector
    }
    return totalSize;
}

bool NumericValueMetricProducer::valuePassesThreshold(const Interval& interval) const {
    if (mUploadThreshold == nullopt) {
        return true;
    }

    double doubleValue = toDouble(getFinalValue(interval));

    switch (mUploadThreshold->value_comparison_case()) {
        case UploadThreshold::kLtInt:
            return doubleValue < (double)mUploadThreshold->lt_int();
        case UploadThreshold::kGtInt:
            return doubleValue > (double)mUploadThreshold->gt_int();
        case UploadThreshold::kLteInt:
            return doubleValue <= (double)mUploadThreshold->lte_int();
        case UploadThreshold::kGteInt:
            return doubleValue >= (double)mUploadThreshold->gte_int();
        case UploadThreshold::kLtFloat:
            return doubleValue <= (double)mUploadThreshold->lt_float();
        case UploadThreshold::kGtFloat:
            return doubleValue >= (double)mUploadThreshold->gt_float();
        default:
            ALOGE("Value metric no upload threshold type used");
            return false;
    }
}

NumericValue NumericValueMetricProducer::getFinalValue(const Interval& interval) const {
    if (interval.aggregate.is<HistogramValue>()) {
        return interval.aggregate.getValue<HistogramValue>().getCompactedHistogramValue();
    }
    if (getAggregationTypeLocked(interval.aggIndex) != ValueMetric::AVG) {
        return interval.aggregate;
    } else {
        double sum = toDouble(interval.aggregate);
        return NumericValue(sum / interval.sampleSize);
    }
}

NumericValueMetricProducer::DumpProtoFields NumericValueMetricProducer::getDumpProtoFields() const {
    return {FIELD_ID_VALUE_METRICS,
            FIELD_ID_BUCKET_NUM,
            FIELD_ID_START_BUCKET_ELAPSED_MILLIS,
            FIELD_ID_END_BUCKET_ELAPSED_MILLIS,
            FIELD_ID_CONDITION_TRUE_NS,
            FIELD_ID_CONDITION_CORRECTION_NS};
}

MetricProducer::DataCorruptionSeverity NumericValueMetricProducer::determineCorruptionSeverity(
        int32_t atomId, DataCorruptedReason /*reason*/, LostAtomType atomType) const {
    switch (atomType) {
        case LostAtomType::kWhat:
            return mUseDiff ? DataCorruptionSeverity::kUnrecoverable
                            : DataCorruptionSeverity::kResetOnDump;
        case LostAtomType::kCondition:
        case LostAtomType::kState:
            return DataCorruptionSeverity::kUnrecoverable;
    };
    return DataCorruptionSeverity::kNone;
};

}  // namespace statsd
}  // namespace os
}  // namespace android
