//
//
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//

#include <grpc/support/port_platform.h>

#include "src/core/lib/http/httpcli.h"

#include <limits.h>

#include <string>
#include <utility>

#include "absl/functional/bind_front.h"
#include "absl/status/status.h"
#include "absl/strings/str_format.h"

#include <grpc/grpc.h>
#include <grpc/slice_buffer.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>

#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_args_preconditioning.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/http/format_request.h"
#include "src/core/lib/http/parser.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/resource_quota/api.h"
#include "src/core/lib/security/credentials/credentials.h"
#include "src/core/lib/security/security_connector/security_connector.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/transport/error_utils.h"
#include "src/core/lib/transport/handshaker_registry.h"
#include "src/core/lib/transport/tcp_connect_handshaker.h"

namespace grpc_core {

namespace {

grpc_httpcli_get_override g_get_override;
grpc_httpcli_post_override g_post_override;
grpc_httpcli_put_override g_put_override;
void (*g_test_only_on_handshake_done_intercept)(HttpRequest* req);

}  // namespace

OrphanablePtr<HttpRequest> HttpRequest::Get(
    URI uri, const grpc_channel_args* channel_args,
    grpc_polling_entity* pollent, const grpc_http_request* request,
    Timestamp deadline, grpc_closure* on_done, grpc_http_response* response,
    RefCountedPtr<grpc_channel_credentials> channel_creds) {
  absl::optional<std::function<void()>> test_only_generate_response;
  if (g_get_override != nullptr) {
    test_only_generate_response = [request, uri, deadline, on_done,
                                   response]() {
      // Note that capturing request here assumes it will remain alive
      // until after Start is called. This avoids making a copy as this
      // code path is only used for test mocks.
      g_get_override(request, uri.authority().c_str(), uri.path().c_str(),
                     deadline, on_done, response);
    };
  }
  std::string name =
      absl::StrFormat("HTTP:GET:%s:%s", uri.authority(), uri.path());
  const grpc_slice request_text = grpc_httpcli_format_get_request(
      request, uri.authority().c_str(), uri.path().c_str());
  return MakeOrphanable<HttpRequest>(
      std::move(uri), request_text, response, deadline, channel_args, on_done,
      pollent, name.c_str(), std::move(test_only_generate_response),
      std::move(channel_creds));
}

OrphanablePtr<HttpRequest> HttpRequest::Post(
    URI uri, const grpc_channel_args* channel_args,
    grpc_polling_entity* pollent, const grpc_http_request* request,
    Timestamp deadline, grpc_closure* on_done, grpc_http_response* response,
    RefCountedPtr<grpc_channel_credentials> channel_creds) {
  absl::optional<std::function<void()>> test_only_generate_response;
  if (g_post_override != nullptr) {
    test_only_generate_response = [request, uri, deadline, on_done,
                                   response]() {
      g_post_override(request, uri.authority().c_str(), uri.path().c_str(),
                      request->body, request->body_length, deadline, on_done,
                      response);
    };
  }
  std::string name =
      absl::StrFormat("HTTP:POST:%s:%s", uri.authority(), uri.path());
  const grpc_slice request_text = grpc_httpcli_format_post_request(
      request, uri.authority().c_str(), uri.path().c_str());
  return MakeOrphanable<HttpRequest>(
      std::move(uri), request_text, response, deadline, channel_args, on_done,
      pollent, name.c_str(), std::move(test_only_generate_response),
      std::move(channel_creds));
}

OrphanablePtr<HttpRequest> HttpRequest::Put(
    URI uri, const grpc_channel_args* channel_args,
    grpc_polling_entity* pollent, const grpc_http_request* request,
    Timestamp deadline, grpc_closure* on_done, grpc_http_response* response,
    RefCountedPtr<grpc_channel_credentials> channel_creds) {
  absl::optional<std::function<void()>> test_only_generate_response;
  if (g_put_override != nullptr) {
    test_only_generate_response = [request, uri, deadline, on_done,
                                   response]() {
      g_put_override(request, uri.authority().c_str(), uri.path().c_str(),
                     request->body, request->body_length, deadline, on_done,
                     response);
    };
  }
  std::string name =
      absl::StrFormat("HTTP:PUT:%s:%s", uri.authority(), uri.path());
  const grpc_slice request_text = grpc_httpcli_format_put_request(
      request, uri.authority().c_str(), uri.path().c_str());
  return MakeOrphanable<HttpRequest>(
      std::move(uri), request_text, response, deadline, channel_args, on_done,
      pollent, name.c_str(), std::move(test_only_generate_response),
      std::move(channel_creds));
}

void HttpRequest::SetOverride(grpc_httpcli_get_override get,
                              grpc_httpcli_post_override post,
                              grpc_httpcli_put_override put) {
  g_get_override = get;
  g_post_override = post;
  g_put_override = put;
}

void HttpRequest::TestOnlySetOnHandshakeDoneIntercept(
    void (*intercept)(HttpRequest* req)) {
  g_test_only_on_handshake_done_intercept = intercept;
}

HttpRequest::HttpRequest(
    URI uri, const grpc_slice& request_text, grpc_http_response* response,
    Timestamp deadline, const grpc_channel_args* channel_args,
    grpc_closure* on_done, grpc_polling_entity* pollent, const char* name,
    absl::optional<std::function<void()>> test_only_generate_response,
    RefCountedPtr<grpc_channel_credentials> channel_creds)
    : uri_(std::move(uri)),
      request_text_(request_text),
      deadline_(deadline),
      channel_args_(CoreConfiguration::Get()
                        .channel_args_preconditioning()
                        .PreconditionChannelArgs(channel_args)
                        .ToC()
                        .release()),
      channel_creds_(std::move(channel_creds)),
      on_done_(on_done),
      resource_quota_(ResourceQuotaFromChannelArgs(channel_args_)),
      pollent_(pollent),
      pollset_set_(grpc_pollset_set_create()),
      test_only_generate_response_(std::move(test_only_generate_response)),
      resolver_(GetDNSResolver()) {
  grpc_http_parser_init(&parser_, GRPC_HTTP_RESPONSE, response);
  grpc_slice_buffer_init(&incoming_);
  grpc_slice_buffer_init(&outgoing_);
  grpc_iomgr_register_object(&iomgr_obj_, name);
  GRPC_CLOSURE_INIT(&on_read_, OnRead, this, grpc_schedule_on_exec_ctx);
  GRPC_CLOSURE_INIT(&continue_on_read_after_schedule_on_exec_ctx_,
                    ContinueOnReadAfterScheduleOnExecCtx, this,
                    grpc_schedule_on_exec_ctx);
  GRPC_CLOSURE_INIT(&done_write_, DoneWrite, this, grpc_schedule_on_exec_ctx);
  GRPC_CLOSURE_INIT(&continue_done_write_after_schedule_on_exec_ctx_,
                    ContinueDoneWriteAfterScheduleOnExecCtx, this,
                    grpc_schedule_on_exec_ctx);
  GPR_ASSERT(pollent);
  grpc_polling_entity_add_to_pollset_set(pollent, pollset_set_);
}

HttpRequest::~HttpRequest() {
  grpc_channel_args_destroy(channel_args_);
  grpc_http_parser_destroy(&parser_);
  if (own_endpoint_ && ep_ != nullptr) {
    grpc_endpoint_destroy(ep_);
  }
  CSliceUnref(request_text_);
  grpc_iomgr_unregister_object(&iomgr_obj_);
  grpc_slice_buffer_destroy(&incoming_);
  grpc_slice_buffer_destroy(&outgoing_);
  grpc_pollset_set_destroy(pollset_set_);
}

void HttpRequest::Start() {
  MutexLock lock(&mu_);
  if (test_only_generate_response_.has_value()) {
    test_only_generate_response_.value()();
    return;
  }
  Ref().release();  // ref held by pending DNS resolution
  dns_request_handle_ = resolver_->LookupHostname(
      absl::bind_front(&HttpRequest::OnResolved, this), uri_.authority(),
      uri_.scheme(), kDefaultDNSRequestTimeout, pollset_set_,
      /*name_server=*/"");
}

void HttpRequest::Orphan() {
  {
    MutexLock lock(&mu_);
    GPR_ASSERT(!cancelled_);
    cancelled_ = true;
    // cancel potentially pending DNS resolution.
    if (dns_request_handle_.has_value() &&
        resolver_->Cancel(dns_request_handle_.value())) {
      Finish(GRPC_ERROR_CREATE("cancelled during DNS resolution"));
      Unref();
    }
    if (handshake_mgr_ != nullptr) {
      // Shutdown will cancel any ongoing tcp connect.
      handshake_mgr_->Shutdown(
          GRPC_ERROR_CREATE("HTTP request cancelled during handshake"));
    }
    if (own_endpoint_ && ep_ != nullptr) {
      grpc_endpoint_shutdown(ep_, GRPC_ERROR_CREATE("HTTP request cancelled"));
    }
  }
  Unref();
}

void HttpRequest::AppendError(grpc_error_handle error) {
  if (overall_error_.ok()) {
    overall_error_ = GRPC_ERROR_CREATE("Failed HTTP/1 client request");
  }
  const grpc_resolved_address* addr = &addresses_[next_address_ - 1];
  auto addr_text = grpc_sockaddr_to_uri(addr);
  overall_error_ = grpc_error_add_child(
      overall_error_,
      grpc_error_set_str(
          error, StatusStrProperty::kTargetAddress,
          addr_text.ok() ? addr_text.value() : addr_text.status().ToString()));
}

void HttpRequest::OnReadInternal(grpc_error_handle error) {
  for (size_t i = 0; i < incoming_.count; i++) {
    if (GRPC_SLICE_LENGTH(incoming_.slices[i])) {
      have_read_byte_ = 1;
      grpc_error_handle err =
          grpc_http_parser_parse(&parser_, incoming_.slices[i], nullptr);
      if (!err.ok()) {
        Finish(err);
        return;
      }
    }
  }
  if (cancelled_) {
    Finish(GRPC_ERROR_CREATE_REFERENCING("HTTP1 request cancelled during read",
                                         &overall_error_, 1));
  } else if (error.ok()) {
    DoRead();
  } else if (!have_read_byte_) {
    NextAddress(error);
  } else {
    Finish(grpc_http_parser_eof(&parser_));
  }
}

void HttpRequest::ContinueDoneWriteAfterScheduleOnExecCtx(
    void* arg, grpc_error_handle error) {
  RefCountedPtr<HttpRequest> req(static_cast<HttpRequest*>(arg));
  MutexLock lock(&req->mu_);
  if (error.ok() && !req->cancelled_) {
    req->OnWritten();
  } else {
    req->NextAddress(error);
  }
}

void HttpRequest::StartWrite() {
  CSliceRef(request_text_);
  grpc_slice_buffer_add(&outgoing_, request_text_);
  Ref().release();  // ref held by pending write
  grpc_endpoint_write(ep_, &outgoing_, &done_write_, nullptr,
                      /*max_frame_size=*/INT_MAX);
}

void HttpRequest::OnHandshakeDone(void* arg, grpc_error_handle error) {
  auto* args = static_cast<HandshakerArgs*>(arg);
  RefCountedPtr<HttpRequest> req(static_cast<HttpRequest*>(args->user_data));
  if (g_test_only_on_handshake_done_intercept != nullptr) {
    // Run this testing intercept before the lock so that it has a chance to
    // do things like calling Orphan on the request
    g_test_only_on_handshake_done_intercept(req.get());
  }
  MutexLock lock(&req->mu_);
  req->own_endpoint_ = true;
  if (!error.ok()) {
    req->handshake_mgr_.reset();
    req->NextAddress(error);
    return;
  }
  // Handshake completed, so we own fields in args
  grpc_slice_buffer_destroy(args->read_buffer);
  gpr_free(args->read_buffer);
  req->ep_ = args->endpoint;
  req->handshake_mgr_.reset();
  if (req->cancelled_) {
    req->NextAddress(
        GRPC_ERROR_CREATE("HTTP request cancelled during handshake"));
    return;
  }
  req->StartWrite();
}

void HttpRequest::DoHandshake(const grpc_resolved_address* addr) {
  // Create the security connector using the credentials and target name.
  ChannelArgs args = ChannelArgs::FromC(channel_args_);
  RefCountedPtr<grpc_channel_security_connector> sc =
      channel_creds_->create_security_connector(
          nullptr /*call_creds*/, uri_.authority().c_str(), &args);
  if (sc == nullptr) {
    Finish(GRPC_ERROR_CREATE_REFERENCING("failed to create security connector",
                                         &overall_error_, 1));
    return;
  }
  absl::StatusOr<std::string> address = grpc_sockaddr_to_uri(addr);
  if (!address.ok()) {
    Finish(GRPC_ERROR_CREATE_REFERENCING("Failed to extract URI from address",
                                         &overall_error_, 1));
    return;
  }
  args = args.SetObject(std::move(sc))
             .Set(GRPC_ARG_TCP_HANDSHAKER_RESOLVED_ADDRESS, address.value());
  // Start the handshake
  handshake_mgr_ = MakeRefCounted<HandshakeManager>();
  CoreConfiguration::Get().handshaker_registry().AddHandshakers(
      HANDSHAKER_CLIENT, args, pollset_set_, handshake_mgr_.get());
  Ref().release();  // ref held by pending handshake
  grpc_endpoint* ep = ep_;
  ep_ = nullptr;
  own_endpoint_ = false;
  handshake_mgr_->DoHandshake(ep, args, deadline_,
                              /*acceptor=*/nullptr, OnHandshakeDone,
                              /*user_data=*/this);
}

void HttpRequest::NextAddress(grpc_error_handle error) {
  if (!error.ok()) {
    AppendError(error);
  }
  if (cancelled_) {
    Finish(GRPC_ERROR_CREATE_REFERENCING("HTTP request was cancelled",
                                         &overall_error_, 1));
    return;
  }
  if (next_address_ == addresses_.size()) {
    Finish(GRPC_ERROR_CREATE_REFERENCING("Failed HTTP requests to all targets",
                                         &overall_error_, 1));
    return;
  }
  const grpc_resolved_address* addr = &addresses_[next_address_++];
  DoHandshake(addr);
}

void HttpRequest::OnResolved(
    absl::StatusOr<std::vector<grpc_resolved_address>> addresses_or) {
  RefCountedPtr<HttpRequest> unreffer(this);
  MutexLock lock(&mu_);
  dns_request_handle_.reset();
  if (cancelled_) {
    Finish(GRPC_ERROR_CREATE("cancelled during DNS resolution"));
    return;
  }
  if (!addresses_or.ok()) {
    Finish(absl_status_to_grpc_error(addresses_or.status()));
    return;
  }
  addresses_ = std::move(*addresses_or);
  next_address_ = 0;
  NextAddress(absl::OkStatus());
}

}  // namespace grpc_core
