"""Network simulator Python gRPC client."""

import logging
import os
import platform
from typing import Dict, Optional

from google.protobuf import empty_pb2
import grpc
from netsim_grpc.proto.netsim import (
    common_pb2 as common,
    frontend_pb2_grpc as frontend_grpc,
    frontend_pb2 as frontend,
    model_pb2 as model,
)

_Empty = empty_pb2.Empty
_Channel = grpc.Channel

NETSIM_INI = 'netsim.ini'


class SetupError(Exception):
  """Class for exceptions related to netsim setup."""


class NetsimClient(object):
  """Network simulator client."""

  def __init__(self):
    """Create a NetsimClient.

    Args:
      local_creds: Use local credentials for gRPC channel.
    """
    self._server_addr = _get_grpc_server_addr()
    self._channel = _create_frontend_grpc_channel(self._server_addr)
    self._stub = frontend_grpc.FrontendServiceStub(self._channel)

  def get_version(self) -> str:
    """Get the version of the netsim daemon.

    Returns:
      The netsim daemon version.
    """
    return self._stub.GetVersion(_Empty()).version

  def get_devices(self) -> Dict[str, model.Device]:
    """Get info for all devices connected to netsim.

    Returns:
      A dict mapping each connected device to its netsim properties.
    """
    response = self._stub.ListDevice(_Empty())
    return {device.name: device for device in response.devices}

  def set_position(
      self,
      device_name: str,
      position: Optional[model.Position] = None,
      orientation: Optional[model.Orientation] = None,
  ) -> bool:
    """Set the position and/or orientation of the specified device.

    NOTE: Leaving the position/orientation unset would reset the device's
      position/orientation to zero.

    Args:
      device_name: The avd name of the specified device.
      position: The desired (x, y, z) position of the device.
      orientation: The desired (yaw, pitch, roll) orientation of the device.

    Returns:
      bool indicating whether device position was successfully set.
    """
    request = frontend.PatchDeviceRequest()
    request.device.name = device_name
    if position:
      logging.info(
          'Setting new position for device %s: %s', device_name, position
      )
      request.device.position.x = position.x
      request.device.position.y = position.y
      request.device.position.z = position.z
    if orientation:
      logging.info(
          'Setting new orientation for device %s: %s', device_name, orientation
      )
      request.device.orientation.yaw = orientation.yaw
      request.device.orientation.pitch = orientation.pitch
      request.device.orientation.roll = orientation.roll
    self._stub.PatchDevice(request)
    device_info = self.get_devices()[device_name]
    success = True
    if position and device_info.position != position:
      logging.error(
          'Device %s position not set as expected. Current position: %s',
          device_name,
          device_info.position,
      )
      success = False
    if orientation and device_info.orientation != orientation:
      logging.error(
          'Device %s orientation not set as expected. Current orientation: %s',
          device_name,
          device_info.orientation,
      )
      success = False
    return success

  def set_radio(
      self, device_name: str, radio: model.PhyKind, state: bool
  ) -> None:
    """Set the radio state of the specified device.

    Args:
      device_name: The avd name of the specified device.
      radio: The specified radio, e.g. BLUETOOTH_LOW_ENERGY, WIFI
      state: Set radio state UP if True, DOWN if False.
    """
    chip = model.Chip()

    if radio == model.PhyKind.WIFI:
      chip.wifi.state = state
      chip.kind = common.ChipKind.WIFI
    elif radio == model.PhyKind.UWB:
      chip.uwb.state = state
      chip.kind = common.ChipKind.UWB
    else:
      if radio == model.PhyKind.BLUETOOTH_LOW_ENERGY:
        chip.bt.low_energy.state = state
      elif radio == model.PhyKind.BLUETOOTH_CLASSIC:
        chip.bt.classic.state = state
      chip.kind = common.ChipKind.BLUETOOTH

    request = frontend.PatchDeviceRequest()
    request.device.name = device_name
    request.device.chips.append(chip)
    self._stub.PatchDevice(request)

  def get_captures(self) -> list[model.Capture]:
    """Get info for all capture information in netsim.

    Returns:
      A List of all captures where capture is netsim.model.Capture.
    """
    return self._stub.ListCapture(_Empty()).captures

  def set_capture(
      self, device_name: str, radio: common.ChipKind, state: bool
  ) -> None:
    """Set the capture state of the specific device and radio.

    Args:
      device_name: The avd name of the specified device.
      radio: The specified radio ChipKind, e.g. BLUETOOTH, WIFI, UWB
      state: Set capture state UP if True, Down if False.
    """
    for capture in self.get_captures():
      if capture.chip_kind == radio and capture.device_name == device_name:
        request = frontend.PatchCaptureRequest()
        request.id = capture.id
        request.patch.state = state
        logging.info(
            'Setting capture state of radio %s for device %s to %s',
            common.ChipKind.Name(radio),
            device_name,
            state,
        )
        self._stub.PatchCapture(request)

  def set_capture_all(self, state: bool) -> None:
    logging.info('Setting capture state for all devices: %s', state)
    for capture in self.get_captures():
      request = frontend.PatchCaptureRequest()
      request.id = capture.id
      request.patch.state = state
      self._stub.PatchCapture(request)

  def reset(self) -> None:
    """Reset all devices."""
    self._stub.Reset(_Empty())

  def close(self) -> None:
    """Close the netsim client connection."""
    if hasattr(self, '_channel'):
      self._channel.close()

  def __del__(self) -> None:
    self.close()


def _get_grpc_server_addr() -> str:
  """Locate the grpc server address from netsim's .ini file."""
  # TMPDIR is set on buildbots
  file_path = os.path.join('/tmp', NETSIM_INI)
  if 'TMPDIR' in os.environ and os.path.exists(
      os.path.join(os.environ['TMPDIR'], NETSIM_INI)
  ):
    file_path = os.path.join(os.environ['TMPDIR'], NETSIM_INI)
  # XDG_RUNTIME_DIR for Linux local discovery env
  elif platform.system() == 'Linux' and 'XDG_RUNTIME_DIR' in os.environ:
    file_path = os.path.join(os.environ['XDG_RUNTIME_DIR'], NETSIM_INI)
  # HOME for Mac local discovery
  elif platform.system() == 'Darwin' and 'HOME' in os.environ:
    file_path = os.path.join(
        os.environ['HOME'], 'Library/Caches/TemporaryItems', NETSIM_INI
    )
  # LOCALAPPDATA for Windows local discovery
  elif platform.system() == 'Windows' and 'LOCALAPPDATA' in os.environ:
    file_path = os.path.join(os.environ['LOCALAPPDATA'], 'Temp', NETSIM_INI)
  else:
    logging.warning(
        'TMPDIR, XDG_RUNTIME_DIR, HOME, or LOCALAPPDATA environment variable'
        ' not set.Using /tmp. Is netsimd running?'
    )
  if not os.path.exists(file_path):
    raise SetupError(
        f'Unable to find the netsim.ini file at {file_path}. Is netsimd'
        ' running?',
    )
  with open(file_path, 'r') as f:
    for line in f:
      key, value = line.strip().split('=')
      if key == 'grpc.port':
        logging.info('Found netsim server gRPC port: %s.', value)
        return f'localhost:{value}'
  raise SetupError(
      'Unable to find the netsim server address from the .ini file.'
  )


def _create_frontend_grpc_channel(
    server_addr: str,
) -> _Channel:
  """Creates a gRPC channel to communicate with netsim FE service.

  Args:
    server_addr: Endpoint address of the netsim server.

  Returns:
    gRPC channel
  """
  logging.info(
      'Creating gRPC channel for netsim frontend service at %s.', server_addr
  )
  return grpc.insecure_channel(server_addr)
