//
// Copyright 2018 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

#include <grpc/support/port_platform.h>

#include "src/core/client_channel/subchannel_stream_client.h"

#include <inttypes.h>
#include <stdio.h>

#include <utility>

#include <grpc/status.h>
#include <grpc/support/log.h>

#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/time_precise.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/resource_quota/resource_quota.h"
#include "src/core/lib/transport/error_utils.h"

#define SUBCHANNEL_STREAM_INITIAL_CONNECT_BACKOFF_SECONDS 1
#define SUBCHANNEL_STREAM_RECONNECT_BACKOFF_MULTIPLIER 1.6
#define SUBCHANNEL_STREAM_RECONNECT_MAX_BACKOFF_SECONDS 120
#define SUBCHANNEL_STREAM_RECONNECT_JITTER 0.2

namespace grpc_core {

using ::grpc_event_engine::experimental::EventEngine;

//
// SubchannelStreamClient
//

SubchannelStreamClient::SubchannelStreamClient(
    RefCountedPtr<ConnectedSubchannel> connected_subchannel,
    grpc_pollset_set* interested_parties,
    std::unique_ptr<CallEventHandler> event_handler, const char* tracer)
    : InternallyRefCounted<SubchannelStreamClient>(tracer),
      connected_subchannel_(std::move(connected_subchannel)),
      interested_parties_(interested_parties),
      tracer_(tracer),
      call_allocator_(
          connected_subchannel_->args()
              .GetObject<ResourceQuota>()
              ->memory_quota()
              ->CreateMemoryAllocator(
                  (tracer != nullptr) ? tracer : "SubchannelStreamClient")),
      event_handler_(std::move(event_handler)),
      retry_backoff_(
          BackOff::Options()
              .set_initial_backoff(Duration::Seconds(
                  SUBCHANNEL_STREAM_INITIAL_CONNECT_BACKOFF_SECONDS))
              .set_multiplier(SUBCHANNEL_STREAM_RECONNECT_BACKOFF_MULTIPLIER)
              .set_jitter(SUBCHANNEL_STREAM_RECONNECT_JITTER)
              .set_max_backoff(Duration::Seconds(
                  SUBCHANNEL_STREAM_RECONNECT_MAX_BACKOFF_SECONDS))),
      event_engine_(connected_subchannel_->args().GetObject<EventEngine>()) {
  if (GPR_UNLIKELY(tracer_ != nullptr)) {
    gpr_log(GPR_INFO, "%s %p: created SubchannelStreamClient", tracer_, this);
  }
  StartCall();
}

SubchannelStreamClient::~SubchannelStreamClient() {
  if (GPR_UNLIKELY(tracer_ != nullptr)) {
    gpr_log(GPR_INFO, "%s %p: destroying SubchannelStreamClient", tracer_,
            this);
  }
}

void SubchannelStreamClient::Orphan() {
  if (GPR_UNLIKELY(tracer_ != nullptr)) {
    gpr_log(GPR_INFO, "%s %p: SubchannelStreamClient shutting down", tracer_,
            this);
  }
  {
    MutexLock lock(&mu_);
    event_handler_.reset();
    call_state_.reset();
    if (retry_timer_handle_.has_value()) {
      event_engine_->Cancel(*retry_timer_handle_);
      retry_timer_handle_.reset();
    }
  }
  Unref(DEBUG_LOCATION, "orphan");
}

void SubchannelStreamClient::StartCall() {
  MutexLock lock(&mu_);
  StartCallLocked();
}

void SubchannelStreamClient::StartCallLocked() {
  if (event_handler_ == nullptr) return;
  GPR_ASSERT(call_state_ == nullptr);
  if (event_handler_ != nullptr) {
    event_handler_->OnCallStartLocked(this);
  }
  call_state_ = MakeOrphanable<CallState>(Ref(), interested_parties_);
  if (GPR_UNLIKELY(tracer_ != nullptr)) {
    gpr_log(GPR_INFO, "%s %p: SubchannelStreamClient created CallState %p",
            tracer_, this, call_state_.get());
  }
  call_state_->StartCallLocked();
}

void SubchannelStreamClient::StartRetryTimerLocked() {
  if (event_handler_ != nullptr) {
    event_handler_->OnRetryTimerStartLocked(this);
  }
  const Duration timeout = retry_backoff_.NextAttemptTime() - Timestamp::Now();
  if (GPR_UNLIKELY(tracer_ != nullptr)) {
    gpr_log(GPR_INFO, "%s %p: SubchannelStreamClient health check call lost...",
            tracer_, this);
    if (timeout > Duration::Zero()) {
      gpr_log(GPR_INFO, "%s %p: ... will retry in %" PRId64 "ms.", tracer_,
              this, timeout.millis());
    } else {
      gpr_log(GPR_INFO, "%s %p: ... retrying immediately.", tracer_, this);
    }
  }
  retry_timer_handle_ = event_engine_->RunAfter(
      timeout, [self = Ref(DEBUG_LOCATION, "health_retry_timer")]() mutable {
        ApplicationCallbackExecCtx callback_exec_ctx;
        ExecCtx exec_ctx;
        self->OnRetryTimer();
        self.reset(DEBUG_LOCATION, "health_retry_timer");
      });
}

void SubchannelStreamClient::OnRetryTimer() {
  MutexLock lock(&mu_);
  if (event_handler_ != nullptr && retry_timer_handle_.has_value() &&
      call_state_ == nullptr) {
    if (GPR_UNLIKELY(tracer_ != nullptr)) {
      gpr_log(GPR_INFO,
              "%s %p: SubchannelStreamClient restarting health check call",
              tracer_, this);
    }
    StartCallLocked();
  }
  retry_timer_handle_.reset();
}

//
// SubchannelStreamClient::CallState
//

SubchannelStreamClient::CallState::CallState(
    RefCountedPtr<SubchannelStreamClient> health_check_client,
    grpc_pollset_set* interested_parties)
    : subchannel_stream_client_(std::move(health_check_client)),
      pollent_(grpc_polling_entity_create_from_pollset_set(interested_parties)),
      arena_(Arena::Create(subchannel_stream_client_->connected_subchannel_
                               ->GetInitialCallSizeEstimate(),
                           &subchannel_stream_client_->call_allocator_)),
      payload_(context_) {}

SubchannelStreamClient::CallState::~CallState() {
  if (GPR_UNLIKELY(subchannel_stream_client_->tracer_ != nullptr)) {
    gpr_log(GPR_INFO, "%s %p: SubchannelStreamClient destroying CallState %p",
            subchannel_stream_client_->tracer_, subchannel_stream_client_.get(),
            this);
  }
  for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) {
    if (context_[i].destroy != nullptr) {
      context_[i].destroy(context_[i].value);
    }
  }
  // Unset the call combiner cancellation closure.  This has the
  // effect of scheduling the previously set cancellation closure, if
  // any, so that it can release any internal references it may be
  // holding to the call stack.
  call_combiner_.SetNotifyOnCancel(nullptr);
}

void SubchannelStreamClient::CallState::Orphan() {
  call_combiner_.Cancel(absl::CancelledError());
  Cancel();
}

void SubchannelStreamClient::CallState::StartCallLocked() {
  SubchannelCall::Args args = {
      subchannel_stream_client_->connected_subchannel_,
      &pollent_,
      Slice::FromStaticString("/grpc.health.v1.Health/Watch"),
      gpr_get_cycle_counter(),  // start_time
      Timestamp::InfFuture(),   // deadline
      arena_.get(),
      context_,
      &call_combiner_,
  };
  grpc_error_handle error;
  call_ = SubchannelCall::Create(std::move(args), &error).release();
  // Register after-destruction callback.
  GRPC_CLOSURE_INIT(&after_call_stack_destruction_, AfterCallStackDestruction,
                    this, grpc_schedule_on_exec_ctx);
  call_->SetAfterCallStackDestroy(&after_call_stack_destruction_);
  // Check if creation failed.
  if (!error.ok() || subchannel_stream_client_->event_handler_ == nullptr) {
    gpr_log(GPR_ERROR,
            "SubchannelStreamClient %p CallState %p: error creating "
            "stream on subchannel (%s); will retry",
            subchannel_stream_client_.get(), this,
            StatusToString(error).c_str());
    CallEndedLocked(/*retry=*/true);
    return;
  }
  // Initialize payload and batch.
  payload_.context = context_;
  batch_.payload = &payload_;
  // on_complete callback takes ref, handled manually.
  call_->Ref(DEBUG_LOCATION, "on_complete").release();
  batch_.on_complete = GRPC_CLOSURE_INIT(&on_complete_, OnComplete, this,
                                         grpc_schedule_on_exec_ctx);
  // Add send_initial_metadata op.
  send_initial_metadata_.Set(
      HttpPathMetadata(),
      subchannel_stream_client_->event_handler_->GetPathLocked());
  GPR_ASSERT(error.ok());
  payload_.send_initial_metadata.send_initial_metadata =
      &send_initial_metadata_;
  batch_.send_initial_metadata = true;
  // Add send_message op.
  send_message_.Append(Slice(
      subchannel_stream_client_->event_handler_->EncodeSendMessageLocked()));
  payload_.send_message.send_message = &send_message_;
  batch_.send_message = true;
  // Add send_trailing_metadata op.
  payload_.send_trailing_metadata.send_trailing_metadata =
      &send_trailing_metadata_;
  batch_.send_trailing_metadata = true;
  // Add recv_initial_metadata op.
  payload_.recv_initial_metadata.recv_initial_metadata =
      &recv_initial_metadata_;
  payload_.recv_initial_metadata.trailing_metadata_available = nullptr;
  // recv_initial_metadata_ready callback takes ref, handled manually.
  call_->Ref(DEBUG_LOCATION, "recv_initial_metadata_ready").release();
  payload_.recv_initial_metadata.recv_initial_metadata_ready =
      GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady,
                        this, grpc_schedule_on_exec_ctx);
  batch_.recv_initial_metadata = true;
  // Add recv_message op.
  payload_.recv_message.recv_message = &recv_message_;
  payload_.recv_message.call_failed_before_recv_message = nullptr;
  // recv_message callback takes ref, handled manually.
  call_->Ref(DEBUG_LOCATION, "recv_message_ready").release();
  payload_.recv_message.recv_message_ready = GRPC_CLOSURE_INIT(
      &recv_message_ready_, RecvMessageReady, this, grpc_schedule_on_exec_ctx);
  batch_.recv_message = true;
  // Start batch.
  StartBatch(&batch_);
  // Initialize recv_trailing_metadata batch.
  recv_trailing_metadata_batch_.payload = &payload_;
  // Add recv_trailing_metadata op.
  payload_.recv_trailing_metadata.recv_trailing_metadata =
      &recv_trailing_metadata_;
  payload_.recv_trailing_metadata.collect_stats = &collect_stats_;
  // This callback signals the end of the call, so it relies on the
  // initial ref instead of taking a new ref.  When it's invoked, the
  // initial ref is released.
  payload_.recv_trailing_metadata.recv_trailing_metadata_ready =
      GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
                        RecvTrailingMetadataReady, this,
                        grpc_schedule_on_exec_ctx);
  recv_trailing_metadata_batch_.recv_trailing_metadata = true;
  // Start recv_trailing_metadata batch.
  StartBatch(&recv_trailing_metadata_batch_);
}

void SubchannelStreamClient::CallState::StartBatchInCallCombiner(
    void* arg, grpc_error_handle /*error*/) {
  auto* batch = static_cast<grpc_transport_stream_op_batch*>(arg);
  auto* call = static_cast<SubchannelCall*>(batch->handler_private.extra_arg);
  call->StartTransportStreamOpBatch(batch);
}

void SubchannelStreamClient::CallState::StartBatch(
    grpc_transport_stream_op_batch* batch) {
  batch->handler_private.extra_arg = call_;
  GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner,
                    batch, grpc_schedule_on_exec_ctx);
  GRPC_CALL_COMBINER_START(&call_combiner_, &batch->handler_private.closure,
                           absl::OkStatus(), "start_subchannel_batch");
}

void SubchannelStreamClient::CallState::AfterCallStackDestruction(
    void* arg, grpc_error_handle /*error*/) {
  auto* self = static_cast<SubchannelStreamClient::CallState*>(arg);
  delete self;
}

void SubchannelStreamClient::CallState::OnCancelComplete(
    void* arg, grpc_error_handle /*error*/) {
  auto* self = static_cast<SubchannelStreamClient::CallState*>(arg);
  GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "health_cancel");
  self->call_->Unref(DEBUG_LOCATION, "cancel");
}

void SubchannelStreamClient::CallState::StartCancel(
    void* arg, grpc_error_handle /*error*/) {
  auto* self = static_cast<SubchannelStreamClient::CallState*>(arg);
  auto* batch = grpc_make_transport_stream_op(
      GRPC_CLOSURE_CREATE(OnCancelComplete, self, grpc_schedule_on_exec_ctx));
  batch->cancel_stream = true;
  batch->payload->cancel_stream.cancel_error = absl::CancelledError();
  self->call_->StartTransportStreamOpBatch(batch);
}

void SubchannelStreamClient::CallState::Cancel() {
  bool expected = false;
  if (cancelled_.compare_exchange_strong(expected, true,
                                         std::memory_order_acq_rel,
                                         std::memory_order_acquire)) {
    call_->Ref(DEBUG_LOCATION, "cancel").release();
    GRPC_CALL_COMBINER_START(
        &call_combiner_,
        GRPC_CLOSURE_CREATE(StartCancel, this, grpc_schedule_on_exec_ctx),
        absl::OkStatus(), "health_cancel");
  }
}

void SubchannelStreamClient::CallState::OnComplete(
    void* arg, grpc_error_handle /*error*/) {
  auto* self = static_cast<SubchannelStreamClient::CallState*>(arg);
  GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "on_complete");
  self->send_initial_metadata_.Clear();
  self->send_trailing_metadata_.Clear();
  self->call_->Unref(DEBUG_LOCATION, "on_complete");
}

void SubchannelStreamClient::CallState::RecvInitialMetadataReady(
    void* arg, grpc_error_handle /*error*/) {
  auto* self = static_cast<SubchannelStreamClient::CallState*>(arg);
  GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "recv_initial_metadata_ready");
  self->recv_initial_metadata_.Clear();
  self->call_->Unref(DEBUG_LOCATION, "recv_initial_metadata_ready");
}

void SubchannelStreamClient::CallState::RecvMessageReady() {
  if (!recv_message_.has_value()) {
    call_->Unref(DEBUG_LOCATION, "recv_message_ready");
    return;
  }
  // Report payload.
  {
    MutexLock lock(&subchannel_stream_client_->mu_);
    if (subchannel_stream_client_->event_handler_ != nullptr) {
      absl::Status status =
          subchannel_stream_client_->event_handler_->RecvMessageReadyLocked(
              subchannel_stream_client_.get(), recv_message_->JoinIntoString());
      if (!status.ok()) {
        if (GPR_UNLIKELY(subchannel_stream_client_->tracer_ != nullptr)) {
          gpr_log(GPR_INFO,
                  "%s %p: SubchannelStreamClient CallState %p: failed to "
                  "parse response message: %s",
                  subchannel_stream_client_->tracer_,
                  subchannel_stream_client_.get(), this,
                  status.ToString().c_str());
        }
        Cancel();
      }
    }
  }
  seen_response_.store(true, std::memory_order_release);
  recv_message_.reset();
  // Start another recv_message batch.
  // This re-uses the ref we're holding.
  // Note: Can't just reuse batch_ here, since we don't know that all
  // callbacks from the original batch have completed yet.
  recv_message_batch_.payload = &payload_;
  payload_.recv_message.recv_message = &recv_message_;
  payload_.recv_message.call_failed_before_recv_message = nullptr;
  payload_.recv_message.recv_message_ready = GRPC_CLOSURE_INIT(
      &recv_message_ready_, RecvMessageReady, this, grpc_schedule_on_exec_ctx);
  recv_message_batch_.recv_message = true;
  StartBatch(&recv_message_batch_);
}

void SubchannelStreamClient::CallState::RecvMessageReady(
    void* arg, grpc_error_handle /*error*/) {
  auto* self = static_cast<SubchannelStreamClient::CallState*>(arg);
  GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "recv_message_ready");
  self->RecvMessageReady();
}

void SubchannelStreamClient::CallState::RecvTrailingMetadataReady(
    void* arg, grpc_error_handle error) {
  auto* self = static_cast<SubchannelStreamClient::CallState*>(arg);
  GRPC_CALL_COMBINER_STOP(&self->call_combiner_,
                          "recv_trailing_metadata_ready");
  // Get call status.
  grpc_status_code status =
      self->recv_trailing_metadata_.get(GrpcStatusMetadata())
          .value_or(GRPC_STATUS_UNKNOWN);
  if (!error.ok()) {
    grpc_error_get_status(error, Timestamp::InfFuture(), &status,
                          nullptr /* slice */, nullptr /* http_error */,
                          nullptr /* error_string */);
  }
  if (GPR_UNLIKELY(self->subchannel_stream_client_->tracer_ != nullptr)) {
    gpr_log(GPR_INFO,
            "%s %p: SubchannelStreamClient CallState %p: health watch failed "
            "with status %d",
            self->subchannel_stream_client_->tracer_,
            self->subchannel_stream_client_.get(), self, status);
  }
  // Clean up.
  self->recv_trailing_metadata_.Clear();
  // Report call end.
  MutexLock lock(&self->subchannel_stream_client_->mu_);
  if (self->subchannel_stream_client_->event_handler_ != nullptr) {
    self->subchannel_stream_client_->event_handler_
        ->RecvTrailingMetadataReadyLocked(self->subchannel_stream_client_.get(),
                                          status);
  }
  // For status UNIMPLEMENTED, give up and assume always healthy.
  self->CallEndedLocked(/*retry=*/status != GRPC_STATUS_UNIMPLEMENTED);
}

void SubchannelStreamClient::CallState::CallEndedLocked(bool retry) {
  // If this CallState is still in use, this call ended because of a failure,
  // so we need to stop using it and optionally create a new one.
  // Otherwise, we have deliberately ended this call, and no further action
  // is required.
  if (this == subchannel_stream_client_->call_state_.get()) {
    subchannel_stream_client_->call_state_.reset();
    if (retry) {
      GPR_ASSERT(subchannel_stream_client_->event_handler_ != nullptr);
      if (seen_response_.load(std::memory_order_acquire)) {
        // If the call fails after we've gotten a successful response, reset
        // the backoff and restart the call immediately.
        subchannel_stream_client_->retry_backoff_.Reset();
        subchannel_stream_client_->StartCallLocked();
      } else {
        // If the call failed without receiving any messages, retry later.
        subchannel_stream_client_->StartRetryTimerLocked();
      }
    }
  }
  // When the last ref to the call stack goes away, the CallState object
  // will be automatically destroyed.
  call_->Unref(DEBUG_LOCATION, "call_ended");
}

}  // namespace grpc_core
