/*
 *    Copyright (c) 2021, The OpenThread Authors.
 *    All rights reserved.
 *
 *    Redistribution and use in source and binary forms, with or without
 *    modification, are permitted provided that the following conditions are met:
 *    1. Redistributions of source code must retain the above copyright
 *       notice, this list of conditions and the following disclaimer.
 *    2. Redistributions in binary form must reproduce the above copyright
 *       notice, this list of conditions and the following disclaimer in the
 *       documentation and/or other materials provided with the distribution.
 *    3. Neither the name of the copyright holder nor the
 *       names of its contributors may be used to endorse or promote products
 *       derived from this software without specific prior written permission.
 *
 *    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 *    AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 *    IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 *    ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
 *    LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 *    CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 *    SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 *    INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 *    CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 *    ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 *    POSSIBILITY OF SUCH DAMAGE.
 */

/**
 * @file
 * This file implements the Task Runner that executes tasks on the mainloop.
 */

#include "common/task_runner.hpp"

#include <algorithm>

#include <fcntl.h>
#include <unistd.h>

#include "common/code_utils.hpp"

namespace otbr {

TaskRunner::TaskRunner(void)
    : mTaskQueue(DelayedTask::Comparator{})
{
    int flags;

    // We do not handle failures when creating a pipe, simply die.
    VerifyOrDie(pipe(mEventFd) != -1, strerror(errno));

    flags = fcntl(mEventFd[kRead], F_GETFL, 0);
    VerifyOrDie(fcntl(mEventFd[kRead], F_SETFL, flags | O_NONBLOCK) != -1, strerror(errno));
    flags = fcntl(mEventFd[kWrite], F_GETFL, 0);
    VerifyOrDie(fcntl(mEventFd[kWrite], F_SETFL, flags | O_NONBLOCK) != -1, strerror(errno));
}

TaskRunner::~TaskRunner(void)
{
    if (mEventFd[kRead] != -1)
    {
        close(mEventFd[kRead]);
        mEventFd[kRead] = -1;
    }
    if (mEventFd[kWrite] != -1)
    {
        close(mEventFd[kWrite]);
        mEventFd[kWrite] = -1;
    }
}

void TaskRunner::Post(Task<void> aTask)
{
    Post(Milliseconds::zero(), std::move(aTask));
}

TaskRunner::TaskId TaskRunner::Post(Milliseconds aDelay, Task<void> aTask)
{
    return PushTask(aDelay, std::move(aTask));
}

void TaskRunner::Update(MainloopContext &aMainloop)
{
    aMainloop.AddFdToReadSet(mEventFd[kRead]);

    {
        std::lock_guard<std::mutex> _(mTaskQueueMutex);

        if (!mTaskQueue.empty())
        {
            auto  now     = Clock::now();
            auto &task    = mTaskQueue.top();
            auto  delay   = std::chrono::duration_cast<Microseconds>(task.GetTimeExecute() - now);
            auto  timeout = FromTimeval<Microseconds>(aMainloop.mTimeout);

            if (task.GetTimeExecute() < now)
            {
                delay = Microseconds::zero();
            }

            if (delay <= timeout)
            {
                aMainloop.mTimeout.tv_sec  = delay.count() / 1000000;
                aMainloop.mTimeout.tv_usec = delay.count() % 1000000;
            }
        }
    }
}

void TaskRunner::Process(const MainloopContext &aMainloop)
{
    OTBR_UNUSED_VARIABLE(aMainloop);

    ssize_t rval;

    // Read any data in the pipe.
    do
    {
        uint8_t n;

        rval = read(mEventFd[kRead], &n, sizeof(n));
    } while (rval > 0 || (rval == -1 && errno == EINTR));

    // Critical error happens, simply die.
    VerifyOrDie(errno == EAGAIN || errno == EWOULDBLOCK, strerror(errno));

    PopTasks();
}

TaskRunner::TaskId TaskRunner::PushTask(Milliseconds aDelay, Task<void> aTask)
{
    ssize_t       rval;
    const uint8_t kOne = 1;
    TaskId        taskId;

    {
        std::lock_guard<std::mutex> _(mTaskQueueMutex);

        taskId = mNextTaskId++;

        mActiveTaskIds.insert(taskId);
        mTaskQueue.emplace(taskId, aDelay, std::move(aTask));
    }

    do
    {
        rval = write(mEventFd[kWrite], &kOne, sizeof(kOne));
    } while (rval == -1 && errno == EINTR);

    VerifyOrExit(rval == -1);

    // Critical error happens, simply die.
    VerifyOrDie(errno == EAGAIN || errno == EWOULDBLOCK, strerror(errno));

    // We are blocked because there are already data (written by other concurrent callers in
    // different threads) in the pipe, and the mEventFd[kRead] should be readable now.
    otbrLogWarning("Failed to write fd %d: %s", mEventFd[kWrite], strerror(errno));

exit:
    return taskId;
}

void TaskRunner::Cancel(TaskRunner::TaskId aTaskId)
{
    std::lock_guard<std::mutex> _(mTaskQueueMutex);

    mActiveTaskIds.erase(aTaskId);
}

void TaskRunner::PopTasks(void)
{
    while (true)
    {
        Task<void> task;
        bool       canceled;

        // The braces here are necessary for auto-releasing of the mutex.
        {
            std::lock_guard<std::mutex> _(mTaskQueueMutex);

            if (!mTaskQueue.empty() && mTaskQueue.top().GetTimeExecute() <= Clock::now())
            {
                const DelayedTask &top    = mTaskQueue.top();
                TaskId             taskId = top.mTaskId;

                task = std::move(top.mTask);
                mTaskQueue.pop();
                canceled = (mActiveTaskIds.erase(taskId) == 0);
            }
            else
            {
                break;
            }
        }

        if (!canceled)
        {
            task();
        }
    }
}

} // namespace otbr
