#!/usr/bin/env vpython3
# Copyright 2020 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import copy
import json
import subprocess
import sys
from typing import List, Tuple
import unittest

import unittest.mock as mock

from unexpected_passes_common import builders
from unexpected_passes_common import constants
from unexpected_passes_common import data_types
from unexpected_passes_common import expectations
from unexpected_passes_common import queries
from unexpected_passes_common import unittest_utils

queries.QUERY_DELAY = 0


class HelperMethodUnittest(unittest.TestCase):
  def testStripPrefixFromBuildIdValidId(self) -> None:
    self.assertEqual(queries._StripPrefixFromBuildId('build-1'), '1')

  def testStripPrefixFromBuildIdInvalidId(self) -> None:
    with self.assertRaises(AssertionError):
      queries._StripPrefixFromBuildId('build1')
    with self.assertRaises(AssertionError):
      queries._StripPrefixFromBuildId('build-1-2')

  def testConvertActualResultToExpectationFileFormatAbort(self) -> None:
    self.assertEqual(
        queries._ConvertActualResultToExpectationFileFormat('ABORT'), 'Timeout')


class QueryGeneratorUnittest(unittest.TestCase):
  def setUp(self):
    self._builder = data_types.BuilderEntry('ci', constants.BuilderTypes.CI,
                                            False)

  def testSplitQueryGeneratorInitialSplit(self) -> None:
    """Tests that initial query splitting works as expected."""
    test_filter = queries.SplitQueryGenerator(self._builder, ['1', '2', '3'], 2)
    self.assertEqual(test_filter._test_id_lists, [['1', '2'], ['3']])
    self.assertEqual(len(test_filter.GetClauses()), 2)
    test_filter = queries.SplitQueryGenerator(self._builder, ['1', '2', '3'], 3)
    self.assertEqual(test_filter._test_id_lists, [['1', '2', '3']])
    self.assertEqual(len(test_filter.GetClauses()), 1)

  def testSplitQueryGeneratorSplitQuery(self) -> None:
    """Tests that SplitQueryGenerator's query splitting works."""
    test_filter = queries.SplitQueryGenerator(self._builder, ['1', '2'], 10)
    self.assertEqual(len(test_filter.GetClauses()), 1)
    test_filter.SplitQuery()
    self.assertEqual(len(test_filter.GetClauses()), 2)

  def testSplitQueryGeneratorSplitQueryCannotSplitFurther(self) -> None:
    """Tests that SplitQueryGenerator's failure mode."""
    test_filter = queries.SplitQueryGenerator(self._builder, ['1'], 1)
    with self.assertRaises(queries.QuerySplitError):
      test_filter.SplitQuery()


class QueryBuilderUnittest(unittest.TestCase):
  def setUp(self) -> None:
    self._patcher = mock.patch.object(subprocess, 'Popen')
    self._popen_mock = self._patcher.start()
    self.addCleanup(self._patcher.stop)

    builders.ClearInstance()
    expectations.ClearInstance()
    unittest_utils.RegisterGenericBuildersImplementation()
    unittest_utils.RegisterGenericExpectationsImplementation()
    self._querier = unittest_utils.CreateGenericQuerier()

    self._relevant_file_patcher = mock.patch.object(
        self._querier,
        '_GetRelevantExpectationFilesForQueryResult',
        return_value=None)
    self._relevant_file_mock = self._relevant_file_patcher.start()
    self.addCleanup(self._relevant_file_patcher.stop)

    self._builder = data_types.BuilderEntry('builder',
                                            constants.BuilderTypes.CI, False)

  def testQueryFailureRaised(self) -> None:
    """Tests that a query failure is properly surfaced."""
    self._popen_mock.return_value = unittest_utils.FakeProcess(returncode=1)
    with self.assertRaises(RuntimeError):
      self._querier.QueryBuilder(
          data_types.BuilderEntry('builder', constants.BuilderTypes.CI, False))

  def testInvalidNumSamples(self) -> None:
    """Tests that the number of samples is validated."""
    with self.assertRaises(AssertionError):
      unittest_utils.CreateGenericQuerier(num_samples=-1)

  def testInvalidNumJobs(self) -> None:
    """Tests that the number of jobs is validated."""
    with self.assertRaises(AssertionError):
      unittest_utils.CreateGenericQuerier(num_jobs=0)

  def testNoResults(self) -> None:
    """Tests functionality if the query returns no results."""
    self._popen_mock.return_value = unittest_utils.FakeProcess(stdout='[]')
    results, expectation_files = self._querier.QueryBuilder(
        data_types.BuilderEntry('builder', constants.BuilderTypes.CI, False))
    self.assertEqual(results, [])
    self.assertIsNone(expectation_files, None)

  def testValidResults(self) -> None:
    """Tests functionality when valid results are returned."""
    self._relevant_file_mock.return_value = ['foo_expectations']
    query_results = [
        {
            'id':
            'build-1234',
            'test_id': ('ninja://chrome/test:telemetry_gpu_integration_test/'
                        'gpu_tests.pixel_integration_test.'
                        'PixelIntegrationTest.test_name'),
            'status':
            'FAIL',
            'typ_expectations': [
                'RetryOnFailure',
            ],
            'typ_tags': [
                'win',
                'intel',
                'unknown_tag',  # This is expected to be removed.
            ],
            'step_name':
            'step_name',
        },
    ]
    self._popen_mock.return_value = unittest_utils.FakeProcess(
        stdout=json.dumps(query_results))
    results, expectation_files = self._querier.QueryBuilder(
        data_types.BuilderEntry('builder', constants.BuilderTypes.CI, False))
    self.assertEqual(len(results), 1)
    self.assertEqual(
        results[0],
        data_types.Result('test_name', ['win', 'intel'], 'Failure', 'step_name',
                          '1234'))
    self.assertEqual(expectation_files, ['foo_expectations'])

  def testValidResultsNoneExpectations(self) -> None:
    """Tests when an implementation uses None for expectation files."""
    query_results = [
        {
            'id':
            'build-1234',
            'test_id': ('ninja://chrome/test:telemetry_gpu_integration_test/'
                        'gpu_tests.pixel_integration_test.'
                        'PixelIntegrationTest.test_name'),
            'status':
            'FAIL',
            'typ_expectations': [
                'RetryOnFailure',
            ],
            'typ_tags': [
                'win',
                'intel',
            ],
            'step_name':
            'step_name',
        },
        {
            'id':
            'build-1234',
            'test_id': ('ninja://chrome/test:telemetry_gpu_integration_test/'
                        'gpu_tests.pixel_integration_test.'
                        'PixelIntegrationTest.test_name'),
            'status':
            'FAIL',
            'typ_expectations': [
                'RetryOnFailure',
            ],
            'typ_tags': [
                'win',
                'nvidia',
            ],
            'step_name':
            'step_name',
        },
    ]
    self._popen_mock.return_value = unittest_utils.FakeProcess(
        stdout=json.dumps(query_results))
    with mock.patch.object(
        self._querier, '_GetRelevantExpectationFilesForQueryResult') as ef_mock:
      ef_mock.return_value = None
      results, expectation_files = self._querier.QueryBuilder(
          data_types.BuilderEntry('builder', constants.BuilderTypes.CI, False))
      self.assertEqual(len(results), 2)
      self.assertIn(
          data_types.Result('test_name', ['win', 'intel'], 'Failure',
                            'step_name', '1234'), results)
      self.assertIn(
          data_types.Result('test_name', ['win', 'nvidia'], 'Failure',
                            'step_name', '1234'), results)
      self.assertIsNone(expectation_files)
      ef_mock.assert_called_once()

  def testValidResultsMultipleSteps(self) -> None:
    """Tests functionality when results from multiple steps are present."""

    def SideEffect(result: queries.QueryResult) -> List[str]:
      if result['step_name'] == 'a step name':
        return ['foo_expectations']
      if result['step_name'] == 'another step name':
        return ['bar_expectations']
      raise RuntimeError('Unknown step %s' % result['step_name'])

    self._relevant_file_mock.side_effect = SideEffect
    query_results = [
        {
            'id': 'build-1234',
            'test_id': 'ninja://:blink_web_tests/some/test/with.test_name',
            'status': 'FAIL',
            'typ_expectations': [
                'Failure',
            ],
            'typ_tags': [
                'linux',
                'intel',
            ],
            'step_name': 'a step name',
        },
        {
            'id': 'build-1234',
            'test_id': 'ninja://:blink_web_tests/some/test/with.test_name',
            'status': 'FAIL',
            'typ_expectations': [
                'Crash',
            ],
            'typ_tags': [
                'linux',
                'amd',
            ],
            'step_name': 'another step name',
        },
    ]
    self._popen_mock.return_value = unittest_utils.FakeProcess(
        stdout=json.dumps(query_results))
    results, expectation_files = self._querier.QueryBuilder(
        data_types.BuilderEntry('builder', constants.BuilderTypes.CI, False))
    self.assertEqual(len(results), 2)
    self.assertIn(
        data_types.Result('test_name', ['linux', 'intel'], 'Failure',
                          'a step name', '1234'), results)
    self.assertIn(
        data_types.Result('test_name', ['linux', 'amd'], 'Failure',
                          'another step name', '1234'), results)
    self.assertEqual(len(expectation_files), 2)
    self.assertEqual(set(expectation_files),
                     set(['foo_expectations', 'bar_expectations']))

  def testFilterInsertion(self) -> None:
    """Tests that test filters are properly inserted into the query."""
    with mock.patch.object(
        self._querier,
        '_GetQueryGeneratorForBuilder',
        return_value=unittest_utils.SimpleFixedQueryGenerator(
            self._builder, 'a real filter')), mock.patch.object(
                self._querier,
                '_RunBigQueryCommandsForJsonOutput',
                return_value=[]) as query_mock:
      self._querier.QueryBuilder(self._builder)
      query_mock.assert_called_once()
      query = query_mock.call_args[0][0][0]
      self.assertIn('a real filter', query)

  def testEarlyReturnOnNoFilter(self) -> None:
    """Tests that the absence of a test filter results in an early return."""
    with mock.patch.object(
        self._querier, '_GetQueryGeneratorForBuilder',
        return_value=None), mock.patch.object(
            self._querier, '_RunBigQueryCommandsForJsonOutput') as query_mock:
      results, expectation_files = self._querier.QueryBuilder(self._builder)
      query_mock.assert_not_called()
      self.assertEqual(results, [])
      self.assertEqual(expectation_files, None)

  def testRetryOnMemoryLimit(self) -> None:
    """Tests that queries are split and retried if the memory limit is hit."""

    def SideEffect(*_, **__) -> list:
      SideEffect.call_count += 1
      if SideEffect.call_count == 1:
        raise queries.MemoryLimitError()
      return []

    SideEffect.call_count = 0

    with mock.patch.object(
        self._querier,
        '_GetQueryGeneratorForBuilder',
        return_value=unittest_utils.SimpleSplitQueryGenerator(
            self._builder, ['filter_a', 'filter_b'], 10)), mock.patch.object(
                self._querier,
                '_RunBigQueryCommandsForJsonOutput') as query_mock:
      query_mock.side_effect = SideEffect
      self._querier.QueryBuilder(self._builder)
      self.assertEqual(query_mock.call_count, 2)

      args, _ = unittest_utils.GetArgsForMockCall(query_mock.call_args_list, 0)
      first_query = args[0][0]
      self.assertIn('filter_a', first_query)
      self.assertIn('filter_b', first_query)

      args, _ = unittest_utils.GetArgsForMockCall(query_mock.call_args_list, 1)
      second_query_first_half = args[0][0]
      self.assertIn('filter_a', second_query_first_half)
      self.assertNotIn('filter_b', second_query_first_half)

      second_query_second_half = args[0][1]
      self.assertIn('filter_b', second_query_second_half)
      self.assertNotIn('filter_a', second_query_second_half)


class FillExpectationMapForBuildersUnittest(unittest.TestCase):
  def setUp(self) -> None:
    self._querier = unittest_utils.CreateGenericQuerier()

    self._query_patcher = mock.patch.object(self._querier, 'QueryBuilder')
    self._query_mock = self._query_patcher.start()
    self.addCleanup(self._query_patcher.stop)
    self._filter_patcher = mock.patch.object(self._querier,
                                             '_FilterOutInactiveBuilders')
    self._filter_mock = self._filter_patcher.start()
    self._filter_mock.side_effect = lambda b, _: b
    self.addCleanup(self._filter_patcher.stop)

  def testErrorOnMixedBuilders(self) -> None:
    """Tests that providing builders of mixed type is an error."""
    builders_to_fill = [
        data_types.BuilderEntry('ci_builder', constants.BuilderTypes.CI, False),
        data_types.BuilderEntry('try_builder', constants.BuilderTypes.TRY,
                                False)
    ]
    with self.assertRaises(AssertionError):
      self._querier.FillExpectationMapForBuilders(
          data_types.TestExpectationMap({}), builders_to_fill)

  def testValidResults(self) -> None:
    """Tests functionality when valid results are returned by the query."""

    def SideEffect(builder: data_types.BuilderEntry,
                   *args) -> Tuple[data_types.ResultListType, None]:
      del args
      if builder.name == 'matched_builder':
        return ([
            data_types.Result('foo', ['win'], 'Pass', 'step_name', 'build_id')
        ], None)
      if builder.name == 'matched_internal':
        return ([
            data_types.Result('foo', ['win'], 'Pass', 'step_name_internal',
                              'build_id')
        ], None)
      if builder.name == 'unmatched_internal':
        return ([
            data_types.Result('bar', [], 'Pass', 'step_name_internal',
                              'build_id')
        ], None)
      return ([data_types.Result('bar', [], 'Pass', 'step_name',
                                 'build_id')], None)

    self._query_mock.side_effect = SideEffect

    expectation = data_types.Expectation('foo', ['win'], 'RetryOnFailure')
    expectation_map = data_types.TestExpectationMap({
        'foo':
        data_types.ExpectationBuilderMap({
            expectation:
            data_types.BuilderStepMap(),
        }),
    })
    builders_to_fill = [
        data_types.BuilderEntry('matched_builder', constants.BuilderTypes.CI,
                                False),
        data_types.BuilderEntry('unmatched_builder', constants.BuilderTypes.CI,
                                False),
        data_types.BuilderEntry('matched_internal', constants.BuilderTypes.CI,
                                True),
        data_types.BuilderEntry('unmatched_internal', constants.BuilderTypes.CI,
                                True),
    ]
    unmatched_results = self._querier.FillExpectationMapForBuilders(
        expectation_map, builders_to_fill)
    stats = data_types.BuildStats()
    stats.AddPassedBuild(frozenset(['win']))
    expected_expectation_map = {
        'foo': {
            expectation: {
                'chromium/ci:matched_builder': {
                    'step_name': stats,
                },
                'chrome/ci:matched_internal': {
                    'step_name_internal': stats,
                },
            },
        },
    }
    self.assertEqual(expectation_map, expected_expectation_map)
    self.assertEqual(
        unmatched_results, {
            'chromium/ci:unmatched_builder': [
                data_types.Result('bar', [], 'Pass', 'step_name', 'build_id'),
            ],
            'chrome/ci:unmatched_internal': [
                data_types.Result('bar', [], 'Pass', 'step_name_internal',
                                  'build_id'),
            ],
        })

  def testQueryFailureIsSurfaced(self) -> None:
    """Tests that a query failure is properly surfaced despite being async."""
    self._query_mock.side_effect = IndexError('failure')
    with self.assertRaises(IndexError):
      self._querier.FillExpectationMapForBuilders(
          data_types.TestExpectationMap(), [
              data_types.BuilderEntry('matched_builder',
                                      constants.BuilderTypes.CI, False)
          ])


class FilterOutInactiveBuildersUnittest(unittest.TestCase):
  def setUp(self) -> None:
    self._subprocess_patcher = mock.patch(
        'unexpected_passes_common.queries.subprocess.Popen')
    self._subprocess_mock = self._subprocess_patcher.start()
    self.addCleanup(self._subprocess_patcher.stop)

    self._querier = unittest_utils.CreateGenericQuerier()

  def testAllActiveBuilders(self) -> None:
    """Tests that no builders are removed if no inactive builders are found."""
    results = [{
        'builder_name': 'foo_builder',
    }, {
        'builder_name': 'bar_builder',
    }]
    fake_process = unittest_utils.FakeProcess(stdout=json.dumps(results))
    self._subprocess_mock.return_value = fake_process
    initial_builders = [
        data_types.BuilderEntry('foo_builder', constants.BuilderTypes.CI,
                                False),
        data_types.BuilderEntry('bar_builder', constants.BuilderTypes.CI,
                                False),
    ]
    expected_builders = copy.copy(initial_builders)
    filtered_builders = self._querier._FilterOutInactiveBuilders(
        initial_builders, constants.BuilderTypes.CI)
    self.assertEqual(filtered_builders, expected_builders)

  def testInactiveBuilders(self) -> None:
    """Tests that inactive builders are removed."""
    results = [{
        'builder_name': 'foo_builder',
    }]
    fake_process = unittest_utils.FakeProcess(stdout=json.dumps(results))
    self._subprocess_mock.return_value = fake_process
    initial_builders = [
        data_types.BuilderEntry('foo_builder', constants.BuilderTypes.CI,
                                False),
        data_types.BuilderEntry('bar_builder', constants.BuilderTypes.CI,
                                False),
    ]
    expected_builders = [
        data_types.BuilderEntry('foo_builder', constants.BuilderTypes.CI, False)
    ]
    filtered_builders = self._querier._FilterOutInactiveBuilders(
        initial_builders, constants.BuilderTypes.CI)
    self.assertEqual(filtered_builders, expected_builders)

  def testByteConversion(self) -> None:
    """Tests that bytes are properly handled if returned."""
    results = [{
        'builder_name': 'foo_builder',
    }]
    fake_process = unittest_utils.FakeProcess(stdout=json.dumps(results))
    self._subprocess_mock.return_value = fake_process
    initial_builders = [
        data_types.BuilderEntry('foo_builder', constants.BuilderTypes.CI,
                                False),
        data_types.BuilderEntry('bar_builder', constants.BuilderTypes.CI,
                                False),
    ]
    expected_builders = [
        data_types.BuilderEntry('foo_builder', constants.BuilderTypes.CI, False)
    ]
    filtered_builders = self._querier._FilterOutInactiveBuilders(
        initial_builders, constants.BuilderTypes.CI)
    self.assertEqual(filtered_builders, expected_builders)


class RunBigQueryCommandsForJsonOutputUnittest(unittest.TestCase):
  def setUp(self) -> None:
    self._popen_patcher = mock.patch.object(subprocess, 'Popen')
    self._popen_mock = self._popen_patcher.start()
    self.addCleanup(self._popen_patcher.stop)

    self._querier = unittest_utils.CreateGenericQuerier()

  def testJsonReturned(self) -> None:
    """Tests that valid JSON parsed from stdout is returned."""
    query_output = [{'foo': 'bar'}]
    self._popen_mock.return_value = unittest_utils.FakeProcess(
        stdout=json.dumps(query_output))
    result = self._querier._RunBigQueryCommandsForJsonOutput([''], {})
    self.assertEqual(result, query_output)
    self._popen_mock.assert_called_once()

  def testJsonReturnedSplitQuery(self) -> None:
    """Tests that valid JSON is returned when a split query is used."""

    def SideEffect(*_, **__) -> unittest_utils.FakeProcess:
      SideEffect.call_count += 1
      if SideEffect.call_count == 1:
        return unittest_utils.FakeProcess(stdout=json.dumps([{'foo': 'bar'}]))
      return unittest_utils.FakeProcess(stdout=json.dumps([{'bar': 'baz'}]))

    SideEffect.call_count = 0

    self._popen_mock.side_effect = SideEffect
    result = self._querier._RunBigQueryCommandsForJsonOutput(['1', '2'], {})

    self.assertEqual(len(result), 2)
    self.assertIn({'foo': 'bar'}, result)
    self.assertIn({'bar': 'baz'}, result)

  def testExceptionRaisedOnFailure(self) -> None:
    """Tests that an exception is raised if the query fails."""
    self._popen_mock.return_value = unittest_utils.FakeProcess(returncode=1)
    with self.assertRaises(RuntimeError):
      self._querier._RunBigQueryCommandsForJsonOutput([''], {})

  def testRateLimitRetrySuccess(self) -> None:
    """Tests that rate limit errors result in retries."""

    def SideEffect(*_, **__) -> unittest_utils.FakeProcess:
      SideEffect.call_count += 1
      if SideEffect.call_count == 1:
        return unittest_utils.FakeProcess(
            returncode=1, stdout='Exceeded rate limits for foo.')
      return unittest_utils.FakeProcess(stdout='[]')

    SideEffect.call_count = 0

    self._popen_mock.side_effect = SideEffect
    self._querier._RunBigQueryCommandsForJsonOutput([''], {})
    self.assertEqual(self._popen_mock.call_count, 2)

  def testRateLimitRetryFailure(self) -> None:
    """Tests that rate limit errors stop retrying after enough iterations."""
    self._popen_mock.return_value = unittest_utils.FakeProcess(
        returncode=1, stdout='Exceeded rate limits for foo.')
    with self.assertRaises(RuntimeError):
      self._querier._RunBigQueryCommandsForJsonOutput([''], {})
    self.assertEqual(self._popen_mock.call_count, queries.MAX_QUERY_TRIES)

  def testBatching(self) -> None:
    """Tests that batching preferences are properly forwarded."""
    query_output = [{'foo': 'bar'}]
    self._popen_mock.return_value = unittest_utils.FakeProcess(
        stdout=json.dumps(query_output))

    self._querier._RunBigQueryCommandsForJsonOutput([''], {})
    self._popen_mock.assert_called_once()
    args, _ = unittest_utils.GetArgsForMockCall(self._popen_mock.call_args_list,
                                                0)
    cmd = args[0]
    self.assertIn('--batch', cmd)

    self._querier = unittest_utils.CreateGenericQuerier(use_batching=False)
    self._popen_mock.reset_mock()
    self._querier._RunBigQueryCommandsForJsonOutput([''], {})
    self._popen_mock.assert_called_once()
    args, _ = unittest_utils.GetArgsForMockCall(self._popen_mock.call_args_list,
                                                0)
    cmd = args[0]
    self.assertNotIn('--batch', cmd)


class GenerateBigQueryCommandUnittest(unittest.TestCase):
  def testNoParametersSpecified(self) -> None:
    """Tests that no parameters are added if none are specified."""
    cmd = queries.GenerateBigQueryCommand('project', {})
    for element in cmd:
      self.assertFalse(element.startswith('--parameter'))

  def testParameterAddition(self) -> None:
    """Tests that specified parameters are added appropriately."""
    cmd = queries.GenerateBigQueryCommand('project', {
        '': {
            'string': 'string_value'
        },
        'INT64': {
            'int': 1
        }
    })
    self.assertIn('--parameter=string::string_value', cmd)
    self.assertIn('--parameter=int:INT64:1', cmd)

  def testBatchMode(self) -> None:
    """Tests that batch mode adds the necessary arg."""
    cmd = queries.GenerateBigQueryCommand('project', {}, batch=True)
    self.assertIn('--batch', cmd)


if __name__ == '__main__':
  unittest.main(verbosity=2)
