//
//
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//

#ifndef GRPC_SRC_CPP_EXT_OTEL_OTEL_PLUGIN_H
#define GRPC_SRC_CPP_EXT_OTEL_OTEL_PLUGIN_H

#include <grpc/support/port_platform.h>

#include <stddef.h>
#include <stdint.h>

#include <bitset>
#include <memory>
#include <string>
#include <utility>

#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "absl/functional/any_invocable.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "opentelemetry/metrics/async_instruments.h"
#include "opentelemetry/metrics/meter_provider.h"
#include "opentelemetry/metrics/observer_result.h"
#include "opentelemetry/metrics/sync_instruments.h"
#include "opentelemetry/nostd/shared_ptr.h"

#include <grpcpp/ext/otel_plugin.h>

#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/metrics.h"
#include "src/core/lib/transport/metadata_batch.h"

namespace grpc {
namespace internal {

// An iterable container interface that can be used as a return type for the
// OpenTelemetry plugin's label injector.
class LabelsIterable {
 public:
  virtual ~LabelsIterable() = default;

  // Returns the key-value label at the current position or absl::nullopt if the
  // iterator has reached the end.
  virtual absl::optional<std::pair<absl::string_view, absl::string_view>>
  Next() = 0;

  virtual size_t Size() const = 0;

  // Resets position of iterator to the start.
  virtual void ResetIteratorPosition() = 0;
};

// An interface that allows you to add additional labels on the calls traced
// through the OpenTelemetry plugin.
class LabelsInjector {
 public:
  virtual ~LabelsInjector() {}
  // Read the incoming initial metadata to get the set of labels to be added to
  // metrics.
  virtual std::unique_ptr<LabelsIterable> GetLabels(
      grpc_metadata_batch* incoming_initial_metadata) const = 0;

  // Modify the outgoing initial metadata with metadata information to be sent
  // to the peer. On the server side, \a labels_from_incoming_metadata returned
  // from `GetLabels` should be provided as input here. On the client side, this
  // should be nullptr.
  virtual void AddLabels(
      grpc_metadata_batch* outgoing_initial_metadata,
      LabelsIterable* labels_from_incoming_metadata) const = 0;

  // Adds optional labels to the traced calls. Each entry in the span
  // corresponds to the CallAttemptTracer::OptionalLabelComponent enum. Returns
  // false when callback returns false.
  virtual bool AddOptionalLabels(
      bool is_client,
      absl::Span<const grpc_core::RefCountedStringValue> optional_labels,
      opentelemetry::nostd::function_ref<
          bool(opentelemetry::nostd::string_view,
               opentelemetry::common::AttributeValue)>
          callback) const = 0;

  // Gets the actual size of the optional labels that the Plugin is going to
  // produce through the AddOptionalLabels method.
  virtual size_t GetOptionalLabelsSize(
      bool is_client,
      absl::Span<const grpc_core::RefCountedStringValue> optional_labels)
      const = 0;
};

class InternalOpenTelemetryPluginOption
    : public grpc::OpenTelemetryPluginOption {
 public:
  ~InternalOpenTelemetryPluginOption() override = default;
  // Determines whether a plugin option is active on a given channel target
  virtual bool IsActiveOnClientChannel(absl::string_view target) const = 0;
  // Determines whether a plugin option is active on a given server
  virtual bool IsActiveOnServer(const grpc_core::ChannelArgs& args) const = 0;
  // Returns the LabelsInjector used by this plugin option, nullptr if none.
  virtual const grpc::internal::LabelsInjector* labels_injector() const = 0;
};

// Tags
absl::string_view OpenTelemetryMethodKey();
absl::string_view OpenTelemetryStatusKey();
absl::string_view OpenTelemetryTargetKey();

class OpenTelemetryPluginBuilderImpl {
 public:
  OpenTelemetryPluginBuilderImpl();
  ~OpenTelemetryPluginBuilderImpl();
  // If `SetMeterProvider()` is not called, no metrics are collected.
  OpenTelemetryPluginBuilderImpl& SetMeterProvider(
      std::shared_ptr<opentelemetry::metrics::MeterProvider> meter_provider);
  // Methods to manipulate which instruments are enabled in the OpenTelemetry
  // Stats Plugin. The default set of instruments are -
  // grpc.client.attempt.started
  // grpc.client.attempt.duration
  // grpc.client.attempt.sent_total_compressed_message_size
  // grpc.client.attempt.rcvd_total_compressed_message_size
  // grpc.server.call.started
  // grpc.server.call.duration
  // grpc.server.call.sent_total_compressed_message_size
  // grpc.server.call.rcvd_total_compressed_message_size
  OpenTelemetryPluginBuilderImpl& EnableMetrics(
      absl::Span<const absl::string_view> metric_names);
  OpenTelemetryPluginBuilderImpl& DisableMetrics(
      absl::Span<const absl::string_view> metric_names);
  OpenTelemetryPluginBuilderImpl& DisableAllMetrics();
  // If set, \a server_selector is called per incoming call on the server
  // to decide whether to collect metrics on that call or not.
  // TODO(yashkt): We should only need to do this per server connection or even
  // per server. Change this when we have a ServerTracer.
  OpenTelemetryPluginBuilderImpl& SetServerSelector(
      absl::AnyInvocable<bool(const grpc_core::ChannelArgs& /*args*/) const>
          server_selector);
  // If set, \a target_attribute_filter is called per channel to decide whether
  // to record the target attribute on client or to replace it with "other".
  // This helps reduce the cardinality on metrics in cases where many channels
  // are created with different targets in the same binary (which might happen
  // for example, if the channel target string uses IP addresses directly).
  OpenTelemetryPluginBuilderImpl& SetTargetAttributeFilter(
      absl::AnyInvocable<bool(absl::string_view /*target*/) const>
          target_attribute_filter);
  // If set, \a generic_method_attribute_filter is called per call with a
  // generic method type to decide whether to record the method name or to
  // replace it with "other". Non-generic or pre-registered methods remain
  // unaffected. If not set, by default, generic method names are replaced with
  // "other" when recording metrics.
  OpenTelemetryPluginBuilderImpl& SetGenericMethodAttributeFilter(
      absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const>
          generic_method_attribute_filter);
  OpenTelemetryPluginBuilderImpl& AddPluginOption(
      std::unique_ptr<InternalOpenTelemetryPluginOption> option);
  // Records \a optional_label_key on all metrics that provide it.
  OpenTelemetryPluginBuilderImpl& AddOptionalLabel(
      absl::string_view optional_label_key);
  // Set scope filter to choose which channels are recorded by this plugin.
  // Server-side recording remains unaffected.
  OpenTelemetryPluginBuilderImpl& SetChannelScopeFilter(
      absl::AnyInvocable<
          bool(const OpenTelemetryPluginBuilder::ChannelScope& /*scope*/) const>
          channel_scope_filter);
  absl::Status BuildAndRegisterGlobal();

  const absl::flat_hash_set<std::string>& TestOnlyEnabledMetrics() {
    return metrics_;
  }

 private:
  std::shared_ptr<opentelemetry::metrics::MeterProvider> meter_provider_;
  std::unique_ptr<LabelsInjector> labels_injector_;
  absl::AnyInvocable<bool(absl::string_view /*target*/) const>
      target_attribute_filter_;
  absl::flat_hash_set<std::string> metrics_;
  absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const>
      generic_method_attribute_filter_;
  absl::AnyInvocable<bool(const grpc_core::ChannelArgs& /*args*/) const>
      server_selector_;
  std::vector<std::unique_ptr<InternalOpenTelemetryPluginOption>>
      plugin_options_;
  std::set<absl::string_view> optional_label_keys_;
  absl::AnyInvocable<bool(
      const OpenTelemetryPluginBuilder::ChannelScope& /*scope*/) const>
      channel_scope_filter_;
};

class OpenTelemetryPlugin : public grpc_core::StatsPlugin {
 public:
  OpenTelemetryPlugin(
      const absl::flat_hash_set<std::string>& metrics,
      opentelemetry::nostd::shared_ptr<opentelemetry::metrics::MeterProvider>
          meter_provider,
      absl::AnyInvocable<bool(absl::string_view /*target*/) const>
          target_attribute_filter,
      absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const>
          generic_method_attribute_filter,
      absl::AnyInvocable<bool(const grpc_core::ChannelArgs& /*args*/) const>
          server_selector,
      std::vector<std::unique_ptr<InternalOpenTelemetryPluginOption>>
          plugin_options,
      const std::set<absl::string_view>& optional_label_keys,
      absl::AnyInvocable<
          bool(const OpenTelemetryPluginBuilder::ChannelScope& /*scope*/) const>
          channel_scope_filter);

 private:
  class ClientCallTracer;
  class KeyValueIterable;
  class NPCMetricsKeyValueIterable;
  class ServerCallTracer;

  // Creates a convenience wrapper to help iterate over only those plugin
  // options that are active over a given channel/server.
  class ActivePluginOptionsView {
   public:
    static ActivePluginOptionsView MakeForClient(
        absl::string_view target, const OpenTelemetryPlugin* otel_plugin) {
      return ActivePluginOptionsView(
          [target](const InternalOpenTelemetryPluginOption& plugin_option) {
            return plugin_option.IsActiveOnClientChannel(target);
          },
          otel_plugin);
    }

    static ActivePluginOptionsView MakeForServer(
        const grpc_core::ChannelArgs& args,
        const OpenTelemetryPlugin* otel_plugin) {
      return ActivePluginOptionsView(
          [&args](const InternalOpenTelemetryPluginOption& plugin_option) {
            return plugin_option.IsActiveOnServer(args);
          },
          otel_plugin);
    }

    bool ForEach(absl::FunctionRef<
                     bool(const InternalOpenTelemetryPluginOption&, size_t)>
                     func,
                 const OpenTelemetryPlugin* otel_plugin) const {
      for (size_t i = 0; i < otel_plugin->plugin_options().size(); ++i) {
        const auto& plugin_option = otel_plugin->plugin_options()[i];
        if (active_mask_[i] && !func(*plugin_option, i)) {
          return false;
        }
      }
      return true;
    }

   private:
    explicit ActivePluginOptionsView(
        absl::FunctionRef<bool(const InternalOpenTelemetryPluginOption&)> func,
        const OpenTelemetryPlugin* otel_plugin) {
      for (size_t i = 0; i < otel_plugin->plugin_options().size(); ++i) {
        const auto& plugin_option = otel_plugin->plugin_options()[i];
        if (plugin_option != nullptr && func(*plugin_option)) {
          active_mask_.set(i);
        }
      }
    }

    std::bitset<64> active_mask_;
  };

  class ClientScopeConfig : public grpc_core::StatsPlugin::ScopeConfig {
   public:
    ClientScopeConfig(const OpenTelemetryPlugin* otel_plugin,
                      const OpenTelemetryPluginBuilder::ChannelScope& scope)
        : active_plugin_options_view_(ActivePluginOptionsView::MakeForClient(
              scope.target(), otel_plugin)),
          filtered_target_(
              // Use the original target string only if a filter on the
              // attribute is not registered or if the filter returns true,
              // otherwise use "other".
              otel_plugin->target_attribute_filter() == nullptr ||
                      otel_plugin->target_attribute_filter()(scope.target())
                  ? scope.target()
                  : "other") {}

    const ActivePluginOptionsView& active_plugin_options_view() const {
      return active_plugin_options_view_;
    }

    absl::string_view filtered_target() const { return filtered_target_; }

   private:
    ActivePluginOptionsView active_plugin_options_view_;
    std::string filtered_target_;
  };
  class ServerScopeConfig : public grpc_core::StatsPlugin::ScopeConfig {
   public:
    ServerScopeConfig(const OpenTelemetryPlugin* otel_plugin,
                      const grpc_core::ChannelArgs& args)
        : active_plugin_options_view_(
              ActivePluginOptionsView::MakeForServer(args, otel_plugin)) {}

    const ActivePluginOptionsView& active_plugin_options_view() const {
      return active_plugin_options_view_;
    }

   private:
    ActivePluginOptionsView active_plugin_options_view_;
  };

  struct ClientMetrics {
    struct Attempt {
      std::unique_ptr<opentelemetry::metrics::Counter<uint64_t>> started;
      std::unique_ptr<opentelemetry::metrics::Histogram<double>> duration;
      std::unique_ptr<opentelemetry::metrics::Histogram<uint64_t>>
          sent_total_compressed_message_size;
      std::unique_ptr<opentelemetry::metrics::Histogram<uint64_t>>
          rcvd_total_compressed_message_size;
    } attempt;
  };
  struct ServerMetrics {
    struct Call {
      std::unique_ptr<opentelemetry::metrics::Counter<uint64_t>> started;
      std::unique_ptr<opentelemetry::metrics::Histogram<double>> duration;
      std::unique_ptr<opentelemetry::metrics::Histogram<uint64_t>>
          sent_total_compressed_message_size;
      std::unique_ptr<opentelemetry::metrics::Histogram<uint64_t>>
          rcvd_total_compressed_message_size;
    } call;
  };

  // This object should be used inline.
  class CallbackMetricReporter : public grpc_core::CallbackMetricReporter {
   public:
    CallbackMetricReporter(OpenTelemetryPlugin* ot_plugin,
                           grpc_core::RegisteredMetricCallback* key)
        ABSL_EXCLUSIVE_LOCKS_REQUIRED(ot_plugin->mu_);

    void Report(
        grpc_core::GlobalInstrumentsRegistry::GlobalCallbackInt64GaugeHandle
            handle,
        int64_t value, absl::Span<const absl::string_view> label_values,
        absl::Span<const absl::string_view> optional_values)
        ABSL_EXCLUSIVE_LOCKS_REQUIRED(
            CallbackGaugeState<int64_t>::ot_plugin->mu_) override;
    void Report(
        grpc_core::GlobalInstrumentsRegistry::GlobalCallbackDoubleGaugeHandle
            handle,
        double value, absl::Span<const absl::string_view> label_values,
        absl::Span<const absl::string_view> optional_values)
        ABSL_EXCLUSIVE_LOCKS_REQUIRED(
            CallbackGaugeState<double>::ot_plugin->mu_) override;

   private:
    OpenTelemetryPlugin* ot_plugin_;
    grpc_core::RegisteredMetricCallback* key_;
  };

  // Returns the string form of \a key
  static absl::string_view OptionalLabelKeyToString(
      grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey key);

  // Returns the OptionalLabelKey form of \a key if \a key is recognized and
  // is public, absl::nullopt otherwise.
  static absl::optional<
      grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey>
  OptionalLabelStringToKey(absl::string_view key);

  // StatsPlugin:
  std::pair<bool, std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig>>
  IsEnabledForChannel(
      const OpenTelemetryPluginBuilder::ChannelScope& scope) const override;
  std::pair<bool, std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig>>
  IsEnabledForServer(const grpc_core::ChannelArgs& args) const override;
  void AddCounter(
      grpc_core::GlobalInstrumentsRegistry::GlobalUInt64CounterHandle handle,
      uint64_t value, absl::Span<const absl::string_view> label_values,
      absl::Span<const absl::string_view> optional_values) override;
  void AddCounter(
      grpc_core::GlobalInstrumentsRegistry::GlobalDoubleCounterHandle handle,
      double value, absl::Span<const absl::string_view> label_values,
      absl::Span<const absl::string_view> optional_values) override;
  void RecordHistogram(
      grpc_core::GlobalInstrumentsRegistry::GlobalUInt64HistogramHandle handle,
      uint64_t value, absl::Span<const absl::string_view> label_values,
      absl::Span<const absl::string_view> optional_values) override;
  void RecordHistogram(
      grpc_core::GlobalInstrumentsRegistry::GlobalDoubleHistogramHandle handle,
      double value, absl::Span<const absl::string_view> label_values,
      absl::Span<const absl::string_view> optional_values) override;
  void SetGauge(
      grpc_core::GlobalInstrumentsRegistry::GlobalInt64GaugeHandle /*handle*/,
      int64_t /*value*/, absl::Span<const absl::string_view> /*label_values*/,
      absl::Span<const absl::string_view> /*optional_values*/) override {}
  void SetGauge(
      grpc_core::GlobalInstrumentsRegistry::GlobalDoubleGaugeHandle /*handle*/,
      double /*value*/, absl::Span<const absl::string_view> /*label_values*/,
      absl::Span<const absl::string_view> /*optional_values*/) override {}
  void AddCallback(grpc_core::RegisteredMetricCallback* callback)
      ABSL_LOCKS_EXCLUDED(mu_) override;
  void RemoveCallback(grpc_core::RegisteredMetricCallback* callback)
      ABSL_LOCKS_EXCLUDED(mu_) override;
  grpc_core::ClientCallTracer* GetClientCallTracer(
      const grpc_core::Slice& path, bool registered_method,
      std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig> scope_config)
      override;
  grpc_core::ServerCallTracer* GetServerCallTracer(
      std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig> scope_config)
      override;

  const absl::AnyInvocable<bool(const grpc_core::ChannelArgs& /*args*/) const>&
  server_selector() const {
    return server_selector_;
  }
  const absl::AnyInvocable<bool(absl::string_view /*target*/) const>&
  target_attribute_filter() const {
    return target_attribute_filter_;
  }
  const absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const>&
  generic_method_attribute_filter() const {
    return generic_method_attribute_filter_;
  }
  const std::vector<std::unique_ptr<InternalOpenTelemetryPluginOption>>&
  plugin_options() const {
    return plugin_options_;
  }

  template <typename ValueType>
  struct CallbackGaugeState {
    // It's possible to set values for multiple sets of labels at the same time
    // in a single callback. Key is a vector of label values and enabled
    // optional label values.
    using Cache = absl::flat_hash_map<std::vector<std::string>, ValueType>;
    grpc_core::GlobalInstrumentsRegistry::InstrumentID id;
    opentelemetry::nostd::shared_ptr<
        opentelemetry::metrics::ObservableInstrument>
        instrument;
    bool ot_callback_registered ABSL_GUARDED_BY(ot_plugin->mu_);
    // instrument1 ----- RegisteredMetricCallback1
    //               x
    // instrument2 ----- RegisteredMetricCallback2
    // One instrument can be registered by multiple callbacks.
    absl::flat_hash_map<grpc_core::RegisteredMetricCallback*, Cache> caches
        ABSL_GUARDED_BY(ot_plugin->mu_);
    OpenTelemetryPlugin* ot_plugin;

    static void CallbackGaugeCallback(
        opentelemetry::metrics::ObserverResult result, void* arg)
        ABSL_LOCKS_EXCLUDED(ot_plugin->mu_);

    void Observe(opentelemetry::metrics::ObserverResult& result,
                 const Cache& cache)
        ABSL_EXCLUSIVE_LOCKS_REQUIRED(ot_plugin->mu_);
  };

  // Instruments for per-call metrics.
  ClientMetrics client_;
  ServerMetrics server_;
  static constexpr int kOptionalLabelsSizeLimit = 64;
  using OptionalLabelsBitSet = std::bitset<kOptionalLabelsSizeLimit>;
  OptionalLabelsBitSet per_call_optional_label_bits_;
  // Instruments for non-per-call metrics.
  struct Disabled {};
  using Instrument = absl::variant<
      Disabled, std::unique_ptr<opentelemetry::metrics::Counter<uint64_t>>,
      std::unique_ptr<opentelemetry::metrics::Counter<double>>,
      std::unique_ptr<opentelemetry::metrics::Histogram<uint64_t>>,
      std::unique_ptr<opentelemetry::metrics::Histogram<double>>,
      std::unique_ptr<CallbackGaugeState<int64_t>>,
      std::unique_ptr<CallbackGaugeState<double>>>;
  struct InstrumentData {
    Instrument instrument;
    OptionalLabelsBitSet optional_labels_bits;
  };
  std::vector<InstrumentData> instruments_data_;
  grpc_core::Mutex mu_;
  absl::flat_hash_map<grpc_core::RegisteredMetricCallback*,
                      grpc_core::Timestamp>
      callback_timestamps_ ABSL_GUARDED_BY(mu_);
  opentelemetry::nostd::shared_ptr<opentelemetry::metrics::MeterProvider>
      meter_provider_;
  absl::AnyInvocable<bool(const grpc_core::ChannelArgs& /*args*/) const>
      server_selector_;
  absl::AnyInvocable<bool(absl::string_view /*target*/) const>
      target_attribute_filter_;
  absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const>
      generic_method_attribute_filter_;
  std::vector<std::unique_ptr<InternalOpenTelemetryPluginOption>>
      plugin_options_;
  absl::AnyInvocable<bool(
      const OpenTelemetryPluginBuilder::ChannelScope& /*scope*/) const>
      channel_scope_filter_;
};

}  // namespace internal
}  // namespace grpc

#endif  // GRPC_SRC_CPP_EXT_OTEL_OTEL_PLUGIN_H
