// Copyright 2022 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.

import com.google.common.collect.ImmutableList;
import com.google.protobuf.TextFormat;
import dev.pigweed.pw_hdlc.Decoder;
import dev.pigweed.pw_hdlc.Encoder;
import dev.pigweed.pw_hdlc.Frame;
import dev.pigweed.pw_log.Logger;
import dev.pigweed.pw_rpc.Channel;
import dev.pigweed.pw_rpc.ChannelOutputException;
import dev.pigweed.pw_rpc.Client;
import dev.pigweed.pw_rpc.Status;
import dev.pigweed.pw_transfer.ProtocolVersion;
import dev.pigweed.pw_transfer.TransferClient;
import dev.pigweed.pw_transfer.TransferError;
import dev.pigweed.pw_transfer.TransferService;
import dev.pigweed.pw_transfer.TransferTimeoutSettings;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.ExecutionException;
import pw.transfer.ConfigProtos;
import pw.transfer.ConfigProtos.TransferAction;

public class JavaClient {
  private static final String SERVICE = "pw.transfer.Transfer";
  private static final Logger logger = Logger.forClass(Client.class);

  private static final int CHANNEL_ID = 1;
  private static final long RPC_HDLC_ADDRESS = 'R';
  private static final String HOSTNAME = "localhost";

  // This is the maximum size of the socket send buffers. Ideally, this is set
  // to the lowest allowed value to minimize buffering between the proxy and
  // clients so rate limiting causes the client to block and wait for the
  // integration test proxy to drain rather than allowing OS buffers to backlog
  // large quantities of data.
  //
  // Note that the OS may chose to not strictly follow this requested buffer
  // size. Still, setting this value to be as small as possible does reduce
  // bufer sizes significantly enough to better reflect typical inter-device
  // communication.
  //
  // For this to be effective, servers should also configure their sockets to a
  // smaller receive buffer size.
  private static final int MAX_SOCKET_SEND_BUFFER_SIZE = 1;

  private final HdlcRpcChannelOutput channelOutput;
  private final Client rpcClient;
  private final HdlcParseThread parseThread;

  public JavaClient(OutputStream writer, InputStream reader) {
    this.channelOutput = new HdlcRpcChannelOutput(writer, RPC_HDLC_ADDRESS);

    this.rpcClient = Client.create(ImmutableList.of(new Channel(CHANNEL_ID, this.channelOutput)),
        ImmutableList.of(TransferService.get()));

    this.parseThread = new HdlcParseThread(reader, this.rpcClient);
  }

  void startClient() {
    parseThread.start();
  }

  Client getRpcClient() {
    return this.rpcClient;
  }

  private class HdlcRpcChannelOutput implements Channel.Output {
    private final OutputStream writer;
    private final long address;

    public HdlcRpcChannelOutput(OutputStream writer, long address) {
      this.writer = writer;
      this.address = address;
    }

    public void send(byte[] packet) throws ChannelOutputException {
      try {
        Encoder.writeUiFrame(this.address, ByteBuffer.wrap(packet), this.writer);
      } catch (IOException e) {
        throw new ChannelOutputException("Failed to write HDLC UI Frame", e);
      }
    }
  }

  private class HdlcParseThread extends Thread {
    private final InputStream reader;
    private final RpcOnComplete frame_handler;
    private final Decoder decoder;

    private class RpcOnComplete implements Decoder.OnCompleteFrame {
      private final Client rpc_client;

      public RpcOnComplete(Client rpc_client) {
        this.rpc_client = rpc_client;
      }

      public void onCompleteFrame(Frame frame) {
        if (frame.getAddress() == RPC_HDLC_ADDRESS) {
          this.rpc_client.processPacket(frame.getPayload());
        }
      }
    }

    public HdlcParseThread(InputStream reader, Client rpc_client) {
      this.reader = reader;
      this.frame_handler = new RpcOnComplete(rpc_client);
      this.decoder = new Decoder(this.frame_handler);
    }

    public void run() {
      while (true) {
        int val = 0;
        try {
          val = this.reader.read();
        } catch (IOException e) {
          logger.atSevere().log("HDLC parse thread read failed");
          System.exit(1);
        }
        this.decoder.process((byte) val);
      }
    }
  }

  public static ConfigProtos.ClientConfig ParseConfigFrom(InputStream reader) throws IOException {
    byte[] buffer = new byte[reader.available()];
    reader.read(buffer);
    ConfigProtos.ClientConfig.Builder config_builder = ConfigProtos.ClientConfig.newBuilder();
    TextFormat.merge(new String(buffer, StandardCharsets.UTF_8), config_builder);
    if (config_builder.getChunkTimeoutMs() == 0) {
      throw new AssertionError("chunk_timeout_ms may not be 0");
    }
    if (config_builder.getInitialChunkTimeoutMs() == 0) {
      throw new AssertionError("initial_chunk_timeout_ms may not be 0");
    }
    if (config_builder.getMaxRetries() == 0) {
      throw new AssertionError("max_retries may not be 0");
    }
    if (config_builder.getMaxLifetimeRetries() == 0) {
      throw new AssertionError("max_lifetime_retries may not be 0");
    }
    return config_builder.build();
  }

  public static void ReadFromServer(int resourceId,
      Path fileName,
      TransferClient client,
      Status expected_status,
      int initial_offset) {
    byte[] data;
    try {
      data = client.read(resourceId, initial_offset).get();
    } catch (ExecutionException e) {
      if (((TransferError) e.getCause()).status() != expected_status) {
        throw new AssertionError("Unexpected transfer read failure", e);
      }
      // Expected failure occurred, skip trying to write the data knowing that
      // it is missing.
      return;
    } catch (InterruptedException e) {
      throw new AssertionError("Read from server failed", e);
    }

    if (expected_status != Status.OK) {
      throw new AssertionError("Transfer succeeded unexpectedly");
    }

    try {
      Files.write(fileName, data);
    } catch (IOException e) {
      logger.atSevere().log("Failed to write to output file `%s`", fileName);
      throw new AssertionError("Failed to write output file from server", e);
    }
  }

  public static void WriteToServer(int resourceId,
      Path fileName,
      TransferClient client,
      Status expected_status,
      int initial_offset) {
    if (Files.notExists(fileName)) {
      logger.atSevere().log("Input file `%s` does not exist", fileName);
    }

    byte[] data;
    try {
      data = Files.readAllBytes(fileName);
    } catch (IOException e) {
      logger.atSevere().log("Failed to read input file `%s`", fileName);
      throw new AssertionError("Failed to read input file on write to server", e);
    }

    try {
      client.write(resourceId, data, initial_offset).get();
    } catch (ExecutionException e) {
      if (((TransferError) e.getCause()).status() != expected_status) {
        throw new AssertionError("Unexpected transfer write failure", e);
      }
      return;
    } catch (InterruptedException e) {
      throw new AssertionError("Write to server failed", e);
    }

    if (expected_status != Status.OK) {
      throw new AssertionError("Transfer succeeded unexpectedly");
    }
  }

  public static void main(String[] args) {
    if (args.length != 1) {
      logger.atSevere().log("Usage: PORT");
      System.exit(1);
    }

    // The port is provided directly as a commandline argument.
    int port = Integer.parseInt(args[0]);

    ConfigProtos.ClientConfig config;
    try {
      config = ParseConfigFrom(System.in);
    } catch (IOException e) {
      throw new AssertionError("Failed to parse config file from stdin", e);
    }

    Socket socket;
    try {
      socket = new Socket(HOSTNAME, port);
    } catch (IOException e) {
      logger.atSevere().log("Failed to connect to %s:%d", HOSTNAME, port);
      throw new AssertionError("Failed to connect to server/proxy port", e);
    }
    try {
      socket.setSendBufferSize(MAX_SOCKET_SEND_BUFFER_SIZE);
    } catch (SocketException e) {
      logger.atSevere().log("Invalid socket buffer size %d", MAX_SOCKET_SEND_BUFFER_SIZE);
      throw new AssertionError("Invalid socket buffer size", e);
    }
    InputStream reader;
    OutputStream writer;

    try {
      writer = socket.getOutputStream();
      reader = socket.getInputStream();
    } catch (IOException e) {
      throw new AssertionError("Failed to open socket streams", e);
    }

    JavaClient hdlc_rpc_client = new JavaClient(writer, reader);

    hdlc_rpc_client.startClient();

    TransferClient client = new TransferClient(
        hdlc_rpc_client.getRpcClient().method(CHANNEL_ID, TransferService.get().name() + "/Read"),
        hdlc_rpc_client.getRpcClient().method(CHANNEL_ID, TransferService.get().name() + "/Write"),
        TransferTimeoutSettings.builder()
            .setTimeoutMillis(config.getChunkTimeoutMs())
            .setInitialTimeoutMillis(config.getInitialChunkTimeoutMs())
            .setMaxRetries(config.getMaxRetries())
            .setMaxLifetimeRetries(config.getMaxLifetimeRetries())
            .build());

    for (ConfigProtos.TransferAction action : config.getTransferActionsList()) {
      int resourceId = action.getResourceId();
      Path fileName = Paths.get(action.getFilePath());

      if (action.getProtocolVersion() != TransferAction.ProtocolVersion.UNKNOWN_VERSION) {
        client.setProtocolVersion(ProtocolVersion.values()[action.getProtocolVersionValue()]);
      } else {
        client.setProtocolVersion(ProtocolVersion.latest());
      }
      try {
        if (action.getTransferType() == ConfigProtos.TransferAction.TransferType.WRITE_TO_SERVER) {
          WriteToServer(resourceId,
              fileName,
              client,
              Status.fromCode(action.getExpectedStatus().getNumber()),
              action.getInitialOffset());
        } else if (action.getTransferType()
            == ConfigProtos.TransferAction.TransferType.READ_FROM_SERVER) {
          ReadFromServer(resourceId,
              fileName,
              client,
              Status.fromCode(action.getExpectedStatus().getNumber()),
              action.getInitialOffset());
        } else {
          throw new AssertionError("Unknown transfer action type");
        }
      } catch (AssertionError e) {
        System.exit(1);
      }
    }

    logger.atInfo().log("Transfer completed successfully");

    System.exit(0);
  }
}
