// Copyright 2017 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "mojo/public/cpp/system/wait_set.h"

#include <set>
#include <vector>

#include "base/bind.h"
#include "base/callback.h"
#include "base/memory/ptr_util.h"
#include "base/synchronization/waitable_event.h"
#include "base/threading/platform_thread.h"
#include "base/threading/simple_thread.h"
#include "mojo/public/cpp/system/message_pipe.h"
#include "mojo/public/cpp/system/wait.h"
#include "testing/gtest/include/gtest/gtest.h"

namespace mojo {
namespace {

using WaitSetTest = testing::Test;

void WriteMessage(const ScopedMessagePipeHandle& handle,
                  const base::StringPiece& message) {
  MojoResult rv = WriteMessageRaw(handle.get(), message.data(),
                                  static_cast<uint32_t>(message.size()),
                                  nullptr, 0, MOJO_WRITE_MESSAGE_FLAG_NONE);
  CHECK_EQ(MOJO_RESULT_OK, rv);
}

std::string ReadMessage(const ScopedMessagePipeHandle& handle) {
  std::vector<uint8_t> bytes;
  MojoResult rv = ReadMessageRaw(handle.get(), &bytes, nullptr,
                                 MOJO_READ_MESSAGE_FLAG_NONE);
  CHECK_EQ(MOJO_RESULT_OK, rv);
  return std::string(bytes.begin(), bytes.end());
}

class ThreadedRunner : public base::SimpleThread {
 public:
  explicit ThreadedRunner(const base::Closure& callback)
      : SimpleThread("ThreadedRunner"), callback_(callback) {}
  ~ThreadedRunner() override { Join(); }

  void Run() override { callback_.Run(); }

 private:
  const base::Closure callback_;

  DISALLOW_COPY_AND_ASSIGN(ThreadedRunner);
};

TEST_F(WaitSetTest, Satisfied) {
  WaitSet wait_set;
  MessagePipe p;

  const char kTestMessage1[] = "hello wake up";

  // Watch only one handle and write to the other.

  wait_set.AddHandle(p.handle1.get(), MOJO_HANDLE_SIGNAL_READABLE);
  WriteMessage(p.handle0, kTestMessage1);

  size_t num_ready_handles = 2;
  Handle ready_handles[2];
  MojoResult ready_results[2] = {MOJO_RESULT_UNKNOWN, MOJO_RESULT_UNKNOWN};
  HandleSignalsState hss[2];
  wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results, hss);

  EXPECT_EQ(1u, num_ready_handles);
  EXPECT_EQ(p.handle1.get(), ready_handles[0]);
  EXPECT_EQ(MOJO_RESULT_OK, ready_results[0]);
  EXPECT_TRUE(hss[0].readable() && hss[0].writable() && !hss[0].peer_closed());

  wait_set.RemoveHandle(p.handle1.get());

  // Now watch only the other handle and write to the first one.

  wait_set.AddHandle(p.handle0.get(), MOJO_HANDLE_SIGNAL_READABLE);
  WriteMessage(p.handle1, kTestMessage1);

  num_ready_handles = 2;
  ready_results[0] = MOJO_RESULT_UNKNOWN;
  ready_results[1] = MOJO_RESULT_UNKNOWN;
  wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results, hss);

  EXPECT_EQ(1u, num_ready_handles);
  EXPECT_EQ(p.handle0.get(), ready_handles[0]);
  EXPECT_EQ(MOJO_RESULT_OK, ready_results[0]);
  EXPECT_TRUE(hss[0].readable() && hss[0].writable() && !hss[0].peer_closed());

  // Now wait on both of them.
  wait_set.AddHandle(p.handle1.get(), MOJO_HANDLE_SIGNAL_READABLE);

  num_ready_handles = 2;
  ready_results[0] = MOJO_RESULT_UNKNOWN;
  ready_results[1] = MOJO_RESULT_UNKNOWN;
  wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results, hss);
  EXPECT_EQ(2u, num_ready_handles);
  EXPECT_TRUE((ready_handles[0] == p.handle0.get() &&
               ready_handles[1] == p.handle1.get()) ||
              (ready_handles[0] == p.handle1.get() &&
               ready_handles[1] == p.handle0.get()));
  EXPECT_EQ(MOJO_RESULT_OK, ready_results[0]);
  EXPECT_EQ(MOJO_RESULT_OK, ready_results[1]);
  EXPECT_TRUE(hss[0].readable() && hss[0].writable() && !hss[0].peer_closed());
  EXPECT_TRUE(hss[1].readable() && hss[1].writable() && !hss[1].peer_closed());

  // Wait on both again, but with only enough output space for one result.
  num_ready_handles = 1;
  ready_results[0] = MOJO_RESULT_UNKNOWN;
  wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results, hss);
  EXPECT_EQ(1u, num_ready_handles);
  EXPECT_TRUE(ready_handles[0] == p.handle0.get() ||
              ready_handles[0] == p.handle1.get());
  EXPECT_EQ(MOJO_RESULT_OK, ready_results[0]);

  // Remove the ready handle from the set and wait one more time.
  EXPECT_EQ(MOJO_RESULT_OK, wait_set.RemoveHandle(ready_handles[0]));

  num_ready_handles = 1;
  ready_results[0] = MOJO_RESULT_UNKNOWN;
  wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results, hss);
  EXPECT_EQ(1u, num_ready_handles);
  EXPECT_TRUE(ready_handles[0] == p.handle0.get() ||
              ready_handles[0] == p.handle1.get());
  EXPECT_EQ(MOJO_RESULT_OK, ready_results[0]);

  EXPECT_EQ(MOJO_RESULT_OK, wait_set.RemoveHandle(ready_handles[0]));

  // The wait set should be empty now. Nothing to wait on.
  num_ready_handles = 2;
  wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results);
  EXPECT_EQ(0u, num_ready_handles);
}

TEST_F(WaitSetTest, Unsatisfiable) {
  MessagePipe p, q;
  WaitSet wait_set;

  wait_set.AddHandle(q.handle0.get(), MOJO_HANDLE_SIGNAL_READABLE);
  wait_set.AddHandle(q.handle1.get(), MOJO_HANDLE_SIGNAL_READABLE);
  wait_set.AddHandle(p.handle0.get(), MOJO_HANDLE_SIGNAL_READABLE);

  size_t num_ready_handles = 2;
  Handle ready_handles[2];
  MojoResult ready_results[2] = {MOJO_RESULT_UNKNOWN, MOJO_RESULT_UNKNOWN};

  p.handle1.reset();
  wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results);
  EXPECT_EQ(1u, num_ready_handles);
  EXPECT_EQ(p.handle0.get(), ready_handles[0]);
  EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, ready_results[0]);
}

TEST_F(WaitSetTest, CloseWhileWaiting) {
  MessagePipe p;
  WaitSet wait_set;

  wait_set.AddHandle(p.handle0.get(), MOJO_HANDLE_SIGNAL_READABLE);

  const Handle handle0_value = p.handle0.get();
  ThreadedRunner close_after_delay(base::Bind(
      [](ScopedMessagePipeHandle* handle) {
        // Wait a little while, then close the handle.
        base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(200));
        handle->reset();
      },
      &p.handle0));
  close_after_delay.Start();

  size_t num_ready_handles = 2;
  Handle ready_handles[2];
  MojoResult ready_results[2] = {MOJO_RESULT_UNKNOWN, MOJO_RESULT_UNKNOWN};
  wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results);
  EXPECT_EQ(1u, num_ready_handles);
  EXPECT_EQ(handle0_value, ready_handles[0]);
  EXPECT_EQ(MOJO_RESULT_CANCELLED, ready_results[0]);

  EXPECT_EQ(MOJO_RESULT_NOT_FOUND, wait_set.RemoveHandle(handle0_value));
}

TEST_F(WaitSetTest, CloseBeforeWaiting) {
  MessagePipe p;
  WaitSet wait_set;

  wait_set.AddHandle(p.handle0.get(), MOJO_HANDLE_SIGNAL_READABLE);
  wait_set.AddHandle(p.handle1.get(), MOJO_HANDLE_SIGNAL_READABLE);

  Handle handle0_value = p.handle0.get();
  Handle handle1_value = p.handle1.get();

  p.handle0.reset();
  p.handle1.reset();

  // Ensure that the WaitSet user is always made aware of all cancellations even
  // if they happen while not waiting, or they have to be returned over the span
  // of multiple Wait() calls due to insufficient output storage.

  size_t num_ready_handles = 1;
  Handle ready_handle;
  MojoResult ready_result = MOJO_RESULT_UNKNOWN;
  wait_set.Wait(nullptr, &num_ready_handles, &ready_handle, &ready_result);
  EXPECT_EQ(1u, num_ready_handles);
  EXPECT_TRUE(ready_handle == handle0_value || ready_handle == handle1_value);
  EXPECT_EQ(MOJO_RESULT_CANCELLED, ready_result);
  EXPECT_EQ(MOJO_RESULT_NOT_FOUND, wait_set.RemoveHandle(handle0_value));

  wait_set.Wait(nullptr, &num_ready_handles, &ready_handle, &ready_result);
  EXPECT_EQ(1u, num_ready_handles);
  EXPECT_TRUE(ready_handle == handle0_value || ready_handle == handle1_value);
  EXPECT_EQ(MOJO_RESULT_CANCELLED, ready_result);
  EXPECT_EQ(MOJO_RESULT_NOT_FOUND, wait_set.RemoveHandle(handle0_value));

  // Nothing more to wait on.
  wait_set.Wait(nullptr, &num_ready_handles, &ready_handle, &ready_result);
  EXPECT_EQ(0u, num_ready_handles);
}

TEST_F(WaitSetTest, SatisfiedThenUnsatisfied) {
  MessagePipe p;
  WaitSet wait_set;

  wait_set.AddHandle(p.handle0.get(), MOJO_HANDLE_SIGNAL_READABLE);
  wait_set.AddHandle(p.handle1.get(), MOJO_HANDLE_SIGNAL_READABLE);

  const char kTestMessage1[] = "testing testing testing";
  WriteMessage(p.handle0, kTestMessage1);

  size_t num_ready_handles = 2;
  Handle ready_handles[2];
  MojoResult ready_results[2] = {MOJO_RESULT_UNKNOWN, MOJO_RESULT_UNKNOWN};
  wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results);
  EXPECT_EQ(1u, num_ready_handles);
  EXPECT_EQ(p.handle1.get(), ready_handles[0]);
  EXPECT_EQ(MOJO_RESULT_OK, ready_results[0]);

  EXPECT_EQ(kTestMessage1, ReadMessage(p.handle1));

  ThreadedRunner write_after_delay(base::Bind(
      [](ScopedMessagePipeHandle* handle) {
        // Wait a little while, then write a message.
        base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(200));
        WriteMessage(*handle, "wakey wakey");
      },
      &p.handle1));
  write_after_delay.Start();

  num_ready_handles = 2;
  wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results);
  EXPECT_EQ(1u, num_ready_handles);
  EXPECT_EQ(p.handle0.get(), ready_handles[0]);
  EXPECT_EQ(MOJO_RESULT_OK, ready_results[0]);
}

TEST_F(WaitSetTest, EventOnly) {
  base::WaitableEvent event(base::WaitableEvent::ResetPolicy::MANUAL,
                            base::WaitableEvent::InitialState::SIGNALED);
  WaitSet wait_set;
  wait_set.AddEvent(&event);

  base::WaitableEvent* ready_event = nullptr;
  size_t num_ready_handles = 1;
  Handle ready_handle;
  MojoResult ready_result = MOJO_RESULT_UNKNOWN;
  wait_set.Wait(&ready_event, &num_ready_handles, &ready_handle, &ready_result);
  EXPECT_EQ(0u, num_ready_handles);
  EXPECT_EQ(&event, ready_event);
}

TEST_F(WaitSetTest, EventAndHandle) {
  const char kTestMessage[] = "hello hello";

  MessagePipe p;
  WriteMessage(p.handle0, kTestMessage);

  base::WaitableEvent event(base::WaitableEvent::ResetPolicy::MANUAL,
                            base::WaitableEvent::InitialState::NOT_SIGNALED);

  WaitSet wait_set;
  wait_set.AddHandle(p.handle1.get(), MOJO_HANDLE_SIGNAL_READABLE);
  wait_set.AddEvent(&event);

  base::WaitableEvent* ready_event = nullptr;
  size_t num_ready_handles = 1;
  Handle ready_handle;
  MojoResult ready_result = MOJO_RESULT_UNKNOWN;
  wait_set.Wait(&ready_event, &num_ready_handles, &ready_handle, &ready_result);
  EXPECT_EQ(1u, num_ready_handles);
  EXPECT_EQ(nullptr, ready_event);
  EXPECT_EQ(p.handle1.get(), ready_handle);
  EXPECT_EQ(MOJO_RESULT_OK, ready_result);

  EXPECT_EQ(kTestMessage, ReadMessage(p.handle1));

  ThreadedRunner signal_after_delay(base::Bind(
      [](base::WaitableEvent* event) {
        // Wait a little while, then close the handle.
        base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(200));
        event->Signal();
      },
      &event));
  signal_after_delay.Start();

  wait_set.Wait(&ready_event, &num_ready_handles, &ready_handle, &ready_result);
  EXPECT_EQ(0u, num_ready_handles);
  EXPECT_EQ(&event, ready_event);
}

TEST_F(WaitSetTest, NoStarvation) {
  const char kTestMessage[] = "wait for it";
  const size_t kNumTestPipes = 50;
  const size_t kNumTestEvents = 10;

  // Create a bunch of handles and events which are always ready and add them
  // to a shared WaitSet.

  WaitSet wait_set;

  MessagePipe pipes[kNumTestPipes];
  for (size_t i = 0; i < kNumTestPipes; ++i) {
    WriteMessage(pipes[i].handle0, kTestMessage);
    Wait(pipes[i].handle1.get(), MOJO_HANDLE_SIGNAL_READABLE);

    WriteMessage(pipes[i].handle1, kTestMessage);
    Wait(pipes[i].handle0.get(), MOJO_HANDLE_SIGNAL_READABLE);

    wait_set.AddHandle(pipes[i].handle0.get(), MOJO_HANDLE_SIGNAL_READABLE);
    wait_set.AddHandle(pipes[i].handle1.get(), MOJO_HANDLE_SIGNAL_READABLE);
  }

  std::vector<std::unique_ptr<base::WaitableEvent>> events(kNumTestEvents);
  for (auto& event_ptr : events) {
    event_ptr = std::make_unique<base::WaitableEvent>(
        base::WaitableEvent::ResetPolicy::MANUAL,
        base::WaitableEvent::InitialState::NOT_SIGNALED);
    event_ptr->Signal();
    wait_set.AddEvent(event_ptr.get());
  }

  // Now verify that all handle and event signals are deteceted within a finite
  // number of consecutive Wait() calls. Do it a few times for good measure.
  for (size_t i = 0; i < 3; ++i) {
    std::set<base::WaitableEvent*> ready_events;
    std::set<Handle> ready_handles;
    while (ready_events.size() < kNumTestEvents ||
           ready_handles.size() < kNumTestPipes * 2) {
      base::WaitableEvent* ready_event = nullptr;
      size_t num_ready_handles = 1;
      Handle ready_handle;
      MojoResult ready_result = MOJO_RESULT_UNKNOWN;
      wait_set.Wait(&ready_event, &num_ready_handles, &ready_handle,
                    &ready_result);
      if (ready_event)
        ready_events.insert(ready_event);

      if (num_ready_handles) {
        EXPECT_EQ(1u, num_ready_handles);
        EXPECT_EQ(MOJO_RESULT_OK, ready_result);
        ready_handles.insert(ready_handle);
      }
    }
  }
}

}  // namespace
}  // namespace mojo
