#!/usr/bin/env python
#
# Copyright 2018 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Common code used by acloud create methods/classes."""

import collections
import logging
import os
import re
import shutil

from acloud import errors
from acloud.internal import constants
from acloud.internal.lib import android_build_client
from acloud.internal.lib import auth
from acloud.internal.lib import utils


logger = logging.getLogger(__name__)

# The boot image name pattern supports the following cases:
# - Cuttlefish ANDROID_PRODUCT_OUT directory conatins boot.img.
# - In Android 12, the officially released GKI (Generic Kernel Image) name is
#   boot-<kernel version>.img.
# - In Android 13, the name is boot.img.
_BOOT_IMAGE_NAME_PATTERN = r"boot(-[\d.]+)?\.img"
_TARGET_FILES_IMAGES_DIR_NAME = "IMAGES"
_SYSTEM_IMAGE_NAME = "system.img"
_SYSTEM_EXT_IMAGE_NAME = "system_ext.img"
_PRODUCT_IMAGE_NAME = "product.img"
_VENDOR_BOOT_IMAGE_NAME_PATTERN = r"vendor_boot\.img"

_ANDROID_BOOT_IMAGE_MAGIC = b"ANDROID!"

# Store the file path to upload to the remote instance.
ExtraFile = collections.namedtuple("ExtraFile", ["source", "target"])

SystemImagePaths = collections.namedtuple(
    "SystemImagePaths",
    ["system", "system_ext", "product"])


def ParseExtraFilesArgs(files_info, path_separator=","):
    """Parse extra-files argument.

    e.g.
    ["local_path,gce_path"]
    -> ExtraFile(source='local_path', target='gce_path')

    Args:
        files_info: List of strings to be converted to namedtuple ExtraFile.
        item_separator: String character to separate file info.

    Returns:
        A list of namedtuple ExtraFile.

    Raises:
        error.MalformedDictStringError: If files_info is malformed.
    """
    extra_files = []
    if files_info:
        for file_info in files_info:
            if path_separator not in file_info:
                raise errors.MalformedDictStringError(
                    "Expecting '%s' in '%s'." % (path_separator, file_info))
            source, target = file_info.split(path_separator)
            extra_files.append(ExtraFile(source, target))
    return extra_files


def ParseKeyValuePairArgs(dict_str, item_separator=",", key_value_separator=":"):
    """Helper function to initialize a dict object from string.

    e.g.
    cpu:2,dpi:240,resolution:1280x800
    -> {"cpu":"2", "dpi":"240", "resolution":"1280x800"}

    Args:
        dict_str: A String to be converted to dict object.
        item_separator: String character to separate items.
        key_value_separator: String character to separate key and value.

    Returns:
        Dict created from key:val pairs in dict_str.

    Raises:
        error.MalformedDictStringError: If dict_str is malformed.
    """
    args_dict = {}
    if not dict_str:
        return args_dict

    for item in dict_str.split(item_separator):
        if key_value_separator not in item:
            raise errors.MalformedDictStringError(
                "Expecting ':' in '%s' to make a key-val pair" % item)
        key, value = item.split(key_value_separator)
        if not value or not key:
            raise errors.MalformedDictStringError(
                "Missing key or value in %s, expecting form of 'a:b'" % item)
        args_dict[key.strip()] = value.strip()

    return args_dict


def GetNonEmptyEnvVars(*variable_names):
    """Get non-empty environment variables.

    Args:
        variable_names: Strings, the variable names.

    Returns:
        List of strings, the variable values that are defined and not empty.
    """
    return list(filter(None, (os.environ.get(v) for v in variable_names)))


def GetCvdHostPackage(package_path=None):
    """Get cvd host package path.

    Look for the host package in specified path or $ANDROID_HOST_OUT and dist
    dir then verify existence and get cvd host package path.

    Args:
        package_path: String of cvd host package path.

    Return:
        A string, the path to the host package.

    Raises:
        errors.GetCvdLocalHostPackageError: Can't find cvd host package.
    """
    if package_path:
        if os.path.exists(package_path):
            return package_path
        raise errors.GetCvdLocalHostPackageError(
            "The cvd host package path (%s) doesn't exist." % package_path)
    dirs_to_check = GetNonEmptyEnvVars(constants.ENV_ANDROID_SOONG_HOST_OUT,
                                       constants.ENV_ANDROID_HOST_OUT)
    dist_dir = utils.GetDistDir()
    if dist_dir:
        dirs_to_check.append(dist_dir)

    for path in dirs_to_check:
        for name in [constants.CVD_HOST_TARBALL, constants.CVD_HOST_PACKAGE]:
            cvd_host_package = os.path.join(path, name)
            if os.path.exists(cvd_host_package):
                logger.debug("cvd host package: %s", cvd_host_package)
                return cvd_host_package
    raise errors.GetCvdLocalHostPackageError(
        "Can't find the cvd host package (Try lunching a cuttlefish target"
        " like aosp_cf_x86_64_phone-userdebug and running 'm'): \n%s" %
        '\n'.join(dirs_to_check))


def FindLocalImage(path, default_name_pattern, raise_error=True):
    """Find an image file in the given path.

    Args:
        path: The path to the file or the parent directory.
        default_name_pattern: A regex string, the file to look for if the path
                              is a directory.

    Returns:
        The absolute path to the image file.

    Raises:
        errors.GetLocalImageError if this method cannot find exactly one image.
    """
    path = os.path.abspath(path)
    if os.path.isdir(path):
        names = [name for name in os.listdir(path) if
                 re.fullmatch(default_name_pattern, name)]
        if not names:
            if raise_error:
                raise errors.GetLocalImageError(f"No image in {path}.")
            return None
        if len(names) != 1:
            raise errors.GetLocalImageError(
                f"More than one image in {path}: {' '.join(names)}")
        path = os.path.join(path, names[0])
    if os.path.isfile(path):
        return path
    raise errors.GetLocalImageError(f"{path} is not a file.")


def _IsBootImage(image_path):
    """Check if a file is an Android boot image by reading the magic bytes.

    Args:
        image_path: The file path.

    Returns:
        A boolean, whether the file is a boot image.
    """
    if not os.path.isfile(image_path):
        return False
    with open(image_path, "rb") as image_file:
        return image_file.read(8) == _ANDROID_BOOT_IMAGE_MAGIC


def FindBootImage(path, raise_error=True):
    """Find a boot image file in the given path."""
    boot_image_path = FindLocalImage(path, _BOOT_IMAGE_NAME_PATTERN,
                                     raise_error)
    if boot_image_path and not _IsBootImage(boot_image_path):
        raise errors.GetLocalImageError(
            f"{boot_image_path} is not a boot image.")
    return boot_image_path


def FindVendorBootImage(path, raise_error=True):
    """Find a vendor boot image file in the given path."""
    return FindLocalImage(path, _VENDOR_BOOT_IMAGE_NAME_PATTERN, raise_error)


def FindSystemImages(path):
    """Find system, system_ext, and product image files in a given path.

    Args:
        path: A string, the search path.

    Returns:
        The absolute paths to system, system_ext and product images.
        The paths to system_ext and product can be None.

    Raises:
        GetLocalImageError if this method cannot find the system image.
    """
    path = os.path.abspath(path)
    if os.path.isfile(path):
        return SystemImagePaths(path, None, None)

    image_dir = path
    system_image_path = os.path.join(image_dir, _SYSTEM_IMAGE_NAME)
    if not os.path.isfile(system_image_path):
        image_dir = os.path.join(path, _TARGET_FILES_IMAGES_DIR_NAME)
        system_image_path = os.path.join(image_dir, _SYSTEM_IMAGE_NAME)
        if not os.path.isfile(system_image_path):
            raise errors.GetLocalImageError(
                f"No {_SYSTEM_IMAGE_NAME} in {path}.")

    system_ext_image_path = os.path.join(image_dir, _SYSTEM_EXT_IMAGE_NAME)
    product_image_path = os.path.join(image_dir, _PRODUCT_IMAGE_NAME)
    return SystemImagePaths(
        system_image_path,
        (system_ext_image_path if os.path.isfile(system_ext_image_path) else
         None),
        (product_image_path if os.path.isfile(product_image_path) else None))


def DownloadRemoteArtifact(cfg, build_target, build_id, artifact, extract_path,
                           decompress=False):
    """Download remote artifact.

    Args:
        cfg: An AcloudConfig instance.
        build_target: String, the build target, e.g. cf_x86_phone-userdebug.
        build_id: String, Build id, e.g. "2263051", "P2804227"
        artifact: String, zip image or cvd host package artifact.
        extract_path: String, a path include extracted files.
        decompress: Boolean, if true decompress the artifact.
    """
    build_client = android_build_client.AndroidBuildClient(
        auth.CreateCredentials(cfg))
    temp_file = os.path.join(extract_path, artifact)
    build_client.DownloadArtifact(
        build_target,
        build_id,
        artifact,
        temp_file)
    if decompress:
        utils.Decompress(temp_file, extract_path)
        try:
            os.remove(temp_file)
            logger.debug("Deleted temporary file %s", temp_file)
        except OSError as e:
            logger.error("Failed to delete temporary file: %s", str(e))


def PrepareLocalInstanceDir(instance_dir, avd_spec):
    """Create a directory for a local cuttlefish or goldfish instance.

    If avd_spec has the local instance directory, this method creates a
    symbolic link from instance_dir to the directory. Otherwise, it creates an
    empty directory at instance_dir.

    Args:
        instance_dir: The absolute path to the default instance directory.
        avd_spec: AVDSpec object that provides the instance directory.
    """
    if os.path.islink(instance_dir):
        os.remove(instance_dir)
    else:
        shutil.rmtree(instance_dir, ignore_errors=True)

    if avd_spec.local_instance_dir:
        abs_instance_dir = os.path.abspath(avd_spec.local_instance_dir)
        if instance_dir != abs_instance_dir:
            os.symlink(abs_instance_dir, instance_dir)
            return
    if not os.path.exists(instance_dir):
        os.makedirs(instance_dir)
