/*
 * Copyright 2019 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "os/queue.h"

#include <sys/eventfd.h>

#include <atomic>
#include <chrono>
#include <future>
#include <unordered_map>

#include "common/bind.h"
#include "gtest/gtest.h"
#include "os/reactor.h"

using namespace std::chrono_literals;

namespace bluetooth {
namespace os {
namespace {

constexpr int kQueueSize = 10;
constexpr int kHalfOfQueueSize = kQueueSize / 2;
constexpr int kDoubleOfQueueSize = kQueueSize * 2;
constexpr int kQueueSizeOne = 1;

class QueueTest : public ::testing::Test {
protected:
  void SetUp() override {
    enqueue_thread_ = new Thread("enqueue_thread", Thread::Priority::NORMAL);
    enqueue_handler_ = new Handler(enqueue_thread_);
    dequeue_thread_ = new Thread("dequeue_thread", Thread::Priority::NORMAL);
    dequeue_handler_ = new Handler(dequeue_thread_);
  }
  void TearDown() override {
    enqueue_handler_->Clear();
    delete enqueue_handler_;
    delete enqueue_thread_;
    dequeue_handler_->Clear();
    delete dequeue_handler_;
    delete dequeue_thread_;
    enqueue_handler_ = nullptr;
    enqueue_thread_ = nullptr;
    dequeue_handler_ = nullptr;
    dequeue_thread_ = nullptr;
  }

  Thread* enqueue_thread_;
  Handler* enqueue_handler_;
  Thread* dequeue_thread_;
  Handler* dequeue_handler_;

  void sync_enqueue_handler() {
    log::assert_that(enqueue_thread_ != nullptr, "assert failed: enqueue_thread_ != nullptr");
    log::assert_that(enqueue_thread_->GetReactor()->WaitForIdle(2s),
                     "assert failed: enqueue_thread_->GetReactor()->WaitForIdle(2s)");
  }
};

class TestEnqueueEnd {
public:
  explicit TestEnqueueEnd(Queue<std::string>* queue, Handler* handler)
      : count(0), handler_(handler), queue_(queue), delay_(0) {}

  ~TestEnqueueEnd() {}

  void RegisterEnqueue(std::unordered_map<int, std::promise<int>>* promise_map) {
    promise_map_ = promise_map;
    handler_->Post(
            common::BindOnce(&TestEnqueueEnd::handle_register_enqueue, common::Unretained(this)));
  }

  void UnregisterEnqueue() {
    std::promise<void> promise;
    auto future = promise.get_future();

    handler_->Post(common::BindOnce(&TestEnqueueEnd::handle_unregister_enqueue,
                                    common::Unretained(this), std::move(promise)));
    future.wait();
  }

  std::unique_ptr<std::string> EnqueueCallbackForTest() {
    if (delay_ != 0) {
      std::this_thread::sleep_for(std::chrono::milliseconds(delay_));
    }

    count++;
    std::unique_ptr<std::string> data = std::move(buffer_.front());
    buffer_.pop();
    std::string copy = *data;
    if (buffer_.empty()) {
      queue_->UnregisterEnqueue();
    }

    auto key = buffer_.size();
    auto node = promise_map_->extract(key);
    if (node) {
      node.mapped().set_value(key);
    }

    return data;
  }

  void setDelay(int value) { delay_ = value; }

  std::queue<std::unique_ptr<std::string>> buffer_;
  int count;

private:
  Handler* handler_;
  Queue<std::string>* queue_;
  std::unordered_map<int, std::promise<int>>* promise_map_;
  int delay_;

  void handle_register_enqueue() {
    queue_->RegisterEnqueue(handler_, common::Bind(&TestEnqueueEnd::EnqueueCallbackForTest,
                                                   common::Unretained(this)));
  }

  void handle_unregister_enqueue(std::promise<void> promise) {
    queue_->UnregisterEnqueue();
    promise.set_value();
  }
};

class TestDequeueEnd {
public:
  explicit TestDequeueEnd(Queue<std::string>* queue, Handler* handler, int capacity)
      : count(0), handler_(handler), queue_(queue), capacity_(capacity), delay_(0) {}

  ~TestDequeueEnd() {}

  void RegisterDequeue(std::unordered_map<int, std::promise<int>>* promise_map) {
    promise_map_ = promise_map;
    handler_->Post(
            common::BindOnce(&TestDequeueEnd::handle_register_dequeue, common::Unretained(this)));
  }

  void UnregisterDequeue() {
    std::promise<void> promise;
    auto future = promise.get_future();

    handler_->Post(common::BindOnce(&TestDequeueEnd::handle_unregister_dequeue,
                                    common::Unretained(this), std::move(promise)));
    future.wait();
  }

  void DequeueCallbackForTest() {
    if (delay_ != 0) {
      std::this_thread::sleep_for(std::chrono::milliseconds(delay_));
    }

    count++;
    std::unique_ptr<std::string> data = queue_->TryDequeue();
    buffer_.push(std::move(data));

    if (buffer_.size() == (size_t)capacity_) {
      queue_->UnregisterDequeue();
    }

    auto key = buffer_.size();
    auto node = promise_map_->extract(key);
    if (node) {
      node.mapped().set_value(key);
    }
  }

  void setDelay(int value) { delay_ = value; }

  std::queue<std::unique_ptr<std::string>> buffer_;
  int count;

private:
  Handler* handler_;
  Queue<std::string>* queue_;
  std::unordered_map<int, std::promise<int>>* promise_map_;
  int capacity_;
  int delay_;

  void handle_register_dequeue() {
    queue_->RegisterDequeue(handler_, common::Bind(&TestDequeueEnd::DequeueCallbackForTest,
                                                   common::Unretained(this)));
  }

  void handle_unregister_dequeue(std::promise<void> promise) {
    queue_->UnregisterDequeue();
    promise.set_value();
  }
};

// Enqueue end level : 0 -> queue is full, 1 - >  queue isn't full
// Dequeue end level : 0 -> queue is empty, 1 - >  queue isn't empty

// Test 1 : Queue is empty

// Enqueue end level : 1
// Dequeue end level : 0
// Test 1-1 EnqueueCallback should continually be invoked when queue isn't full
TEST_F(QueueTest, register_enqueue_with_empty_queue) {
  Queue<std::string> queue(kQueueSize);
  TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);

  // Push kQueueSize data to enqueue_end buffer
  for (int i = 0; i < kQueueSize; i++) {
    std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
    test_enqueue_end.buffer_.push(std::move(data));
  }
  EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kQueueSize);

  // Register enqueue and expect data move to Queue
  std::unordered_map<int, std::promise<int>> enqueue_promise_map;
  enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0),
                              std::forward_as_tuple());
  auto enqueue_future = enqueue_promise_map[0].get_future();
  test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
  enqueue_future.wait();
  EXPECT_EQ(enqueue_future.get(), 0);
  std::this_thread::sleep_for(std::chrono::milliseconds(20));
}

// Enqueue end level : 1
// Dequeue end level : 0
// Test 1-2 DequeueCallback shouldn't be invoked when queue is empty
TEST_F(QueueTest, register_dequeue_with_empty_queue) {
  Queue<std::string> queue(kQueueSize);
  TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize);

  // Register dequeue, DequeueCallback shouldn't be invoked
  std::unordered_map<int, std::promise<int>> dequeue_promise_map;
  test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
  std::this_thread::sleep_for(std::chrono::milliseconds(20));
  EXPECT_EQ(test_dequeue_end.count, 0);

  test_dequeue_end.UnregisterDequeue();
}

// Test 2 : Queue is full

// Enqueue end level : 0
// Dequeue end level : 1
// Test 2-1 EnqueueCallback shouldn't be invoked when queue is full
TEST_F(QueueTest, register_enqueue_with_full_queue) {
  Queue<std::string> queue(kQueueSize);
  TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);

  // make Queue full
  for (int i = 0; i < kQueueSize; i++) {
    std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
    test_enqueue_end.buffer_.push(std::move(data));
  }
  std::unordered_map<int, std::promise<int>> enqueue_promise_map;
  enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0),
                              std::forward_as_tuple());
  auto enqueue_future = enqueue_promise_map[0].get_future();
  test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
  enqueue_future.wait();
  EXPECT_EQ(enqueue_future.get(), 0);

  // push some data to enqueue_end buffer and register enqueue;
  for (int i = 0; i < kHalfOfQueueSize; i++) {
    std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
    test_enqueue_end.buffer_.push(std::move(data));
  }
  test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);

  // EnqueueCallback shouldn't be invoked
  std::this_thread::sleep_for(std::chrono::milliseconds(20));
  EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kHalfOfQueueSize);
  EXPECT_EQ(test_enqueue_end.count, kQueueSize);

  test_enqueue_end.UnregisterEnqueue();
}

// Enqueue end level : 0
// Dequeue end level : 1
// Test 2-2 DequeueCallback should continually be invoked when queue isn't empty
TEST_F(QueueTest, register_dequeue_with_full_queue) {
  Queue<std::string> queue(kQueueSize);
  TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
  TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize);

  // make Queue full
  for (int i = 0; i < kQueueSize; i++) {
    std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
    test_enqueue_end.buffer_.push(std::move(data));
  }
  std::unordered_map<int, std::promise<int>> enqueue_promise_map;
  enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0),
                              std::forward_as_tuple());
  auto enqueue_future = enqueue_promise_map[0].get_future();
  test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
  enqueue_future.wait();
  EXPECT_EQ(enqueue_future.get(), 0);

  // Register dequeue and expect data move to dequeue end buffer
  std::unordered_map<int, std::promise<int>> dequeue_promise_map;
  dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize),
                              std::forward_as_tuple());
  auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
  test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
  dequeue_future.wait();
  EXPECT_EQ(dequeue_future.get(), kQueueSize);

  test_dequeue_end.UnregisterDequeue();
}

// Test 3 : Queue is non-empty and non-full

// Enqueue end level : 1
// Dequeue end level : 1
// Test 3-1 Register enqueue with half empty queue, EnqueueCallback should continually be invoked
TEST_F(QueueTest, register_enqueue_with_half_empty_queue) {
  Queue<std::string> queue(kQueueSize);
  TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);

  // make Queue half empty
  for (int i = 0; i < kHalfOfQueueSize; i++) {
    std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
    test_enqueue_end.buffer_.push(std::move(data));
  }
  std::unordered_map<int, std::promise<int>> enqueue_promise_map;
  enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0),
                              std::forward_as_tuple());
  auto enqueue_future = enqueue_promise_map[0].get_future();
  test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
  enqueue_future.wait();
  EXPECT_EQ(enqueue_future.get(), 0);

  // push some data to enqueue_end buffer and register enqueue;
  for (int i = 0; i < kHalfOfQueueSize; i++) {
    std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
    test_enqueue_end.buffer_.push(std::move(data));
  }

  // Register enqueue and expect data move to Queue
  enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0),
                              std::forward_as_tuple());
  enqueue_future = enqueue_promise_map[0].get_future();
  test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
  enqueue_future.wait();
  EXPECT_EQ(enqueue_future.get(), 0);
  sync_enqueue_handler();
}

// Enqueue end level : 1
// Dequeue end level : 1
// Test 3-2 Register dequeue with half empty queue, DequeueCallback should continually be invoked
TEST_F(QueueTest, register_dequeue_with_half_empty_queue) {
  Queue<std::string> queue(kQueueSize);
  TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
  TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize);

  // make Queue half empty
  for (int i = 0; i < kHalfOfQueueSize; i++) {
    std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
    test_enqueue_end.buffer_.push(std::move(data));
  }
  std::unordered_map<int, std::promise<int>> enqueue_promise_map;
  enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0),
                              std::forward_as_tuple());
  auto enqueue_future = enqueue_promise_map[0].get_future();
  test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
  enqueue_future.wait();
  EXPECT_EQ(enqueue_future.get(), 0);

  // Register dequeue and expect data move to dequeue end buffer
  std::unordered_map<int, std::promise<int>> dequeue_promise_map;
  dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize),
                              std::forward_as_tuple());
  auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future();
  test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
  dequeue_future.wait();
  EXPECT_EQ(dequeue_future.get(), kHalfOfQueueSize);

  test_dequeue_end.UnregisterDequeue();
}

// Dynamic level test

// Test 4 : Queue becomes full during test, EnqueueCallback should stop to be invoked

// Enqueue end level : 1 -> 0
// Dequeue end level : 1
// Test 4-1 Queue becomes full due to only register EnqueueCallback
TEST_F(QueueTest, queue_becomes_full_enqueue_callback_only) {
  Queue<std::string> queue(kQueueSize);
  TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);

  // push double of kQueueSize to enqueue end buffer
  for (int i = 0; i < kDoubleOfQueueSize; i++) {
    std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
    test_enqueue_end.buffer_.push(std::move(data));
  }

  // Register enqueue and expect kQueueSize data move to Queue
  std::unordered_map<int, std::promise<int>> enqueue_promise_map;
  enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize),
                              std::forward_as_tuple());
  auto enqueue_future = enqueue_promise_map[kQueueSize].get_future();
  test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
  enqueue_future.wait();
  EXPECT_EQ(enqueue_future.get(), kQueueSize);

  // EnqueueCallback shouldn't be invoked and buffer size stay in kQueueSize
  std::this_thread::sleep_for(std::chrono::milliseconds(20));
  EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kQueueSize);
  EXPECT_EQ(test_enqueue_end.count, kQueueSize);

  test_enqueue_end.UnregisterEnqueue();
}

// Enqueue end level : 1 -> 0
// Dequeue end level : 1
// Test 4-2 Queue becomes full due to DequeueCallback unregister during test
TEST_F(QueueTest, queue_becomes_full_dequeue_callback_unregister) {
  Queue<std::string> queue(kQueueSize);
  TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
  TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kHalfOfQueueSize);

  // push double of kQueueSize to enqueue end buffer
  for (int i = 0; i < kDoubleOfQueueSize; i++) {
    std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
    test_enqueue_end.buffer_.push(std::move(data));
  }

  // Register dequeue
  std::unordered_map<int, std::promise<int>> dequeue_promise_map;
  dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize),
                              std::forward_as_tuple());
  auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future();
  test_dequeue_end.RegisterDequeue(&dequeue_promise_map);

  // Register enqueue
  std::unordered_map<int, std::promise<int>> enqueue_promise_map;
  enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize),
                              std::forward_as_tuple());
  auto enqueue_future = enqueue_promise_map[kHalfOfQueueSize].get_future();
  test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);

  // Dequeue end will unregister when buffer size is kHalfOfQueueSize
  dequeue_future.wait();
  EXPECT_EQ(dequeue_future.get(), kHalfOfQueueSize);

  // EnqueueCallback shouldn't be invoked and buffer size stay in kHalfOfQueueSize
  enqueue_future.wait();
  EXPECT_EQ(enqueue_future.get(), kHalfOfQueueSize);
  std::this_thread::sleep_for(std::chrono::milliseconds(20));
  EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kHalfOfQueueSize);
  EXPECT_EQ(test_enqueue_end.count, kQueueSize + kHalfOfQueueSize);

  test_enqueue_end.UnregisterEnqueue();
}

// Enqueue end level : 1 -> 0
// Dequeue end level : 1
// Test 4-3 Queue becomes full due to DequeueCallback is slower
TEST_F(QueueTest, queue_becomes_full_dequeue_callback_slower) {
  Queue<std::string> queue(kQueueSize);
  TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
  TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize);

  // push double of kDoubleOfQueueSize to enqueue end buffer
  for (int i = 0; i < kDoubleOfQueueSize; i++) {
    std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
    test_enqueue_end.buffer_.push(std::move(data));
  }

  // Set 20 ms delay for callback and register dequeue
  std::unordered_map<int, std::promise<int>> dequeue_promise_map;
  test_dequeue_end.setDelay(20);
  auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future();
  test_dequeue_end.RegisterDequeue(&dequeue_promise_map);

  // Register enqueue
  std::unordered_map<int, std::promise<int>> enqueue_promise_map;
  enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0),
                              std::forward_as_tuple());
  auto enqueue_future = enqueue_promise_map[0].get_future();
  test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);

  // Wait for enqueue buffer empty and expect queue is full
  enqueue_future.wait();
  EXPECT_EQ(enqueue_future.get(), 0);
  EXPECT_GE(test_dequeue_end.buffer_.size(), (size_t)(kQueueSize - 1));

  test_dequeue_end.UnregisterDequeue();
}

// Enqueue end level : 0 -> 1
// Dequeue end level : 1 -> 0
// Test 5 Queue becomes full and non empty at same time.
TEST_F(QueueTest, queue_becomes_full_and_non_empty_at_same_time) {
  Queue<std::string> queue(kQueueSizeOne);
  TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
  TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize);

  // push double of kQueueSize to enqueue end buffer
  for (int i = 0; i < kQueueSize; i++) {
    std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
    test_enqueue_end.buffer_.push(std::move(data));
  }

  // Register dequeue
  std::unordered_map<int, std::promise<int>> dequeue_promise_map;
  dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize),
                              std::forward_as_tuple());
  auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
  test_dequeue_end.RegisterDequeue(&dequeue_promise_map);

  // Register enqueue
  std::unordered_map<int, std::promise<int>> enqueue_promise_map;
  auto enqueue_future = enqueue_promise_map[0].get_future();
  test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);

  // Wait for all data move from enqueue end buffer to dequeue end buffer
  dequeue_future.wait();
  EXPECT_EQ(dequeue_future.get(), kQueueSize);

  test_dequeue_end.UnregisterDequeue();
}

// Enqueue end level : 1 -> 0
// Dequeue end level : 1
// Test 6 Queue becomes not full during test, EnqueueCallback should start to be invoked
TEST_F(QueueTest, queue_becomes_non_full_during_test) {
  Queue<std::string> queue(kQueueSize);
  TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
  TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize * 3);

  // make Queue full
  for (int i = 0; i < kDoubleOfQueueSize; i++) {
    std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
    test_enqueue_end.buffer_.push(std::move(data));
  }
  std::unordered_map<int, std::promise<int>> enqueue_promise_map;
  enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize),
                              std::forward_as_tuple());
  enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0),
                              std::forward_as_tuple());
  auto enqueue_future = enqueue_promise_map[kQueueSize].get_future();
  test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
  enqueue_future.wait();
  EXPECT_EQ(enqueue_future.get(), kQueueSize);

  // Expect kQueueSize data block in enqueue end buffer
  std::this_thread::sleep_for(std::chrono::milliseconds(20));
  EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kQueueSize);

  // Register dequeue
  std::unordered_map<int, std::promise<int>> dequeue_promise_map;
  test_dequeue_end.RegisterDequeue(&dequeue_promise_map);

  // Expect enqueue end will empty
  enqueue_future = enqueue_promise_map[0].get_future();
  enqueue_future.wait();
  EXPECT_EQ(enqueue_future.get(), 0);

  test_dequeue_end.UnregisterDequeue();
}

// Enqueue end level : 0 -> 1
// Dequeue end level : 1 -> 0
// Test 7 Queue becomes non full and empty at same time. (Exactly same as Test 5)
TEST_F(QueueTest, queue_becomes_non_full_and_empty_at_same_time) {
  Queue<std::string> queue(kQueueSizeOne);
  TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
  TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize);

  // push double of kQueueSize to enqueue end buffer
  for (int i = 0; i < kQueueSize; i++) {
    std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
    test_enqueue_end.buffer_.push(std::move(data));
  }

  // Register dequeue
  std::unordered_map<int, std::promise<int>> dequeue_promise_map;
  dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize),
                              std::forward_as_tuple());
  auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
  test_dequeue_end.RegisterDequeue(&dequeue_promise_map);

  // Register enqueue
  std::unordered_map<int, std::promise<int>> enqueue_promise_map;
  auto enqueue_future = enqueue_promise_map[0].get_future();
  test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);

  // Wait for all data move from enqueue end buffer to dequeue end buffer
  dequeue_future.wait();
  EXPECT_EQ(dequeue_future.get(), kQueueSize);

  test_dequeue_end.UnregisterDequeue();
}

// Test 8 : Queue becomes empty during test, DequeueCallback should stop to be invoked

// Enqueue end level : 1
// Dequeue end level : 1 -> 0
// Test 8-1 Queue becomes empty due to only register DequeueCallback
TEST_F(QueueTest, queue_becomes_empty_dequeue_callback_only) {
  Queue<std::string> queue(kQueueSize);
  TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
  TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kHalfOfQueueSize);

  // make Queue half empty
  for (int i = 0; i < kHalfOfQueueSize; i++) {
    std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
    test_enqueue_end.buffer_.push(std::move(data));
  }
  std::unordered_map<int, std::promise<int>> enqueue_promise_map;
  enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0),
                              std::forward_as_tuple());
  auto enqueue_future = enqueue_promise_map[0].get_future();
  test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
  enqueue_future.wait();
  EXPECT_EQ(enqueue_future.get(), 0);

  // Register dequeue, expect kHalfOfQueueSize data move to dequeue end buffer
  std::unordered_map<int, std::promise<int>> dequeue_promise_map;
  dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize),
                              std::forward_as_tuple());
  auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future();
  test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
  dequeue_future.wait();
  EXPECT_EQ(dequeue_future.get(), kHalfOfQueueSize);

  // Expect DequeueCallback should stop to be invoked
  std::this_thread::sleep_for(std::chrono::milliseconds(20));
  EXPECT_EQ(test_dequeue_end.count, kHalfOfQueueSize);
}

// Enqueue end level : 1
// Dequeue end level : 1 -> 0
// Test 8-2 Queue becomes empty due to EnqueueCallback unregister during test
TEST_F(QueueTest, queue_becomes_empty_enqueue_callback_unregister) {
  Queue<std::string> queue(kQueueSize);
  TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
  TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize);

  // make Queue half empty
  for (int i = 0; i < kHalfOfQueueSize; i++) {
    std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
    test_enqueue_end.buffer_.push(std::move(data));
  }
  std::unordered_map<int, std::promise<int>> enqueue_promise_map;
  enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0),
                              std::forward_as_tuple());
  auto enqueue_future = enqueue_promise_map[0].get_future();
  test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
  enqueue_future.wait();
  EXPECT_EQ(enqueue_future.get(), 0);

  // push kHalfOfQueueSize to enqueue end buffer and register enqueue.
  for (int i = 0; i < kHalfOfQueueSize; i++) {
    std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
    test_enqueue_end.buffer_.push(std::move(data));
  }
  test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);

  // Register dequeue, expect kQueueSize move to dequeue end buffer
  std::unordered_map<int, std::promise<int>> dequeue_promise_map;
  dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize),
                              std::forward_as_tuple());
  auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
  test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
  dequeue_future.wait();
  EXPECT_EQ(dequeue_future.get(), kQueueSize);

  // Expect DequeueCallback should stop to be invoked
  std::this_thread::sleep_for(std::chrono::milliseconds(20));
  EXPECT_EQ(test_dequeue_end.count, kQueueSize);
}

// Enqueue end level : 1
// Dequeue end level : 0 -> 1
// Test 9 Queue becomes not empty during test, DequeueCallback should start to be invoked
TEST_F(QueueTest, queue_becomes_non_empty_during_test) {
  Queue<std::string> queue(kQueueSize);
  TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
  TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize);

  // Register dequeue
  std::unordered_map<int, std::promise<int>> dequeue_promise_map;
  dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize),
                              std::forward_as_tuple());
  test_dequeue_end.RegisterDequeue(&dequeue_promise_map);

  // push kQueueSize data to enqueue end buffer and register enqueue
  for (int i = 0; i < kQueueSize; i++) {
    std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
    test_enqueue_end.buffer_.push(std::move(data));
  }
  std::unordered_map<int, std::promise<int>> enqueue_promise_map;
  test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);

  // Expect kQueueSize data move to dequeue end buffer
  auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
  dequeue_future.wait();
  EXPECT_EQ(dequeue_future.get(), kQueueSize);
}

TEST_F(QueueTest, pass_smart_pointer_and_unregister) {
  Queue<std::string>* queue = new Queue<std::string>(kQueueSize);

  // Enqueue a string
  std::string valid = "Valid String";
  std::shared_ptr<std::string> shared = std::make_shared<std::string>(valid);
  queue->RegisterEnqueue(enqueue_handler_, common::Bind(
                                                   [](Queue<std::string>* queue,
                                                      std::shared_ptr<std::string> shared) {
                                                     queue->UnregisterEnqueue();
                                                     return std::make_unique<std::string>(*shared);
                                                   },
                                                   common::Unretained(queue), shared));

  // Dequeue the string
  queue->RegisterDequeue(dequeue_handler_,
                         common::Bind(
                                 [](Queue<std::string>* queue, std::string valid) {
                                   queue->UnregisterDequeue();
                                   auto answer = *queue->TryDequeue();
                                   ASSERT_EQ(answer, valid);
                                 },
                                 common::Unretained(queue), valid));

  // Wait for both handlers to finish and delete the Queue
  std::promise<void> promise;
  auto future = promise.get_future();

  enqueue_handler_->Post(common::BindOnce(
          [](os::Handler* dequeue_handler, Queue<std::string>* queue, std::promise<void>* promise) {
            dequeue_handler->Post(common::BindOnce(
                    [](Queue<std::string>* queue, std::promise<void>* promise) {
                      delete queue;
                      promise->set_value();
                    },
                    common::Unretained(queue), common::Unretained(promise)));
          },
          common::Unretained(dequeue_handler_), common::Unretained(queue),
          common::Unretained(&promise)));
  future.wait();
}

std::unique_ptr<std::string> sleep_and_enqueue_callback(int* to_increase) {
  std::this_thread::sleep_for(std::chrono::milliseconds(100));
  (*to_increase)++;
  return std::make_unique<std::string>("Hello");
}

TEST_F(QueueTest, unregister_enqueue_and_wait) {
  Queue<std::string> queue(10);
  int* indicator = new int(100);
  queue.RegisterEnqueue(enqueue_handler_,
                        common::Bind(&sleep_and_enqueue_callback, common::Unretained(indicator)));
  std::this_thread::sleep_for(std::chrono::milliseconds(50));
  queue.UnregisterEnqueue();
  EXPECT_EQ(*indicator, 101);
  delete indicator;
}

std::unique_ptr<std::string> sleep_and_enqueue_callback_and_unregister(
        int* to_increase, Queue<std::string>* queue, std::atomic_bool* is_registered) {
  std::this_thread::sleep_for(std::chrono::milliseconds(100));
  (*to_increase)++;
  if (is_registered->exchange(false)) {
    queue->UnregisterEnqueue();
  }
  return std::make_unique<std::string>("Hello");
}

TEST_F(QueueTest, unregister_enqueue_and_wait_maybe_unregistered) {
  Queue<std::string> queue(10);
  int* indicator = new int(100);
  std::atomic_bool is_registered = true;
  queue.RegisterEnqueue(
          enqueue_handler_,
          common::Bind(&sleep_and_enqueue_callback_and_unregister, common::Unretained(indicator),
                       common::Unretained(&queue), common::Unretained(&is_registered)));
  std::this_thread::sleep_for(std::chrono::milliseconds(50));
  if (is_registered.exchange(false)) {
    queue.UnregisterEnqueue();
  }
  EXPECT_EQ(*indicator, 101);
  delete indicator;
}

void sleep_and_dequeue_callback(int* to_increase) {
  std::this_thread::sleep_for(std::chrono::milliseconds(100));
  (*to_increase)++;
}

TEST_F(QueueTest, unregister_dequeue_and_wait) {
  int* indicator = new int(100);
  Queue<std::string> queue(10);
  queue.RegisterEnqueue(enqueue_handler_, common::Bind(
                                                  [](Queue<std::string>* queue) {
                                                    queue->UnregisterEnqueue();
                                                    return std::make_unique<std::string>("Hello");
                                                  },
                                                  common::Unretained(&queue)));
  queue.RegisterDequeue(enqueue_handler_,
                        common::Bind(&sleep_and_dequeue_callback, common::Unretained(indicator)));
  std::this_thread::sleep_for(std::chrono::milliseconds(50));
  queue.UnregisterDequeue();
  EXPECT_EQ(*indicator, 101);
  delete indicator;
}

// Create all threads for death tests in the function that dies
class QueueDeathTest : public ::testing::Test {
public:
  void RegisterEnqueueAndDelete() {
    Thread* enqueue_thread = new Thread("enqueue_thread", Thread::Priority::NORMAL);
    Handler* enqueue_handler = new Handler(enqueue_thread);
    Queue<std::string>* queue = new Queue<std::string>(kQueueSizeOne);
    queue->RegisterEnqueue(enqueue_handler, common::Bind([]() {
                             return std::make_unique<std::string>("A string to fill the queue");
                           }));
    delete queue;
  }

  void RegisterDequeueAndDelete() {
    Thread* dequeue_thread = new Thread("dequeue_thread", Thread::Priority::NORMAL);
    Handler* dequeue_handler = new Handler(dequeue_thread);
    Queue<std::string>* queue = new Queue<std::string>(kQueueSizeOne);
    queue->RegisterDequeue(dequeue_handler,
                           common::Bind([](Queue<std::string>* queue) { queue->TryDequeue(); },
                                        common::Unretained(queue)));
    delete queue;
  }
};

TEST_F(QueueDeathTest, die_if_enqueue_not_unregistered) {
  EXPECT_DEATH(RegisterEnqueueAndDelete(), "nqueue");
}

TEST_F(QueueDeathTest, die_if_dequeue_not_unregistered) {
  EXPECT_DEATH(RegisterDequeueAndDelete(), "equeue");
}

class MockIQueueEnqueue : public IQueueEnqueue<int> {
public:
  void RegisterEnqueue(Handler* handler, EnqueueCallback callback) override {
    EXPECT_FALSE(registered_);
    registered_ = true;
    handler->Post(common::BindOnce(&MockIQueueEnqueue::handle_register_enqueue,
                                   common::Unretained(this), callback));
  }

  void handle_register_enqueue(EnqueueCallback callback) {
    if (dont_handle_register_enqueue_) {
      return;
    }
    while (registered_) {
      std::unique_ptr<int> front = callback.Run();
      queue_.push(*front);
    }
  }

  void UnregisterEnqueue() override {
    EXPECT_TRUE(registered_);
    registered_ = false;
  }

  bool dont_handle_register_enqueue_ = false;
  bool registered_ = false;
  std::queue<int> queue_;
};

class EnqueueBufferTest : public ::testing::Test {
protected:
  void SetUp() override {
    thread_ = new Thread("test_thread", Thread::Priority::NORMAL);
    handler_ = new Handler(thread_);
  }

  void TearDown() override {
    handler_->Clear();
    delete handler_;
    delete thread_;
  }

  void SynchronizeHandler() {
    std::promise<void> promise;
    auto future = promise.get_future();
    handler_->Post(common::BindOnce([](std::promise<void> promise) { promise.set_value(); },
                                    std::move(promise)));
    future.wait();
  }

  MockIQueueEnqueue enqueue_;
  EnqueueBuffer<int> enqueue_buffer_{&enqueue_};
  Thread* thread_;
  Handler* handler_;
};

TEST_F(EnqueueBufferTest, enqueue) {
  int num_items = 10;
  for (int i = 0; i < num_items; i++) {
    enqueue_buffer_.Enqueue(std::make_unique<int>(i), handler_);
  }
  SynchronizeHandler();
  for (int i = 0; i < num_items; i++) {
    ASSERT_EQ(enqueue_.queue_.front(), i);
    enqueue_.queue_.pop();
  }
  ASSERT_FALSE(enqueue_.registered_);
}

TEST_F(EnqueueBufferTest, clear) {
  enqueue_.dont_handle_register_enqueue_ = true;
  int num_items = 10;
  for (int i = 0; i < num_items; i++) {
    enqueue_buffer_.Enqueue(std::make_unique<int>(i), handler_);
  }
  ASSERT_TRUE(enqueue_.registered_);
  enqueue_buffer_.Clear();
  ASSERT_FALSE(enqueue_.registered_);
}

TEST_F(EnqueueBufferTest, delete_when_in_callback) {
  Queue<int>* queue = new Queue<int>(kQueueSize);
  EnqueueBuffer<int>* enqueue_buffer = new EnqueueBuffer<int>(queue);
  int num_items = 10;
  for (int i = 0; i < num_items; i++) {
    enqueue_buffer->Enqueue(std::make_unique<int>(i), handler_);
  }

  delete enqueue_buffer;
  delete queue;
}

}  // namespace
}  // namespace os
}  // namespace bluetooth
