/*
 * Copyright 2014 The gRPC Authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package io.grpc.internal;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.InvalidMarkException;
import java.util.ArrayDeque;
import java.util.Deque;
import javax.annotation.Nullable;

/**
 * A {@link ReadableBuffer} that is composed of 0 or more {@link ReadableBuffer}s. This provides a
 * facade that allows multiple buffers to be treated as one.
 *
 * <p>When a buffer is added to a composite, its life cycle is controlled by the composite. Once
 * the composite has read past the end of a given buffer, that buffer is automatically closed and
 * removed from the composite.
 */
public class CompositeReadableBuffer extends AbstractReadableBuffer {

  private final Deque<ReadableBuffer> readableBuffers;
  private Deque<ReadableBuffer> rewindableBuffers;
  private int readableBytes;
  private boolean marked;

  public CompositeReadableBuffer(int initialCapacity) {
    readableBuffers = new ArrayDeque<>(initialCapacity);
  }

  public CompositeReadableBuffer() {
    readableBuffers = new ArrayDeque<>();
  }

  /**
   * Adds a new {@link ReadableBuffer} at the end of the buffer list. After a buffer is added, it is
   * expected that this {@code CompositeBuffer} has complete ownership. Any attempt to modify the
   * buffer (i.e. modifying the readable bytes) may result in corruption of the internal state of
   * this {@code CompositeBuffer}.
   */
  public void addBuffer(ReadableBuffer buffer) {
    boolean markHead = marked && readableBuffers.isEmpty();
    enqueueBuffer(buffer);
    if (markHead) {
      readableBuffers.peek().mark();
    }
  }

  private void enqueueBuffer(ReadableBuffer buffer) {
    if (!(buffer instanceof CompositeReadableBuffer)) {
      readableBuffers.add(buffer);
      readableBytes += buffer.readableBytes();
      return;
    }

    CompositeReadableBuffer compositeBuffer = (CompositeReadableBuffer) buffer;
    while (!compositeBuffer.readableBuffers.isEmpty()) {
      ReadableBuffer subBuffer = compositeBuffer.readableBuffers.remove();
      readableBuffers.add(subBuffer);
    }
    readableBytes += compositeBuffer.readableBytes;
    compositeBuffer.readableBytes = 0;
    compositeBuffer.close();
  }

  @Override
  public int readableBytes() {
    return readableBytes;
  }

  private static final NoThrowReadOperation<Void> UBYTE_OP =
      new NoThrowReadOperation<Void>() {
        @Override
        public int read(ReadableBuffer buffer, int length, Void unused, int value) {
          return buffer.readUnsignedByte();
        }
      };

  @Override
  public int readUnsignedByte() {
    return executeNoThrow(UBYTE_OP, 1, null, 0);
  }

  private static final NoThrowReadOperation<Void> SKIP_OP =
      new NoThrowReadOperation<Void>() {
        @Override
        public int read(ReadableBuffer buffer, int length, Void unused, int unused2) {
          buffer.skipBytes(length);
          return 0;
        }
      };

  @Override
  public void skipBytes(int length) {
    executeNoThrow(SKIP_OP, length, null, 0);
  }

  private static final NoThrowReadOperation<byte[]> BYTE_ARRAY_OP =
      new NoThrowReadOperation<byte[]>() {
        @Override
        public int read(ReadableBuffer buffer, int length, byte[] dest, int offset) {
          buffer.readBytes(dest, offset, length);
          return offset + length;
        }
      };

  @Override
  public void readBytes(byte[] dest, int destOffset, int length) {
    executeNoThrow(BYTE_ARRAY_OP, length, dest, destOffset);
  }

  private static final NoThrowReadOperation<ByteBuffer> BYTE_BUF_OP =
      new NoThrowReadOperation<ByteBuffer>() {
        @Override
        public int read(ReadableBuffer buffer, int length, ByteBuffer dest, int unused) {
          // Change the limit so that only lengthToCopy bytes are available.
          int prevLimit = dest.limit();
          ((Buffer) dest).limit(dest.position() + length);
          // Write the bytes and restore the original limit.
          buffer.readBytes(dest);
          ((Buffer) dest).limit(prevLimit);
          return 0;
        }
      };

  @Override
  public void readBytes(ByteBuffer dest) {
    executeNoThrow(BYTE_BUF_OP, dest.remaining(), dest, 0);
  }

  private static final ReadOperation<OutputStream> STREAM_OP =
      new ReadOperation<OutputStream>() {
        @Override
        public int read(ReadableBuffer buffer, int length, OutputStream dest, int unused)
            throws IOException {
          buffer.readBytes(dest, length);
          return 0;
        }
      };

  @Override
  public void readBytes(OutputStream dest, int length) throws IOException {
    execute(STREAM_OP, length, dest, 0);
  }

  @Override
  public ReadableBuffer readBytes(int length) {
    if (length <= 0) {
      return ReadableBuffers.empty();
    }
    checkReadable(length);
    readableBytes -= length;

    ReadableBuffer newBuffer = null;
    CompositeReadableBuffer newComposite = null;
    do {
      ReadableBuffer buffer = readableBuffers.peek();
      int readable = buffer.readableBytes();
      ReadableBuffer readBuffer;
      if (readable > length) {
        readBuffer = buffer.readBytes(length);
        length = 0;
      } else {
        if (marked) {
          readBuffer = buffer.readBytes(readable);
          advanceBuffer();
        } else {
          readBuffer = readableBuffers.poll();
        }
        length -= readable;
      }
      if (newBuffer == null) {
        newBuffer = readBuffer;
      } else {
        if (newComposite == null) {
          newComposite = new CompositeReadableBuffer(
              length == 0 ? 2 : Math.min(readableBuffers.size() + 2, 16));
          newComposite.addBuffer(newBuffer);
          newBuffer = newComposite;
        }
        newComposite.addBuffer(readBuffer);
      }
    } while (length > 0);
    return newBuffer;
  }

  @Override
  public boolean markSupported() {
    for (ReadableBuffer buffer : readableBuffers) {
      if (!buffer.markSupported()) {
        return false;
      }
    }
    return true;
  }

  @Override
  public void mark() {
    if (rewindableBuffers == null) {
      rewindableBuffers = new ArrayDeque<>(Math.min(readableBuffers.size(), 16));
    }
    while (!rewindableBuffers.isEmpty()) {
      rewindableBuffers.remove().close();
    }
    marked = true;
    ReadableBuffer buffer = readableBuffers.peek();
    if (buffer != null) {
      buffer.mark();
    }
  }

  @Override
  public void reset() {
    if (!marked) {
      throw new InvalidMarkException();
    }
    ReadableBuffer buffer;
    if ((buffer = readableBuffers.peek()) != null) {
      int currentRemain = buffer.readableBytes();
      buffer.reset();
      readableBytes += (buffer.readableBytes() - currentRemain);
    }
    while ((buffer = rewindableBuffers.pollLast()) != null) {
      buffer.reset();
      readableBuffers.addFirst(buffer);
      readableBytes += buffer.readableBytes();
    }
  }

  @Override
  public boolean byteBufferSupported() {
    for (ReadableBuffer buffer : readableBuffers) {
      if (!buffer.byteBufferSupported()) {
        return false;
      }
    }
    return true;
  }

  @Nullable
  @Override
  public ByteBuffer getByteBuffer() {
    if (readableBuffers.isEmpty()) {
      return null;
    }
    return readableBuffers.peek().getByteBuffer();
  }

  @Override
  public void close() {
    while (!readableBuffers.isEmpty()) {
      readableBuffers.remove().close();
    }
    if (rewindableBuffers != null) {
      while (!rewindableBuffers.isEmpty()) {
        rewindableBuffers.remove().close();
      }
    }
  }

  /**
   * Executes the given {@link ReadOperation} against the {@link ReadableBuffer}s required to
   * satisfy the requested {@code length}.
   */
  private <T> int execute(ReadOperation<T> op, int length, T dest, int value) throws IOException {
    checkReadable(length);

    if (!readableBuffers.isEmpty()) {
      advanceBufferIfNecessary();
    }

    for (; length > 0 && !readableBuffers.isEmpty(); advanceBufferIfNecessary()) {
      ReadableBuffer buffer = readableBuffers.peek();
      int lengthToCopy = Math.min(length, buffer.readableBytes());

      // Perform the read operation for this buffer.
      value = op.read(buffer, lengthToCopy, dest, value);

      length -= lengthToCopy;
      readableBytes -= lengthToCopy;
    }

    if (length > 0) {
      // Should never get here.
      throw new AssertionError("Failed executing read operation");
    }

    return value;
  }

  private <T> int executeNoThrow(NoThrowReadOperation<T> op, int length, T dest, int value) {
    try {
      return execute(op, length, dest, value);
    } catch (IOException e) {
      throw new AssertionError(e); // shouldn't happen
    }
  }

  /**
   * If the current buffer is exhausted, removes and closes it.
   */
  private void advanceBufferIfNecessary() {
    ReadableBuffer buffer = readableBuffers.peek();
    if (buffer.readableBytes() == 0) {
      advanceBuffer();
    }
  }

  /**
   * Removes one buffer from the front and closes it.
   */
  private void advanceBuffer() {
    if (marked) {
      rewindableBuffers.add(readableBuffers.remove());
      ReadableBuffer next = readableBuffers.peek();
      if (next != null) {
        next.mark();
      }
    } else {
      readableBuffers.remove().close();
    }
  }

  /**
   * A simple read operation to perform on a single {@link ReadableBuffer}.
   * All state management for the buffers is done by
   * {@link CompositeReadableBuffer#execute(ReadOperation, int, Object, int)}.
   */
  private interface ReadOperation<T> {
    /**
     * This method can also be used to simultaneously perform operation-specific int-valued
     * aggregation over the sequence of buffers in a {@link CompositeReadableBuffer}.
     * {@code value} is the return value from the prior buffer, or the "initial" value passed
     * to {@code execute()} in the case of the first buffer. {@code execute()} returns the value
     * returned by the operation called on the last buffer.
     */
    int read(ReadableBuffer buffer, int length, T dest, int value) throws IOException;
  }

  private interface NoThrowReadOperation<T> extends ReadOperation<T> {
    @Override
    int read(ReadableBuffer buffer, int length, T dest, int value);
  }
}
