//
//
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//

#include "test/cpp/interop/pre_stop_hook_server.h"

#include <thread>

#include "absl/strings/str_format.h"

#include <grpcpp/grpcpp.h>

#include "src/core/lib/gprpp/sync.h"
#include "src/proto/grpc/testing/messages.pb.h"

namespace grpc {
namespace testing {
namespace {

enum class State : std::uint8_t { kNew, kWaiting, kDone, kShuttingDown };

std::unique_ptr<Server> BuildHookServer(HookServiceImpl* service, int port) {
  ServerBuilder builder;
  builder.AddListeningPort(absl::StrFormat("0.0.0.0:%d", port),
                           grpc::InsecureServerCredentials());
  builder.RegisterService(service);
  return builder.BuildAndStart();
}

}  // namespace

class PreStopHookServer {
 public:
  explicit PreStopHookServer(int port, const absl::Duration& startup_timeout)
      : server_(BuildHookServer(&hook_service_, port)),
        server_thread_(PreStopHookServer::ServerThread, this) {
    WaitForState(State::kWaiting, startup_timeout);
  }

  ~PreStopHookServer() {
    hook_service_.Stop();
    SetState(State::kShuttingDown);
    server_->Shutdown();
    WaitForState(State::kDone, absl::Seconds(5));
    server_thread_.detach();
  }

  State GetState() {
    grpc_core::MutexLock lock(&mu_);
    return state_;
  }

  void SetState(State state) {
    grpc_core::MutexLock lock(&mu_);
    state_ = state;
    condition_.SignalAll();
  }

  void SetReturnStatus(const Status& status) {
    hook_service_.AddReturnStatus(status);
  }

  bool TestOnlyExpectRequests(size_t expected_requests_count,
                              absl::Duration timeout) {
    return hook_service_.TestOnlyExpectRequests(expected_requests_count,
                                                timeout);
  }

 private:
  bool WaitForState(State state, const absl::Duration& timeout) {
    grpc_core::MutexLock lock(&mu_);
    auto deadline = absl::Now() + timeout;
    while (state_ != state && !condition_.WaitWithDeadline(&mu_, deadline)) {
    }
    return state_ == state;
  }

  static void ServerThread(PreStopHookServer* server) {
    server->SetState(State::kWaiting);
    server->server_->Wait();
    server->SetState(State::kDone);
  }

  HookServiceImpl hook_service_;
  grpc_core::Mutex mu_;
  grpc_core::CondVar condition_ ABSL_GUARDED_BY(mu_);
  State state_ ABSL_GUARDED_BY(mu_) = State::kNew;
  std::unique_ptr<Server> server_;
  std::thread server_thread_;
};

Status PreStopHookServerManager::Start(int port, size_t timeout_s) {
  if (server_) {
    return Status(StatusCode::ALREADY_EXISTS,
                  "Pre hook server is already running");
  }
  server_ = std::unique_ptr<PreStopHookServer, PreStopHookServerDeleter>(
      new PreStopHookServer(port, absl::Seconds(timeout_s)),
      PreStopHookServerDeleter());
  return server_->GetState() == State::kWaiting
             ? Status::OK
             : Status(StatusCode::DEADLINE_EXCEEDED, "Server have not started");
}

Status PreStopHookServerManager::Stop() {
  if (!server_) {
    return Status(StatusCode::UNAVAILABLE, "Pre hook server is not running");
  }
  server_.reset();
  return Status::OK;
}

void PreStopHookServerManager::Return(StatusCode code,
                                      absl::string_view description) {
  server_->SetReturnStatus(Status(code, std::string(description)));
}

bool PreStopHookServerManager::TestOnlyExpectRequests(
    size_t expected_requests_count, const absl::Duration& timeout) {
  return server_->TestOnlyExpectRequests(expected_requests_count, timeout);
}

void PreStopHookServerManager::PreStopHookServerDeleter::operator()(
    PreStopHookServer* server) {
  delete server;
}

//
// HookServiceImpl
//

ServerUnaryReactor* HookServiceImpl::Hook(CallbackServerContext* context,
                                          const Empty* /* request */,
                                          Empty* /* reply */) {
  auto reactor = context->DefaultReactor();
  grpc_core::MutexLock lock(&mu_);
  pending_requests_.emplace_back(reactor);
  MatchRequestsAndStatuses();
  return reactor;
}

ServerUnaryReactor* HookServiceImpl::SetReturnStatus(
    CallbackServerContext* context, const SetReturnStatusRequest* request,
    Empty* /* reply */) {
  auto reactor = context->DefaultReactor();
  reactor->Finish(Status::OK);
  grpc_core::MutexLock lock(&mu_);
  respond_all_status_.emplace(
      static_cast<StatusCode>(request->grpc_code_to_return()),
      request->grpc_status_description());
  MatchRequestsAndStatuses();
  return reactor;
}

ServerUnaryReactor* HookServiceImpl::ClearReturnStatus(
    CallbackServerContext* context, const Empty* /* request */,
    Empty* /* reply */) {
  auto reactor = context->DefaultReactor();
  reactor->Finish(Status::OK);
  grpc_core::MutexLock lock(&mu_);
  respond_all_status_.reset();
  MatchRequestsAndStatuses();
  return reactor;
}

void HookServiceImpl::AddReturnStatus(const Status& status) {
  grpc_core::MutexLock lock(&mu_);
  pending_statuses_.push_back(status);
  MatchRequestsAndStatuses();
}

bool HookServiceImpl::TestOnlyExpectRequests(size_t expected_requests_count,
                                             const absl::Duration& timeout) {
  grpc_core::MutexLock lock(&mu_);
  auto deadline = absl::Now() + timeout;
  while (pending_requests_.size() < expected_requests_count &&
         !request_var_.WaitWithDeadline(&mu_, deadline)) {
  }
  return pending_requests_.size() >= expected_requests_count;
}

void HookServiceImpl::Stop() {
  grpc_core::MutexLock lock(&mu_);
  if (!respond_all_status_.has_value()) {
    respond_all_status_.emplace(StatusCode::ABORTED, "Shutting down");
  }
  MatchRequestsAndStatuses();
}

void HookServiceImpl::MatchRequestsAndStatuses() {
  while (!pending_requests_.empty() && !pending_statuses_.empty()) {
    pending_requests_.front()->Finish(std::move(pending_statuses_.front()));
    pending_requests_.erase(pending_requests_.begin());
    pending_statuses_.erase(pending_statuses_.begin());
  }
  if (respond_all_status_.has_value()) {
    for (const auto& request : pending_requests_) {
      request->Finish(*respond_all_status_);
    }
    pending_requests_.clear();
  }
  request_var_.SignalAll();
}

}  // namespace testing
}  // namespace grpc
