# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import os
import logging
import click

from bumble.colors import color
from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.keys import JsonKeyStore
from bumble.smp import AddressResolver
from bumble.device import Advertisement
from bumble.hci import Address, HCI_Constant, HCI_LE_1M_PHY, HCI_LE_CODED_PHY


# -----------------------------------------------------------------------------
def make_rssi_bar(rssi):
    DISPLAY_MIN_RSSI = -105
    DISPLAY_MAX_RSSI = -30
    DEFAULT_RSSI_BAR_WIDTH = 30

    blocks = ['', '▏', '▎', '▍', '▌', '▋', '▊', '▉']
    bar_width = (rssi - DISPLAY_MIN_RSSI) / (DISPLAY_MAX_RSSI - DISPLAY_MIN_RSSI)
    bar_width = min(max(bar_width, 0), 1)
    bar_ticks = int(bar_width * DEFAULT_RSSI_BAR_WIDTH * 8)
    return ('█' * int(bar_ticks / 8)) + blocks[bar_ticks % 8]


# -----------------------------------------------------------------------------
class AdvertisementPrinter:
    def __init__(self, min_rssi, resolver):
        self.min_rssi = min_rssi
        self.resolver = resolver

    def print_advertisement(self, advertisement):
        address = advertisement.address
        address_color = 'yellow' if advertisement.is_connectable else 'red'

        if self.min_rssi is not None and advertisement.rssi < self.min_rssi:
            return

        address_qualifier = ''
        resolution_qualifier = ''
        if self.resolver and advertisement.address.is_resolvable:
            resolved = self.resolver.resolve(advertisement.address)
            if resolved is not None:
                resolution_qualifier = f'(resolved from {advertisement.address})'
                address = resolved

        address_type_string = ('PUBLIC', 'RANDOM', 'PUBLIC_ID', 'RANDOM_ID')[
            address.address_type
        ]
        if address.address_type in (
            Address.RANDOM_IDENTITY_ADDRESS,
            Address.PUBLIC_IDENTITY_ADDRESS,
        ):
            type_color = 'yellow'
        else:
            if address.is_public:
                type_color = 'cyan'
            elif address.is_static:
                type_color = 'green'
                address_qualifier = '(static)'
            elif address.is_resolvable:
                type_color = 'magenta'
                address_qualifier = '(resolvable)'
            else:
                type_color = 'blue'
                address_qualifier = '(non-resolvable)'

        separator = '\n  '
        rssi_bar = make_rssi_bar(advertisement.rssi)
        if not advertisement.is_legacy:
            phy_info = (
                f'PHY: {HCI_Constant.le_phy_name(advertisement.primary_phy)}/'
                f'{HCI_Constant.le_phy_name(advertisement.secondary_phy)} '
                f'{separator}'
            )
        else:
            phy_info = ''

        print(
            f'>>> {color(address, address_color)} '
            f'[{color(address_type_string, type_color)}]{address_qualifier}'
            f'{resolution_qualifier}:{separator}'
            f'{phy_info}'
            f'RSSI:{advertisement.rssi:4} {rssi_bar}{separator}'
            f'{advertisement.data.to_string(separator)}\n'
        )

    def on_advertisement(self, advertisement):
        self.print_advertisement(advertisement)

    def on_advertising_report(self, report):
        print(f'{color("EVENT", "green")}: {report.event_type_string()}')
        self.print_advertisement(Advertisement.from_advertising_report(report))


# -----------------------------------------------------------------------------
async def scan(
    min_rssi,
    passive,
    scan_interval,
    scan_window,
    phy,
    filter_duplicates,
    raw,
    irks,
    keystore_file,
    device_config,
    transport,
):
    print('<<< connecting to HCI...')
    async with await open_transport_or_link(transport) as (hci_source, hci_sink):
        print('<<< connected')

        if device_config:
            device = Device.from_config_file_with_hci(
                device_config, hci_source, hci_sink
            )
        else:
            device = Device.with_hci(
                'Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink
            )

        await device.power_on()

        if keystore_file:
            device.keystore = JsonKeyStore.from_device(device, filename=keystore_file)

        if device.keystore:
            resolving_keys = await device.keystore.get_resolving_keys()
        else:
            resolving_keys = []

        for irk_and_address in irks:
            if ':' not in irk_and_address:
                raise ValueError('invalid IRK:ADDRESS value')
            irk_hex, address_str = irk_and_address.split(':', 1)
            resolving_keys.append(
                (
                    bytes.fromhex(irk_hex),
                    Address(address_str, Address.RANDOM_DEVICE_ADDRESS),
                )
            )

        resolver = AddressResolver(resolving_keys) if resolving_keys else None

        printer = AdvertisementPrinter(min_rssi, resolver)
        if raw:
            device.host.on('advertising_report', printer.on_advertising_report)
        else:
            device.on('advertisement', printer.on_advertisement)

        if phy is None:
            scanning_phys = [HCI_LE_1M_PHY, HCI_LE_CODED_PHY]
        else:
            scanning_phys = [{'1m': HCI_LE_1M_PHY, 'coded': HCI_LE_CODED_PHY}[phy]]

        await device.start_scanning(
            active=(not passive),
            scan_interval=scan_interval,
            scan_window=scan_window,
            filter_duplicates=filter_duplicates,
            scanning_phys=scanning_phys,
        )

        await hci_source.wait_for_termination()


# -----------------------------------------------------------------------------
@click.command()
@click.option('--min-rssi', type=int, help='Minimum RSSI value')
@click.option('--passive', is_flag=True, default=False, help='Perform passive scanning')
@click.option('--scan-interval', type=int, default=60, help='Scan interval')
@click.option('--scan-window', type=int, default=60, help='Scan window')
@click.option(
    '--phy', type=click.Choice(['1m', 'coded']), help='Only scan on the specified PHY'
)
@click.option(
    '--filter-duplicates',
    type=bool,
    default=True,
    help='Filter duplicates at the controller level',
)
@click.option(
    '--raw',
    is_flag=True,
    default=False,
    help='Listen for raw advertising reports instead of processed ones',
)
@click.option(
    '--irk',
    metavar='<IRK_HEX>:<ADDRESS>',
    help=(
        'Use this IRK for resolving private addresses ' '(may be used more than once)'
    ),
    multiple=True,
)
@click.option(
    '--keystore-file',
    metavar='FILE_PATH',
    help='Keystore file to use when resolving addresses',
)
@click.option(
    '--device-config',
    metavar='FILE_PATH',
    help='Device config file for the scanning device',
)
@click.argument('transport')
def main(
    min_rssi,
    passive,
    scan_interval,
    scan_window,
    phy,
    filter_duplicates,
    raw,
    irk,
    keystore_file,
    device_config,
    transport,
):
    logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
    asyncio.run(
        scan(
            min_rssi,
            passive,
            scan_interval,
            scan_window,
            phy,
            filter_duplicates,
            raw,
            irk,
            keystore_file,
            device_config,
            transport,
        )
    )


# -----------------------------------------------------------------------------
if __name__ == '__main__':
    main()
