#!/usr/bin/env python3

import datetime
import json
import os
import pathlib
import shutil
from typing import Any, Callable, cast, Dict, List, Optional, Union
from urllib.request import urlopen

REPO_ROOT = pathlib.Path(__file__).resolve().parent.parent.parent


def get_disabled_issues() -> List[str]:
    reenabled_issues = os.getenv("REENABLED_ISSUES", "")
    issue_numbers = reenabled_issues.split(",")
    print("Ignoring disabled issues: ", issue_numbers)
    return issue_numbers


SLOW_TESTS_FILE = ".pytorch-slow-tests.json"
DISABLED_TESTS_FILE = ".pytorch-disabled-tests.json"
ADDITIONAL_CI_FILES_FOLDER = pathlib.Path(".additional_ci_files")
TEST_TIMES_FILE = "test-times.json"
TEST_CLASS_TIMES_FILE = "test-class-times.json"
TEST_FILE_RATINGS_FILE = "test-file-ratings.json"
TEST_CLASS_RATINGS_FILE = "test-class-ratings.json"
TD_HEURISTIC_PROFILING_FILE = "td_heuristic_profiling.json"
TD_HEURISTIC_HISTORICAL_EDITED_FILES = "td_heuristic_historical_edited_files.json"
TD_HEURISTIC_PREVIOUSLY_FAILED = "previous_failures.json"
TD_HEURISTIC_PREVIOUSLY_FAILED_ADDITIONAL = "previous_failures_additional.json"

FILE_CACHE_LIFESPAN_SECONDS = datetime.timedelta(hours=3).seconds


def fetch_and_cache(
    dirpath: Union[str, pathlib.Path],
    name: str,
    url: str,
    process_fn: Callable[[Dict[str, Any]], Dict[str, Any]],
) -> Dict[str, Any]:
    """
    This fetch and cache utils allows sharing between different process.
    """
    pathlib.Path(dirpath).mkdir(exist_ok=True)

    path = os.path.join(dirpath, name)
    print(f"Downloading {url} to {path}")

    def is_cached_file_valid() -> bool:
        # Check if the file is new enough (see: FILE_CACHE_LIFESPAN_SECONDS). A real check
        # could make a HEAD request and check/store the file's ETag
        fname = pathlib.Path(path)
        now = datetime.datetime.now()
        mtime = datetime.datetime.fromtimestamp(fname.stat().st_mtime)
        diff = now - mtime
        return diff.total_seconds() < FILE_CACHE_LIFESPAN_SECONDS

    if os.path.exists(path) and is_cached_file_valid():
        # Another test process already download the file, so don't re-do it
        with open(path) as f:
            return cast(Dict[str, Any], json.load(f))

    for _ in range(3):
        try:
            contents = urlopen(url, timeout=5).read().decode("utf-8")
            processed_contents = process_fn(json.loads(contents))
            with open(path, "w") as f:
                f.write(json.dumps(processed_contents))
            return processed_contents
        except Exception as e:
            print(f"Could not download {url} because: {e}.")
    print(f"All retries exhausted, downloading {url} failed.")
    return {}


def get_slow_tests(
    dirpath: str, filename: str = SLOW_TESTS_FILE
) -> Optional[Dict[str, float]]:
    url = "https://ossci-metrics.s3.amazonaws.com/slow-tests.json?versionId=oKMp2dsjwgbtvuXJrL9fZbQiSJkiw91I"
    try:
        return fetch_and_cache(dirpath, filename, url, lambda x: x)
    except Exception:
        print("Couldn't download slow test set, leaving all tests enabled...")
        return {}


def get_test_times() -> Dict[str, Dict[str, float]]:
    return get_from_test_infra_generated_stats(
        "test-times.json",
        TEST_TIMES_FILE,
        "Couldn't download test times...",
    )


def get_test_class_times() -> Dict[str, Dict[str, float]]:
    return get_from_test_infra_generated_stats(
        "test-class-times.json",
        TEST_CLASS_TIMES_FILE,
        "Couldn't download test times...",
    )


def get_disabled_tests(
    dirpath: str, filename: str = DISABLED_TESTS_FILE
) -> Optional[Dict[str, Any]]:
    def process_disabled_test(the_response: Dict[str, Any]) -> Dict[str, Any]:
        # remove re-enabled tests and condense even further by getting rid of pr_num
        disabled_issues = get_disabled_issues()
        disabled_test_from_issues = dict()
        for test_name, (pr_num, link, platforms) in the_response.items():
            if pr_num not in disabled_issues:
                disabled_test_from_issues[test_name] = (
                    link,
                    platforms,
                )
        return disabled_test_from_issues

    try:
        url = "https://ossci-metrics.s3.amazonaws.com/disabled-tests-condensed.json?versionId=0zzD6gFqZ9l2Vs1SYtjHXSuVLz6BaLtE"
        return fetch_and_cache(dirpath, filename, url, process_disabled_test)
    except Exception:
        print("Couldn't download test skip set, leaving all tests enabled...")
        return {}


def get_test_file_ratings() -> Dict[str, Any]:
    return get_from_test_infra_generated_stats(
        "file_test_rating.json",
        TEST_FILE_RATINGS_FILE,
        "Couldn't download test file ratings file, not reordering...",
    )


def get_test_class_ratings() -> Dict[str, Any]:
    return get_from_test_infra_generated_stats(
        "file_test_class_rating.json",
        TEST_CLASS_RATINGS_FILE,
        "Couldn't download test class ratings file, not reordering...",
    )


def get_td_heuristic_historial_edited_files_json() -> Dict[str, Any]:
    return get_from_test_infra_generated_stats(
        "td_heuristic_historical_edited_files.json",
        TD_HEURISTIC_HISTORICAL_EDITED_FILES,
        "Couldn't download td_heuristic_historical_edited_files.json, not reordering...",
    )


def get_td_heuristic_profiling_json() -> Dict[str, Any]:
    return get_from_test_infra_generated_stats(
        "td_heuristic_profiling.json",
        TD_HEURISTIC_PROFILING_FILE,
        "Couldn't download td_heuristic_profiling.json not reordering...",
    )


def copy_pytest_cache() -> None:
    original_path = REPO_ROOT / ".pytest_cache/v/cache/lastfailed"
    if not original_path.exists():
        return
    shutil.copyfile(
        original_path,
        REPO_ROOT / ADDITIONAL_CI_FILES_FOLDER / TD_HEURISTIC_PREVIOUSLY_FAILED,
    )


def copy_additional_previous_failures() -> None:
    original_path = (
        REPO_ROOT / ".pytest_cache" / TD_HEURISTIC_PREVIOUSLY_FAILED_ADDITIONAL
    )
    if not original_path.exists():
        return
    shutil.copyfile(
        original_path,
        REPO_ROOT
        / ADDITIONAL_CI_FILES_FOLDER
        / TD_HEURISTIC_PREVIOUSLY_FAILED_ADDITIONAL,
    )


def get_from_test_infra_generated_stats(
    from_file: str, to_file: str, failure_explanation: str
) -> Dict[str, Any]:
    url = f"https://raw.githubusercontent.com/pytorch/test-infra/generated-stats/stats/{from_file}"
    try:
        return fetch_and_cache(
            REPO_ROOT / ADDITIONAL_CI_FILES_FOLDER, to_file, url, lambda x: x
        )
    except Exception:
        print(failure_explanation)
        return {}
