# Copyright (c) 2017 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Unit tests for server/cros/network/rf_switch/rf_switch.py."""

import unittest

from autotest_lib.client.common_lib.test_utils import mock
from autotest_lib.server.cros.network.rf_switch import rf_mocks
from autotest_lib.server.cros.network.rf_switch import rf_switch
from autotest_lib.server.cros.network.rf_switch import scpi


class RfSwitchTestCases(unittest.TestCase):
    """Unit tests for RfSwitch Methods."""

    _HOST = '1.2.3.4'

    def setUp(self):
        self.god = mock.mock_god(debug=False, fail_fast=True)
        self.mock_rf_switch = rf_mocks.RfSwitchMock(self._HOST)
        self.code = 0
        self.msg = ''

    def tearDown(self):
        self.god.unstub_all()

    def _populate_stack_for_cmd(self, cmd):
        """Helper to add call stacks to verify in mock.

        @param cmd: command to verify
        """
        # monitor send for correct command.
        self.god.stub_function(self.mock_rf_switch.socket, 'send')
        self.mock_rf_switch.socket.send.expect_call('%s' % cmd)
        self.mock_rf_switch.socket.send.expect_call(
            '%s\n' % scpi.Scpi.CMD_ERROR_CHECK)
        self.mock_rf_switch.socket.send.expect_call(
            '%s\n' % rf_switch.RfSwitch._CMD_WAIT)

        self.god.stub_function_to_return(
            self.mock_rf_switch.socket, 'recv', '%d, "%s"' % (
                self.code, self.msg))

    def _populate_stack_for_query_commands(self, cmd, response):
        """Helper to add call stacks for query commands

        @param cmd: command to verify
        @param response: response to mock return
        """
        self.god.stub_function(self.mock_rf_switch.socket, 'send')
        self.mock_rf_switch.socket.send.expect_call('%s' % cmd)

        self.god.stub_function_to_return(
            self.mock_rf_switch.socket, 'recv', response)

    def test_send_cmd_check_error(self):
        """Verify send_cmd_check_error."""
        test_command = 'This is a command\n'
        self._populate_stack_for_cmd(test_command)
        self.mock_rf_switch.send_cmd_check_error(test_command)
        self.god.check_playback()

    def test_send_cmd_check_error_throws_error(self):
        """Verify send_cmd_check_error throws error."""
        test_command = 'This is a command'
        code = 101
        msg = 'Error Message'

        self.god.stub_function_to_return(
            self.mock_rf_switch.socket, 'recv', '%d, "%s"' % (code, msg))
        with self.assertRaises(rf_switch.RfSwitchException):
            self.mock_rf_switch.send_cmd_check_error(test_command)

    def test_close_relays(self):
        """Verify close_relays."""
        relays = 'R1:R3'
        self._populate_stack_for_cmd(
            '%s (@%s)\n' % (rf_switch.RfSwitch._CMD_CLOSE_RELAYS, relays))
        self.mock_rf_switch.close_relays(relays)
        self.god.check_playback()

    def test_relays_closed(self):
        """Verify relays_closed."""
        relays = 'R1:R3'
        status = '1,0,1'
        cmd = '%s (@%s)\n' % (
            rf_switch.RfSwitch._CMD_CHECK_RELAYS_CLOSED, relays)
        self._populate_stack_for_query_commands(cmd, status)
        self.mock_rf_switch.relays_closed(relays)
        self.god.check_playback()

    def test_open_relays(self):
        """Verify open_relays."""
        relays = 'R1:R3'
        self._populate_stack_for_cmd(
            '%s (@%s)\n' % (rf_switch.RfSwitch._CMD_OPEN_RELAYS, relays))
        self.mock_rf_switch.open_relays(relays)
        self.god.check_playback()

    def test_open_all_relays(self):
        """Verify open_all_relays."""
        self._populate_stack_for_cmd(
            '%s\n' % rf_switch.RfSwitch._CMD_OPEN_ALL_RELAYS)
        self.mock_rf_switch.open_all_relays()
        self.god.check_playback()

    def test_set_verify_on(self):
        """Verify set_verify for on."""
        relays = 'R1:R3'
        status = True
        self._populate_stack_for_cmd(
            '%s 1,(@%s)\n' % (rf_switch.RfSwitch._CMD_SET_VERIFY, relays))
        self.mock_rf_switch.set_verify(relays, status)
        self.god.check_playback()

    def test_set_verify_off(self):
        """Verify set_verify for off."""
        relays = 'R1:R3'
        status = False
        self._populate_stack_for_cmd(
            '%s 0,(@%s)\n' % (rf_switch.RfSwitch._CMD_SET_VERIFY, relays))
        self.mock_rf_switch.set_verify(relays, status)
        self.god.check_playback()

    def test_get_verify(self):
        """Verify get_verify."""
        relays = 'R1:R3'
        status = '1,0,1'
        cmd = '%s (@%s)\n' % (rf_switch.RfSwitch._CMD_GET_VERIFY, relays)
        self._populate_stack_for_query_commands(cmd, status)
        self.mock_rf_switch.get_verify(relays)
        self.god.check_playback()

    def test_set_verify_inverted_true(self):
        """Verify set_verify_inverted."""
        relays = 'R1:R3'
        status = True
        cmd = '%s INV,(@%s)\n' % (
            rf_switch.RfSwitch._CMD_SET_VERIFY_INVERTED, relays)
        self._populate_stack_for_cmd(cmd)
        self.mock_rf_switch.set_verify_inverted(relays, status)
        self.god.check_playback()

    def test_set_verify_inverted_false(self):
        """Verify set_verify_inverted."""
        relays = 'R1:R3'
        status = False
        cmd = '%s NORM,(@%s)\n' % (
            rf_switch.RfSwitch._CMD_SET_VERIFY_INVERTED, relays)
        self._populate_stack_for_cmd(cmd)
        self.mock_rf_switch.set_verify_inverted(relays, status)
        self.god.check_playback()

    def test_get_verify_inverted(self):
        """Verify get_verify_inverted."""
        relays = 'R1:R3'
        status = '1,0,1'
        cmd = '%s (@%s)\n' % (
            rf_switch.RfSwitch._CMD_GET_VERIFY_INVERTED, relays)
        self._populate_stack_for_query_commands(cmd, status)
        self.mock_rf_switch.get_verify_inverted(relays)
        self.god.check_playback()

    def test_get_verify_state(self):
        """Verify get_verify_state."""
        relays = 'R1:R3'
        status = '1,0,1'
        cmd = '%s (@%s)\n' % (rf_switch.RfSwitch._CMD_GET_VERIFY_STATE, relays)
        self._populate_stack_for_query_commands(cmd, status)
        self.mock_rf_switch.get_verify_state(relays)
        self.god.check_playback()

    def test_busy(self):
        """Verify busy."""
        status = '1'
        cmd = '%s\n' % rf_switch.RfSwitch._CMD_CHECK_BUSY
        self._populate_stack_for_query_commands(cmd, status)
        busy = self.mock_rf_switch.busy
        self.god.check_playback()
        self.assertTrue(busy)

    def test_not_busy(self):
        """Verify busy."""
        status = '0'
        cmd = '%s\n' % rf_switch.RfSwitch._CMD_CHECK_BUSY
        self._populate_stack_for_query_commands(cmd, status)
        busy = self.mock_rf_switch.busy
        self.god.check_playback()
        self.assertFalse(busy)

    def test_wait(self):
        """Verify wait."""
        self.god.stub_function(self.mock_rf_switch.socket, 'send')
        self.mock_rf_switch.socket.send.expect_call(
            '%s\n' % rf_switch.RfSwitch._CMD_WAIT)
        self.mock_rf_switch.wait()
        self.god.check_playback()

    def test_get_attenuation(self):
        """Verify get_attenuation."""
        ap = 1
        attenuation = 100
        relays = rf_switch.RfSwitch._AP_ATTENUATOR_RELAYS_SHORT[ap]
        return_relays = '1,1,0,1,1,0,0'
        cmd = '%s (@%s)\n' % (rf_switch.RfSwitch._CMD_CHECK_RELAYS_CLOSED,
                              relays)
        self._populate_stack_for_query_commands(cmd, return_relays)
        response = self.mock_rf_switch.get_attenuation(ap)
        self.god.check_playback()
        self.assertEquals(response, attenuation,
                          'get_attenuation returned %s, did not match %s' % (
                              response, attenuation))

    def test_set_attenuation(self):
        """Verify set_attenuation."""
        ap = 1
        ap_relays = 'k1_49,k1_50,k1_51,k1_52,k1_53,k1_54,R1_9'
        attenuation = 100
        relays_for_attenuation = 'k1_53,k1_52,k1_50,k1_49'

        self._populate_stack_for_cmd(
            '%s (@%s)\n' % (rf_switch.RfSwitch._CMD_OPEN_RELAYS, ap_relays))
        self._populate_stack_for_cmd('%s (@%s)\n' % (
            rf_switch.RfSwitch._CMD_CLOSE_RELAYS, relays_for_attenuation))
        self.mock_rf_switch.set_attenuation(ap, attenuation)
        self.god.check_playback()

    def test_set_attenuation_all(self):
        """Verify we can set same attenuation to all."""

        # 0 should close all relays
        for x in range(rf_switch.RfSwitch._MAX_ENCLOSURE):
            relays = ','.join(rf_switch.RfSwitch._AP_ATTENUATOR_RELAYS[x + 1])
            reverse_relays = ','.join(
                rf_switch.RfSwitch._AP_ATTENUATOR_RELAYS[x + 1][::-1])
            self._populate_stack_for_cmd(
                '%s (@%s)\n' % (rf_switch.RfSwitch._CMD_OPEN_RELAYS, relays))
            self._populate_stack_for_cmd('%s (@%s)\n' % (
                rf_switch.RfSwitch._CMD_CLOSE_RELAYS, reverse_relays))
        self.mock_rf_switch.set_attenuation(0, 0)

        # 127 should open all (close none)
        for x in range(rf_switch.RfSwitch._MAX_ENCLOSURE):
            relays = ','.join(rf_switch.RfSwitch._AP_ATTENUATOR_RELAYS[x + 1])
            self._populate_stack_for_cmd(
                '%s (@%s)\n' % (rf_switch.RfSwitch._CMD_OPEN_RELAYS, relays))
            self._populate_stack_for_cmd('%s (@)\n' % (
                rf_switch.RfSwitch._CMD_CLOSE_RELAYS))
        self.mock_rf_switch.set_attenuation(0, 127)
        self.god.check_playback()

    def test_set_attenuation_raise_error(self):
        """Verify set_attenuation will raise error."""
        with self.assertRaises(ValueError):
            self.mock_rf_switch.set_attenuation(5, 100)
        with self.assertRaises(ValueError):
            self.mock_rf_switch.set_attenuation(-1, 100)

    def test_connect_ap_client(self):
        """Verify connect_ap_client sets correct relays."""
        relays = 'k1_1,k1_25'
        ap = 1
        client = 1
        self._populate_stack_for_cmd(
            '%s (@%s)\n' % (rf_switch.RfSwitch._CMD_CLOSE_RELAYS, relays))
        self.mock_rf_switch.connect_ap_client(ap, client)
        self.god.check_playback()

if __name__ == '__main__':
    unittest.main()
