// Copyright 2021 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.

import { Message } from 'google-protobuf';
import { Status } from 'pigweedjs/pw_status';

import { Call } from './call';
import { Channel, Method, Service } from './descriptors';
import * as packets from './packets';

/** Max number that can fit into a 2-byte varint */
const MAX_CALL_ID = 1 << 14;
/** Calls with ID of `kOpenCallId` were unrequested, and are updated to have the
    call ID of the first matching request. */
const LEGACY_OPEN_CALL_ID = 0;
const OPEN_CALL_ID = 2 ** 32 - 1;

/** Data class for a pending RPC call. */
export class Rpc {
  readonly channel: Channel;
  readonly service: Service;
  readonly method: Method;

  constructor(channel: Channel, service: Service, method: Method) {
    this.channel = channel;
    this.service = service;
    this.method = method;
  }

  /** Returns channel service method callId tuple */
  getIdSet(callId: number): [number, number, number, number] {
    return [this.channel.id, this.service.id, this.method.id, callId];
  }

  /**
   * Returns a string sequence to uniquely identify channel, service, method
   * and call ID. This can be used to hash the Rpc.
   *
   * For example: "12346789.23452345.12341234.34"
   */
  getIdString(callId: number): string {
    return `${this.channel.id}.${this.service.id}.${this.method.id}.${callId}`;
  }

  toString(): string {
    return (
      `${this.service.name}.${this.method.name} on channel ` +
      `${this.channel.id}`
    );
  }
}

/** Tracks pending RPCs and encodes outgoing RPC packets. */
export class PendingCalls {
  pending: Map<string, Call> = new Map();
  // We skip callId zero to avoid LEGACY_OPEN_CALL_ID.
  nextCallId: number = 1;

  /** Starts the provided RPC and returns the encoded packet to send. */
  request(rpc: Rpc, request: Message, call: Call): Uint8Array {
    this.open(rpc, call);
    console.log(`Starting ${rpc}`);
    return packets.encodeRequest(rpc.getIdSet(call.callId), request);
  }

  allocateCallId(): number {
    const callId = this.nextCallId;
    this.nextCallId = (this.nextCallId + 1) % MAX_CALL_ID;
    // We skip callId zero to avoid LEGACY_OPEN_CALL_ID.
    if (this.nextCallId == 0) {
      this.nextCallId = 1;
    }
    return callId;
  }

  /** Calls request and sends the resulting packet to the channel. */
  sendRequest(
    rpc: Rpc,
    call: Call,
    ignoreError: boolean,
    request?: Message,
  ): Call | undefined {
    const previous = this.open(rpc, call);
    const packet = packets.encodeRequest(
      rpc.getIdSet(call.callId),
      request,
      rpc.method.customRequestSerializer,
    );
    try {
      rpc.channel.send(packet);
    } catch (error) {
      if (!ignoreError) {
        throw error;
      }
    }
    return previous;
  }

  /**
   * Creates a call for an RPC, but does not invoke it.
   *
   * open() can be used to receive streaming responses to an RPC that was not
   * invoked by this client. For example, a server may stream logs with a
   * server streaming RPC prior to any clients invoking it.
   */
  open(rpc: Rpc, call: Call): Call | undefined {
    console.debug(`Starting ${rpc}`);
    const previous = this.pending.get(rpc.getIdString(call.callId));
    this.pending.set(rpc.getIdString(call.callId), call);
    return previous;
  }

  sendClientStream(rpc: Rpc, message: Message, callId: number) {
    if (this.getPending(rpc, callId) === undefined) {
      throw new Error(`Attempt to send client stream for inactive RPC: ${rpc}`);
    }
    rpc.channel.send(
      packets.encodeClientStream(
        rpc.getIdSet(callId),
        message,
        rpc.method.customRequestSerializer,
      ),
    );
  }

  sendClientStreamEnd(rpc: Rpc, callId: number) {
    if (this.getPending(rpc, callId) === undefined) {
      throw new Error(`Attempt to send client stream for inactive RPC: ${rpc}`);
    }
    rpc.channel.send(packets.encodeClientStreamEnd(rpc.getIdSet(callId)));
  }

  /** Cancels the RPC. Returns the CLIENT_ERROR packet to send. */
  cancel(rpc: Rpc, callId: number): Uint8Array {
    console.debug(`Cancelling ${rpc}`);
    this.pending.delete(rpc.getIdString(callId));
    return packets.encodeCancel(rpc.getIdSet(callId));
  }

  /** Calls cancel and sends the cancel packet, if any, to the channel. */
  sendCancel(rpc: Rpc, callId: number): boolean {
    let packet: Uint8Array | undefined;
    try {
      packet = this.cancel(rpc, callId);
    } catch (err) {
      return false;
    }

    if (packet !== undefined) {
      rpc.channel.send(packet);
    }
    return true;
  }

  /** Gets the pending RPC's call. If status is set, clears the RPC. */
  getPending(rpc: Rpc, callId: number, status?: Status): Call | undefined {
    let call: Call | undefined = this.pending.get(rpc.getIdString(callId));
    if (callId === LEGACY_OPEN_CALL_ID || callId === OPEN_CALL_ID) {
      // Calls with ID `OPEN_CALL_ID` were unrequested, and are updated to
      // have the call ID of the first matching request.
      const allPendingCalls = Array.from(this.pending.values());
      for (const pending in allPendingCalls) {
        const curCall = allPendingCalls[pending];
        if (curCall.rpc.getIdString(0) === rpc.getIdString(0)) {
          call = curCall;
          break;
        }
      }
    }
    if (status === undefined) {
      return call;
    }

    this.pending.delete(rpc.getIdString(callId));
    return call;
  }
}
