//
//
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//

#include <grpc/support/port_platform.h>

#include "src/cpp/ext/gcp/observability_logging_sink.h"

#include <algorithm>
#include <initializer_list>
#include <map>
#include <utility>

#include "absl/numeric/int128.h"
#include "absl/strings/escaping.h"
#include "absl/strings/match.h"
#include "absl/strings/str_format.h"
#include "absl/types/optional.h"
#include "google/api/monitored_resource.pb.h"
#include "google/logging/v2/log_entry.pb.h"
#include "google/logging/v2/logging.grpc.pb.h"
#include "google/logging/v2/logging.pb.h"
#include "google/protobuf/text_format.h"

#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include <grpcpp/grpcpp.h>
#include <grpcpp/security/credentials.h>
#include <grpcpp/support/channel_arguments.h>
#include <grpcpp/support/status.h>

#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/gprpp/env.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/gprpp/uuid_v4.h"
#include "src/core/lib/json/json.h"
#include "src/cpp/ext/filters/census/open_census_call_tracer.h"

// IWYU pragma: no_include "google/protobuf/struct.pb.h"
// IWYU pragma: no_include "google/protobuf/timestamp.pb.h"

namespace grpc {
namespace internal {

using grpc_core::LoggingSink;

ObservabilityLoggingSink::ObservabilityLoggingSink(
    GcpObservabilityConfig::CloudLogging logging_config, std::string project_id,
    std::map<std::string, std::string> labels)
    : project_id_(std::move(project_id)),
      labels_(labels.begin(), labels.end()) {
  for (auto& client_rpc_event_config : logging_config.client_rpc_events) {
    client_configs_.emplace_back(client_rpc_event_config);
  }
  for (auto& server_rpc_event_config : logging_config.server_rpc_events) {
    server_configs_.emplace_back(server_rpc_event_config);
  }
  absl::optional<std::string> authority_env =
      grpc_core::GetEnv("GOOGLE_CLOUD_CPP_LOGGING_SERVICE_V2_ENDPOINT");
  absl::optional<std::string> endpoint_env =
      grpc_core::GetEnv("GOOGLE_CLOUD_CPP_LOGGING_SERVICE_V2_ENDPOINT");
  if (authority_env.has_value() && !authority_env->empty()) {
    authority_ = std::move(*endpoint_env);
  }
}

LoggingSink::Config ObservabilityLoggingSink::FindMatch(
    bool is_client, absl::string_view service, absl::string_view method) {
  const auto& configs = is_client ? client_configs_ : server_configs_;
  if (service.empty() || method.empty()) {
    return LoggingSink::Config();
  }
  for (const auto& config : configs) {
    for (const auto& config_method : config.parsed_methods) {
      if ((config_method.service == "*") ||
          ((service == config_method.service) &&
           ((config_method.method == "*") ||
            (method == config_method.method)))) {
        if (config.exclude) {
          return LoggingSink::Config();
        }
        return LoggingSink::Config(config.max_metadata_bytes,
                                   config.max_message_bytes);
      }
    }
  }
  return LoggingSink::Config();
}

namespace {

std::string EventTypeToString(LoggingSink::Entry::EventType type) {
  switch (type) {
    case LoggingSink::Entry::EventType::kClientHeader:
      return "CLIENT_HEADER";
    case LoggingSink::Entry::EventType::kServerHeader:
      return "SERVER_HEADER";
    case LoggingSink::Entry::EventType::kClientMessage:
      return "CLIENT_MESSAGE";
    case LoggingSink::Entry::EventType::kServerMessage:
      return "SERVER_MESSAGE";
    case LoggingSink::Entry::EventType::kClientHalfClose:
      return "CLIENT_HALF_CLOSE";
    case LoggingSink::Entry::EventType::kServerTrailer:
      return "SERVER_TRAILER";
    case LoggingSink::Entry::EventType::kCancel:
      return "CANCEL";
    case LoggingSink::Entry::EventType::kUnkown:
    default:
      return "EVENT_TYPE_UNKNOWN";
  }
}

std::string LoggerToString(LoggingSink::Entry::Logger type) {
  switch (type) {
    case LoggingSink::Entry::Logger::kClient:
      return "CLIENT";
    case LoggingSink::Entry::Logger::kServer:
      return "SERVER";
    case LoggingSink::Entry::Logger::kUnkown:
    default:
      return "LOGGER_UNKNOWN";
  }
}

void PayloadToJsonStructProto(LoggingSink::Entry::Payload payload,
                              ::google::protobuf::Struct* payload_proto) {
  grpc_core::Json::Object payload_json;
  if (!payload.metadata.empty()) {
    auto* metadata_proto =
        (*payload_proto->mutable_fields())["metadata"].mutable_struct_value();
    for (auto& metadata : payload.metadata) {
      if (absl::EndsWith(metadata.first, "-bin")) {
        (*metadata_proto->mutable_fields())[metadata.first].set_string_value(
            absl::WebSafeBase64Escape(metadata.second));
      } else {
        (*metadata_proto->mutable_fields())[metadata.first].set_string_value(
            std::move(metadata.second));
      }
    }
  }
  if (payload.timeout != grpc_core::Duration::Zero()) {
    (*payload_proto->mutable_fields())["timeout"].set_string_value(
        payload.timeout.ToJsonString());
  }
  if (payload.status_code != 0) {
    (*payload_proto->mutable_fields())["statusCode"].set_number_value(
        payload.status_code);
  }
  if (!payload.status_message.empty()) {
    (*payload_proto->mutable_fields())["statusMessage"].set_string_value(
        std::move(payload.status_message));
  }
  if (!payload.status_details.empty()) {
    (*payload_proto->mutable_fields())["statusDetails"].set_string_value(
        absl::Base64Escape(payload.status_details));
  }
  if (payload.message_length != 0) {
    (*payload_proto->mutable_fields())["messageLength"].set_number_value(
        payload.message_length);
  }
  if (!payload.message.empty()) {
    (*payload_proto->mutable_fields())["message"].set_string_value(
        absl::Base64Escape(payload.message));
  }
}

std::string AddressTypeToString(LoggingSink::Entry::Address::Type type) {
  switch (type) {
    case LoggingSink::Entry::Address::Type::kIpv4:
      return "TYPE_IPV4";
    case LoggingSink::Entry::Address::Type::kIpv6:
      return "TYPE_IPV6";
    case LoggingSink::Entry::Address::Type::kUnix:
      return "TYPE_UNIX";
    case LoggingSink::Entry::Address::Type::kUnknown:
    default:
      return "TYPE_UNKNOWN";
  }
}

void PeerToJsonStructProto(LoggingSink::Entry::Address peer,
                           ::google::protobuf::Struct* peer_json) {
  (*peer_json->mutable_fields())["type"].set_string_value(
      AddressTypeToString(peer.type));
  if (peer.type != LoggingSink::Entry::Address::Type::kUnknown) {
    (*peer_json->mutable_fields())["address"].set_string_value(
        std::move(peer.address));
    (*peer_json->mutable_fields())["ipPort"].set_number_value(peer.ip_port);
  }
}

}  // namespace

void EntryToJsonStructProto(LoggingSink::Entry entry,
                            ::google::protobuf::Struct* json_payload) {
  (*json_payload->mutable_fields())["callId"].set_string_value(
      grpc_core::GenerateUUIDv4(absl::Uint128High64(entry.call_id),
                                absl::Uint128Low64(entry.call_id)));
  (*json_payload->mutable_fields())["sequenceId"].set_number_value(
      entry.sequence_id);
  (*json_payload->mutable_fields())["type"].set_string_value(
      EventTypeToString(entry.type));
  (*json_payload->mutable_fields())["logger"].set_string_value(
      LoggerToString(entry.logger));
  PayloadToJsonStructProto(
      std::move(entry.payload),
      (*json_payload->mutable_fields())["payload"].mutable_struct_value());
  if (entry.payload_truncated) {
    (*json_payload->mutable_fields())["payloadTruncated"].set_bool_value(
        entry.payload_truncated);
  }
  PeerToJsonStructProto(
      std::move(entry.peer),
      (*json_payload->mutable_fields())["peer"].mutable_struct_value());
  (*json_payload->mutable_fields())["authority"].set_string_value(
      std::move(entry.authority));
  (*json_payload->mutable_fields())["serviceName"].set_string_value(
      std::move(entry.service_name));
  (*json_payload->mutable_fields())["methodName"].set_string_value(
      std::move(entry.method_name));
}

namespace {

uint64_t EstimateEntrySize(const LoggingSink::Entry& entry) {
  uint64_t size = sizeof(entry);
  for (const auto& pair : entry.payload.metadata) {
    size += pair.first.size() + pair.second.size();
  }
  size += entry.payload.status_message.size();
  size += entry.payload.status_details.size();
  size += entry.payload.message.size();
  size += entry.authority.size();
  size += entry.service_name.size();
  size += entry.method_name.size();
  return size;
}

}  // namespace

void ObservabilityLoggingSink::LogEntry(Entry entry) {
  auto entry_size = EstimateEntrySize(entry);
  grpc_core::MutexLock lock(&mu_);
  if (sink_closed_) return;
  entries_.push_back(std::move(entry));
  entries_memory_footprint_ += entry_size;
  MaybeTriggerFlushLocked();
}

void ObservabilityLoggingSink::RegisterEnvironmentResource(
    const EnvironmentAutoDetect::ResourceType* resource) {
  grpc_core::MutexLock lock(&mu_);
  resource_ = resource;
  MaybeTriggerFlushLocked();
}

void ObservabilityLoggingSink::FlushAndClose() {
  grpc_core::MutexLock lock(&mu_);
  sink_closed_ = true;
  if (entries_.empty()) return;
  MaybeTriggerFlushLocked();
  sink_flushed_after_close_.Wait(&mu_);
}

void ObservabilityLoggingSink::Flush() {
  std::vector<Entry> entries;
  google::logging::v2::LoggingServiceV2::StubInterface* stub = nullptr;
  const EnvironmentAutoDetect::ResourceType* resource = nullptr;
  {
    grpc_core::MutexLock lock(&mu_);
    if (flush_in_progress_) {
      return;
    }
    flush_in_progress_ = true;
    flush_timer_in_progress_ = false;
    flush_triggered_ = false;
    if (stub_ == nullptr) {
      std::string endpoint;
      absl::optional<std::string> endpoint_env =
          grpc_core::GetEnv("GOOGLE_CLOUD_CPP_LOGGING_SERVICE_V2_ENDPOINT");
      if (endpoint_env.has_value() && !endpoint_env->empty()) {
        endpoint = std::move(*endpoint_env);
      } else {
        endpoint = "logging.googleapis.com";
      }
      ChannelArguments args;
      // Disable observability for RPCs on this channel
      args.SetInt(GRPC_ARG_ENABLE_OBSERVABILITY, 0);
      // Set keepalive time to 24 hrs to effectively disable keepalive ping, but
      // still enable KEEPALIVE_TIMEOUT to get the TCP_USER_TIMEOUT effect.
      args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS,
                  24 * 60 * 60 * 1000 /* 24 hours */);
      args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 20 * 1000 /* 20 seconds */);
      stub_ = google::logging::v2::LoggingServiceV2::NewStub(
          CreateCustomChannel(endpoint, GoogleDefaultCredentials(), args));
    }
    stub = stub_.get();
    entries = std::move(entries_);
    entries_memory_footprint_ = 0;
    resource = resource_;
  }
  FlushEntriesHelper(stub, std::move(entries), resource);
}

void ObservabilityLoggingSink::FlushEntriesHelper(
    google::logging::v2::LoggingServiceV2::StubInterface* stub,
    std::vector<Entry> entries,
    const EnvironmentAutoDetect::ResourceType* resource) {
  if (entries.empty()) {
    return;
  }
  struct CallContext {
    ClientContext context;
    google::logging::v2::WriteLogEntriesRequest request;
    google::logging::v2::WriteLogEntriesResponse response;
  };
  CallContext* call = new CallContext;
  call->context.set_authority(authority_);
  call->context.set_deadline(
      (grpc_core::Timestamp::Now() + grpc_core::Duration::Seconds(30))
          .as_timespec(GPR_CLOCK_MONOTONIC));
  call->request.set_log_name(
      absl::StrFormat("projects/%s/logs/"
                      "microservices.googleapis.com%%2Fobservability%%2fgrpc",
                      project_id_));
  (*call->request.mutable_labels()).insert(labels_.begin(), labels_.end());
  // Set the proper resource type and labels.
  call->request.mutable_resource()->set_type(resource->resource_type);
  call->request.mutable_resource()->mutable_labels()->insert(
      resource->labels.begin(), resource->labels.end());
  for (auto& entry : entries) {
    auto* proto_entry = call->request.add_entries();
    gpr_timespec timespec = entry.timestamp.as_timespec(GPR_CLOCK_REALTIME);
    proto_entry->mutable_timestamp()->set_seconds(timespec.tv_sec);
    proto_entry->mutable_timestamp()->set_nanos(timespec.tv_nsec);
    // Add tracing details
    proto_entry->set_span_id(entry.span_id);
    proto_entry->set_trace(
        absl::StrFormat("projects/%s/traces/%s", project_id_, entry.trace_id));
    proto_entry->set_trace_sampled(entry.is_sampled);
    // TODO(yashykt): Check if we need to fill receive timestamp
    EntryToJsonStructProto(std::move(entry),
                           proto_entry->mutable_json_payload());
  }
  stub->async()->WriteLogEntries(
      &(call->context), &(call->request), &(call->response),
      [this, call](Status status) {
        if (!status.ok()) {
          gpr_log(
              GPR_ERROR,
              "GCP Observability Logging Error %d: %s. Dumping log entries.",
              status.error_code(), status.error_message().c_str());
          for (auto& entry : call->request.entries()) {
            std::string output;
            ::google::protobuf::TextFormat::PrintToString(entry.json_payload(),
                                                          &output);
            gpr_log(
                GPR_INFO, "Log Entry recorded at time: %s : %s",
                grpc_core::Timestamp::FromTimespecRoundUp(
                    gpr_timespec{entry.timestamp().seconds(),
                                 entry.timestamp().nanos(), GPR_CLOCK_REALTIME})
                    .ToString()
                    .c_str(),
                output.c_str());
          }
        }
        delete call;
        grpc_core::MutexLock lock(&mu_);
        flush_in_progress_ = false;
        if (sink_closed_ && entries_.empty()) {
          sink_flushed_after_close_.SignalAll();
        } else {
          MaybeTriggerFlushLocked();
        }
      });
}

void ObservabilityLoggingSink::MaybeTriggerFlush() {
  grpc_core::MutexLock lock(&mu_);
  return MaybeTriggerFlushLocked();
}

void ObservabilityLoggingSink::MaybeTriggerFlushLocked() {
  constexpr int kMaxEntriesBeforeDump = 100000;
  constexpr int kMaxMemoryFootprintBeforeDump = 10 * 1024 * 1024;
  constexpr int kMinEntriesBeforeFlush = 1000;
  constexpr int kMinMemoryFootprintBeforeFlush = 1 * 1024 * 1024;
  // Use this opportunity to fetch environment resource if not fetched already
  if (resource_ == nullptr && !registered_env_fetch_notification_) {
    auto& env_autodetect = EnvironmentAutoDetect::Get();
    resource_ = env_autodetect.resource();
    event_engine_ = grpc_event_engine::experimental::GetDefaultEventEngine();
    if (resource_ == nullptr) {
      registered_env_fetch_notification_ = true;
      env_autodetect.NotifyOnDone([this]() {
        RegisterEnvironmentResource(EnvironmentAutoDetect::Get().resource());
      });
    }
  }
  if (entries_.empty()) return;
  if (entries_.size() > kMaxEntriesBeforeDump ||
      entries_memory_footprint_ > kMaxMemoryFootprintBeforeDump) {
    // Buffer limits have been reached. Dump entries with gpr_log
    gpr_log(GPR_INFO, "Buffer limit reached. Dumping log entries.");
    for (auto& entry : entries_) {
      google::protobuf::Struct proto;
      std::string timestamp = entry.timestamp.ToString();
      EntryToJsonStructProto(std::move(entry), &proto);
      std::string output;
      ::google::protobuf::TextFormat::PrintToString(proto, &output);
      gpr_log(GPR_INFO, "Log Entry recorded at time: %s : %s",
              timestamp.c_str(), output.c_str());
    }
    entries_.clear();
    entries_memory_footprint_ = 0;
  } else if (resource_ != nullptr && !flush_in_progress_) {
    // Environment resource has been detected. Trigger flush if conditions
    // suffice.
    if ((entries_.size() >= kMinEntriesBeforeFlush ||
         entries_memory_footprint_ >= kMinMemoryFootprintBeforeFlush ||
         sink_closed_) &&
        !flush_triggered_) {
      // It is fine even if there were a flush with a timer in progress. What is
      // important is that a flush is triggered.
      flush_triggered_ = true;
      event_engine_->Run([this]() { Flush(); });
    } else if (!flush_timer_in_progress_) {
      flush_timer_in_progress_ = true;
      event_engine_->RunAfter(grpc_core::Duration::Seconds(1),
                              [this]() { Flush(); });
    }
  }
}

ObservabilityLoggingSink::Configuration::Configuration(
    const GcpObservabilityConfig::CloudLogging::RpcEventConfiguration&
        rpc_event_config)
    : exclude(rpc_event_config.exclude),
      max_metadata_bytes(rpc_event_config.max_metadata_bytes),
      max_message_bytes(rpc_event_config.max_message_bytes) {
  for (auto& parsed_method : rpc_event_config.parsed_methods) {
    parsed_methods.emplace_back(ParsedMethod{
        std::string(parsed_method.service), std::string(parsed_method.method)});
  }
}

}  // namespace internal
}  // namespace grpc
