#!/usr/bin/env python3
#
#  Copyright (c) 2021, The OpenThread Authors.
#  All rights reserved.
#
#  Redistribution and use in source and binary forms, with or without
#  modification, are permitted provided that the following conditions are met:
#  1. Redistributions of source code must retain the above copyright
#     notice, this list of conditions and the following disclaimer.
#  2. Redistributions in binary form must reproduce the above copyright
#     notice, this list of conditions and the following disclaimer in the
#     documentation and/or other materials provided with the distribution.
#  3. Neither the name of the copyright holder nor the
#     names of its contributors may be used to endorse or promote products
#     derived from this software without specific prior written permission.
#
#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
#  POSSIBILITY OF SUCH DAMAGE.
#
import ipaddress
import typing
import unittest

import config
import thread_cert

# Test description:
#
#   This test verifies DNS-SD server and DNS client behavior
#   (browsing for services and/or subtype services, resolving an
#   address, or resolving a service). It also indirectly covers the SRP
#   client and server behavior and the interactions of DNS-SD server
#   with SRP server).
#
# Topology:
#    Four nodes, leader acting as SRP and DNS-SD servers, with 3 router
#    nodes acting as SRP and DNS clients.
#

SERVER = 1
CLIENT1 = 2
CLIENT2 = 3
CLIENT3 = 4

DOMAIN = 'default.service.arpa.'
SERVICE = '_ipps._tcp'


class TestDnssd(thread_cert.TestCase):
    SUPPORT_NCP = False
    USE_MESSAGE_FACTORY = False

    TOPOLOGY = {
        SERVER: {
            'mode': 'rdn',
        },
        CLIENT1: {
            'mode': 'rdn',
        },
        CLIENT2: {
            'mode': 'rdn',
        },
        CLIENT3: {
            'mode': 'rdn',
        }
    }

    def test(self):
        server = self.nodes[SERVER]
        client1 = self.nodes[CLIENT1]
        client2 = self.nodes[CLIENT2]
        client3 = self.nodes[CLIENT3]

        #---------------------------------------------------------------
        # Start the server & client devices.

        server.start()
        self.simulator.go(config.LEADER_STARTUP_DELAY)
        self.assertEqual(server.get_state(), 'leader')
        server.srp_server_set_enabled(True)

        client1.start()
        client2.start()
        client3.start()
        self.simulator.go(config.ROUTER_STARTUP_DELAY)
        self.assertEqual(client1.get_state(), 'router')
        self.assertEqual(client2.get_state(), 'router')
        self.assertEqual(client3.get_state(), 'router')

        #---------------------------------------------------------------
        # Register services on clients

        client1_addrs = [client1.get_mleid(), client1.get_rloc()]
        client2_addrs = [client2.get_mleid(), client2.get_rloc()]
        client3_addrs = [client3.get_mleid(), client2.get_rloc()]

        self._config_srp_client_services(client1, server, 'ins1', 'host1', 11111, 1, 1, client1_addrs, ",_s1,_s2")
        self._config_srp_client_services(client2, server, 'ins2', 'HOST2', 22222, 2, 2, client2_addrs)
        self._config_srp_client_services(client3, server, 'ins3', 'host3', 33333, 3, 3, client3_addrs, ",_S1")

        #---------------------------------------------------------------
        # Resolve address (AAAA records)

        answers = client1.dns_resolve(f"host1.{DOMAIN}".upper(), server.get_mleid(), 53)
        self.assertEqual(set(ipaddress.IPv6Address(ip) for ip, _ in answers),
                         set(map(ipaddress.IPv6Address, client1_addrs)))

        answers = client1.dns_resolve(f"host2.{DOMAIN}", server.get_mleid(), 53)
        self.assertEqual(set(ipaddress.IPv6Address(ip) for ip, _ in answers),
                         set(map(ipaddress.IPv6Address, client2_addrs)))

        #---------------------------------------------------------------
        # Browsing for services

        instance1_verify_info = {
            'port': 11111,
            'priority': 1,
            'weight': 1,
            'host': 'host1.default.service.arpa.',
            'address': client1_addrs,
            'txt_data': '',
            'srv_ttl': lambda x: x > 0,
            'txt_ttl': lambda x: x > 0,
            'aaaa_ttl': lambda x: x > 0,
        }

        instance2_verify_info = {
            'port': 22222,
            'priority': 2,
            'weight': 2,
            'host': 'host2.default.service.arpa.',
            'address': client2_addrs,
            'txt_data': '',
            'srv_ttl': lambda x: x > 0,
            'txt_ttl': lambda x: x > 0,
            'aaaa_ttl': lambda x: x > 0,
        }

        instance3_verify_info = {
            'port': 33333,
            'priority': 3,
            'weight': 3,
            'host': 'host3.default.service.arpa.',
            'address': client3_addrs,
            'txt_data': '',
            'srv_ttl': lambda x: x > 0,
            'txt_ttl': lambda x: x > 0,
            'aaaa_ttl': lambda x: x > 0,
        }

        instance4_verify_info = {
            'port': 44444,
            'priority': 4,
            'weight': 4,
            'host': 'host3.default.service.arpa.',
            'address': client3_addrs,
            'txt_data': 'KEY=414243',  # KEY=ABC
            'srv_ttl': lambda x: x > 0,
            'txt_ttl': lambda x: x > 0,
            'aaaa_ttl': lambda x: x > 0,
        }

        # Browse for main service
        service_instances = client1.dns_browse(f'{SERVICE}.{DOMAIN}'.upper(), server.get_mleid(), 53)
        self.assertEqual({'ins1', 'ins2', 'ins3'}, set(service_instances.keys()))

        # Browse for service sub-type _s1.
        service_instances = client1.dns_browse(f'_s1._sub.{SERVICE}.{DOMAIN}'.upper(), server.get_mleid(), 53)
        self.assertEqual({'ins1', 'ins3'}, set(service_instances.keys()))

        # Browse for service sub-type _s2.
        # Since there is only one matching instance, validate that
        # server included the service info in additional section.
        service_instances = client1.dns_browse(f'_s2._sub.{SERVICE}.{DOMAIN}'.upper(), server.get_mleid(), 53)
        self.assertEqual({'ins1'}, set(service_instances.keys()))
        self._assert_service_instance_equal(service_instances['ins1'], instance1_verify_info)

        #---------------------------------------------------------------
        # Resolve service

        service_instance = client1.dns_resolve_service('ins1', f'{SERVICE}.{DOMAIN}'.upper(), server.get_mleid(), 53)
        self._assert_service_instance_equal(service_instance, instance1_verify_info)

        service_instance = client1.dns_resolve_service('ins2', f'{SERVICE}.{DOMAIN}'.upper(), server.get_mleid(), 53)
        self._assert_service_instance_equal(service_instance, instance2_verify_info)

        service_instance = client1.dns_resolve_service('ins3', f'{SERVICE}.{DOMAIN}'.upper(), server.get_mleid(), 53)
        self._assert_service_instance_equal(service_instance, instance3_verify_info)

        #---------------------------------------------------------------
        # Add another service with TXT entries to the existing host and
        # verify that it is properly merged.

        client3.srp_client_add_service('ins4', (SERVICE + ",_s1").upper(), 44444, 4, 4, txt_entries=['KEY=ABC'])
        self.simulator.go(5)

        service_instances = client1.dns_browse(f'{SERVICE}.{DOMAIN}', server.get_mleid(), 53)
        self.assertEqual({'ins1', 'ins2', 'ins3', 'ins4'}, set(service_instances.keys()))

        service_instance = client1.dns_resolve_service('ins4', f'{SERVICE}.{DOMAIN}'.upper(), server.get_mleid(), 53)
        self._assert_service_instance_equal(service_instance, instance4_verify_info)

    def _assert_service_instance_equal(self, instance, info):
        self.assertEqual(instance['host'].lower(), info['host'].lower(), instance)
        for f in ('port', 'priority', 'weight', 'txt_data'):
            self.assertEqual(instance[f], info[f], instance)

        verify_addresses = info['address']
        if not isinstance(verify_addresses, typing.Collection):
            verify_addresses = [verify_addresses]
        self.assertIn(ipaddress.IPv6Address(instance['address']), map(ipaddress.IPv6Address, verify_addresses),
                      instance)

        for ttl_f in ('srv_ttl', 'txt_ttl', 'aaaa_ttl'):
            check_ttl = info[ttl_f]
            if not callable(check_ttl):
                check_ttl = lambda x: x == check_ttl

            self.assertTrue(check_ttl(instance[ttl_f]), instance)

    def _config_srp_client_services(self,
                                    client,
                                    server,
                                    instancename,
                                    hostname,
                                    port,
                                    priority,
                                    weight,
                                    addrs,
                                    subtypes=''):
        client.srp_client_set_host_name(hostname)
        client.srp_client_set_host_address(*addrs)
        client.srp_client_add_service(instancename, SERVICE + subtypes, port, priority, weight)

        self.simulator.go(5)
        self.assertEqual(client.srp_client_get_host_state(), 'Registered')


if __name__ == '__main__':
    unittest.main()
