# Lint as: python2, python3
# Copyright 2019 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import datetime
import dbus
import logging
import os
import re
import subprocess
import time

from autotest_lib.client.bin import test
from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib import error
from autotest_lib.client.cros import upstart
from autotest_lib.client.cros.audio import audio_helper
from autotest_lib.client.cros.audio import cras_utils

_STREAM_TYPE_INPUT_APM = 0
_STREAM_TYPE_OUTPUT = 1

class audio_CrasDevSwitchStress(test.test):
    """
    Checks if output buffer will drift to super high level when
    audio devices switch repeatedly.
    """
    version = 1
    _LOOP_COUNT = 150
    _INPUT_BUFFER_LEVEL = '.*?READ_AUDIO.*?dev:(\d+).*hw_level.*?(\d+).*?'
    _OUTPUT_BUFFER_LEVEL = '.*?FILL_AUDIO.*?dev:(\d+).*hw_level.*?(\d+).*?'
    _CHECK_PERIOD_TIME_SECS = 0.5
    _SILENT_OUTPUT_DEV_ID = 2
    _STREAM_BLOCK_SIZE = 480
    _AUDIO_LOG_TIME_FMT = '%Y-%m-%dT%H:%M:%S.%f'
    _last_audio_log = ''

    """
    Buffer level of input device should stay between 0 and block size.
    Buffer level of output device should between 1 to 2 times of block size.
    Device switching sometimes cause the buffer level to grow more then
    the ideal range, so we use tripple block size here.
    """
    _INPUT_BUFFER_DRIFT_CRITERIA = 3 * _STREAM_BLOCK_SIZE
    _OUTPUT_BUFFER_DRIFT_CRITERIA = 3 * _STREAM_BLOCK_SIZE

    def initialize(self):
        """Initialize the test"""
        upstart.stop_job('ui')

    def cleanup(self):
        """Remove all streams for testing."""
        if self._streams:
            # Clean up all streams.
            while len(self._streams) > 0:
                self._streams[0].kill()
                self._streams.remove(self._streams[0])
        upstart.restart_job('ui')
        super(audio_CrasDevSwitchStress, self).cleanup()

    def _new_stream(self, stream_type, node_pinned):
        """
        Runs new stream by cras_test_client.
        Args:
            stream_type: _STREAM_TYPE_INPUT_APM or _STREAM_TYPE_OUTPUT
            node_pinned: if not None then create the new stream that
                pinned to this node.
        Returns:
            New process that runs a CRAS client stream.
        """
        if stream_type == _STREAM_TYPE_INPUT_APM:
            cmd = ['cras_test_client', '--capture_file', '/dev/null',
                   '--effects', 'aec']
        else:
            cmd = ['cras_test_client', '--playback_file', '/dev/zero']

        cmd += ['--block_size', str(self._STREAM_BLOCK_SIZE)]

        if node_pinned and node_pinned['Id']:
            dev_id = node_pinned['Id'] >> 32
            if stream_type == _STREAM_TYPE_INPUT_APM:
                if node_pinned['IsInput']:
                    cmd += ['--pin_device', str(dev_id)]
            elif not node_pinned['IsInput']:
                cmd += ['--pin_device', str(dev_id)]

        return subprocess.Popen(cmd)

    def _get_time(self, s):
        """
        Parse timespec from the audio log. The format is like
        2020-07-16T00:36:40.819094632 cras ...

        Args:
            s: A string to parse.

        Returns:
            The datetime object created from the given string.
        """
        return datetime.datetime.strptime(
                s.split(' ')[0][:-3], self._AUDIO_LOG_TIME_FMT)

    def _update_last_log_time(self):
        """Save the time of the last audio thread logs."""

        proc = subprocess.Popen(['cras_test_client', '--dump_a'],
                                stdout=subprocess.PIPE)
        output, err = proc.communicate()
        self._last_log_time = self._get_time(output.decode().split('\n')[-2])

    def _get_buffer_level(self, match_str, dev_id):
        """
        Gets a rough number about current buffer level.

        Args:
            match_str: regular expression to match device buffer level.
            dev_id: audio device id in CRAS.

        Returns:
            The current buffer level.
        """
        proc = subprocess.Popen(['cras_test_client', '--dump_a'],
                                stdout=subprocess.PIPE)
        output, err = proc.communicate()
        buffer_level = 0
        lines = output.decode().split('\n')
        start = False
        for line in lines:
            if not line or not start:
                # The timestamp shows up in later lines.
                if 'start at' in line:
                    start = True
                continue
            time = self._get_time(line)
            # Filter logs which appeared earlier than the test run.
            if time <= self._last_log_time:
                continue
            search = re.match(match_str, line)
            if search:
                if dev_id != int(search.group(1)):
                    continue
                tmp = int(search.group(2))
                if tmp > buffer_level:
                    buffer_level = tmp
        return buffer_level

    def _check_buffer_level(self, node):
        """Checks if current buffer level of node stay in criteria."""
        if node['IsInput']:
            match_str = self._INPUT_BUFFER_LEVEL
            criteria = self._INPUT_BUFFER_DRIFT_CRITERIA
        else:
            match_str = self._OUTPUT_BUFFER_LEVEL
            criteria = self._OUTPUT_BUFFER_DRIFT_CRITERIA

        dev_id = node['Id'] >> 32
        buffer_level = self._get_buffer_level(match_str, dev_id)

        logging.debug("Max buffer level: %d on dev %d", buffer_level, dev_id)
        if buffer_level > criteria:
            audio_helper.dump_audio_diagnostics(
                    os.path.join(self.resultsdir, "audio_diagnostics.txt"))
            raise error.TestFail('Buffer level %d drift too high on %s node'
                                 ' with dev id %d' %
                                 (buffer_level, node['Type'], dev_id))

    def _get_cras_pid(self):
        """Gets the pid of CRAS, used to check if CRAS crashes and respawn."""
        pid = utils.system_output('pgrep cras$ 2>&1', retain_output=True,
                                  ignore_status=True).strip()
        try:
            pid = int(pid)
        except ValueError as e:  # empty or invalid string
            raise error.TestFail('CRAS not running')

    def _switch_to_node(self, node):
        """
        Switches CRAS input or output active node to target node.
        Args:
            node: if not None switch CRAS input/output active node to it
        """
        if node == None:
            return
        if node['IsInput']:
            cras_utils.set_active_input_node(node['Id'])
        else:
            cras_utils.set_active_output_node(node['Id'])

    def run_once(self, type_a=None, type_b=None, pinned_type=None):
        """
        Setup the typical streams used for hangout, one input stream with APM
        plus an output stream. If pinned_type is not None, set up an additional
        stream that pinned to the first node of pinned_type.
        The test repeatedly switch active nodes between the first nodes of
        type_a and type_b to verify there's no crash and buffer level stay
        in reasonable range.

        Args:
            type_a: node type to match a CRAS node
            type_b: node type to match a CRAS node
            pinned_type: node type to match a CRAS node
        """
        node_a = None
        node_b = None
        node_pinned = None
        self._streams = []

        """Store the selected nodes at the start of the test."""
        (output_type, input_type) = cras_utils.get_selected_node_types()

        cras_pid = self._get_cras_pid()

        try:
            nodes = cras_utils.get_cras_nodes()
        except dbus.DBusException as e:
            logging.error("Get CRAS nodes failure.");
            raise error.TestFail("No reply from CRAS for get nodes request.")

        for node in nodes:
            if type_a and node['Type'] == type_a:
                node_a = node
            elif type_b and node['Type'] == type_b:
                node_b = node

            if pinned_type and node['Type'] == pinned_type:
                node_pinned = node

        if not (node_a and node_b):
            raise error.TestNAError("No output nodes pair to switch.")

        self._update_last_log_time();
        if node_pinned:
            if node_pinned['IsInput']:
                self._streams.append(
                        self._new_stream(_STREAM_TYPE_INPUT_APM, node_pinned))
            else:
                self._streams.append(
                        self._new_stream(_STREAM_TYPE_OUTPUT, node_pinned))

        self._streams.append(self._new_stream(_STREAM_TYPE_OUTPUT, None))
        self._streams.append(self._new_stream(_STREAM_TYPE_INPUT_APM, None))

        loop_count = 0
        past_time = time.time()

        try:
            while loop_count < self._LOOP_COUNT:
                # Switch active input/output node in each loop.
                if loop_count % 2 == 0:
                    self._switch_to_node(node_a)
                else:
                    self._switch_to_node(node_b)

                time.sleep(0.2)
                now = time.time()

                # Check buffer level.
                if now - past_time > self._CHECK_PERIOD_TIME_SECS:
                    past_time = now
                    self._check_buffer_level(node_a)
                    self._check_buffer_level(node_b)

                loop_count += 1
                if cras_pid != self._get_cras_pid():
                    raise error.TestFail("CRAS crashed and respawned.")
        except dbus.DBusException as e:
            logging.exception(e)
            raise error.TestFail("CRAS may have crashed.")
        finally:
            """Restore the nodes at the end of the test."""
            cras_utils.set_selected_node_types(output_type, input_type)
