# Lint as: python2, python3
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import logging, os, tempfile, shutil, stat, time
from autotest_lib.client.bin import test, utils
from autotest_lib.client.common_lib import error

# TODO:
#  - mock out TPM and check all error conditions
#  - test failure when things aren't mounted correctly

class test_checker(object):
    """ A helper for test result """

    def __init__(self):
        logging.info("test_checker.__init__")
        """ Empty failure list means test passes. """
        self._failures = []

    def _passed(self, msg):
        logging.info('ok: %s', (msg))

    def _failed(self, msg):
        logging.error('FAIL: %s', (msg))
        self._failures.append(msg)

    def _fatal(self, msg):
        logging.error('FATAL: %s', (msg))
        raise error.TestError(msg)

    def check(self, boolean, msg, fatal=False):
        """ Check the result log error """
        if boolean == True:
            self._passed(msg)
        else:
            msg = "could not satisfy '%s'" % (msg)
            if fatal:
                self._fatal(msg)
            else:
                self._failed(msg)

    def test_raise(self):
        """ Raise a failure if anything unexpected was seen. """
        if len(self._failures):
            raise error.TestFail((", ".join(self._failures)))

chk = test_checker()


class EncryptedStateful(object):
    """ A helper to operate the encrypted stateful. """

    def _prepare_simulated_root(self):
        os.makedirs(self.var)
        os.makedirs(self.chronos)
        os.makedirs(self.stateful)

        # Build fake stateful block device (emulate 10G sda1).
        self.stateful_block = os.path.join(self.root, 'stateful.block')
        utils.system("truncate -s 10G %s" % (self.stateful_block))
        utils.system("mkfs.ext4 -F %s" % (self.stateful_block))
        utils.system("mount -n -t ext4 -o loop,noatime,commit=600 %s %s" %
                     (self.stateful_block, self.stateful))

    def __init__(self, root=None):
        if root == None:
            self.root = tempfile.mkdtemp(dir='/mnt/stateful_partition',
                                         prefix='.test-enc-stateful-')
            self.simulated = True
        else:
            self.root = root
            self.simulated = False

        self.var = os.path.join(self.root, 'var')
        self.chronos = os.path.join(self.root, 'home', 'chronos')
        self.stateful = os.path.join(self.root, 'mnt', 'stateful_partition')
        self.mount_log = os.path.join(self.stateful, 'mount.log')
        self.key = os.path.join(self.stateful, 'encrypted.key')
        self.needs_finalization = os.path.join(self.stateful,
                                               'encrypted.needs-finalization')
        self.block = os.path.join(self.stateful, 'encrypted.block')
        self.encrypted = os.path.join(self.stateful, 'encrypted')

        if self.simulated:
            try:
                self._prepare_simulated_root()
            except:
                shutil.rmtree(self.root)
                raise

        self.mounted = not self.simulated

    def mount(self, args=""):
        """ Mount the encstateful partition """
        if self.mounted or not self.simulated:
            return
        # TODO(keescook): figure out what is killing the resizer and
        # remove the explicit use of "tee" here.
        # Without the pipe to "tee", mount-encrypted's forked resizing
        # process gets killed, even though it is using daemon(). (Is
        # autotest doing something odd here?) This leaves the filesystem
        # unresized. It would be better to have the resizer running in
        # the background, as it is designed, so we can examine its behavior
        # during testing (e.g. "does the filesystem actually grow?").
        utils.system("MOUNT_ENCRYPTED_ROOT=%s mount-encrypted --unsafe "
                     "%s 2>&1 | tee %s" % (self.root, args, self.mount_log))
        self.mounted = True

    def umount(self):
        """ Unmount the encstateful partition """
        if not self.mounted or not self.simulated:
            return
        utils.system("MOUNT_ENCRYPTED_ROOT=%s mount-encrypted umount" %
                         (self.root))
        self.mounted = False

    def __del__(self):
        """ Clean up when destroyed. """
        if self.simulated:
            self.umount()
            utils.system("umount -n %s" % (self.stateful))
            shutil.rmtree(self.root)

    def check_sizes(self, finalized=True):
        """
        Perform common post-mount size/owner checks on the filesystem and
        backing files.
        """
        # Do we have the expected backing files?
        chk.check(os.path.exists(self.block), "%s exists" % (self.block))
        if finalized:
            keyfile = self.key
            other = self.needs_finalization
        else:
            keyfile = self.needs_finalization
            other = self.key
        chk.check(os.path.exists(keyfile), "%s exists" % (keyfile))
        chk.check(not os.path.exists(other), "%s does not exist" % (other))

        # Check the key file stat.
        info = os.stat(keyfile)
        chk.check(stat.S_ISREG(info.st_mode),
                  "%s is regular file" % (keyfile))
        chk.check(info.st_uid == 0, "%s is owned by root" % (keyfile))
        chk.check(info.st_gid == 0, "%s has group root" % (keyfile))
        chk.check(stat.S_IMODE(info.st_mode) == (stat.S_IRUSR | stat.S_IWUSR),
                  "%s is S_IRUSR | S_IWUSR" % (keyfile))
        chk.check(info.st_size == 48, "%s is 48 bytes" % (keyfile))

        # Check the block file stat.
        info = os.stat(self.block)
        chk.check(stat.S_ISREG(info.st_mode),
                  "%s is regular file" % (self.block))
        chk.check(info.st_uid == 0, "%s is owned by root" % (self.block))
        chk.check(info.st_gid == 0, "%s has group root" % (self.block))
        chk.check(stat.S_IMODE(info.st_mode) == (stat.S_IRUSR | stat.S_IWUSR),
                  "%s is S_IRUSR | S_IWUSR" % (self.block))
        # Make sure block file is roughly a third of the size of the root
        # filesystem (within 5%).
        top = os.statvfs(self.stateful)
        backing_size = float(info.st_size)
        third = top.f_blocks * top.f_frsize * .3
        chk.check(backing_size > (third * .95)
                  and backing_size < (third * 1.05),
                  "%s is near %d bytes (was %d)" % (self.block, third,
                                                    info.st_size))

        # Wait for resize manager task to finish.
        utils.poll_for_condition(lambda: utils.system("pgrep mount-encrypted",
                                                      ignore_status=True) != 0,
                                 error.TestError('resizer still running'))

        # Verify filesystem is within 5% of the block file size.
        info = os.statvfs(self.encrypted)
        encrypted_size = float(info.f_frsize) * float(info.f_blocks)
        chk.check(encrypted_size / backing_size > 0.95,
                  "%s fs (%d) is nearly the backing device size (%d)" %
                  (self.encrypted, encrypted_size, backing_size))
        # Verify there is a reasonable number of inodes in the encrypted
        # filesystem (near 25% inodes-to-blocks ratio).
        inode_ratio = float(info.f_files) / float(info.f_blocks)
        chk.check(inode_ratio > 0.20 and inode_ratio < 0.30,
                  "%s has close to 25%% ratio of inodes-to-blocks (%.2f%%)" %
                  (self.encrypted, inode_ratio*100))

        # Raise non-fatal failures now, if they were encountered.
        chk.test_raise()

    # Wait for kernel background writing to finish.
    def _backing_stabilize(self):
        start = None
        size = 0
        while True:
            k = int(
                    utils.system_output("du -sk %s" % (self.block),
                                        retain_output=True).split()[0])
            if start == None:
                start = k
            if k == size:
                # Backing file has remained the same size for 10 seconds.
                # Assume the kernel is done with background initialization.
                break
            time.sleep(10)
            utils.system("sync")
            size = k
        logging.info("%s stabilized at %dK (was %dK)",
                     (self.block, size, start))

    def check_reclamation(self):
        """
        Check that the backing file reclaims space when filesystem contents
        are deleted.
        """
        # This test is sensitive to other things happening on the filesystem,
        # so we must wait for background initialization to finish first.
        self._backing_stabilize()

        megs = 200
        data = os.path.join(self.var, "check_reclamation")
        orig = os.statvfs(self.stateful)

        # 200M file added to encrypted filesystem.
        utils.system("dd if=/dev/zero of=%s bs=1M count=%s; sync" % (data,
                                                                     megs))
        # Wait for background allocations to finish.
        self._backing_stabilize()
        filled = os.statvfs(self.stateful)

        # 200M file removed from encrypted filesystem.
        utils.system("rm %s; sync" % (data))
        # Wait for background hole-punching to finish.
        self._backing_stabilize()
        done = os.statvfs(self.stateful)

        # Did the underlying filesystem grow by the size of the test file?
        file_blocks_used = float((megs * 1024 * 1024) / orig.f_frsize)
        fs_blocks_used = float(orig.f_bfree - filled.f_bfree)
        chk.check(file_blocks_used / fs_blocks_used > 0.95,
                  "%d file blocks account for most of %d fs blocks" %
                  (file_blocks_used, fs_blocks_used))

        # Did the underlying filesystem shrink on removal?
        fs_blocks_done = float(orig.f_bfree - done.f_bfree)
        chk.check(fs_blocks_done / file_blocks_used < 0.05,
                  "most of %d fs blocks reclaimed (%d fs blocks left over, "
                  "free: %d -> %d -> %d)" %
                  (fs_blocks_used, fs_blocks_done,
                   orig.f_bfree, filled.f_bfree, done.f_bfree))

        # Raise non-fatal failures now, if they were encountered.
        chk.test_raise()


class platform_EncryptedStateful(test.test):
    """ Test encrypted stateful partition."""
    version = 1

    def is_punch_hole_supported(self):
        """
        With b/80549098, PUNCH_HOLE was disabled for all kernel trees
        before v4.4. This means that the reclamation check will only work
        with kernels that support PUNCH_HOLE.
        """
        kernel_ver = os.uname()[2]
        if utils.compare_versions(kernel_ver, "4.4") < 0 :
            return False
        return True

    def existing_partition(self):
        """ Do a no-write test of system's existing encrypted partition. """
        # Examine the existing encrypted partition.
        encstate = EncryptedStateful("/")

        # Perform post-mount confidence check (and handle unfinalized devices).
        encstate.check_sizes(finalized=os.path.exists(encstate.key))

    def no_tpm(self):
        """
        Do a no-write, no-TPM test with confidence checks. Also do a
        reclamation check against the encrypted stateful partition.
        """
        encstate = EncryptedStateful()

        # Make sure we haven't run here before.
        chk.check(not os.path.exists(encstate.key),
                  "%s does not exist" % (encstate.key))
        chk.check(not os.path.exists(encstate.block),
                  "%s does not exist" % (encstate.block))

        # Relocate the TPM device during mount.
        tpm = "/dev/tpm0"
        off = "%s.off" % (tpm)
        try:
            if os.path.exists(tpm):
                utils.system("mv %s %s" % (tpm, off))
            # Mount without a TPM.
            encstate.mount()
        finally:
            if os.path.exists(off):
                utils.system("mv %s %s" % (off, tpm))

        # Perform post-mount confidence checks.
        encstate.check_sizes(finalized=True)

        # Check disk reclamation for kernels that support PUNCH_HOLE.
        if self.is_punch_hole_supported():
            encstate.check_reclamation()

        # Check explicit umount.
        encstate.umount()

    def run_once(self):
        """ Primary autotest function. """
        # Do a no-write test of system's existing encrypted partition.
        self.existing_partition()

        # Do a no-write, no-TPM test with confidence checks. Also do a
        # reclamation check against the encrypted stateful partition.
        self.no_tpm()
