# Copyright 2018 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for create_common."""

import collections
import os
import shutil
import tempfile
import unittest

from unittest import mock

from acloud import errors
from acloud.create import create_common
from acloud.internal.lib import android_build_client
from acloud.internal.lib import auth
from acloud.internal.lib import driver_test_lib
from acloud.internal.lib import utils


ExtraFile = collections.namedtuple("ExtraFile", ["source", "target"])


class FakeZipFile:
    """Fake implementation of ZipFile()"""

    # pylint: disable=invalid-name,unused-argument,no-self-use
    def write(self, filename, arcname=None, compress_type=None):
        """Fake write method."""
        return

    # pylint: disable=invalid-name,no-self-use
    def close(self):
        """Fake close method."""
        return


# pylint: disable=invalid-name,protected-access
class CreateCommonTest(driver_test_lib.BaseDriverTest):
    """Test create_common functions."""

    # pylint: disable=protected-access
    def testProcessHWPropertyWithInvalidArgs(self):
        """Test ParseKeyValuePairArgs with invalid args."""
        # Checking wrong property value.
        args_str = "cpu:3,disk:"
        with self.assertRaises(errors.MalformedDictStringError):
            create_common.ParseKeyValuePairArgs(args_str)

        # Checking wrong property format.
        args_str = "cpu:3,disk"
        with self.assertRaises(errors.MalformedDictStringError):
            create_common.ParseKeyValuePairArgs(args_str)

    def testParseHWPropertyStr(self):
        """Test ParseKeyValuePairArgs."""
        expected_dict = {"cpu": "2", "resolution": "1080x1920", "dpi": "240",
                         "memory": "4g", "disk": "4g"}
        args_str = "cpu:2,resolution:1080x1920,dpi:240,memory:4g,disk:4g"
        result_dict = create_common.ParseKeyValuePairArgs(args_str)
        self.assertTrue(expected_dict == result_dict)

    def testGetNonEmptyEnvVars(self):
        """Test GetNonEmptyEnvVars."""
        with mock.patch.dict("acloud.internal.lib.utils.os.environ",
                             {"A": "", "B": "b"},
                             clear=True):
            self.assertEqual(
                ["b"], create_common.GetNonEmptyEnvVars("A", "B", "C"))

    def testParseExtraFilesArgs(self):
        """Test ParseExtraFilesArgs."""
        expected_result = [ExtraFile(source="local_path", target="gce_path")]
        files_info = ["local_path,gce_path"]
        self.assertEqual(expected_result,
                         create_common.ParseExtraFilesArgs(files_info))

        # Test multiple files
        expected_result = [ExtraFile(source="local_path1", target="gce_path1"),
                           ExtraFile(source="local_path2", target="gce_path2")]
        files_info = ["local_path1,gce_path1",
                      "local_path2,gce_path2"]
        self.assertEqual(expected_result,
                         create_common.ParseExtraFilesArgs(files_info))

        # Test wrong file info format.
        files_info = ["local_path"]
        with self.assertRaises(errors.MalformedDictStringError):
            create_common.ParseExtraFilesArgs(files_info)

    def testGetCvdHostPackage(self):
        """test GetCvdHostPackage."""
        # Can't find the cvd host package
        with mock.patch("os.path.exists") as exists:
            exists.return_value = False
            self.assertRaises(
                errors.GetCvdLocalHostPackageError,
                create_common.GetCvdHostPackage)

        self.Patch(os.environ, "get", return_value="/fake_dir2")
        self.Patch(utils, "GetDistDir", return_value="/fake_dir1")
        self.Patch(os.path, "exists",
                   side_effect=lambda path:
                       path == "/fake_dir1/cvd-host_package.tar.gz")

        # Find cvd host in dist dir.
        self.assertEqual(
            create_common.GetCvdHostPackage(),
            "/fake_dir1/cvd-host_package.tar.gz")

        # Find cvd host in host out dir.
        self.Patch(os.environ, "get", return_value="/fake_dir2")
        self.Patch(utils, "GetDistDir", return_value=None)
        with mock.patch("os.path.exists") as exists:
            exists.side_effect = lambda path: \
                path == "/fake_dir2/cvd-host_package.tar.gz"
            self.assertEqual(
                create_common.GetCvdHostPackage(),
                "/fake_dir2/cvd-host_package.tar.gz")
        with mock.patch("os.path.exists") as exists:
            exists.side_effect = lambda path: \
                path == "/fake_dir2/cvd-host_package"
            self.assertEqual(
                create_common.GetCvdHostPackage(),
                "/fake_dir2/cvd-host_package")

        # Find cvd host in specified path.
        package_path = "/tool_dir/cvd-host_package.tar.gz"
        self.Patch(utils, "GetDistDir", return_value=None)
        with mock.patch("os.path.exists") as exists:
            exists.return_value = True
            self.assertEqual(
                create_common.GetCvdHostPackage(package_path),
                "/tool_dir/cvd-host_package.tar.gz")

    @mock.patch("acloud.create.create_common.os.path.isfile",
                side_effect=lambda path: path == "/dir/name")
    @mock.patch("acloud.create.create_common.os.path.isdir",
                side_effect=lambda path: path == "/dir")
    @mock.patch("acloud.create.create_common.os.listdir",
                return_value=["name", "name2"])
    def testFindLocalImage(self, _mock_listdir, _mock_isdir, _mock_isfile):
        """Test FindLocalImage."""
        self.assertEqual(
            "/dir/name",
            create_common.FindLocalImage("/test/../dir/name", "not_exist"))

        self.assertEqual("/dir/name",
                         create_common.FindLocalImage("/dir/", "name"))


        self.assertIsNone(create_common.FindLocalImage("/dir", "not_exist",
                                                       raise_error=False))
        with self.assertRaises(errors.GetLocalImageError):
            create_common.FindLocalImage("/dir", "not_exist")

        with self.assertRaises(errors.GetLocalImageError):
            create_common.FindLocalImage("/dir", "name.?", raise_error=False)

    def testFindBootImage(self):
        """Test FindBootImage."""
        with tempfile.TemporaryDirectory() as temp_dir:
            with self.assertRaises(errors.GetLocalImageError):
                create_common.FindBootImage(temp_dir)

            boot_image_path = os.path.join(temp_dir, "boot.img")
            self.CreateFile(boot_image_path, b"invalid")
            with self.assertRaises(errors.GetLocalImageError):
                create_common.FindBootImage(temp_dir)
            os.remove(boot_image_path)

            boot_image_path = os.path.join(temp_dir, "boot.img")
            self.CreateFile(boot_image_path, b"ANDROID!")
            self.assertEqual(boot_image_path,
                             create_common.FindBootImage(temp_dir))
            os.remove(boot_image_path)

            boot_image_path = os.path.join(temp_dir, "boot-5.10.img")
            self.CreateFile(boot_image_path, b"ANDROID!")
            self.assertEqual(boot_image_path,
                             create_common.FindBootImage(temp_dir))

    def testFindSystemImages(self):
        """Test FindSystemImages."""
        with tempfile.TemporaryDirectory() as temp_dir:
            with self.assertRaises(errors.GetLocalImageError):
                create_common.FindSystemImages(temp_dir)

            image_dir = os.path.join(temp_dir, "IMAGES")
            system_image_path = os.path.join(image_dir, "system.img")
            self.CreateFile(system_image_path)
            self.assertEqual((system_image_path, None, None),
                             create_common.FindSystemImages(temp_dir))
            self.assertEqual((system_image_path, None, None),
                             create_common.FindSystemImages(image_dir))
            self.assertEqual((system_image_path, None, None),
                             create_common.FindSystemImages(system_image_path))

            system_ext_image_path = os.path.join(image_dir, "system_ext.img")
            self.CreateFile(system_ext_image_path)
            product_image_path = os.path.join(image_dir, "product.img")
            self.CreateFile(product_image_path)
            self.assertEqual(
                (system_image_path, system_ext_image_path, product_image_path),
                create_common.FindSystemImages(temp_dir))
            self.assertEqual(
                (system_image_path, system_ext_image_path, product_image_path),
                create_common.FindSystemImages(image_dir))


    def testFindVendorBootImage(self):
        """Test FindVendorBootImage."""
        with tempfile.TemporaryDirectory() as temp_dir:
            with self.assertRaises(errors.GetLocalImageError):
                create_common.FindVendorBootImage(temp_dir)

            boot_image_path = os.path.join(temp_dir, "vendor_boot.img")
            self.CreateFile(boot_image_path)
            self.assertEqual(boot_image_path,
                             create_common.FindVendorBootImage(boot_image_path))
            self.assertEqual(boot_image_path,
                             create_common.FindVendorBootImage(temp_dir))


    @mock.patch.object(utils, "Decompress")
    def testDownloadRemoteArtifact(self, mock_decompress):
        """Test Download cuttlefish package."""
        mock_build_client = mock.MagicMock()
        self.Patch(
            android_build_client,
            "AndroidBuildClient",
            return_value=mock_build_client)
        self.Patch(auth, "CreateCredentials", return_value=mock.MagicMock())
        avd_spec = mock.MagicMock()
        avd_spec.cfg = mock.MagicMock()
        avd_spec.remote_image = {"build_target" : "aosp_cf_x86_64_phone-userdebug",
                                 "build_id": "1234"}
        build_id = "1234"
        build_target = "aosp_cf_x86_64_phone-userdebug"
        checkfile1 = "aosp_cf_x86_phone-img-1234.zip"
        checkfile2 = "cvd-host_package.tar.gz"
        extract_path = "/tmp/1234"

        create_common.DownloadRemoteArtifact(
            avd_spec.cfg,
            avd_spec.remote_image["build_target"],
            avd_spec.remote_image["build_id"],
            checkfile1,
            extract_path,
            decompress=True)

        self.assertEqual(mock_build_client.DownloadArtifact.call_count, 1)
        mock_build_client.DownloadArtifact.assert_called_once_with(
            build_target,
            build_id,
            checkfile1,
            "%s/%s" % (extract_path, checkfile1))
        self.assertEqual(mock_decompress.call_count, 1)

        mock_decompress.call_count = 0
        mock_build_client.DownloadArtifact.call_count = 0
        create_common.DownloadRemoteArtifact(
            avd_spec.cfg,
            avd_spec.remote_image["build_target"],
            avd_spec.remote_image["build_id"],
            checkfile2,
            extract_path)

        self.assertEqual(mock_build_client.DownloadArtifact.call_count, 1)
        mock_build_client.DownloadArtifact.assert_called_once_with(
            build_target,
            build_id,
            checkfile2,
            "%s/%s" % (extract_path, checkfile2))
        self.assertEqual(mock_decompress.call_count, 0)

    def testPrepareLocalInstanceDir(self):
        """test PrepareLocalInstanceDir."""
        temp_dir = tempfile.mkdtemp()
        try:
            cvd_home_dir = os.path.join(temp_dir, "local-instance-1")
            mock_avd_spec = mock.Mock(local_instance_dir=None)
            create_common.PrepareLocalInstanceDir(cvd_home_dir, mock_avd_spec)
            self.assertTrue(os.path.isdir(cvd_home_dir) and
                            not os.path.islink(cvd_home_dir))

            link_target_dir = os.path.join(temp_dir, "cvd_home")
            os.mkdir(link_target_dir)
            mock_avd_spec.local_instance_dir = link_target_dir
            create_common.PrepareLocalInstanceDir(cvd_home_dir, mock_avd_spec)
            self.assertTrue(os.path.islink(cvd_home_dir) and
                            os.path.samefile(cvd_home_dir, link_target_dir))
        finally:
            shutil.rmtree(temp_dir)


if __name__ == "__main__":
    unittest.main()
