/*
 * Copyright (C) 2021 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#include "lights_observer.h"

#include <android-base/logging.h>
#include <chrono>
#include "cuttlefish/common/libs/utils/vsock_connection.h"

#include <json/json.h>

namespace cuttlefish {
namespace webrtc_streaming {

LightsObserver::LightsObserver(unsigned int port, unsigned int cid,
                               bool vhost_user_vsock)
    : cid_(cid),
      port_(port),
      vhost_user_vsock_(vhost_user_vsock),
      is_running_(false),
      session_active_(false),
      last_client_channel_id_(-1) {}

LightsObserver::~LightsObserver() { Stop(); }

bool LightsObserver::Start() {
  if (connection_thread_.joinable()) {
    LOG(ERROR) << "Connection thread is already running.";
    return false;
  }

  is_running_ = true;

  connection_thread_ = std::thread([this] {
    while (is_running_) {
      while (cvd_connection_.IsConnected()) {
        ReadServerMessages();
      }

      // Try to start a new connection. If this fails, delay retrying a bit.
      if (is_running_ &&
          !cvd_connection_.Connect(
              port_, cid_,
              vhost_user_vsock_
                  ? std::optional(0) /* any value is okay for client */
                  : std::nullopt)) {
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
        continue;
      }
    }

    LOG(INFO) << "Exiting connection thread";
  });

  LOG(INFO) << "Connection thread running";
  return true;
}

void LightsObserver::Stop() {
  is_running_ = false;
  cvd_connection_.Disconnect();

  // The connection_thread_ should finish at any point now. Let's join it.
  if (connection_thread_.joinable()) {
    connection_thread_.join();
  }
}

void LightsObserver::ReadServerMessages() {
  static constexpr auto kEventKey = "event";
  static constexpr auto kMessageStart = "VIRTUAL_DEVICE_START_LIGHTS_SESSION";
  static constexpr auto kMessageStop = "VIRTUAL_DEVICE_STOP_LIGHTS_SESSION";
  static constexpr auto kMessageUpdate = "VIRTUAL_DEVICE_LIGHTS_UPDATE";

  auto json_value = cvd_connection_.ReadJsonMessage();

  if (json_value[kEventKey] == kMessageStart) {
    session_active_ = true;
  } else if (json_value[kEventKey] == kMessageStop) {
    session_active_ = false;
  } else if (json_value[kEventKey] == kMessageUpdate && session_active_) {
    // Cache the latest update for when new clients register
    std::lock_guard<std::mutex> lock(clients_lock_);
    cached_latest_update_ = json_value;

    // Send update to all subscribed clients
    for (auto itr = client_message_senders_.begin();
         itr != client_message_senders_.end(); itr++) {
      itr->second(json_value);
    }
  }
}

int LightsObserver::Subscribe(
    std::function<bool(const Json::Value&)> lights_message_sender) {
  int client_id = -1;
  {
    std::lock_guard<std::mutex> lock(clients_lock_);
    client_id = ++last_client_channel_id_;
    client_message_senders_[client_id] = lights_message_sender;

    if (!cached_latest_update_.empty()) {
      lights_message_sender(cached_latest_update_);
    }
  }

  return client_id;
}

void LightsObserver::Unsubscribe(int id) {
  std::lock_guard<std::mutex> lock(clients_lock_);
  client_message_senders_.erase(id);
}

}  // namespace webrtc_streaming
}  // namespace cuttlefish
