#!/usr/bin/python3.4
#
#   Copyright 2017 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import queue
import time

from acts import asserts
from acts.test_decorators import test_tracker_info
from acts_contrib.test_utils.wifi.aware import aware_const as aconsts
from acts_contrib.test_utils.wifi.aware import aware_test_utils as autils
from acts_contrib.test_utils.wifi.aware.AwareBaseTest import AwareBaseTest
from acts_contrib.test_utils.wifi.rtt import rtt_const as rconsts
from acts_contrib.test_utils.wifi.rtt import rtt_test_utils as rutils
from acts_contrib.test_utils.wifi.rtt.RttBaseTest import RttBaseTest


class RangeAwareTest(AwareBaseTest, RttBaseTest):
    """Test class for RTT ranging to Wi-Fi Aware peers"""
    SERVICE_NAME = "GoogleTestServiceXY"

    # Number of RTT iterations
    NUM_ITER = 10

    # Time gap (in seconds) between iterations
    TIME_BETWEEN_ITERATIONS = 0

    # Time gap (in seconds) when switching between Initiator and Responder
    TIME_BETWEEN_ROLES = 4

    # Min and max of the test subscribe range.
    DISTANCE_MIN = 0
    DISTANCE_MAX = 1000000

    def setup_test(self):
        """Manual setup here due to multiple inheritance: explicitly execute the
        setup method from both parents.
        """
        AwareBaseTest.setup_test(self)
        RttBaseTest.setup_test(self)

    def teardown_test(self):
        """Manual teardown here due to multiple inheritance: explicitly execute the
        teardown method from both parents.
        """
        AwareBaseTest.teardown_test(self)
        RttBaseTest.teardown_test(self)

    #############################################################################

    def run_rtt_discovery(self, init_dut, resp_mac=None, resp_peer_id=None):
        """Perform single RTT measurement, using Aware, from the Initiator DUT to
        a Responder. The RTT Responder can be specified using its MAC address
        (obtained using out- of-band discovery) or its Peer ID (using Aware
        discovery).

        Args:
              init_dut: RTT Initiator device
              resp_mac: MAC address of the RTT Responder device
              resp_peer_id: Peer ID of the RTT Responder device
        """
        asserts.assert_true(
            resp_mac is not None or resp_peer_id is not None,
            "One of the Responder specifications (MAC or Peer ID)"
            " must be provided!")
        if resp_mac is not None:
            id = init_dut.droid.wifiRttStartRangingToAwarePeerMac(resp_mac)
        else:
            id = init_dut.droid.wifiRttStartRangingToAwarePeerId(resp_peer_id)
        event_name = rutils.decorate_event(rconsts.EVENT_CB_RANGING_ON_RESULT,
                                           id)
        try:
            event = init_dut.ed.pop_event(event_name, rutils.EVENT_TIMEOUT)
            result = event["data"][rconsts.EVENT_CB_RANGING_KEY_RESULTS][0]
            if resp_mac is not None:
                rutils.validate_aware_mac_result(result, resp_mac, "DUT")
            else:
                rutils.validate_aware_peer_id_result(result, resp_peer_id,
                                                     "DUT")
            return result
        except queue.Empty:
            self.log.warning("Timed-out waiting for %s", event_name)
            return None

    def run_rtt_ib_discovery_set(self, do_both_directions, iter_count,
                                 time_between_iterations, time_between_roles):
        """Perform a set of RTT measurements, using in-band (Aware) discovery.

        Args:
              do_both_directions: False - perform all measurements in one direction,
                                  True - perform 2 measurements one in both directions.
              iter_count: Number of measurements to perform.
              time_between_iterations: Number of seconds to wait between iterations.
              time_between_roles: Number of seconds to wait when switching between
                                  Initiator and Responder roles (only matters if
                                  do_both_directions=True).

        Returns: a list of the events containing the RTT results (or None for a
        failed measurement). If both directions are tested then returns a list of
        2 elements: one set for each direction.
        """
        p_dut = self.android_devices[0]
        s_dut = self.android_devices[1]

        (p_id, s_id, p_disc_id, s_disc_id, peer_id_on_sub,
         peer_id_on_pub) = autils.create_discovery_pair(
             p_dut,
             s_dut,
             p_config=autils.add_ranging_to_pub(
                 autils.create_discovery_config(
                     self.SERVICE_NAME, aconsts.PUBLISH_TYPE_UNSOLICITED),
                 True),
             s_config=autils.add_ranging_to_sub(
                 autils.create_discovery_config(
                     self.SERVICE_NAME, aconsts.SUBSCRIBE_TYPE_PASSIVE),
                 self.DISTANCE_MIN, self.DISTANCE_MAX),
             device_startup_offset=self.device_startup_offset,
             msg_id=self.get_next_msg_id())

        resultsPS = []
        resultsSP = []
        for i in range(iter_count):
            if i != 0 and time_between_iterations != 0:
                time.sleep(time_between_iterations)

            # perform RTT from pub -> sub
            resultsPS.append(
                self.run_rtt_discovery(p_dut, resp_peer_id=peer_id_on_pub))

            if do_both_directions:
                if time_between_roles != 0:
                    time.sleep(time_between_roles)

                # perform RTT from sub -> pub
                resultsSP.append(
                    self.run_rtt_discovery(s_dut, resp_peer_id=peer_id_on_sub))

        return resultsPS if not do_both_directions else [resultsPS, resultsSP]

    def run_rtt_oob_discovery_set(self, do_both_directions, iter_count,
                                  time_between_iterations, time_between_roles):
        """Perform a set of RTT measurements, using out-of-band discovery.

        Args:
              do_both_directions: False - perform all measurements in one direction,
                                  True - perform 2 measurements one in both directions.
              iter_count: Number of measurements to perform.
              time_between_iterations: Number of seconds to wait between iterations.
              time_between_roles: Number of seconds to wait when switching between
                                  Initiator and Responder roles (only matters if
                                  do_both_directions=True).
              enable_ranging: True to enable Ranging, False to disable.

        Returns: a list of the events containing the RTT results (or None for a
        failed measurement). If both directions are tested then returns a list of
        2 elements: one set for each direction.
        """
        dut0 = self.android_devices[0]
        dut1 = self.android_devices[1]

        id0, mac0 = autils.attach_with_identity(dut0)
        id1, mac1 = autils.attach_with_identity(dut1)

        # wait for for devices to synchronize with each other - there are no other
        # mechanisms to make sure this happens for OOB discovery (except retrying
        # to execute the data-path request)
        time.sleep(autils.WAIT_FOR_CLUSTER)

        # start publisher(s) on the Responder(s) with ranging enabled
        p_config = autils.add_ranging_to_pub(
            autils.create_discovery_config(self.SERVICE_NAME,
                                           aconsts.PUBLISH_TYPE_UNSOLICITED),
            enable_ranging=True)
        dut1.droid.wifiAwarePublish(id1, p_config)
        autils.wait_for_event(dut1, aconsts.SESSION_CB_ON_PUBLISH_STARTED)
        if do_both_directions:
            dut0.droid.wifiAwarePublish(id0, p_config)
            autils.wait_for_event(dut0, aconsts.SESSION_CB_ON_PUBLISH_STARTED)

        results01 = []
        results10 = []
        for i in range(iter_count):
            if i != 0 and time_between_iterations != 0:
                time.sleep(time_between_iterations)

            # perform RTT from dut0 -> dut1
            results01.append(self.run_rtt_discovery(dut0, resp_mac=mac1))

            if do_both_directions:
                if time_between_roles != 0:
                    time.sleep(time_between_roles)

                # perform RTT from dut1 -> dut0
                results10.append(self.run_rtt_discovery(dut1, resp_mac=mac0))

        return results01 if not do_both_directions else [results01, results10]

    def verify_results(self, results, results_reverse_direction=None, accuracy_evaluation=False):
        """Verifies the results of the RTT experiment.

        Args:
              results: List of RTT results.
              results_reverse_direction: List of RTT results executed in the
                                        reverse direction. Optional.
              accuracy_evaluation: False - only evaluate success rate.
                                   True - evaluate both success rate and accuracy
                                   default is False.
        """
        stats = rutils.extract_stats(results, self.rtt_reference_distance_mm,
                                     self.rtt_reference_distance_margin_mm,
                                     self.rtt_min_expected_rssi_dbm)
        stats_reverse_direction = None
        if results_reverse_direction is not None:
            stats_reverse_direction = rutils.extract_stats(
                results_reverse_direction, self.rtt_reference_distance_mm,
                self.rtt_reference_distance_margin_mm,
                self.rtt_min_expected_rssi_dbm)
        self.log.debug("Stats: %s", stats)
        if stats_reverse_direction is not None:
            self.log.debug("Stats in reverse direction: %s",
                           stats_reverse_direction)

        extras = stats if stats_reverse_direction is None else {
            "forward": stats,
            "reverse": stats_reverse_direction
        }

        asserts.assert_true(
            stats['num_no_results'] == 0,
            "Missing (timed-out) results",
            extras=extras)
        asserts.assert_false(
            stats['any_lci_mismatch'], "LCI mismatch", extras=extras)
        asserts.assert_false(
            stats['any_lcr_mismatch'], "LCR mismatch", extras=extras)
        asserts.assert_false(
            stats['invalid_num_attempted'],
            "Invalid (0) number of attempts",
            extras=stats)
        asserts.assert_false(
            stats['invalid_num_successful'],
            "Invalid (0) number of successes",
            extras=stats)
        asserts.assert_equal(
            stats['num_invalid_rssi'], 0, "Invalid RSSI", extras=extras)
        asserts.assert_true(
            stats['num_failures'] <=
            self.rtt_max_failure_rate_two_sided_rtt_percentage *
            stats['num_results'] / 100,
            "Failure rate is too high",
            extras=extras)
        if accuracy_evaluation:
            asserts.assert_true(
                stats['num_range_out_of_margin'] <=
                self.rtt_max_margin_exceeded_rate_two_sided_rtt_percentage *
                stats['num_success_results'] / 100,
                "Results exceeding error margin rate is too high",
                extras=extras)

        if stats_reverse_direction is not None:
            asserts.assert_true(
                stats_reverse_direction['num_no_results'] == 0,
                "Missing (timed-out) results",
                extras=extras)
            asserts.assert_false(
                stats_reverse_direction['any_lci_mismatch'], "LCI mismatch", extras=extras)
            asserts.assert_false(
                stats_reverse_direction['any_lcr_mismatch'], "LCR mismatch", extras=extras)
            asserts.assert_equal(
                stats_reverse_direction['num_invalid_rssi'], 0, "Invalid RSSI", extras=extras)
            asserts.assert_true(
                stats_reverse_direction['num_failures'] <=
                self.rtt_max_failure_rate_two_sided_rtt_percentage *
                stats_reverse_direction['num_results'] / 100,
                "Failure rate is too high",
                extras=extras)
            if accuracy_evaluation:
                asserts.assert_true(
                    stats_reverse_direction['num_range_out_of_margin'] <=
                    self.rtt_max_margin_exceeded_rate_two_sided_rtt_percentage *
                    stats_reverse_direction['num_success_results'] / 100,
                    "Results exceeding error margin rate is too high",
                    extras=extras)

        asserts.explicit_pass("RTT Aware test done", extras=extras)

    def run_rtt_with_aware_session_disabled_ranging(self, disable_publish):
        """Try to perform RTT operation when there publish or subscribe disabled ranging.

        Args:
            disable_publish: if true disable ranging on publish config, otherwise disable ranging on
                            subscribe config
        """
        p_dut = self.android_devices[0]
        s_dut = self.android_devices[1]
        pub_config = autils.create_discovery_config(
            self.SERVICE_NAME, aconsts.PUBLISH_TYPE_UNSOLICITED)
        sub_config = autils.create_discovery_config(
            self.SERVICE_NAME, aconsts.SUBSCRIBE_TYPE_PASSIVE)
        if disable_publish:
            sub_config = autils.add_ranging_to_sub(sub_config, self.DISTANCE_MIN, self.DISTANCE_MAX)
        else:
            pub_config = autils.add_ranging_to_pub(pub_config, True)
        (p_id, s_id, p_disc_id, s_disc_id, peer_id_on_sub,
         peer_id_on_pub) = autils.create_discovery_pair(
            p_dut,
            s_dut,
            p_config=pub_config,
            s_config=sub_config,
            device_startup_offset=self.device_startup_offset,
            msg_id=self.get_next_msg_id())
        for i in range(self.NUM_ITER):
            result_sub = self.run_rtt_discovery(s_dut, resp_peer_id=peer_id_on_sub)
            asserts.assert_equal(rconsts.EVENT_CB_RANGING_STATUS_FAIL,
                                 result_sub[rconsts.EVENT_CB_RANGING_KEY_STATUS],
                                 "Ranging to publisher should fail.",
                                 extras={"data": result_sub})

        time.sleep(self.TIME_BETWEEN_ROLES)

        for i in range(self.NUM_ITER):
            result_pub = self.run_rtt_discovery(p_dut, resp_peer_id=peer_id_on_pub)
            asserts.assert_equal(rconsts.EVENT_CB_RANGING_STATUS_FAIL,
                                 result_pub[rconsts.EVENT_CB_RANGING_KEY_STATUS],
                                 "Ranging to subscriber should fail.",
                                 extras={"data": result_pub})

    #############################################################################

    @test_tracker_info(uuid="22edba77-eeb2-43ee-875a-84437550ad84")
    def test_rtt_oob_discovery_both_ways(self):
        """Perform RTT between 2 Wi-Fi Aware devices. Use out-of-band discovery
        to communicate the MAC addresses to the peer. Test RTT both-ways:
        switching rapidly between Initiator and Responder.
        Functionality test: Only evaluate success rate.
        """
        rtt_results1, rtt_results2 = self.run_rtt_oob_discovery_set(
            do_both_directions=True,
            iter_count=self.NUM_ITER,
            time_between_iterations=self.TIME_BETWEEN_ITERATIONS,
            time_between_roles=self.TIME_BETWEEN_ROLES)
        self.verify_results(rtt_results1, rtt_results2)

    @test_tracker_info(uuid="18cef4be-95b4-4f7d-a140-5165874e7d1c")
    def test_rtt_ib_discovery_one_way(self):
        """Perform RTT between 2 Wi-Fi Aware devices. Use in-band (Aware) discovery
        to communicate the MAC addresses to the peer. Test one-direction RTT only.
        Functionality test: Only evaluate success rate.
        """
        rtt_results = self.run_rtt_ib_discovery_set(
            do_both_directions=False,
            iter_count=self.NUM_ITER,
            time_between_iterations=self.TIME_BETWEEN_ITERATIONS,
            time_between_roles=self.TIME_BETWEEN_ROLES)
        self.verify_results(rtt_results)

    @test_tracker_info(uuid="c67c8e70-c417-42d9-9bca-af3a89f1ddd9")
    def test_rtt_ib_discovery_both_ways(self):
        """Perform RTT between 2 Wi-Fi Aware devices. Use in-band (Aware) discovery
        to communicate the MAC addresses to the peer. Test RTT both-ways:
        switching rapidly between Initiator and Responder.
        Functionality test: Only evaluate success rate.
        """
        rtt_results1, rtt_results2 = self.run_rtt_ib_discovery_set(
            do_both_directions=True,
            iter_count=self.NUM_ITER,
            time_between_iterations=self.TIME_BETWEEN_ITERATIONS,
            time_between_roles=self.TIME_BETWEEN_ROLES)
        self.verify_results(rtt_results1, rtt_results2)

    @test_tracker_info(uuid="82f954a5-c0ec-4bc6-8940-3b72199328ac")
    def test_rtt_oob_discovery_both_ways_with_accuracy_evaluation(self):
        """Perform RTT between 2 Wi-Fi Aware devices. Use out-of-band discovery
        to communicate the MAC addresses to the peer. Test RTT both-ways:
        switching rapidly between Initiator and Responder.
        Performance test: evaluate success rate and accuracy.
        """
        rtt_results1, rtt_results2 = self.run_rtt_oob_discovery_set(
            do_both_directions=True,
            iter_count=self.NUM_ITER,
            time_between_iterations=self.TIME_BETWEEN_ITERATIONS,
            time_between_roles=self.TIME_BETWEEN_ROLES)
        self.verify_results(rtt_results1, rtt_results2, accuracy_evaluation=True)

    @test_tracker_info(uuid="4194e00c-ea49-488e-b67f-ad9360daa5fa")
    def test_rtt_ib_discovery_one_way_with_accuracy_evaluation(self):
        """Perform RTT between 2 Wi-Fi Aware devices. Use in-band (Aware) discovery
        to communicate the MAC addresses to the peer. Test one-direction RTT only.
        Performance test: evaluate success rate and accuracy.
        """
        rtt_results = self.run_rtt_ib_discovery_set(
            do_both_directions=False,
            iter_count=self.NUM_ITER,
            time_between_iterations=self.TIME_BETWEEN_ITERATIONS,
            time_between_roles=self.TIME_BETWEEN_ROLES)
        self.verify_results(rtt_results, accuracy_evaluation=True)

    @test_tracker_info(uuid="bea3ac8b-756d-4cd8-8936-b8bfe64676c9")
    def test_rtt_ib_discovery_both_ways_with_accuracy_evaluation(self):
        """Perform RTT between 2 Wi-Fi Aware devices. Use in-band (Aware) discovery
        to communicate the MAC addresses to the peer. Test RTT both-ways:
        switching rapidly between Initiator and Responder.
        Performance test: evaluate success rate and accuracy.
        """
        rtt_results1, rtt_results2 = self.run_rtt_ib_discovery_set(
            do_both_directions=True,
            iter_count=self.NUM_ITER,
            time_between_iterations=self.TIME_BETWEEN_ITERATIONS,
            time_between_roles=self.TIME_BETWEEN_ROLES)
        self.verify_results(rtt_results1, rtt_results2, accuracy_evaluation=True)

    @test_tracker_info(uuid="54f9693d-45e5-4979-adbb-1b875d217c0c")
    def test_rtt_without_initiator_aware(self):
        """Try to perform RTT operation when there is no local Aware session (on the
        Initiator). The Responder is configured normally: Aware on and a Publisher
        with Ranging enable. Should FAIL.
        """
        init_dut = self.android_devices[0]
        resp_dut = self.android_devices[1]

        # Enable a Responder and start a Publisher
        resp_id = resp_dut.droid.wifiAwareAttach(True)
        autils.wait_for_event(resp_dut, aconsts.EVENT_CB_ON_ATTACHED)
        resp_ident_event = autils.wait_for_event(
            resp_dut, aconsts.EVENT_CB_ON_IDENTITY_CHANGED)
        resp_mac = resp_ident_event['data']['mac']

        resp_config = autils.add_ranging_to_pub(
            autils.create_discovery_config(self.SERVICE_NAME,
                                           aconsts.PUBLISH_TYPE_UNSOLICITED),
            enable_ranging=True)
        resp_dut.droid.wifiAwarePublish(resp_id, resp_config)
        autils.wait_for_event(resp_dut, aconsts.SESSION_CB_ON_PUBLISH_STARTED)

        # Initiate an RTT to Responder (no Aware started on Initiator!)
        results = []
        num_no_responses = 0
        num_successes = 0
        for i in range(self.NUM_ITER):
            result = self.run_rtt_discovery(init_dut, resp_mac=resp_mac)
            self.log.debug("result: %s", result)
            results.append(result)
            if result is None:
                num_no_responses = num_no_responses + 1
            elif (result[rconsts.EVENT_CB_RANGING_KEY_STATUS] ==
                  rconsts.EVENT_CB_RANGING_STATUS_SUCCESS):
                num_successes = num_successes + 1

        asserts.assert_equal(
            num_no_responses, 0, "No RTT response?", extras={"data": results})
        asserts.assert_equal(
            num_successes,
            0,
            "Aware RTT w/o Aware should FAIL!",
            extras={"data": results})
        asserts.explicit_pass("RTT Aware test done", extras={"data": results})

    @test_tracker_info(uuid="87a69053-8261-4928-8ec1-c93aac7f3a8d")
    def test_rtt_without_responder_aware(self):
        """Try to perform RTT operation when there is no peer Aware session (on the
        Responder). Should FAIL."""
        init_dut = self.android_devices[0]
        resp_dut = self.android_devices[1]

        # Enable a Responder and start a Publisher
        resp_id = resp_dut.droid.wifiAwareAttach(True)
        autils.wait_for_event(resp_dut, aconsts.EVENT_CB_ON_ATTACHED)
        resp_ident_event = autils.wait_for_event(
            resp_dut, aconsts.EVENT_CB_ON_IDENTITY_CHANGED)
        resp_mac = resp_ident_event['data']['mac']

        resp_config = autils.add_ranging_to_pub(
            autils.create_discovery_config(self.SERVICE_NAME,
                                           aconsts.PUBLISH_TYPE_UNSOLICITED),
            enable_ranging=True)
        resp_dut.droid.wifiAwarePublish(resp_id, resp_config)
        autils.wait_for_event(resp_dut, aconsts.SESSION_CB_ON_PUBLISH_STARTED)

        # Disable Responder
        resp_dut.droid.wifiAwareDestroy(resp_id)

        # Enable the Initiator
        init_id = init_dut.droid.wifiAwareAttach()
        autils.wait_for_event(init_dut, aconsts.EVENT_CB_ON_ATTACHED)

        # Initiate an RTT to Responder (no Aware started on Initiator!)
        results = []
        num_no_responses = 0
        num_successes = 0
        for i in range(self.NUM_ITER):
            result = self.run_rtt_discovery(init_dut, resp_mac=resp_mac)
            self.log.debug("result: %s", result)
            results.append(result)
            if result is None:
                num_no_responses = num_no_responses + 1
            elif (result[rconsts.EVENT_CB_RANGING_KEY_STATUS] ==
                  rconsts.EVENT_CB_RANGING_STATUS_SUCCESS):
                num_successes = num_successes + 1

        asserts.assert_equal(
            num_no_responses, 0, "No RTT response?", extras={"data": results})
        asserts.assert_equal(
            num_successes,
            0,
            "Aware RTT w/o Aware should FAIL!",
            extras={"data": results})
        asserts.explicit_pass("RTT Aware test done", extras={"data": results})

    @test_tracker_info(uuid="80b0f96e-f87d-4dc9-a2b9-fae48558c8d8")
    def test_rtt_with_publish_ranging_disabled(self):
        """
        Try to perform RTT operation when publish ranging disabled, should fail.
        """
        self.run_rtt_with_aware_session_disabled_ranging(True)

    @test_tracker_info(uuid="cb93a902-9b7a-4734-ba81-864878f9fa55")
    def test_rtt_with_subscribe_ranging_disabled(self):
        """
        Try to perform RTT operation when subscribe ranging disabled, should fail.
        """
        self.run_rtt_with_aware_session_disabled_ranging(False)
