/*
 * Copyright (C) 2021 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "snapuserd_readahead.h"

#include <pthread.h>

#include "android-base/properties.h"
#include "snapuserd_core.h"
#include "utility.h"

namespace android {
namespace snapshot {

using namespace android;
using namespace android::dm;
using android::base::unique_fd;

ReadAhead::ReadAhead(const std::string& cow_device, const std::string& backing_device,
                     const std::string& misc_name, std::shared_ptr<SnapshotHandler> snapuserd,
                     uint32_t cow_op_merge_size) {
    cow_device_ = cow_device;
    backing_store_device_ = backing_device;
    misc_name_ = misc_name;
    snapuserd_ = snapuserd;
    cow_op_merge_size_ = cow_op_merge_size;
}

void ReadAhead::CheckOverlap(const CowOperation* cow_op) {
    uint64_t source_offset;
    if (!reader_->GetSourceOffset(cow_op, &source_offset)) {
        SNAP_LOG(ERROR) << "ReadAhead operation has no source offset: " << *cow_op;
        return;
    }

    uint64_t source_block = GetBlockFromOffset(header_, source_offset);
    bool misaligned = (GetBlockRelativeOffset(header_, source_offset) != 0);

    if (dest_blocks_.count(cow_op->new_block) || source_blocks_.count(source_block) ||
        (misaligned && source_blocks_.count(source_block + 1))) {
        overlap_ = true;
    }

    dest_blocks_.insert(source_block);
    if (source_offset > 0) {
        dest_blocks_.insert(source_block + 1);
    }
    source_blocks_.insert(cow_op->new_block);
}

int ReadAhead::PrepareNextReadAhead(uint64_t* source_offset, int* pending_ops,
                                    std::vector<uint64_t>& blocks,
                                    std::vector<const CowOperation*>& xor_op_vec) {
    int num_ops = *pending_ops;
    if (cow_op_merge_size_ != 0) {
        num_ops = std::min(static_cast<int>(cow_op_merge_size_), *pending_ops);
    }

    int nr_consecutive = 0;
    bool is_ops_present = (!RAIterDone() && num_ops);

    if (!is_ops_present) {
        return nr_consecutive;
    }

    // Get the first block with offset
    const CowOperation* cow_op = GetRAOpIter();

    if (!reader_->GetSourceOffset(cow_op, source_offset)) {
        SNAP_LOG(ERROR) << "PrepareNextReadAhead operation has no source offset: " << *cow_op;
        return nr_consecutive;
    }
    if (cow_op->type() == kCowXorOp) {
        xor_op_vec.push_back(cow_op);
    }

    RAIterNext();
    num_ops -= 1;
    nr_consecutive = 1;
    blocks.push_back(cow_op->new_block);

    if (!overlap_) {
        CheckOverlap(cow_op);
    }

    /*
     * Find number of consecutive blocks
     */
    while (!RAIterDone() && num_ops) {
        const CowOperation* op = GetRAOpIter();
        uint64_t next_offset;
        if (!reader_->GetSourceOffset(op, &next_offset)) {
            SNAP_LOG(ERROR) << "PrepareNextReadAhead operation has no source offset: " << *cow_op;
            break;
        }

        // Check for consecutive blocks
        if (next_offset != (*source_offset + nr_consecutive * BLOCK_SZ)) {
            break;
        }

        if (op->type() == kCowXorOp) {
            xor_op_vec.push_back(op);
        }

        nr_consecutive += 1;
        num_ops -= 1;
        blocks.push_back(op->new_block);
        RAIterNext();

        if (!overlap_) {
            CheckOverlap(op);
        }
    }

    return nr_consecutive;
}

class [[nodiscard]] AutoNotifyReadAheadFailed {
  public:
    AutoNotifyReadAheadFailed(std::shared_ptr<SnapshotHandler> snapuserd) : snapuserd_(snapuserd) {}

    ~AutoNotifyReadAheadFailed() {
        if (cancelled_) {
            return;
        }
        snapuserd_->ReadAheadIOFailed();
    }

    void Cancel() { cancelled_ = true; }

  private:
    std::shared_ptr<SnapshotHandler> snapuserd_;
    bool cancelled_ = false;
};

bool ReadAhead::ReconstructDataFromCow() {
    std::unordered_map<uint64_t, void*>& read_ahead_buffer_map = snapuserd_->GetReadAheadMap();
    loff_t metadata_offset = 0;
    loff_t start_data_offset = snapuserd_->GetBufferDataOffset();
    int num_ops = 0;
    int total_blocks_merged = 0;

    // This memcpy is important as metadata_buffer_ will be an unaligned address and will fault
    // on 32-bit systems
    std::unique_ptr<uint8_t[]> metadata_buffer =
            std::make_unique<uint8_t[]>(snapuserd_->GetBufferMetadataSize());
    memcpy(metadata_buffer.get(), metadata_buffer_, snapuserd_->GetBufferMetadataSize());

    while (true) {
        struct ScratchMetadata* bm = reinterpret_cast<struct ScratchMetadata*>(
                (char*)metadata_buffer.get() + metadata_offset);

        // Done reading metadata
        if (bm->new_block == 0 && bm->file_offset == 0) {
            break;
        }

        loff_t buffer_offset = bm->file_offset - start_data_offset;
        void* bufptr = static_cast<void*>((char*)read_ahead_buffer_ + buffer_offset);
        read_ahead_buffer_map[bm->new_block] = bufptr;
        num_ops += 1;
        total_blocks_merged += 1;

        metadata_offset += sizeof(struct ScratchMetadata);
    }

    AutoNotifyReadAheadFailed notify_read_ahead_failed(snapuserd_);

    // We are done re-constructing the mapping; however, we need to make sure
    // all the COW operations to-be merged are present in the re-constructed
    // mapping.
    while (!RAIterDone()) {
        const CowOperation* op = GetRAOpIter();
        if (read_ahead_buffer_map.find(op->new_block) != read_ahead_buffer_map.end()) {
            num_ops -= 1;
            RAIterNext();
            continue;
        }

        // Verify that we have covered all the ops which were re-constructed
        // from COW device - These are the ops which are being
        // re-constructed after crash.
        if (!(num_ops == 0)) {
            SNAP_LOG(ERROR) << "ReconstructDataFromCow failed. Not all ops recoverd "
                            << " Pending ops: " << num_ops;
            return false;
        }

        break;
    }

    snapuserd_->SetMergedBlockCountForNextCommit(total_blocks_merged);

    snapuserd_->FinishReconstructDataFromCow();

    if (!snapuserd_->ReadAheadIOCompleted(true)) {
        SNAP_LOG(ERROR) << "ReadAheadIOCompleted failed...";
        return false;
    }

    snapuserd_->RaThreadStarted();
    SNAP_LOG(INFO) << "ReconstructDataFromCow success";
    notify_read_ahead_failed.Cancel();
    return true;
}

/*
 * With io_uring, the data flow is slightly different.
 *
 * The data flow is as follows:
 *
 * 1: Queue the I/O requests to be read from backing source device.
 * This is done by retrieving the SQE entry from ring and populating
 * the SQE entry. Note that the I/O is not submitted yet.
 *
 * 2: Once the ring is full (aka queue_depth), we will submit all
 * the queued I/O request with a single system call. This essentially
 * cuts down "queue_depth" number of system calls to a single system call.
 *
 * 3: Once the I/O is submitted, user-space thread will now work
 * on processing the XOR Operations. This happens in parallel when
 * I/O requests are submitted to the kernel. This is ok because, for XOR
 * operations, we first need to retrieve the compressed data form COW block
 * device. Thus, we have offloaded the backing source I/O to the kernel
 * and user-space is parallely working on fetching the data for XOR operations.
 *
 * 4: After the XOR operations are read from COW device, poll the completion
 * queue for all the I/O submitted. If the I/O's were already completed,
 * then user-space thread will just read the CQE requests from the ring
 * without doing any system call. If none of the I/O were completed yet,
 * user-space thread will do a system call and wait for I/O completions.
 *
 * Flow diagram:
 *                                                    SQ-RING
 *  SQE1 <----------- Fetch SQE1 Entry ---------- |SQE1||SQE2|SQE3|
 *
 *  SQE1  ------------ Populate SQE1 Entry ------> |SQE1-X||SQE2|SQE3|
 *
 *  SQE2 <----------- Fetch SQE2 Entry ---------- |SQE1-X||SQE2|SQE3|
 *
 *  SQE2  ------------ Populate SQE2 Entry ------> |SQE1-X||SQE2-X|SQE3|
 *
 *  SQE3 <----------- Fetch SQE3 Entry ---------- |SQE1-X||SQE2-X|SQE3|
 *
 *  SQE3  ------------ Populate SQE3 Entry ------> |SQE1-X||SQE2-X|SQE3-X|
 *
 *  Submit-IO ---------------------------------> |SQE1-X||SQE2-X|SQE3-X|
 *     |                                                  |
 *     |                                        Process I/O entries in kernel
 *     |                                                  |
 *  Retrieve XOR                                          |
 *  data from COW                                         |
 *     |                                                  |
 *     |                                                  |
 *  Fetch CQ completions
 *     |                                              CQ-RING
 *                                               |CQE1-X||CQE2-X|CQE3-X|
 *                                                        |
 *   CQE1 <------------Fetch CQE1 Entry          |CQE1||CQE2-X|CQE3-X|
 *   CQE2 <------------Fetch CQE2 Entry          |CQE1||CQE2-|CQE3-X|
 *   CQE3 <------------Fetch CQE3 Entry          |CQE1||CQE2-|CQE3-|
 *    |
 *    |
 *  Continue Next set of operations in the RING
 */

bool ReadAhead::ReadAheadAsyncIO() {
    int num_ops = (snapuserd_->GetBufferDataSize()) / BLOCK_SZ;
    loff_t buffer_offset = 0;
    total_blocks_merged_ = 0;
    overlap_ = false;
    dest_blocks_.clear();
    source_blocks_.clear();
    blocks_.clear();
    std::vector<const CowOperation*> xor_op_vec;

    int pending_sqe = queue_depth_;
    int pending_ios_to_submit = 0;

    size_t xor_op_index = 0;
    size_t block_index = 0;

    loff_t offset = 0;

    bufsink_.ResetBufferOffset();

    // Number of ops to be merged in this window. This is a fixed size
    // except for the last window wherein the number of ops can be less
    // than the size of the RA window.
    while (num_ops) {
        uint64_t source_offset;
        struct io_uring_sqe* sqe;

        int linear_blocks = PrepareNextReadAhead(&source_offset, &num_ops, blocks_, xor_op_vec);

        if (linear_blocks != 0) {
            size_t io_size = (linear_blocks * BLOCK_SZ);

            // Get an SQE entry from the ring and populate the I/O variables
            sqe = io_uring_get_sqe(ring_.get());
            if (!sqe) {
                SNAP_PLOG(ERROR) << "io_uring_get_sqe failed during read-ahead";
                return false;
            }

            io_uring_prep_read(sqe, backing_store_fd_.get(),
                               (char*)ra_temp_buffer_.get() + buffer_offset, io_size,
                               source_offset);

            buffer_offset += io_size;
            num_ops -= linear_blocks;
            total_blocks_merged_ += linear_blocks;

            pending_sqe -= 1;
            pending_ios_to_submit += 1;
            sqe->flags |= IOSQE_ASYNC;
        }

        // pending_sqe == 0 : Ring is full
        //
        // num_ops == 0 : All the COW ops in this batch are processed - Submit
        // pending I/O requests in the ring
        //
        // linear_blocks == 0 : All the COW ops processing is done. Submit
        // pending I/O requests in the ring
        if (pending_sqe == 0 || num_ops == 0 || (linear_blocks == 0 && pending_ios_to_submit)) {
            // Submit the IO for all the COW ops in a single syscall
            int ret = io_uring_submit(ring_.get());
            if (ret != pending_ios_to_submit) {
                SNAP_PLOG(ERROR) << "io_uring_submit failed for read-ahead: "
                                 << " io submit: " << ret << " expected: " << pending_ios_to_submit;
                return false;
            }

            int pending_ios_to_complete = pending_ios_to_submit;
            pending_ios_to_submit = 0;

            bool xor_processing_required = (xor_op_vec.size() > 0);

            // Read XOR data from COW file in parallel when I/O's are in-flight
            if (xor_processing_required && !ReadXorData(block_index, xor_op_index, xor_op_vec)) {
                SNAP_LOG(ERROR) << "ReadXorData failed";
                return false;
            }

            // Fetch I/O completions
            if (!ReapIoCompletions(pending_ios_to_complete)) {
                SNAP_LOG(ERROR) << "ReapIoCompletions failed";
                return false;
            }

            // Retrieve XOR'ed data
            if (xor_processing_required) {
                ProcessXorData(block_index, xor_op_index, xor_op_vec, ra_temp_buffer_.get(),
                               offset);
            }

            // All the I/O in the ring is processed.
            pending_sqe = queue_depth_;
        }

        if (linear_blocks == 0) {
            break;
        }
    }

    // Done with merging ordered ops
    if (RAIterDone() && total_blocks_merged_ == 0) {
        return true;
    }

    CHECK(blocks_.size() == total_blocks_merged_);

    UpdateScratchMetadata();

    return true;
}

void ReadAhead::UpdateScratchMetadata() {
    loff_t metadata_offset = 0;

    struct ScratchMetadata* bm = reinterpret_cast<struct ScratchMetadata*>(
            (char*)ra_temp_meta_buffer_.get() + metadata_offset);

    bm->new_block = 0;
    bm->file_offset = 0;

    loff_t file_offset = snapuserd_->GetBufferDataOffset();

    for (size_t block_index = 0; block_index < blocks_.size(); block_index++) {
        uint64_t new_block = blocks_[block_index];
        // Track the metadata blocks which are stored in scratch space
        bm = reinterpret_cast<struct ScratchMetadata*>((char*)ra_temp_meta_buffer_.get() +
                                                       metadata_offset);

        bm->new_block = new_block;
        bm->file_offset = file_offset;

        metadata_offset += sizeof(struct ScratchMetadata);
        file_offset += BLOCK_SZ;
    }

    // This is important - explicitly set the contents to zero. This is used
    // when re-constructing the data after crash. This indicates end of
    // reading metadata contents when re-constructing the data
    bm = reinterpret_cast<struct ScratchMetadata*>((char*)ra_temp_meta_buffer_.get() +
                                                   metadata_offset);
    bm->new_block = 0;
    bm->file_offset = 0;
}

bool ReadAhead::ReapIoCompletions(int pending_ios_to_complete) {
    bool status = true;

    // Reap I/O completions
    while (pending_ios_to_complete) {
        struct io_uring_cqe* cqe;

        // io_uring_wait_cqe can potentially return -EAGAIN or -EINTR;
        // these error codes are not truly I/O errors; we can retry them
        // by re-populating the SQE entries and submitting the I/O
        // request back. However, we don't do that now; instead we
        // will fallback to synchronous I/O.
        int ret = io_uring_wait_cqe(ring_.get(), &cqe);
        if (ret) {
            SNAP_LOG(ERROR) << "Read-ahead - io_uring_wait_cqe failed: " << strerror(-ret);
            status = false;
            break;
        }

        if (cqe->res < 0) {
            SNAP_LOG(ERROR) << "Read-ahead - io_uring_Wait_cqe failed with res: " << cqe->res;
            status = false;
            break;
        }

        io_uring_cqe_seen(ring_.get(), cqe);
        pending_ios_to_complete -= 1;
    }

    return status;
}

void ReadAhead::ProcessXorData(size_t& block_xor_index, size_t& xor_index,
                               std::vector<const CowOperation*>& xor_op_vec, void* buffer,
                               loff_t& buffer_offset) {
    using WordType = std::conditional_t<sizeof(void*) == sizeof(uint64_t), uint64_t, uint32_t>;
    loff_t xor_buf_offset = 0;

    while (block_xor_index < blocks_.size()) {
        void* bufptr = static_cast<void*>((char*)buffer + buffer_offset);
        uint64_t new_block = blocks_[block_xor_index];

        if (xor_index < xor_op_vec.size()) {
            const CowOperation* xor_op = xor_op_vec[xor_index];

            // Check if this block is an XOR op
            if (xor_op->new_block == new_block) {
                // Pointer to the data read from base device
                auto buffer_words = reinterpret_cast<WordType*>(bufptr);
                // Get the xor'ed data read from COW device
                auto xor_data_words = reinterpret_cast<WordType*>(
                        (char*)bufsink_.GetPayloadBufPtr() + xor_buf_offset);
                auto num_words = BLOCK_SZ / sizeof(WordType);

                for (auto i = 0; i < num_words; i++) {
                    buffer_words[i] ^= xor_data_words[i];
                }

                // Move to next XOR op
                xor_index += 1;
                xor_buf_offset += BLOCK_SZ;
            }
        }

        buffer_offset += BLOCK_SZ;
        block_xor_index += 1;
    }

    bufsink_.ResetBufferOffset();
}

bool ReadAhead::ReadXorData(size_t block_index, size_t xor_op_index,
                            std::vector<const CowOperation*>& xor_op_vec) {
    // Process the XOR ops in parallel - We will be reading data
    // from COW file for XOR ops processing.
    while (block_index < blocks_.size()) {
        uint64_t new_block = blocks_[block_index];

        if (xor_op_index < xor_op_vec.size()) {
            const CowOperation* xor_op = xor_op_vec[xor_op_index];
            if (xor_op->new_block == new_block) {
                void* buffer = bufsink_.AcquireBuffer(BLOCK_SZ);
                if (!buffer) {
                    SNAP_LOG(ERROR) << "ReadAhead - failed to allocate buffer for block: "
                                    << xor_op->new_block;
                    return false;
                }
                if (ssize_t rv = reader_->ReadData(xor_op, buffer, BLOCK_SZ); rv != BLOCK_SZ) {
                    SNAP_LOG(ERROR)
                            << " ReadAhead - XorOp Read failed for block: " << xor_op->new_block
                            << ", return value: " << rv;
                    return false;
                }

                xor_op_index += 1;
            }
        }
        block_index += 1;
    }
    return true;
}

bool ReadAhead::ReadAheadSyncIO() {
    int num_ops = (snapuserd_->GetBufferDataSize()) / BLOCK_SZ;
    loff_t buffer_offset = 0;
    total_blocks_merged_ = 0;
    overlap_ = false;
    dest_blocks_.clear();
    source_blocks_.clear();
    blocks_.clear();
    std::vector<const CowOperation*> xor_op_vec;

    AutoNotifyReadAheadFailed notify_read_ahead_failed(snapuserd_);

    bufsink_.ResetBufferOffset();

    // Number of ops to be merged in this window. This is a fixed size
    // except for the last window wherein the number of ops can be less
    // than the size of the RA window.
    while (num_ops) {
        uint64_t source_offset;

        int linear_blocks = PrepareNextReadAhead(&source_offset, &num_ops, blocks_, xor_op_vec);
        if (linear_blocks == 0) {
            // No more blocks to read
            SNAP_LOG(DEBUG) << " Read-ahead completed....";
            break;
        }

        size_t io_size = (linear_blocks * BLOCK_SZ);

        // Read from the base device consecutive set of blocks in one shot
        if (!android::base::ReadFullyAtOffset(backing_store_fd_,
                                              (char*)ra_temp_buffer_.get() + buffer_offset, io_size,
                                              source_offset)) {
            SNAP_PLOG(ERROR) << "Ordered-op failed. Read from backing store: "
                             << backing_store_device_ << "at block :" << source_offset / BLOCK_SZ
                             << " offset :" << source_offset % BLOCK_SZ
                             << " buffer_offset : " << buffer_offset << " io_size : " << io_size
                             << " buf-addr : " << read_ahead_buffer_;
            return false;
        }

        buffer_offset += io_size;
        total_blocks_merged_ += linear_blocks;
        num_ops -= linear_blocks;
    }

    // Done with merging ordered ops
    if (RAIterDone() && total_blocks_merged_ == 0) {
        notify_read_ahead_failed.Cancel();
        return true;
    }

    loff_t metadata_offset = 0;

    struct ScratchMetadata* bm = reinterpret_cast<struct ScratchMetadata*>(
            (char*)ra_temp_meta_buffer_.get() + metadata_offset);

    bm->new_block = 0;
    bm->file_offset = 0;

    loff_t file_offset = snapuserd_->GetBufferDataOffset();

    loff_t offset = 0;
    CHECK(blocks_.size() == total_blocks_merged_);

    size_t xor_index = 0;
    BufferSink bufsink;
    bufsink.Initialize(BLOCK_SZ * 2);

    for (size_t block_index = 0; block_index < blocks_.size(); block_index++) {
        void* bufptr = static_cast<void*>((char*)ra_temp_buffer_.get() + offset);
        uint64_t new_block = blocks_[block_index];

        if (xor_index < xor_op_vec.size()) {
            const CowOperation* xor_op = xor_op_vec[xor_index];

            // Check if this block is an XOR op
            if (xor_op->new_block == new_block) {
                // Read the xor'ed data from COW
                void* buffer = bufsink.GetPayloadBuffer(BLOCK_SZ);
                if (!buffer) {
                    SNAP_LOG(ERROR) << "ReadAhead - failed to allocate buffer";
                    return false;
                }
                if (ssize_t rv = reader_->ReadData(xor_op, buffer, BLOCK_SZ); rv != BLOCK_SZ) {
                    SNAP_LOG(ERROR)
                            << " ReadAhead - XorOp Read failed for block: " << xor_op->new_block
                            << ", return value: " << rv;
                    return false;
                }
                // Pointer to the data read from base device
                uint8_t* read_buffer = reinterpret_cast<uint8_t*>(bufptr);
                // Get the xor'ed data read from COW device
                uint8_t* xor_data = reinterpret_cast<uint8_t*>(bufsink.GetPayloadBufPtr());

                // Retrieve the original data
                for (size_t byte_offset = 0; byte_offset < BLOCK_SZ; byte_offset++) {
                    read_buffer[byte_offset] ^= xor_data[byte_offset];
                }

                // Move to next XOR op
                xor_index += 1;
            }
        }

        offset += BLOCK_SZ;
        // Track the metadata blocks which are stored in scratch space
        bm = reinterpret_cast<struct ScratchMetadata*>((char*)ra_temp_meta_buffer_.get() +
                                                       metadata_offset);

        bm->new_block = new_block;
        bm->file_offset = file_offset;

        metadata_offset += sizeof(struct ScratchMetadata);
        file_offset += BLOCK_SZ;
    }

    // Verify if all the xor blocks were scanned to retrieve the original data
    CHECK(xor_index == xor_op_vec.size());

    // This is important - explicitly set the contents to zero. This is used
    // when re-constructing the data after crash. This indicates end of
    // reading metadata contents when re-constructing the data
    bm = reinterpret_cast<struct ScratchMetadata*>((char*)ra_temp_meta_buffer_.get() +
                                                   metadata_offset);
    bm->new_block = 0;
    bm->file_offset = 0;

    notify_read_ahead_failed.Cancel();
    return true;
}

bool ReadAhead::ReadAheadIOStart() {
    // Check if the data has to be constructed from the COW file.
    // This will be true only once during boot up after a crash
    // during merge.
    if (snapuserd_->ShouldReconstructDataFromCow()) {
        return ReconstructDataFromCow();
    }

    bool retry = false;
    bool ra_status;

    // Start Async read-ahead
    if (read_ahead_async_) {
        ra_status = ReadAheadAsyncIO();
        if (!ra_status) {
            SNAP_LOG(ERROR) << "ReadAheadAsyncIO failed - Falling back synchronous I/O";
            FinalizeIouring();
            RAResetIter(total_blocks_merged_);
            retry = true;
            read_ahead_async_ = false;
        }
    }

    // Check if we need to fallback and retry the merge
    //
    // If the device doesn't support async operations, we
    // will directly enter here (aka devices with 4.x kernels)

    const bool ra_sync_required = (retry || !read_ahead_async_);

    if (ra_sync_required) {
        ra_status = ReadAheadSyncIO();
        if (!ra_status) {
            SNAP_LOG(ERROR) << "ReadAheadSyncIO failed";
            return false;
        }
    }

    SNAP_LOG(DEBUG) << "Read-ahead: total_ra_blocks_merged: " << total_ra_blocks_completed_;

    // Wait for the merge to finish for the previous RA window. We shouldn't
    // be touching the scratch space until merge is complete of previous RA
    // window. If there is a crash during this time frame, merge should resume
    // based on the contents of the scratch space.
    if (!snapuserd_->WaitForMergeReady()) {
        SNAP_LOG(VERBOSE) << "ReadAhead failed to wait for merge ready";
        return false;
    }

    // Acquire buffer lock before doing memcpy to the scratch buffer. Although,
    // by now snapshot-merge thread shouldn't be working on this scratch space
    // but we take additional measure to ensure that the buffer is not being
    // used by the merge thread at this point. see b/377819507
    {
        std::lock_guard<std::mutex> buffer_lock(snapuserd_->GetBufferLock());
        // Copy the data to scratch space
        memcpy(metadata_buffer_, ra_temp_meta_buffer_.get(), snapuserd_->GetBufferMetadataSize());
        memcpy(read_ahead_buffer_, ra_temp_buffer_.get(), total_blocks_merged_ * BLOCK_SZ);

        loff_t offset = 0;
        std::unordered_map<uint64_t, void*>& read_ahead_buffer_map = snapuserd_->GetReadAheadMap();
        read_ahead_buffer_map.clear();

        for (size_t block_index = 0; block_index < blocks_.size(); block_index++) {
            void* bufptr = static_cast<void*>((char*)read_ahead_buffer_ + offset);
            uint64_t new_block = blocks_[block_index];

            read_ahead_buffer_map[new_block] = bufptr;
            offset += BLOCK_SZ;
        }

        total_ra_blocks_completed_ += total_blocks_merged_;
        snapuserd_->SetMergedBlockCountForNextCommit(total_blocks_merged_);
    }

    // Flush the scratch data - Technically, we should flush only for overlapping
    // blocks; However, since this region is mmap'ed, the dirty pages can still
    // get flushed to disk at any random point in time. Instead, make sure
    // the data in scratch is in the correct state before merge thread resumes.
    //
    // Notify the Merge thread to resume merging this window
    if (!snapuserd_->ReadAheadIOCompleted(true)) {
        SNAP_LOG(ERROR) << "ReadAheadIOCompleted failed...";
        snapuserd_->ReadAheadIOFailed();
        return false;
    }

    return true;
}

bool ReadAhead::InitializeIouring() {
    if (!snapuserd_->IsIouringSupported()) {
        return false;
    }

    ring_ = std::make_unique<struct io_uring>();

    int ret = io_uring_queue_init(queue_depth_, ring_.get(), 0);
    if (ret) {
        SNAP_LOG(ERROR) << "io_uring_queue_init failed with ret: " << ret;
        return false;
    }

    // For xor ops processing
    bufsink_.Initialize(PAYLOAD_BUFFER_SZ * 2);
    read_ahead_async_ = true;

    SNAP_LOG(INFO) << "Read-ahead: io_uring initialized with queue depth: " << queue_depth_;
    return true;
}

void ReadAhead::FinalizeIouring() {
    if (read_ahead_async_) {
        io_uring_queue_exit(ring_.get());
    }
}

bool ReadAhead::RunThread() {
    SNAP_LOG(INFO) << "ReadAhead thread started.";

    pthread_setname_np(pthread_self(), "ReadAhead");

    if (!InitializeFds()) {
        return false;
    }

    InitializeBuffer();

    if (!InitReader()) {
        return false;
    }

    InitializeRAIter();

    InitializeIouring();

    if (!SetThreadPriority(ANDROID_PRIORITY_BACKGROUND)) {
        SNAP_PLOG(ERROR) << "Failed to set thread priority";
    }

    if (!SetProfiles({"CPUSET_SP_BACKGROUND"})) {
        SNAP_PLOG(ERROR) << "Failed to assign task profile to readahead thread";
    }

    SNAP_LOG(INFO) << "ReadAhead processing.";
    while (!RAIterDone()) {
        if (!ReadAheadIOStart()) {
            break;
        }
    }

    FinalizeIouring();
    CloseFds();
    reader_->CloseCowFd();

    SNAP_LOG(INFO) << " ReadAhead thread terminating.";
    return true;
}

// Initialization
bool ReadAhead::InitializeFds() {
    backing_store_fd_.reset(open(backing_store_device_.c_str(), O_RDONLY));
    if (backing_store_fd_ < 0) {
        SNAP_PLOG(ERROR) << "Open Failed: " << backing_store_device_;
        return false;
    }

    cow_fd_.reset(open(cow_device_.c_str(), O_RDWR));
    if (cow_fd_ < 0) {
        SNAP_PLOG(ERROR) << "Open Failed: " << cow_device_;
        return false;
    }

    return true;
}

bool ReadAhead::InitReader() {
    reader_ = snapuserd_->CloneReaderForWorker();

    if (!reader_->InitForMerge(std::move(cow_fd_))) {
        return false;
    }
    header_ = reader_->GetHeader();
    return true;
}

void ReadAhead::InitializeRAIter() {
    cowop_iter_ = reader_->GetOpIter(true);
}

bool ReadAhead::RAIterDone() {
    if (cowop_iter_->AtEnd()) {
        return true;
    }

    const CowOperation* cow_op = GetRAOpIter();

    if (!IsOrderedOp(*cow_op)) {
        return true;
    }

    return false;
}

void ReadAhead::RAIterNext() {
    cowop_iter_->Next();
}

void ReadAhead::RAResetIter(uint64_t num_blocks) {
    while (num_blocks && !cowop_iter_->AtBegin()) {
        cowop_iter_->Prev();
        num_blocks -= 1;
    }
}

const CowOperation* ReadAhead::GetRAOpIter() {
    return cowop_iter_->Get();
}

void ReadAhead::InitializeBuffer() {
    void* mapped_addr = snapuserd_->GetMappedAddr();
    // Map the scratch space region into memory
    metadata_buffer_ =
            static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferMetadataOffset());
    read_ahead_buffer_ = static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());

    ra_temp_buffer_ = std::make_unique<uint8_t[]>(snapuserd_->GetBufferDataSize());
    ra_temp_meta_buffer_ = std::make_unique<uint8_t[]>(snapuserd_->GetBufferMetadataSize());
}

}  // namespace snapshot
}  // namespace android
