// Copyright 2022 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "backend/grpc_server.h"

#include <google/protobuf/util/json_util.h>
#include <stdlib.h>

#include <cstdint>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>

#include "google/protobuf/empty.pb.h"
#include "grpcpp/server_context.h"
#include "grpcpp/support/status.h"
#include "netsim-daemon/src/ffi.rs.h"
#include "netsim/common.pb.h"
#include "netsim/packet_streamer.grpc.pb.h"
#include "netsim/packet_streamer.pb.h"
#include "util/log.h"

namespace netsim {
namespace backend {
namespace {

using netsim::common::ChipKind;

using Stream =
    ::grpc::ServerReaderWriter<packet::PacketResponse, packet::PacketRequest>;

using netsim::startup::Chip;

// Mapping from chip_id to streams.
std::unordered_map<uint32_t, Stream *> chip_id_to_stream;

// Libslirp is not thread safe. Use a lock to prevent concurrent access to
// libslirp.
std::mutex gSlirpMutex;

// Service handles the gRPC StreamPackets requests.

class ServiceImpl final : public packet::PacketStreamer::Service {
 public:
  ::grpc::Status StreamPackets(::grpc::ServerContext *context,
                               Stream *stream) override {
    // Now connected to a peer issuing a bi-directional streaming grpc
    auto peer = context->peer();
    BtsLogInfo("grpc_server new packet_stream for peer %s", peer.c_str());

    packet::PacketRequest request;

    // First packet must have initial_info describing the peer
    bool success = stream->Read(&request);
    if (!success || !request.has_initial_info()) {
      BtsLogError("ServiceImpl no initial information or stream closed");
      return ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT,
                            "Missing initial_info in first packet.");
    }

    auto device_name = request.initial_info().name();
    auto chip_kind = request.initial_info().chip().kind();
    // multiple chips of the same chip_kind for a device have a name
    auto chip_name = request.initial_info().chip().id();
    auto manufacturer = request.initial_info().chip().manufacturer();
    auto product_name = request.initial_info().chip().product_name();
    auto chip_address = request.initial_info().chip().address();
    auto bt_properties = request.initial_info().chip().bt_properties();
    // Add a new chip to the device
    std::string chip_kind_string;
    switch (chip_kind) {
      case common::ChipKind::BLUETOOTH:
        chip_kind_string = "BLUETOOTH";
        break;
      case common::ChipKind::WIFI:
        chip_kind_string = "WIFI";
        break;
      case common::ChipKind::UWB:
        chip_kind_string = "UWB";
        break;
      default:
        chip_kind_string = "UNSPECIFIED";
        break;
    }

    std::vector<unsigned char> message_vec(bt_properties.ByteSizeLong());
    if (!bt_properties.SerializeToArray(message_vec.data(),
                                        message_vec.size())) {
      BtsLogError("Failed to serialize bt_properties to bytes");
    }
    auto device_info = request.initial_info().device_info();
    auto kind = device_info.kind();
    auto version = device_info.version();
    auto sdk_version = device_info.sdk_version();
    auto build_id = device_info.build_id();
    auto variant = device_info.variant();
    auto arch = device_info.arch();
    auto result = netsim::device::AddChipCxx(
        peer, device_name, chip_kind_string, chip_address, chip_name,
        manufacturer, product_name, message_vec, kind, version, sdk_version,
        build_id, variant, arch);
    if (result->IsError()) {
      return ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT,
                            "AddChipCxx failed to add chip into netsim");
    }
    uint32_t device_id = result->GetDeviceId();
    uint32_t chip_id = result->GetChipId();

    BtsLogInfo(
        "grpc_server: adding chip - chip_id: %d, "
        "device_name: "
        "%s",
        chip_id, device_name.c_str());
    // connect packet responses from chip facade to the peer
    chip_id_to_stream[chip_id] = stream;
    netsim::transport::RegisterGrpcTransport(chip_id);
    this->ProcessRequests(stream, chip_id, chip_kind);

    // no longer able to send responses to peer
    netsim::transport::UnregisterGrpcTransport(chip_id);
    chip_id_to_stream.erase(chip_id);

    // Remove the chip from the device
    netsim::device::RemoveChipCxx(device_id, chip_id);

    BtsLogInfo(
        "grpc_server: removing chip - chip_id: %d, "
        "device_name: "
        "%s",
        chip_id, device_name.c_str());

    return ::grpc::Status::OK;
  }

  // Convert a protobuf bytes field into shared_ptr<<vec<uint8_t>>.
  //
  // Release ownership of the bytes field and convert it to a vector using move
  // iterators. No copy when called with a mutable reference.
  std::shared_ptr<std::vector<uint8_t>> ToSharedVec(std::string *bytes_field) {
    return std::make_shared<std::vector<uint8_t>>(
        std::make_move_iterator(bytes_field->begin()),
        std::make_move_iterator(bytes_field->end()));
  }

  // Process requests in a loop forwarding packets to the packet_hub and
  // returning when the channel is closed.
  void ProcessRequests(Stream *stream, uint32_t chip_id,
                       common::ChipKind chip_kind) {
    packet::PacketRequest request;
    while (true) {
      if (!stream->Read(&request)) {
        BtsLogWarn("grpc_server: reading stopped - chip_id: %d", chip_id);
        break;
      }
      // All kinds possible (bt, uwb, wifi), but each rpc only streames one.
      if (chip_kind == common::ChipKind::BLUETOOTH) {
        if (!request.has_hci_packet()) {
          BtsLogWarn("grpc_server: unknown packet type from chip_id: %d",
                     chip_id);
          continue;
        }
        auto packet_type = request.hci_packet().packet_type();
        auto packet =
            ToSharedVec(request.mutable_hci_packet()->mutable_packet());
        wireless::HandleRequestCxx(chip_id, *packet, packet_type);
      } else if (chip_kind == common::ChipKind::WIFI) {
        if (!request.has_packet()) {
          BtsLogWarn("grpc_server: unknown packet type from chip_id: %d",
                     chip_id);
          continue;
        }
        auto packet = ToSharedVec(request.mutable_packet());
        {
          std::lock_guard<std::mutex> guard(gSlirpMutex);
          wireless::HandleRequestCxx(chip_id, *packet,
                                     packet::HCIPacket::HCI_PACKET_UNSPECIFIED);
        }
      } else if (chip_kind == common::ChipKind::UWB) {
        if (!request.has_packet()) {
          BtsLogWarn("grpc_server: unknown packet from chip_id: %d", chip_id);
          continue;
        }
        auto packet = ToSharedVec(request.mutable_packet());
        wireless::HandleRequestCxx(chip_id, *packet,
                                   packet::HCIPacket::HCI_PACKET_UNSPECIFIED);

      } else {
        BtsLogWarn("grpc_server: unknown chip_kind");
      }
    }
  }
};
}  // namespace

// handle_response is called by packet_hub to forward a response to the gRPC
// stream associated with chip_id.
//
// When writing, the packet is copied because is borrowed from a shared_ptr and
// grpc++ doesn't know about smart pointers.
void HandleResponse(uint32_t chip_id, const std::vector<uint8_t> &packet,
                    packet::HCIPacket_PacketType packet_type) {
  auto stream = chip_id_to_stream[chip_id];
  if (stream) {
    // TODO: lock or caller here because gRPC does not allow overlapping writes.
    packet::PacketResponse response;
    // Copies the borrowed packet for output
    auto str_packet = std::string(packet.begin(), packet.end());
    if (packet_type != packet::HCIPacket_PacketType_HCI_PACKET_UNSPECIFIED) {
      response.mutable_hci_packet()->set_packet_type(packet_type);
      response.mutable_hci_packet()->set_packet(str_packet);
    } else {
      response.set_packet(str_packet);
    }
    if (!stream->Write(response)) {
      BtsLogWarn("grpc_server: write failed for chip_id: %d", chip_id);
    }
  } else {
    BtsLogWarn("grpc_server: no stream for chip_id: %d", chip_id);
  }
}

// for cxx
void HandleResponseCxx(uint32_t chip_id, const rust::Vec<rust::u8> &packet,
                       /* optional */ uint8_t packet_type) {
  std::vector<uint8_t> vec(packet.begin(), packet.end());
  HandleResponse(chip_id, vec, packet::HCIPacket_PacketType(packet_type));
}

}  // namespace backend

std::unique_ptr<packet::PacketStreamer::Service> GetBackendService() {
  return std::make_unique<backend::ServiceImpl>();
}
}  // namespace netsim
