/*
 * Copyright (C) 2023 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "stats_buffer_writer_queue.h"

#include <private/android_filesystem_config.h>
#include <unistd.h>

#include <chrono>
#include <queue>
#include <thread>

#include "stats_buffer_writer_impl.h"
#include "stats_buffer_writer_queue_impl.h"
#include "utils.h"

namespace {
constexpr int32_t kBootTimeEventElapsedTimeAtomId = 240;
}

BufferWriterQueue::BufferWriterQueue() : mWorkThread(&BufferWriterQueue::processCommands, this) {
    pthread_setname_np(mWorkThread.native_handle(), "socket_writer_queue");
}

BufferWriterQueue::~BufferWriterQueue() {
    terminate();
    // at this stage there can be N elements in the queue for which memory needs to be freed
    // explicitly
    drainQueue();
}

bool BufferWriterQueue::write(const uint8_t* buffer, size_t size, uint32_t atomId) {
    Cmd cmd = createWriteBufferCmd(buffer, size, atomId);
    if (cmd.buffer == NULL) {
        return false;
    }
    return pushToQueue(cmd);
}

size_t BufferWriterQueue::getQueueSize() const {
    std::unique_lock<std::mutex> lock(mMutex);
    return mCmdQueue.size();
}

bool BufferWriterQueue::pushToQueue(const Cmd& cmd) {
    {
        std::unique_lock<std::mutex> lock(mMutex);
        if (mCmdQueue.size() >= kQueueMaxSizeLimit) {
            // TODO (b/258003151): add logging info about internal queue overflow with appropriate
            // error code
            return false;
        }
        mCmdQueue.push(cmd);
    }
    mCondition.notify_one();
    return true;
}

BufferWriterQueue::Cmd BufferWriterQueue::createWriteBufferCmd(const uint8_t* buffer, size_t size,
                                                               uint32_t atomId) {
    BufferWriterQueue::Cmd writeCmd;
    writeCmd.atomId = atomId;
    writeCmd.buffer = (uint8_t*)malloc(size);
    if (writeCmd.buffer == NULL) {
        return writeCmd;
    }
    memcpy(writeCmd.buffer, buffer, size);
    writeCmd.size = size;
    return writeCmd;
}

void BufferWriterQueue::terminate() {
    if (mWorkThread.joinable()) {
        mDoTerminate = true;
        Cmd terminateCmd;
        terminateCmd.buffer = NULL;
        pushToQueue(terminateCmd);
        mWorkThread.join();
    }
}

void BufferWriterQueue::drainQueue() {
    std::unique_lock<std::mutex> lock(mMutex);
    while (!mCmdQueue.empty()) {
        free(mCmdQueue.front().buffer);
        mCmdQueue.pop();
    }
}

void BufferWriterQueue::processCommands() {
    while (true) {
        // temporary local thread copy
        Cmd cmd;
        {
            std::unique_lock<std::mutex> lock(mMutex);
            if (mCmdQueue.empty()) {
                mCondition.wait(lock, [this] { return !this->mCmdQueue.empty(); });
            }
            cmd = mCmdQueue.front();
        }

        if (cmd.buffer == NULL) {
            // null buffer ptr used as a marker of the termination request
            return;
        }

        const bool writeSuccess = handleCommand(cmd);
        if (writeSuccess) {
            // no event drop is observed otherwise command remains in the queue
            // and worker thread will try to log later on

            // call free() explicitly here to free memory before the mutex lock
            free(cmd.buffer);
            {
                std::unique_lock<std::mutex> lock(mMutex);
                // this will lead to Cmd destructor call which will be no-op since now the
                // buffer is NULL
                mCmdQueue.pop();
            }
        }
        // TODO (b/258003151): add logging info about retry count

        if (mDoTerminate) {
            return;
        }

        // attempt to enforce the logging frequency constraints
        // in case of failed write due to socket overflow the sleep can be longer
        // to not overload socket continuously
        if (!writeSuccess) {
            std::this_thread::sleep_for(std::chrono::milliseconds(kDelayOnFailedWriteMs));
        }
    }
}

bool BufferWriterQueue::handleCommand(const Cmd& cmd) const {
    // skip log drop if occurs, since the atom remains in the queue and write will be retried
    return write_buffer_to_statsd_impl(cmd.buffer, cmd.size, cmd.atomId, /*doNoteDrop*/ false) > 0;
}

bool write_buffer_to_statsd_queue(const uint8_t* buffer, size_t size, uint32_t atomId) {
    static BufferWriterQueue queue;
    return queue.write(buffer, size, atomId);
}

bool should_write_via_queue(uint32_t atomId) {
    // bootstats is very short living process - queue does not have sufficient
    // time to be drained entirely so writing this atom straight to socket
    if (atomId == kBootTimeEventElapsedTimeAtomId) {
        return false;
    }

    const uint32_t appUid = getuid();

    // hard-coded push all system server atoms to queue
    if (appUid == AID_SYSTEM) {
        return true;
    }

#ifdef ENABLE_BENCHMARK_SUPPORT
    // some hand-picked atoms to be pushed into the queue
    switch (atomId) {
        case 47:  // APP_BREADCRUMB_REPORTED for statsd_benchmark purpose
            return true;
        default:
            return false;
    }
#endif  // ENABLE_BENCHMARK_SUPPORT
    return false;
}
