#!/usr/bin/env python3
#
# Copyright (C) 2016 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.

import logging
import os
import random
import re
import string
import threading
import time
try:
    import pandas as pd
except ModuleNotFoundError:
    pass
from queue import Empty
from subprocess import call
from acts import asserts
from acts_contrib.test_utils.bt.bt_constants import adv_fail
from acts_contrib.test_utils.bt.bt_constants import adv_succ
from acts_contrib.test_utils.bt.bt_constants import batch_scan_not_supported_list
from acts_contrib.test_utils.bt.bt_constants import batch_scan_result
from acts_contrib.test_utils.bt.bt_constants import bits_per_samples
from acts_contrib.test_utils.bt.bt_constants import ble_advertise_settings_modes
from acts_contrib.test_utils.bt.bt_constants import ble_advertise_settings_tx_powers
from acts_contrib.test_utils.bt.bt_constants import bluetooth_a2dp_codec_config_changed
from acts_contrib.test_utils.bt.bt_constants import bluetooth_off
from acts_contrib.test_utils.bt.bt_constants import bluetooth_on
from acts_contrib.test_utils.bt.bt_constants import \
    bluetooth_profile_connection_state_changed
from acts_contrib.test_utils.bt.bt_constants import bluetooth_socket_conn_test_uuid
from acts_contrib.test_utils.bt.bt_constants import bt_default_timeout
from acts_contrib.test_utils.bt.bt_constants import bt_profile_constants
from acts_contrib.test_utils.bt.bt_constants import bt_profile_states
from acts_contrib.test_utils.bt.bt_constants import bt_rfcomm_uuids
from acts_contrib.test_utils.bt.bt_constants import bt_scan_mode_types
from acts_contrib.test_utils.bt.bt_constants import btsnoop_last_log_path_on_device
from acts_contrib.test_utils.bt.bt_constants import btsnoop_log_path_on_device
from acts_contrib.test_utils.bt.bt_constants import channel_modes
from acts_contrib.test_utils.bt.bt_constants import codec_types
from acts_contrib.test_utils.bt.bt_constants import default_bluetooth_socket_timeout_ms
from acts_contrib.test_utils.bt.bt_constants import default_rfcomm_timeout_ms
from acts_contrib.test_utils.bt.bt_constants import hid_id_keyboard
from acts_contrib.test_utils.bt.bt_constants import pairing_variant_passkey_confirmation
from acts_contrib.test_utils.bt.bt_constants import pan_connect_timeout
from acts_contrib.test_utils.bt.bt_constants import sample_rates
from acts_contrib.test_utils.bt.bt_constants import scan_result
from acts_contrib.test_utils.bt.bt_constants import sig_uuid_constants
from acts_contrib.test_utils.bt.bt_constants import small_timeout
from acts_contrib.test_utils.tel.tel_test_utils import toggle_airplane_mode_by_adb
from acts_contrib.test_utils.tel.tel_test_utils import verify_http_connection
from acts.utils import exe_cmd

from acts import utils

log = logging

advertisements_to_devices = {}


class BtTestUtilsError(Exception):
    pass


def _add_android_device_to_dictionary(android_device, profile_list,
                                      selector_dict):
    """Adds the AndroidDevice and supported features to the selector dictionary

    Args:
        android_device: The Android device.
        profile_list: The list of profiles the Android device supports.
    """
    for profile in profile_list:
        if profile in selector_dict and android_device not in selector_dict[
                profile]:
            selector_dict[profile].append(android_device)
        else:
            selector_dict[profile] = [android_device]


def bluetooth_enabled_check(ad, timeout_sec=5):
    """Checks if the Bluetooth state is enabled, if not it will attempt to
    enable it.

    Args:
        ad: The Android device list to enable Bluetooth on.
        timeout_sec: number of seconds to wait for toggle to take effect.

    Returns:
        True if successful, false if unsuccessful.
    """
    if not ad.droid.bluetoothCheckState():
        ad.droid.bluetoothToggleState(True)
        expected_bluetooth_on_event_name = bluetooth_on
        try:
            ad.ed.pop_event(expected_bluetooth_on_event_name,
                            bt_default_timeout)
        except Empty:
            ad.log.info("Failed to toggle Bluetooth on(no broadcast received).")
            # Try one more time to poke at the actual state.
            if ad.droid.bluetoothCheckState():
                ad.log.info(".. actual state is ON")
                return True
            ad.log.error(".. actual state is OFF")
            return False
    end_time = time.time() + timeout_sec
    while not ad.droid.bluetoothCheckState() and time.time() < end_time:
        time.sleep(1)
    return ad.droid.bluetoothCheckState()


def check_device_supported_profiles(droid):
    """Checks for Android device supported profiles.

    Args:
        droid: The droid object to query.

    Returns:
        A dictionary of supported profiles.
    """
    profile_dict = {}
    profile_dict['hid'] = droid.bluetoothHidIsReady()
    profile_dict['hsp'] = droid.bluetoothHspIsReady()
    profile_dict['a2dp'] = droid.bluetoothA2dpIsReady()
    profile_dict['avrcp'] = droid.bluetoothAvrcpIsReady()
    profile_dict['a2dp_sink'] = droid.bluetoothA2dpSinkIsReady()
    profile_dict['hfp_client'] = droid.bluetoothHfpClientIsReady()
    profile_dict['pbap_client'] = droid.bluetoothPbapClientIsReady()
    return profile_dict


def cleanup_scanners_and_advertisers(scn_android_device, scn_callback_list,
                                     adv_android_device, adv_callback_list):
    """Try to gracefully stop all scanning and advertising instances.

    Args:
        scn_android_device: The Android device that is actively scanning.
        scn_callback_list: The scan callback id list that needs to be stopped.
        adv_android_device: The Android device that is actively advertising.
        adv_callback_list: The advertise callback id list that needs to be
            stopped.
    """
    scan_droid, scan_ed = scn_android_device.droid, scn_android_device.ed
    adv_droid = adv_android_device.droid
    try:
        for scan_callback in scn_callback_list:
            scan_droid.bleStopBleScan(scan_callback)
    except Exception as err:
        scn_android_device.log.debug(
            "Failed to stop LE scan... reseting Bluetooth. Error {}".format(
                err))
        reset_bluetooth([scn_android_device])
    try:
        for adv_callback in adv_callback_list:
            adv_droid.bleStopBleAdvertising(adv_callback)
    except Exception as err:
        adv_android_device.log.debug(
            "Failed to stop LE advertisement... reseting Bluetooth. Error {}".
            format(err))
        reset_bluetooth([adv_android_device])


def clear_bonded_devices(ad):
    """Clear bonded devices from the input Android device.

    Args:
        ad: the Android device performing the connection.
    Returns:
        True if clearing bonded devices was successful, false if unsuccessful.
    """
    bonded_device_list = ad.droid.bluetoothGetBondedDevices()
    while bonded_device_list:
        device_address = bonded_device_list[0]['address']
        if not ad.droid.bluetoothUnbond(device_address):
            log.error("Failed to unbond {} from {}".format(
                device_address, ad.serial))
            return False
        log.info("Successfully unbonded {} from {}".format(
            device_address, ad.serial))
        #TODO: wait for BOND_STATE_CHANGED intent instead of waiting
        time.sleep(1)

        # If device was first connected using LE transport, after bonding it is
        # accessible through it's LE address, and through it classic address.
        # Unbonding it will unbond two devices representing different
        # "addresses". Attempt to unbond such already unbonded devices will
        # result in bluetoothUnbond returning false.
        bonded_device_list = ad.droid.bluetoothGetBondedDevices()
    return True


def connect_phone_to_headset(android,
                             headset,
                             timeout=bt_default_timeout,
                             connection_check_period=10):
    """Connects android phone to bluetooth headset.
    Headset object must have methods power_on and enter_pairing_mode,
    and attribute mac_address.

    Args:
        android: AndroidDevice object with SL4A installed.
        headset: Object with attribute mac_address and methods power_on and
            enter_pairing_mode.
        timeout: Seconds to wait for devices to connect.
        connection_check_period: how often to check for connection once the
            SL4A connect RPC has been sent.
    Returns:
        connected (bool): True if devices are paired and connected by end of
        method. False otherwise.
    """
    headset_mac_address = headset.mac_address
    connected = android.droid.audioIsBluetoothA2dpOn()
    log.info('Devices connected before pair attempt: %s' % connected)
    if not connected:
        # Turn on headset and initiate pairing mode.
        headset.enter_pairing_mode()
        android.droid.bluetoothStartPairingHelper()
    start_time = time.time()
    # If already connected, skip pair and connect attempt.
    while not connected and (time.time() - start_time < timeout):
        bonded_info = android.droid.bluetoothGetBondedDevices()
        connected_info = android.droid.bluetoothGetConnectedDevices()
        if headset.mac_address not in [info["address"] for info in bonded_info]:
            # Use SL4A to pair and connect with headset.
            headset.enter_pairing_mode()
            android.droid.bluetoothDiscoverAndBond(headset_mac_address)
        elif headset.mac_address not in [
                info["address"] for info in connected_info
        ]:
            #Device is bonded but not connected
            android.droid.bluetoothConnectBonded(headset_mac_address)
        else:
            #Headset is connected, but A2DP profile is not
            android.droid.bluetoothA2dpConnect(headset_mac_address)
        log.info('Waiting for connection...')
        time.sleep(connection_check_period)
        # Check for connection.
        connected = android.droid.audioIsBluetoothA2dpOn()
    log.info('Devices connected after pair attempt: %s' % connected)
    return connected


def connect_pri_to_sec(pri_ad, sec_ad, profiles_set, attempts=2):
    """Connects pri droid to secondary droid.

    Args:
        pri_ad: AndroidDroid initiating connection
        sec_ad: AndroidDroid accepting connection
        profiles_set: Set of profiles to be connected
        attempts: Number of attempts to try until failure.

    Returns:
        Pass if True
        Fail if False
    """
    device_addr = sec_ad.droid.bluetoothGetLocalAddress()
    # Allows extra time for the SDP records to be updated.
    time.sleep(2)
    curr_attempts = 0
    while curr_attempts < attempts:
        log.info("connect_pri_to_sec curr attempt {} total {}".format(
            curr_attempts, attempts))
        if _connect_pri_to_sec(pri_ad, sec_ad, profiles_set):
            return True
        curr_attempts += 1
    log.error("connect_pri_to_sec failed to connect after {} attempts".format(
        attempts))
    return False


def _connect_pri_to_sec(pri_ad, sec_ad, profiles_set):
    """Connects pri droid to secondary droid.

    Args:
        pri_ad: AndroidDroid initiating connection.
        sec_ad: AndroidDroid accepting connection.
        profiles_set: Set of profiles to be connected.

    Returns:
        True of connection is successful, false if unsuccessful.
    """
    # Check if we support all profiles.
    supported_profiles = bt_profile_constants.values()
    for profile in profiles_set:
        if profile not in supported_profiles:
            pri_ad.log.info("Profile {} is not supported list {}".format(
                profile, supported_profiles))
            return False

    # First check that devices are bonded.
    paired = False
    for paired_device in pri_ad.droid.bluetoothGetBondedDevices():
        if paired_device['address'] == \
                sec_ad.droid.bluetoothGetLocalAddress():
            paired = True
            break

    if not paired:
        pri_ad.log.error("Not paired to {}".format(sec_ad.serial))
        return False

    # Now try to connect them, the following call will try to initiate all
    # connections.
    pri_ad.droid.bluetoothConnectBonded(sec_ad.droid.bluetoothGetLocalAddress())

    end_time = time.time() + 10
    profile_connected = set()
    sec_addr = sec_ad.droid.bluetoothGetLocalAddress()
    pri_ad.log.info("Profiles to be connected {}".format(profiles_set))
    # First use APIs to check profile connection state
    while (time.time() < end_time and
           not profile_connected.issuperset(profiles_set)):
        if (bt_profile_constants['headset_client'] not in profile_connected and
                bt_profile_constants['headset_client'] in profiles_set):
            if is_hfp_client_device_connected(pri_ad, sec_addr):
                profile_connected.add(bt_profile_constants['headset_client'])
        if (bt_profile_constants['a2dp'] not in profile_connected and
                bt_profile_constants['a2dp'] in profiles_set):
            if is_a2dp_src_device_connected(pri_ad, sec_addr):
                profile_connected.add(bt_profile_constants['a2dp'])
        if (bt_profile_constants['a2dp_sink'] not in profile_connected and
                bt_profile_constants['a2dp_sink'] in profiles_set):
            if is_a2dp_snk_device_connected(pri_ad, sec_addr):
                profile_connected.add(bt_profile_constants['a2dp_sink'])
        if (bt_profile_constants['map_mce'] not in profile_connected and
                bt_profile_constants['map_mce'] in profiles_set):
            if is_map_mce_device_connected(pri_ad, sec_addr):
                profile_connected.add(bt_profile_constants['map_mce'])
        if (bt_profile_constants['map'] not in profile_connected and
                bt_profile_constants['map'] in profiles_set):
            if is_map_mse_device_connected(pri_ad, sec_addr):
                profile_connected.add(bt_profile_constants['map'])
        time.sleep(0.1)
    # If APIs fail, try to find the connection broadcast receiver.
    while not profile_connected.issuperset(profiles_set):
        try:
            profile_event = pri_ad.ed.pop_event(
                bluetooth_profile_connection_state_changed,
                bt_default_timeout + 10)
            pri_ad.log.info("Got event {}".format(profile_event))
        except Exception:
            pri_ad.log.error("Did not get {} profiles left {}".format(
                bluetooth_profile_connection_state_changed, profile_connected))
            return False

        profile = profile_event['data']['profile']
        state = profile_event['data']['state']
        device_addr = profile_event['data']['addr']
        if state == bt_profile_states['connected'] and \
                device_addr == sec_ad.droid.bluetoothGetLocalAddress():
            profile_connected.add(profile)
        pri_ad.log.info(
            "Profiles connected until now {}".format(profile_connected))
    # Failure happens inside the while loop. If we came here then we already
    # connected.
    return True


def determine_max_advertisements(android_device):
    """Determines programatically how many advertisements the Android device
    supports.

    Args:
        android_device: The Android device to determine max advertisements of.

    Returns:
        The maximum advertisement count.
    """
    android_device.log.info(
        "Determining number of maximum concurrent advertisements...")
    advertisement_count = 0
    bt_enabled = False
    expected_bluetooth_on_event_name = bluetooth_on
    if not android_device.droid.bluetoothCheckState():
        android_device.droid.bluetoothToggleState(True)
    try:
        android_device.ed.pop_event(expected_bluetooth_on_event_name,
                                    bt_default_timeout)
    except Exception:
        android_device.log.info(
            "Failed to toggle Bluetooth on(no broadcast received).")
        # Try one more time to poke at the actual state.
        if android_device.droid.bluetoothCheckState() is True:
            android_device.log.info(".. actual state is ON")
        else:
            android_device.log.error(
                "Failed to turn Bluetooth on. Setting default advertisements to 1"
            )
            advertisement_count = -1
            return advertisement_count
    advertise_callback_list = []
    advertise_data = android_device.droid.bleBuildAdvertiseData()
    advertise_settings = android_device.droid.bleBuildAdvertiseSettings()
    while (True):
        advertise_callback = android_device.droid.bleGenBleAdvertiseCallback()
        advertise_callback_list.append(advertise_callback)

        android_device.droid.bleStartBleAdvertising(advertise_callback,
                                                    advertise_data,
                                                    advertise_settings)

        regex = "(" + adv_succ.format(
            advertise_callback) + "|" + adv_fail.format(
                advertise_callback) + ")"
        # wait for either success or failure event
        evt = android_device.ed.pop_events(regex, bt_default_timeout,
                                           small_timeout)
        if evt[0]["name"] == adv_succ.format(advertise_callback):
            advertisement_count += 1
            android_device.log.info(
                "Advertisement {} started.".format(advertisement_count))
        else:
            error = evt[0]["data"]["Error"]
            if error == "ADVERTISE_FAILED_TOO_MANY_ADVERTISERS":
                android_device.log.info(
                    "Advertisement failed to start. Reached max " +
                    "advertisements at {}".format(advertisement_count))
                break
            else:
                raise BtTestUtilsError(
                    "Expected ADVERTISE_FAILED_TOO_MANY_ADVERTISERS," +
                    " but received bad error code {}".format(error))
    try:
        for adv in advertise_callback_list:
            android_device.droid.bleStopBleAdvertising(adv)
    except Exception:
        android_device.log.error(
            "Failed to stop advertisingment, resetting Bluetooth.")
        reset_bluetooth([android_device])
    return advertisement_count


def disable_bluetooth(droid):
    """Disable Bluetooth on input Droid object.

    Args:
        droid: The droid object to disable Bluetooth on.

    Returns:
        True if successful, false if unsuccessful.
    """
    if droid.bluetoothCheckState() is True:
        droid.bluetoothToggleState(False)
        if droid.bluetoothCheckState() is True:
            log.error("Failed to toggle Bluetooth off.")
            return False
    return True


def disconnect_pri_from_sec(pri_ad, sec_ad, profiles_list):
    """
    Disconnect primary from secondary on a specific set of profiles
    Args:
        pri_ad - Primary android_device initiating disconnection
        sec_ad - Secondary android droid (sl4a interface to keep the
          method signature the same connect_pri_to_sec above)
        profiles_list - List of profiles we want to disconnect from

    Returns:
        True on Success
        False on Failure
    """
    # Sanity check to see if all the profiles in the given set is supported
    supported_profiles = bt_profile_constants.values()
    for profile in profiles_list:
        if profile not in supported_profiles:
            pri_ad.log.info("Profile {} is not in supported list {}".format(
                profile, supported_profiles))
            return False

    pri_ad.log.info(pri_ad.droid.bluetoothGetBondedDevices())
    # Disconnecting on a already disconnected profile is a nop,
    # so not checking for the connection state
    try:
        pri_ad.droid.bluetoothDisconnectConnectedProfile(
            sec_ad.droid.bluetoothGetLocalAddress(), profiles_list)
    except Exception as err:
        pri_ad.log.error(
            "Exception while trying to disconnect profile(s) {}: {}".format(
                profiles_list, err))
        return False

    profile_disconnected = set()
    pri_ad.log.info("Disconnecting from profiles: {}".format(profiles_list))

    while not profile_disconnected.issuperset(profiles_list):
        try:
            profile_event = pri_ad.ed.pop_event(
                bluetooth_profile_connection_state_changed, bt_default_timeout)
            pri_ad.log.info("Got event {}".format(profile_event))
        except Exception as e:
            pri_ad.log.error(
                "Did not disconnect from Profiles. Reason {}".format(e))
            return False

        profile = profile_event['data']['profile']
        state = profile_event['data']['state']
        device_addr = profile_event['data']['addr']

        if state == bt_profile_states['disconnected'] and \
                device_addr == sec_ad.droid.bluetoothGetLocalAddress():
            profile_disconnected.add(profile)
        pri_ad.log.info(
            "Profiles disconnected so far {}".format(profile_disconnected))

    return True


def enable_bluetooth(droid, ed):
    if droid.bluetoothCheckState() is True:
        return True

    droid.bluetoothToggleState(True)
    expected_bluetooth_on_event_name = bluetooth_on
    try:
        ed.pop_event(expected_bluetooth_on_event_name, bt_default_timeout)
    except Exception:
        log.info("Failed to toggle Bluetooth on (no broadcast received)")
        if droid.bluetoothCheckState() is True:
            log.info(".. actual state is ON")
            return True
        log.info(".. actual state is OFF")
        return False

    return True


def factory_reset_bluetooth(android_devices):
    """Clears Bluetooth stack of input Android device list.

        Args:
            android_devices: The Android device list to reset Bluetooth

        Returns:
            True if successful, false if unsuccessful.
        """
    for a in android_devices:
        droid, ed = a.droid, a.ed
        a.log.info("Reset state of bluetooth on device.")
        if not bluetooth_enabled_check(a):
            return False
        # TODO: remove device unbond b/79418045
        # Temporary solution to ensure all devices are unbonded
        bonded_devices = droid.bluetoothGetBondedDevices()
        for b in bonded_devices:
            a.log.info("Removing bond for device {}".format(b['address']))
            droid.bluetoothUnbond(b['address'])

        droid.bluetoothFactoryReset()
        wait_for_bluetooth_manager_state(droid)
        if not enable_bluetooth(droid, ed):
            return False
    return True


def generate_ble_advertise_objects(droid):
    """Generate generic LE advertise objects.

    Args:
        droid: The droid object to generate advertise LE objects from.

    Returns:
        advertise_callback: The generated advertise callback id.
        advertise_data: The generated advertise data id.
        advertise_settings: The generated advertise settings id.
    """
    advertise_callback = droid.bleGenBleAdvertiseCallback()
    advertise_data = droid.bleBuildAdvertiseData()
    advertise_settings = droid.bleBuildAdvertiseSettings()
    return advertise_callback, advertise_data, advertise_settings


def generate_ble_scan_objects(droid):
    """Generate generic LE scan objects.

    Args:
        droid: The droid object to generate LE scan objects from.

    Returns:
        filter_list: The generated scan filter list id.
        scan_settings: The generated scan settings id.
        scan_callback: The generated scan callback id.
    """
    filter_list = droid.bleGenFilterList()
    scan_settings = droid.bleBuildScanSetting()
    scan_callback = droid.bleGenScanCallback()
    return filter_list, scan_settings, scan_callback


def generate_id_by_size(size,
                        chars=(string.ascii_lowercase + string.ascii_uppercase +
                               string.digits)):
    """Generate random ascii characters of input size and input char types

    Args:
        size: Input size of string.
        chars: (Optional) Chars to use in generating a random string.

    Returns:
        String of random input chars at the input size.
    """
    return ''.join(random.choice(chars) for _ in range(size))


def get_advanced_droid_list(android_devices):
    """Add max_advertisement and batch_scan_supported attributes to input
    Android devices

    This will programatically determine maximum LE advertisements of each
    input Android device.

    Args:
        android_devices: The Android devices to setup.

    Returns:
        List of Android devices with new attribtues.
    """
    droid_list = []
    for a in android_devices:
        d, e = a.droid, a.ed
        model = d.getBuildModel()
        max_advertisements = 1
        batch_scan_supported = True
        if model in advertisements_to_devices.keys():
            max_advertisements = advertisements_to_devices[model]
        else:
            max_advertisements = determine_max_advertisements(a)
            max_tries = 3
            # Retry to calculate max advertisements
            while max_advertisements == -1 and max_tries > 0:
                a.log.info(
                    "Attempts left to determine max advertisements: {}".format(
                        max_tries))
                max_advertisements = determine_max_advertisements(a)
                max_tries -= 1
            advertisements_to_devices[model] = max_advertisements
        if model in batch_scan_not_supported_list:
            batch_scan_supported = False
        role = {
            'droid': d,
            'ed': e,
            'max_advertisements': max_advertisements,
            'batch_scan_supported': batch_scan_supported
        }
        droid_list.append(role)
    return droid_list


def get_bluetooth_crash_count(android_device):
    out = android_device.adb.shell("dumpsys bluetooth_manager")
    return int(re.search("crashed(.*\d)", out).group(1))


def read_otp(ad):
    """Reads and parses the OTP output to return TX power backoff

    Reads the OTP registers from the phone, parses them to return a
    dict of TX power backoffs for different power levels

    Args:
        ad : android device object

    Returns :
        otp_dict : power backoff dict
    """

    ad.adb.shell('svc bluetooth disable')
    time.sleep(2)
    otp_output = ad.adb.shell('bluetooth_sar_test -r')
    ad.adb.shell('svc bluetooth enable')
    time.sleep(2)
    otp_dict = {
        "BR": {
            "10": 0,
            "9": 0,
            "8": 0
        },
        "EDR": {
            "10": 0,
            "9": 0,
            "8": 0
        },
        "BLE": {
            "10": 0,
            "9": 0,
            "8": 0
        }
    }

    otp_regex = '\s+\[\s+PL10:\s+(\d+)\s+PL9:\s+(\d+)*\s+PL8:\s+(\d+)\s+\]'

    for key in otp_dict:
        bank_list = re.findall("{}{}".format(key, otp_regex), otp_output)
        for bank_tuple in bank_list:
            if ('0', '0', '0') != bank_tuple:
                [otp_dict[key]["10"], otp_dict[key]["9"],
                 otp_dict[key]["8"]] = bank_tuple
    return otp_dict


def get_bt_metric(ad_list,
                  duration=1,
                  bqr_tag='Monitoring , Handle:',
                  tag='',
                  log_path=False):
    """ Function to get the bt metric from logcat.

    Captures logcat for the specified duration and returns the bqr results.
    Takes list of android objects as input. If a single android object is given,
    converts it into a list.

    Args:
        ad_list: list of android_device objects
        duration: time duration (seconds) for which the logcat is parsed
        bqr_tag: tag of bt metrics
        tag: tag to be appended to the metrics raw data
        log_path: path of metrics raw data

    Returns:
        process_data: dict of process raw data for each android devices
    """

    # Defining bqr quantites and their regex to extract
    regex_dict = {
        "pwlv": "PwLv:\s(\S+)",
        "rssi": "RSSI:\s[-](\d+)",
        "rssi_c0": "RSSI_C0:\s[-](\d+)",
        "rssi_c1": "RSSI_C1:\s[-](\d+)",
        "txpw_c0": "\sTxPw_C0:\s(-?\d+)",
        "txpw_c1": "\sTxPw_C1:\s(-?\d+)",
        "bftx": "BFTx:\s(\w+)",
        "divtx": "DivTx:\s(\w+)"
    }
    metrics_dict = {
        "rssi": {},
        "pwlv": {},
        "rssi_c0": {},
        "rssi_c1": {},
        "txpw_c0": {},
        "txpw_c1": {},
        "bftx": {},
        "divtx": {}
    }

    # Converting a single android device object to list
    if not isinstance(ad_list, list):
        ad_list = [ad_list]

    #Time sync with the test machine
    for ad in ad_list:
        ad.droid.setTime(int(round(time.time() * 1000)))
        time.sleep(0.5)

    begin_time = utils.get_current_epoch_time()
    time.sleep(duration)
    end_time = utils.get_current_epoch_time()

    for ad in ad_list:
        bt_rssi_log = ad.cat_adb_log(tag + "_bt_metric", begin_time, end_time)

        # Extracting supporting bqr quantities
        for metric, regex in regex_dict.items():
            bqr_metric = []
            file_bt_log = open(bt_rssi_log, "r")
            for line in file_bt_log:
                if bqr_tag in line:
                    if re.findall(regex, line):
                        m = re.findall(regex, line)[0].strip(",")
                        bqr_metric.append(m)
            metrics_dict[metric][ad.serial] = bqr_metric
            file_bt_log.close()

        # Formatting and saving the raw data
        metrics_to_be_formatted = [{
            "name": "rssi",
            "averagble": "y"
        }, {
            "name": "rssi_c0",
            "averagble": "y"
        }, {
            "name": "rssi_c1",
            "averagble": "y"
        }, {
            "name": "pwlv",
            "averagble": "n"
        }, {
            "name": "txpw_c0",
            "averagble": "n"
        }, {
            "name": "txpw_c1",
            "averagble": "n"
        }, {
            "name": "bftx",
            "averagble": "n"
        }, {
            "name": "divtx",
            "averagble": "n"
        }]
        for metric in metrics_to_be_formatted:
            if metric["averagble"] == "y":
                metrics_dict[metric["name"]][ad.serial] = [
                    (-1) * int(x)
                    for x in metrics_dict[metric["name"]][ad.serial]
                ]
            else:
                metrics_dict[metric["name"]][ad.serial] = [
                    int(x, 16) if '0x' in x else int(x, 10)
                    for x in metrics_dict[metric["name"]][ad.serial]
                ]
        # Saving metrics raw data for each attenuation
        if log_path:
            output_file_name = ad.serial + "_metrics_raw_data_" + tag + ".csv"
            output_file = os.path.join(log_path, output_file_name)
            os.makedirs(log_path, exist_ok=True)
            df_save_metrics = {}
            for item in metrics_dict.items():
                df_save_metrics[item[0]] = next(iter(item[1].items()))[1]
            MetricsDict_df = pd.DataFrame({key:pd.Series(value) for key, value in df_save_metrics.items()})
            MetricsDict_df.to_csv(output_file)
        # Defining the process_data_dict
        process_data = {
            "rssi": {},
            "pwlv": {},
            "rssi_c0": {},
            "rssi_c1": {},
            "txpw_c0": {},
            "txpw_c1": {},
            "bftx": {},
            "divtx": {}
        }

        # Computing and returning the raw data
        for metric in metrics_to_be_formatted:
            if metric["averagble"] == "y":
                process_data[metric["name"]][ad.serial] = [
                    x for x in metrics_dict[metric["name"]][ad.serial]
                    if x != 0 and x != -127
                ]

                try:
                    #DOING AVERAGE
                    process_data[metric["name"]][ad.serial] = round(
                        sum(metrics_dict[metric["name"]][ad.serial]) /
                        len(metrics_dict[metric["name"]][ad.serial]), 2)
                except ZeroDivisionError:
                    #SETTING VALUE TO 'n/a'
                    process_data[metric["name"]][ad.serial] = "n/a"
            else:
                try:
                    #GETTING MOST_COMMON_VALUE
                    process_data[metric["name"]][ad.serial] = max(
                        metrics_dict[metric["name"]][ad.serial],
                        key=metrics_dict[metric["name"]][ad.serial].count)
                except ValueError:
                    #SETTING VALUE TO 'n/a'
                    process_data[metric["name"]][ad.serial] = "n/a"

    return process_data


def get_bt_rssi(ad, duration=1, processed=True, tag='', log_path=False):
    """Function to get average bt rssi from logcat.

    This function returns the average RSSI for the given duration. RSSI values are
    extracted from BQR.

    Args:
        ad: (list of) android_device object.
        duration: time duration(seconds) for which logcat is parsed.

    Returns:
        avg_rssi: average RSSI on each android device for the given duration.
    """
    bqr_results = get_bt_metric(ad, duration, tag=tag, log_path=log_path)
    return bqr_results["rssi"]


def enable_bqr(
    ad_list,
    bqr_interval=10,
    bqr_event_mask=15,
):
    """Sets up BQR reporting.

       Sets up BQR to report BT metrics at the requested frequency and toggles
       airplane mode for the bqr settings to take effect.

    Args:
        ad_list: an android_device or list of android devices.
    """
    # Converting a single android device object to list
    if not isinstance(ad_list, list):
        ad_list = [ad_list]

    for ad in ad_list:
        #Setting BQR parameters
        ad.adb.shell("setprop persist.bluetooth.bqr.event_mask {}".format(
            bqr_event_mask))
        ad.adb.shell("setprop persist.bluetooth.bqr.min_interval_ms {}".format(
            bqr_interval))

        ## Toggle airplane mode
        ad.droid.connectivityToggleAirplaneMode(True)
        ad.droid.connectivityToggleAirplaneMode(False)


def disable_bqr(ad_list):
    """Disables BQR reporting.

    Args:
        ad_list: an android_device or list of android devices.
    """
    # Converting a single android device object to list
    if not isinstance(ad_list, list):
        ad_list = [ad_list]

    DISABLE_BQR_MASK = 0

    for ad in ad_list:
        #Disabling BQR
        ad.adb.shell("setprop persist.bluetooth.bqr.event_mask {}".format(
            DISABLE_BQR_MASK))

        ## Toggle airplane mode
        ad.droid.connectivityToggleAirplaneMode(True)
        ad.droid.connectivityToggleAirplaneMode(False)


def get_device_selector_dictionary(android_device_list):
    """Create a dictionary of Bluetooth features vs Android devices.

    Args:
        android_device_list: The list of Android devices.
    Returns:
        A dictionary of profiles/features to Android devices.
    """
    selector_dict = {}
    for ad in android_device_list:
        uuids = ad.droid.bluetoothGetLocalUuids()

        for profile, uuid_const in sig_uuid_constants.items():
            uuid_check = sig_uuid_constants['BASE_UUID'].format(
                uuid_const).lower()
            if uuids and uuid_check in uuids:
                if profile in selector_dict:
                    selector_dict[profile].append(ad)
                else:
                    selector_dict[profile] = [ad]

        # Various services may not be active during BT startup.
        # If the device can be identified through adb shell pm list features
        # then try to add them to the appropriate profiles / features.

        # Android TV.
        if "feature:android.hardware.type.television" in ad.features:
            ad.log.info("Android TV device found.")
            supported_profiles = ['AudioSink']
            _add_android_device_to_dictionary(ad, supported_profiles,
                                              selector_dict)

        # Android Auto
        elif "feature:android.hardware.type.automotive" in ad.features:
            ad.log.info("Android Auto device found.")
            # Add: AudioSink , A/V_RemoteControl,
            supported_profiles = [
                'AudioSink', 'A/V_RemoteControl', 'Message Notification Server'
            ]
            _add_android_device_to_dictionary(ad, supported_profiles,
                                              selector_dict)
        # Android Wear
        elif "feature:android.hardware.type.watch" in ad.features:
            ad.log.info("Android Wear device found.")
            supported_profiles = []
            _add_android_device_to_dictionary(ad, supported_profiles,
                                              selector_dict)
        # Android Phone
        elif "feature:android.hardware.telephony" in ad.features:
            ad.log.info("Android Phone device found.")
            # Add: AudioSink
            supported_profiles = [
                'AudioSource', 'A/V_RemoteControlTarget',
                'Message Access Server'
            ]
            _add_android_device_to_dictionary(ad, supported_profiles,
                                              selector_dict)
    return selector_dict


def get_mac_address_of_generic_advertisement(scan_ad, adv_ad):
    """Start generic advertisement and get it's mac address by LE scanning.

    Args:
        scan_ad: The Android device to use as the scanner.
        adv_ad: The Android device to use as the advertiser.

    Returns:
        mac_address: The mac address of the advertisement.
        advertise_callback: The advertise callback id of the active
            advertisement.
    """
    adv_ad.droid.bleSetAdvertiseDataIncludeDeviceName(True)
    adv_ad.droid.bleSetAdvertiseSettingsAdvertiseMode(
        ble_advertise_settings_modes['low_latency'])
    adv_ad.droid.bleSetAdvertiseSettingsIsConnectable(True)
    adv_ad.droid.bleSetAdvertiseSettingsTxPowerLevel(
        ble_advertise_settings_tx_powers['high'])
    advertise_callback, advertise_data, advertise_settings = (
        generate_ble_advertise_objects(adv_ad.droid))
    adv_ad.droid.bleStartBleAdvertising(advertise_callback, advertise_data,
                                        advertise_settings)
    try:
        adv_ad.ed.pop_event(adv_succ.format(advertise_callback),
                            bt_default_timeout)
    except Empty as err:
        raise BtTestUtilsError(
            "Advertiser did not start successfully {}".format(err))
    filter_list = scan_ad.droid.bleGenFilterList()
    scan_settings = scan_ad.droid.bleBuildScanSetting()
    scan_callback = scan_ad.droid.bleGenScanCallback()
    scan_ad.droid.bleSetScanFilterDeviceName(
        adv_ad.droid.bluetoothGetLocalName())
    scan_ad.droid.bleBuildScanFilter(filter_list)
    scan_ad.droid.bleStartBleScan(filter_list, scan_settings, scan_callback)
    try:
        event = scan_ad.ed.pop_event(
            "BleScan{}onScanResults".format(scan_callback), bt_default_timeout)
    except Empty as err:
        raise BtTestUtilsError(
            "Scanner did not find advertisement {}".format(err))
    mac_address = event['data']['Result']['deviceInfo']['address']
    return mac_address, advertise_callback, scan_callback


def hid_device_send_key_data_report(host_id, device_ad, key, interval=1):
    """Send a HID report simulating a 1-second keyboard press from host_ad to
    device_ad

    Args:
        host_id: the Bluetooth MAC address or name of the HID host
        device_ad: HID device
        key: the key we want to send
        interval: the interval between key press and key release
    """
    device_ad.droid.bluetoothHidDeviceSendReport(host_id, hid_id_keyboard,
                                                 hid_keyboard_report(key))
    time.sleep(interval)
    device_ad.droid.bluetoothHidDeviceSendReport(host_id, hid_id_keyboard,
                                                 hid_keyboard_report("00"))


def hid_keyboard_report(key, modifier="00"):
    """Get the HID keyboard report for the given key

    Args:
        key: the key we want
        modifier: HID keyboard modifier bytes
    Returns:
        The byte array for the HID report.
    """
    return str(
        bytearray.fromhex(" ".join(
            [modifier, "00", key, "00", "00", "00", "00", "00"])), "utf-8")


def is_a2dp_connected(sink, source):
    """
    Convenience Function to see if the 2 devices are connected on
    A2dp.
    Args:
        sink:       Audio Sink
        source:     Audio Source
    Returns:
        True if Connected
        False if Not connected
    """

    devices = sink.droid.bluetoothA2dpSinkGetConnectedDevices()
    for device in devices:
        sink.log.info("A2dp Connected device {}".format(device["name"]))
        if (device["address"] == source.droid.bluetoothGetLocalAddress()):
            return True
    return False


def is_a2dp_snk_device_connected(ad, addr):
    """Determines if an AndroidDevice has A2DP snk connectivity to input address

    Args:
        ad: the Android device
        addr: the address that's expected
    Returns:
        True if connection was successful, false if unsuccessful.
    """
    devices = ad.droid.bluetoothA2dpSinkGetConnectedDevices()
    ad.log.info("Connected A2DP Sink devices: {}".format(devices))
    if addr in {d['address'] for d in devices}:
        return True
    return False


def is_a2dp_src_device_connected(ad, addr):
    """Determines if an AndroidDevice has A2DP connectivity to input address

    Args:
        ad: the Android device
        addr: the address that's expected
    Returns:
        True if connection was successful, false if unsuccessful.
    """
    devices = ad.droid.bluetoothA2dpGetConnectedDevices()
    ad.log.info("Connected A2DP Source devices: {}".format(devices))
    if addr in {d['address'] for d in devices}:
        return True
    return False


def is_hfp_client_device_connected(ad, addr):
    """Determines if an AndroidDevice has HFP connectivity to input address

    Args:
        ad: the Android device
        addr: the address that's expected
    Returns:
        True if connection was successful, false if unsuccessful.
    """
    devices = ad.droid.bluetoothHfpClientGetConnectedDevices()
    ad.log.info("Connected HFP Client devices: {}".format(devices))
    if addr in {d['address'] for d in devices}:
        return True
    return False


def is_map_mce_device_connected(ad, addr):
    """Determines if an AndroidDevice has MAP MCE connectivity to input address

    Args:
        ad: the Android device
        addr: the address that's expected
    Returns:
        True if connection was successful, false if unsuccessful.
    """
    devices = ad.droid.bluetoothMapClientGetConnectedDevices()
    ad.log.info("Connected MAP MCE devices: {}".format(devices))
    if addr in {d['address'] for d in devices}:
        return True
    return False


def is_map_mse_device_connected(ad, addr):
    """Determines if an AndroidDevice has MAP MSE connectivity to input address

    Args:
        ad: the Android device
        addr: the address that's expected
    Returns:
        True if connection was successful, false if unsuccessful.
    """
    devices = ad.droid.bluetoothMapGetConnectedDevices()
    ad.log.info("Connected MAP MSE devices: {}".format(devices))
    if addr in {d['address'] for d in devices}:
        return True
    return False


def kill_bluetooth_process(ad):
    """Kill Bluetooth process on Android device.

    Args:
        ad: Android device to kill BT process on.
    """
    ad.log.info("Killing Bluetooth process.")
    pid = ad.adb.shell(
        "ps | grep com.android.bluetooth | awk '{print $2}'").decode('ascii')
    call(["adb -s " + ad.serial + " shell kill " + pid], shell=True)


def log_energy_info(android_devices, state):
    """Logs energy info of input Android devices.

    Args:
        android_devices: input Android device list to log energy info from.
        state: the input state to log. Usually 'Start' or 'Stop' for logging.

    Returns:
        A logging string of the Bluetooth energy info reported.
    """
    return_string = "{} Energy info collection:\n".format(state)
    # Bug: b/31966929
    return return_string


def orchestrate_and_verify_pan_connection(pan_dut, panu_dut):
    """Setups up a PAN conenction between two android devices.

    Args:
        pan_dut: the Android device providing tethering services
        panu_dut: the Android device using the internet connection from the
            pan_dut
    Returns:
        True if PAN connection and verification is successful,
        false if unsuccessful.
    """
    if not toggle_airplane_mode_by_adb(log, panu_dut, True):
        panu_dut.log.error("Failed to toggle airplane mode on")
        return False
    if not toggle_airplane_mode_by_adb(log, panu_dut, False):
        pan_dut.log.error("Failed to toggle airplane mode off")
        return False
    pan_dut.droid.bluetoothStartConnectionStateChangeMonitor("")
    panu_dut.droid.bluetoothStartConnectionStateChangeMonitor("")
    if not bluetooth_enabled_check(panu_dut):
        return False
    if not bluetooth_enabled_check(pan_dut):
        return False
    pan_dut.droid.bluetoothPanSetBluetoothTethering(True)
    if not (pair_pri_to_sec(pan_dut, panu_dut)):
        return False
    if not pan_dut.droid.bluetoothPanIsTetheringOn():
        pan_dut.log.error("Failed to enable Bluetooth tethering.")
        return False
    # Magic sleep needed to give the stack time in between bonding and
    # connecting the PAN profile.
    time.sleep(pan_connect_timeout)
    panu_dut.droid.bluetoothConnectBonded(
        pan_dut.droid.bluetoothGetLocalAddress())
    if not verify_http_connection(log, panu_dut):
        panu_dut.log.error("Can't verify http connection on PANU device.")
        if not verify_http_connection(log, pan_dut):
            pan_dut.log.info(
                "Can't verify http connection on PAN service device")
        return False
    return True


def orchestrate_bluetooth_socket_connection(
        client_ad,
        server_ad,
        accept_timeout_ms=default_bluetooth_socket_timeout_ms,
        uuid=None):
    """Sets up the Bluetooth Socket connection between two Android devices.

    Args:
        client_ad: the Android device performing the connection.
        server_ad: the Android device accepting the connection.
    Returns:
        True if connection was successful, false if unsuccessful.
    """
    server_ad.droid.bluetoothStartPairingHelper()
    client_ad.droid.bluetoothStartPairingHelper()

    server_ad.droid.bluetoothSocketConnBeginAcceptThreadUuid(
        (bluetooth_socket_conn_test_uuid if uuid is None else uuid),
        accept_timeout_ms)
    client_ad.droid.bluetoothSocketConnBeginConnectThreadUuid(
        server_ad.droid.bluetoothGetLocalAddress(),
        (bluetooth_socket_conn_test_uuid if uuid is None else uuid))

    end_time = time.time() + bt_default_timeout
    result = False
    test_result = True
    while time.time() < end_time:
        if len(client_ad.droid.bluetoothSocketConnActiveConnections()) > 0:
            test_result = True
            client_ad.log.info("Bluetooth socket Client Connection Active")
            break
        else:
            test_result = False
        time.sleep(1)
    if not test_result:
        client_ad.log.error("Failed to establish a Bluetooth socket connection")
        return False
    return True


def orchestrate_rfcomm_connection(client_ad,
                                  server_ad,
                                  accept_timeout_ms=default_rfcomm_timeout_ms,
                                  uuid=None):
    """Sets up the RFCOMM connection between two Android devices.

    Args:
        client_ad: the Android device performing the connection.
        server_ad: the Android device accepting the connection.
    Returns:
        True if connection was successful, false if unsuccessful.
    """
    result = orchestrate_bluetooth_socket_connection(
        client_ad, server_ad, accept_timeout_ms,
        (bt_rfcomm_uuids['default_uuid'] if uuid is None else uuid))

    return result


def pair_pri_to_sec(pri_ad, sec_ad, attempts=2, auto_confirm=True):
    """Pairs pri droid to secondary droid.

    Args:
        pri_ad: Android device initiating connection
        sec_ad: Android device accepting connection
        attempts: Number of attempts to try until failure.
        auto_confirm: Auto confirm passkey match for both devices

    Returns:
        Pass if True
        Fail if False
    """
    pri_ad.droid.bluetoothStartConnectionStateChangeMonitor(
        sec_ad.droid.bluetoothGetLocalAddress())
    curr_attempts = 0
    while curr_attempts < attempts:
        if _pair_pri_to_sec(pri_ad, sec_ad, auto_confirm):
            return True
        # Wait 2 seconds before unbound
        time.sleep(2)
        if not clear_bonded_devices(pri_ad):
            log.error(
                "Failed to clear bond for primary device at attempt {}".format(
                    str(curr_attempts)))
            return False
        if not clear_bonded_devices(sec_ad):
            log.error("Failed to clear bond for secondary device at attempt {}".
                      format(str(curr_attempts)))
            return False
        # Wait 2 seconds after unbound
        time.sleep(2)
        curr_attempts += 1
    log.error("pair_pri_to_sec failed to connect after {} attempts".format(
        str(attempts)))
    return False


def _pair_pri_to_sec(pri_ad, sec_ad, auto_confirm):
    # Enable discovery on sec_ad so that pri_ad can find it.
    # The timeout here is based on how much time it would take for two devices
    # to pair with each other once pri_ad starts seeing devices.
    pri_droid = pri_ad.droid
    sec_droid = sec_ad.droid
    pri_ad.ed.clear_all_events()
    sec_ad.ed.clear_all_events()
    log.info("Bonding device {} to {}".format(
        pri_droid.bluetoothGetLocalAddress(),
        sec_droid.bluetoothGetLocalAddress()))
    sec_droid.bluetoothMakeDiscoverable(bt_default_timeout)
    target_address = sec_droid.bluetoothGetLocalAddress()
    log.debug("Starting paring helper on each device")
    pri_droid.bluetoothStartPairingHelper(auto_confirm)
    sec_droid.bluetoothStartPairingHelper(auto_confirm)
    pri_ad.log.info("Primary device starting discovery and executing bond")
    result = pri_droid.bluetoothDiscoverAndBond(target_address)
    if not auto_confirm:
        if not _wait_for_passkey_match(pri_ad, sec_ad):
            return False
    # Loop until we have bonded successfully or timeout.
    end_time = time.time() + bt_default_timeout
    pri_ad.log.info("Verifying devices are bonded")
    while time.time() < end_time:
        bonded_devices = pri_droid.bluetoothGetBondedDevices()
        bonded = False
        for d in bonded_devices:
            if d['address'] == target_address:
                pri_ad.log.info("Successfully bonded to device")
                return True
        time.sleep(0.1)
    # Timed out trying to bond.
    pri_ad.log.info("Failed to bond devices.")
    return False


def reset_bluetooth(android_devices):
    """Resets Bluetooth state of input Android device list.

    Args:
        android_devices: The Android device list to reset Bluetooth state on.

    Returns:
        True if successful, false if unsuccessful.
    """
    for a in android_devices:
        droid, ed = a.droid, a.ed
        a.log.info("Reset state of bluetooth on device.")
        if droid.bluetoothCheckState() is True:
            droid.bluetoothToggleState(False)
            expected_bluetooth_off_event_name = bluetooth_off
            try:
                ed.pop_event(expected_bluetooth_off_event_name,
                             bt_default_timeout)
            except Exception:
                a.log.error("Failed to toggle Bluetooth off.")
                return False
        # temp sleep for b/17723234
        time.sleep(3)
        if not bluetooth_enabled_check(a):
            return False
    return True


def scan_and_verify_n_advertisements(scn_ad, max_advertisements):
    """Verify that input number of advertisements can be found from the scanning
    Android device.

    Args:
        scn_ad: The Android device to start LE scanning on.
        max_advertisements: The number of advertisements the scanner expects to
        find.

    Returns:
        True if successful, false if unsuccessful.
    """
    test_result = False
    address_list = []
    filter_list = scn_ad.droid.bleGenFilterList()
    scn_ad.droid.bleBuildScanFilter(filter_list)
    scan_settings = scn_ad.droid.bleBuildScanSetting()
    scan_callback = scn_ad.droid.bleGenScanCallback()
    scn_ad.droid.bleStartBleScan(filter_list, scan_settings, scan_callback)
    start_time = time.time()
    while (start_time + bt_default_timeout) > time.time():
        event = None
        try:
            event = scn_ad.ed.pop_event(scan_result.format(scan_callback),
                                        bt_default_timeout)
        except Empty as error:
            raise BtTestUtilsError(
                "Failed to find scan event: {}".format(error))
        address = event['data']['Result']['deviceInfo']['address']
        if address not in address_list:
            address_list.append(address)
        if len(address_list) == max_advertisements:
            test_result = True
            break
    scn_ad.droid.bleStopBleScan(scan_callback)
    return test_result


def set_bluetooth_codec(android_device,
                        codec_type,
                        sample_rate,
                        bits_per_sample,
                        channel_mode,
                        codec_specific_1=0):
    """Sets the A2DP codec configuration on the AndroidDevice.

    Args:
        android_device (acts.controllers.android_device.AndroidDevice): the
            android device for which to switch the codec.
        codec_type (str): the desired codec type. Must be a key in
            bt_constants.codec_types.
        sample_rate (str): the desired sample rate. Must be a key in
            bt_constants.sample_rates.
        bits_per_sample (str): the desired bits per sample. Must be a key in
            bt_constants.bits_per_samples.
        channel_mode (str): the desired channel mode. Must be a key in
            bt_constants.channel_modes.
        codec_specific_1 (int): the desired bit rate (quality) for LDAC codec.
    Returns:
        bool: True if the codec config was successfully changed to the desired
            values. Else False.
    """
    message = ("Set Android Device A2DP Bluetooth codec configuration:\n"
               "\tCodec: {codec_type}\n"
               "\tSample Rate: {sample_rate}\n"
               "\tBits per Sample: {bits_per_sample}\n"
               "\tChannel Mode: {channel_mode}".format(
                   codec_type=codec_type,
                   sample_rate=sample_rate,
                   bits_per_sample=bits_per_sample,
                   channel_mode=channel_mode))
    android_device.log.info(message)

    # Send SL4A command
    droid, ed = android_device.droid, android_device.ed
    if not droid.bluetoothA2dpSetCodecConfigPreference(
            codec_types[codec_type], sample_rates[str(sample_rate)],
            bits_per_samples[str(bits_per_sample)], channel_modes[channel_mode],
            codec_specific_1):
        android_device.log.warning("SL4A command returned False. Codec was not "
                                   "changed.")
    else:
        try:
            ed.pop_event(bluetooth_a2dp_codec_config_changed,
                         bt_default_timeout)
        except Exception:
            android_device.log.warning("SL4A event not registered. Codec "
                                       "may not have been changed.")

    # Validate codec value through ADB
    # TODO (aidanhb): validate codec more robustly using SL4A
    command = "dumpsys bluetooth_manager | grep -i 'current codec'"
    out = android_device.adb.shell(command)
    split_out = out.split(": ")
    if len(split_out) != 2:
        android_device.log.warning("Could not verify codec config change "
                                   "through ADB.")
    elif split_out[1].strip().upper() != codec_type:
        android_device.log.error("Codec config was not changed.\n"
                                 "\tExpected codec: {exp}\n"
                                 "\tActual codec: {act}".format(
                                     exp=codec_type, act=split_out[1].strip()))
        return False
    android_device.log.info("Bluetooth codec successfully changed.")
    return True


def set_bt_scan_mode(ad, scan_mode_value):
    """Set Android device's Bluetooth scan mode.

    Args:
        ad: The Android device to set the scan mode on.
        scan_mode_value: The value to set the scan mode to.

    Returns:
        True if successful, false if unsuccessful.
    """
    droid, ed = ad.droid, ad.ed
    if scan_mode_value == bt_scan_mode_types['state_off']:
        disable_bluetooth(droid)
        scan_mode = droid.bluetoothGetScanMode()
        reset_bluetooth([ad])
        if scan_mode != scan_mode_value:
            return False
    elif scan_mode_value == bt_scan_mode_types['none']:
        droid.bluetoothMakeUndiscoverable()
        scan_mode = droid.bluetoothGetScanMode()
        if scan_mode != scan_mode_value:
            return False
    elif scan_mode_value == bt_scan_mode_types['connectable']:
        droid.bluetoothMakeUndiscoverable()
        droid.bluetoothMakeConnectable()
        scan_mode = droid.bluetoothGetScanMode()
        if scan_mode != scan_mode_value:
            return False
    elif (scan_mode_value == bt_scan_mode_types['connectable_discoverable']):
        droid.bluetoothMakeDiscoverable()
        scan_mode = droid.bluetoothGetScanMode()
        if scan_mode != scan_mode_value:
            return False
    else:
        # invalid scan mode
        return False
    return True


def set_device_name(droid, name):
    """Set and check Bluetooth local name on input droid object.

    Args:
        droid: Droid object to set local name on.
        name: the Bluetooth local name to set.

    Returns:
        True if successful, false if unsuccessful.
    """
    droid.bluetoothSetLocalName(name)
    time.sleep(2)
    droid_name = droid.bluetoothGetLocalName()
    if droid_name != name:
        return False
    return True


def set_profile_priority(host_ad, client_ad, profiles, priority):
    """Sets the priority of said profile(s) on host_ad for client_ad"""
    for profile in profiles:
        host_ad.log.info("Profile {} on {} for {} set to priority {}".format(
            profile, host_ad.droid.bluetoothGetLocalName(),
            client_ad.droid.bluetoothGetLocalAddress(), priority.value))
        if bt_profile_constants['a2dp_sink'] == profile:
            host_ad.droid.bluetoothA2dpSinkSetPriority(
                client_ad.droid.bluetoothGetLocalAddress(), priority.value)
        elif bt_profile_constants['headset_client'] == profile:
            host_ad.droid.bluetoothHfpClientSetPriority(
                client_ad.droid.bluetoothGetLocalAddress(), priority.value)
        elif bt_profile_constants['pbap_client'] == profile:
            host_ad.droid.bluetoothPbapClientSetPriority(
                client_ad.droid.bluetoothGetLocalAddress(), priority.value)
        else:
            host_ad.log.error(
                "Profile {} not yet supported for priority settings".format(
                    profile))


def setup_multiple_devices_for_bt_test(android_devices):
    """A common setup routine for Bluetooth on input Android device list.

    Things this function sets up:
    1. Resets Bluetooth
    2. Set Bluetooth local name to random string of size 4
    3. Disable BLE background scanning.
    4. Enable Bluetooth snoop logging.

    Args:
        android_devices: Android device list to setup Bluetooth on.

    Returns:
        True if successful, false if unsuccessful.
    """
    log.info("Setting up Android Devices")
    # TODO: Temp fix for an selinux error.
    for ad in android_devices:
        ad.adb.shell("setenforce 0")
    threads = []
    try:
        for a in android_devices:
            thread = threading.Thread(target=factory_reset_bluetooth,
                                      args=([[a]]))
            threads.append(thread)
            thread.start()
        for t in threads:
            t.join()

        for a in android_devices:
            d = a.droid
            # TODO: Create specific RPC command to instantiate
            # BluetoothConnectionFacade. This is just a workaround.
            d.bluetoothStartConnectionStateChangeMonitor("")
            setup_result = d.bluetoothSetLocalName(generate_id_by_size(4))
            if not setup_result:
                a.log.error("Failed to set device name.")
                return setup_result
            d.bluetoothDisableBLE()
            utils.set_location_service(a, True)
            bonded_devices = d.bluetoothGetBondedDevices()
            for b in bonded_devices:
                a.log.info("Removing bond for device {}".format(b['address']))
                d.bluetoothUnbond(b['address'])
        for a in android_devices:
            a.adb.shell("setprop persist.bluetooth.btsnooplogmode full")
            getprop_result = a.adb.shell(
                "getprop persist.bluetooth.btsnooplogmode") == "full"
            if not getprop_result:
                a.log.warning("Failed to enable Bluetooth Hci Snoop Logging.")
    except Exception as err:
        log.error("Something went wrong in multi device setup: {}".format(err))
        return False
    return setup_result


def setup_n_advertisements(adv_ad, num_advertisements):
    """Setup input number of advertisements on input Android device.

    Args:
        adv_ad: The Android device to start LE advertisements on.
        num_advertisements: The number of advertisements to start.

    Returns:
        advertise_callback_list: List of advertisement callback ids.
    """
    adv_ad.droid.bleSetAdvertiseSettingsAdvertiseMode(
        ble_advertise_settings_modes['low_latency'])
    advertise_data = adv_ad.droid.bleBuildAdvertiseData()
    advertise_settings = adv_ad.droid.bleBuildAdvertiseSettings()
    advertise_callback_list = []
    for i in range(num_advertisements):
        advertise_callback = adv_ad.droid.bleGenBleAdvertiseCallback()
        advertise_callback_list.append(advertise_callback)
        adv_ad.droid.bleStartBleAdvertising(advertise_callback, advertise_data,
                                            advertise_settings)
        try:
            adv_ad.ed.pop_event(adv_succ.format(advertise_callback),
                                bt_default_timeout)
            adv_ad.log.info("Advertisement {} started.".format(i + 1))
        except Empty as error:
            adv_ad.log.error("Advertisement {} failed to start.".format(i + 1))
            raise BtTestUtilsError(
                "Test failed with Empty error: {}".format(error))
    return advertise_callback_list


def take_btsnoop_log(ad, testcase, testname):
    """Grabs the btsnoop_hci log on a device and stores it in the log directory
    of the test class.

    If you want grab the btsnoop_hci log, call this function with android_device
    objects in on_fail. Bug report takes a relative long time to take, so use
    this cautiously.

    Args:
        ad: The android_device instance to take bugreport on.
        testcase: Name of the test calss that triggered this snoop log.
        testname: Name of the test case that triggered this bug report.
    """
    testname = "".join(x for x in testname if x.isalnum())
    serial = ad.serial
    device_model = ad.droid.getBuildModel()
    device_model = device_model.replace(" ", "")
    out_name = ','.join((testname, device_model, serial))
    snoop_path = os.path.join(ad.device_log_path, 'BluetoothSnoopLogs')
    os.makedirs(snoop_path, exist_ok=True)
    cmd = ''.join(("adb -s ", serial, " pull ", btsnoop_log_path_on_device, " ",
                   snoop_path + '/' + out_name, ".btsnoop_hci.log"))
    exe_cmd(cmd)
    try:
        cmd = ''.join(
            ("adb -s ", serial, " pull ", btsnoop_last_log_path_on_device, " ",
             snoop_path + '/' + out_name, ".btsnoop_hci.log.last"))
        exe_cmd(cmd)
    except Exception as err:
        testcase.log.info(
            "File does not exist {}".format(btsnoop_last_log_path_on_device))


def take_btsnoop_logs(android_devices, testcase, testname):
    """Pull btsnoop logs from an input list of android devices.

    Args:
        android_devices: the list of Android devices to pull btsnoop logs from.
        testcase: Name of the test calss that triggered this snoop log.
        testname: Name of the test case that triggered this bug report.
    """
    for a in android_devices:
        take_btsnoop_log(a, testcase, testname)


def teardown_n_advertisements(adv_ad, num_advertisements,
                              advertise_callback_list):
    """Stop input number of advertisements on input Android device.

    Args:
        adv_ad: The Android device to stop LE advertisements on.
        num_advertisements: The number of advertisements to stop.
        advertise_callback_list: The list of advertisement callbacks to stop.

    Returns:
        True if successful, false if unsuccessful.
    """
    for n in range(num_advertisements):
        adv_ad.droid.bleStopBleAdvertising(advertise_callback_list[n])
    return True


def verify_server_and_client_connected(client_ad, server_ad, log=True):
    """Verify that input server and client Android devices are connected.

    This code is under the assumption that there will only be
    a single connection.

    Args:
        client_ad: the Android device to check number of active connections.
        server_ad: the Android device to check number of active connections.

    Returns:
        True both server and client have at least 1 active connection,
        false if unsuccessful.
    """
    test_result = True
    if len(server_ad.droid.bluetoothSocketConnActiveConnections()) == 0:
        if log:
            server_ad.log.error("No socket connections found on server.")
        test_result = False
    if len(client_ad.droid.bluetoothSocketConnActiveConnections()) == 0:
        if log:
            client_ad.log.error("No socket connections found on client.")
        test_result = False
    return test_result


def wait_for_bluetooth_manager_state(droid,
                                     state=None,
                                     timeout=10,
                                     threshold=5):
    """ Waits for BlueTooth normalized state or normalized explicit state
    args:
        droid: droid device object
        state: expected BlueTooth state
        timeout: max timeout threshold
        threshold: list len of bt state
    Returns:
        True if successful, false if unsuccessful.
    """
    all_states = []
    get_state = lambda: droid.bluetoothGetLeState()
    start_time = time.time()
    while time.time() < start_time + timeout:
        all_states.append(get_state())
        if len(all_states) >= threshold:
            # for any normalized state
            if state is None:
                if len(set(all_states[-threshold:])) == 1:
                    log.info("State normalized {}".format(
                        set(all_states[-threshold:])))
                    return True
            else:
                # explicit check against normalized state
                if set([state]).issubset(all_states[-threshold:]):
                    return True
        time.sleep(0.5)
    log.error(
        "Bluetooth state fails to normalize" if state is None else
        "Failed to match bluetooth state, current state {} expected state {}".
        format(get_state(), state))
    return False


def _wait_for_passkey_match(pri_ad, sec_ad):
    pri_pin, sec_pin = -1, 1
    pri_variant, sec_variant = -1, 1
    pri_pairing_req, sec_pairing_req = None, None
    try:
        pri_pairing_req = pri_ad.ed.pop_event(
            event_name="BluetoothActionPairingRequest",
            timeout=bt_default_timeout)
        pri_variant = pri_pairing_req["data"]["PairingVariant"]
        pri_pin = pri_pairing_req["data"]["Pin"]
        pri_ad.log.info("Primary device received Pin: {}, Variant: {}".format(
            pri_pin, pri_variant))
        sec_pairing_req = sec_ad.ed.pop_event(
            event_name="BluetoothActionPairingRequest",
            timeout=bt_default_timeout)
        sec_variant = sec_pairing_req["data"]["PairingVariant"]
        sec_pin = sec_pairing_req["data"]["Pin"]
        sec_ad.log.info("Secondary device received Pin: {}, Variant: {}".format(
            sec_pin, sec_variant))
    except Empty as err:
        log.error("Wait for pin error: {}".format(err))
        log.error("Pairing request state, Primary: {}, Secondary: {}".format(
            pri_pairing_req, sec_pairing_req))
        return False
    if pri_variant == sec_variant == pairing_variant_passkey_confirmation:
        confirmation = pri_pin == sec_pin
        if confirmation:
            log.info("Pairing code matched, accepting connection")
        else:
            log.info("Pairing code mismatched, rejecting connection")
        pri_ad.droid.eventPost("BluetoothActionPairingRequestUserConfirm",
                               str(confirmation))
        sec_ad.droid.eventPost("BluetoothActionPairingRequestUserConfirm",
                               str(confirmation))
        if not confirmation:
            return False
    elif pri_variant != sec_variant:
        log.error("Pairing variant mismatched, abort connection")
        return False
    return True


def write_read_verify_data(client_ad, server_ad, msg, binary=False):
    """Verify that the client wrote data to the server Android device correctly.

    Args:
        client_ad: the Android device to perform the write.
        server_ad: the Android device to read the data written.
        msg: the message to write.
        binary: if the msg arg is binary or not.

    Returns:
        True if the data written matches the data read, false if not.
    """
    client_ad.log.info("Write message.")
    try:
        if binary:
            client_ad.droid.bluetoothSocketConnWriteBinary(msg)
        else:
            client_ad.droid.bluetoothSocketConnWrite(msg)
    except Exception as err:
        client_ad.log.error("Failed to write data: {}".format(err))
        return False
    server_ad.log.info("Read message.")
    try:
        if binary:
            read_msg = server_ad.droid.bluetoothSocketConnReadBinary().rstrip(
                "\r\n")
        else:
            read_msg = server_ad.droid.bluetoothSocketConnRead()
    except Exception as err:
        server_ad.log.error("Failed to read data: {}".format(err))
        return False
    log.info("Verify message.")
    if msg != read_msg:
        log.error("Mismatch! Read: {}, Expected: {}".format(read_msg, msg))
        return False
    return True


class MediaControlOverSl4a(object):
    """Media control using sl4a facade for general purpose.

    """

    def __init__(self, android_device, music_file):
        """Initialize the media_control class.

        Args:
            android_dut: android_device object
            music_file: location of the music file
        """
        self.android_device = android_device
        self.music_file = music_file

    def play(self):
        """Play media.

        """
        self.android_device.droid.mediaPlayOpen('file://%s' % self.music_file,
                                                'default', True)
        playing = self.android_device.droid.mediaIsPlaying()
        asserts.assert_true(playing,
                            'Failed to play music %s' % self.music_file)

    def pause(self):
        """Pause media.

        """
        self.android_device.droid.mediaPlayPause('default')
        paused = not self.android_device.droid.mediaIsPlaying()
        asserts.assert_true(paused,
                            'Failed to pause music %s' % self.music_file)

    def resume(self):
        """Resume media.

        """
        self.android_device.droid.mediaPlayStart('default')
        playing = self.android_device.droid.mediaIsPlaying()
        asserts.assert_true(playing,
                            'Failed to play music %s' % self.music_file)

    def stop(self):
        """Stop media.

        """
        self.android_device.droid.mediaPlayStop('default')
        stopped = not self.android_device.droid.mediaIsPlaying()
        asserts.assert_true(stopped,
                            'Failed to stop music %s' % self.music_file)
