# Copyright 2017 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest

from google.api_core.iam import _DICT_ACCESS_MSG, InvalidOperationException


class TestPolicy:
    @staticmethod
    def _get_target_class():
        from google.api_core.iam import Policy

        return Policy

    def _make_one(self, *args, **kw):
        return self._get_target_class()(*args, **kw)

    def test_ctor_defaults(self):
        empty = frozenset()
        policy = self._make_one()
        assert policy.etag is None
        assert policy.version is None
        assert policy.owners == empty
        assert policy.editors == empty
        assert policy.viewers == empty
        assert len(policy) == 0
        assert dict(policy) == {}

    def test_ctor_explicit(self):
        VERSION = 1
        ETAG = "ETAG"
        empty = frozenset()
        policy = self._make_one(ETAG, VERSION)
        assert policy.etag == ETAG
        assert policy.version == VERSION
        assert policy.owners == empty
        assert policy.editors == empty
        assert policy.viewers == empty
        assert len(policy) == 0
        assert dict(policy) == {}

    def test___getitem___miss(self):
        policy = self._make_one()
        assert policy["nonesuch"] == set()

    def test__getitem___and_set(self):
        from google.api_core.iam import OWNER_ROLE

        policy = self._make_one()

        # get the policy using the getter and then modify it
        policy[OWNER_ROLE].add("user:phred@example.com")
        assert dict(policy) == {OWNER_ROLE: {"user:phred@example.com"}}

    def test___getitem___version3(self):
        policy = self._make_one("DEADBEEF", 3)
        with pytest.raises(InvalidOperationException, match=_DICT_ACCESS_MSG):
            policy["role"]

    def test___getitem___with_conditions(self):
        USER = "user:phred@example.com"
        CONDITION = {"expression": "2 > 1"}
        policy = self._make_one("DEADBEEF", 1)
        policy.bindings = [
            {"role": "role/reader", "members": [USER], "condition": CONDITION}
        ]
        with pytest.raises(InvalidOperationException, match=_DICT_ACCESS_MSG):
            policy["role/reader"]

    def test___setitem__(self):
        USER = "user:phred@example.com"
        PRINCIPALS = set([USER])
        policy = self._make_one()
        policy["rolename"] = [USER]
        assert policy["rolename"] == PRINCIPALS
        assert len(policy) == 1
        assert dict(policy) == {"rolename": PRINCIPALS}

    def test__set_item__overwrite(self):
        GROUP = "group:test@group.com"
        USER = "user:phred@example.com"
        ALL_USERS = "allUsers"
        MEMBERS = set([ALL_USERS])
        GROUPS = set([GROUP])
        policy = self._make_one()
        policy["first"] = [GROUP]
        policy["second"] = [USER]
        policy["second"] = [ALL_USERS]
        assert policy["second"] == MEMBERS
        assert len(policy) == 2
        assert dict(policy) == {"first": GROUPS, "second": MEMBERS}

    def test___setitem___version3(self):
        policy = self._make_one("DEADBEEF", 3)
        with pytest.raises(InvalidOperationException, match=_DICT_ACCESS_MSG):
            policy["role/reader"] = ["user:phred@example.com"]

    def test___setitem___with_conditions(self):
        USER = "user:phred@example.com"
        CONDITION = {"expression": "2 > 1"}
        policy = self._make_one("DEADBEEF", 1)
        policy.bindings = [
            {"role": "role/reader", "members": set([USER]), "condition": CONDITION}
        ]
        with pytest.raises(InvalidOperationException, match=_DICT_ACCESS_MSG):
            policy["role/reader"] = ["user:phred@example.com"]

    def test___delitem___hit(self):
        policy = self._make_one()
        policy.bindings = [
            {"role": "to/keep", "members": set(["phred@example.com"])},
            {"role": "to/remove", "members": set(["phred@example.com"])},
        ]
        del policy["to/remove"]
        assert len(policy) == 1
        assert dict(policy) == {"to/keep": set(["phred@example.com"])}

    def test___delitem___miss(self):
        policy = self._make_one()
        with pytest.raises(KeyError):
            del policy["nonesuch"]

    def test___delitem___version3(self):
        policy = self._make_one("DEADBEEF", 3)
        with pytest.raises(InvalidOperationException, match=_DICT_ACCESS_MSG):
            del policy["role/reader"]

    def test___delitem___with_conditions(self):
        USER = "user:phred@example.com"
        CONDITION = {"expression": "2 > 1"}
        policy = self._make_one("DEADBEEF", 1)
        policy.bindings = [
            {"role": "role/reader", "members": set([USER]), "condition": CONDITION}
        ]
        with pytest.raises(InvalidOperationException, match=_DICT_ACCESS_MSG):
            del policy["role/reader"]

    def test_bindings_property(self):
        USER = "user:phred@example.com"
        CONDITION = {"expression": "2 > 1"}
        policy = self._make_one()
        BINDINGS = [
            {"role": "role/reader", "members": set([USER]), "condition": CONDITION}
        ]
        policy.bindings = BINDINGS
        assert policy.bindings == BINDINGS

    def test_owners_getter(self):
        from google.api_core.iam import OWNER_ROLE

        MEMBER = "user:phred@example.com"
        expected = frozenset([MEMBER])
        policy = self._make_one()
        policy[OWNER_ROLE] = [MEMBER]
        assert policy.owners == expected

    def test_owners_setter(self):
        import warnings
        from google.api_core.iam import OWNER_ROLE

        MEMBER = "user:phred@example.com"
        expected = set([MEMBER])
        policy = self._make_one()

        with warnings.catch_warnings(record=True) as warned:
            policy.owners = [MEMBER]

        (warning,) = warned
        assert warning.category is DeprecationWarning
        assert policy[OWNER_ROLE] == expected

    def test_editors_getter(self):
        from google.api_core.iam import EDITOR_ROLE

        MEMBER = "user:phred@example.com"
        expected = frozenset([MEMBER])
        policy = self._make_one()
        policy[EDITOR_ROLE] = [MEMBER]
        assert policy.editors == expected

    def test_editors_setter(self):
        import warnings
        from google.api_core.iam import EDITOR_ROLE

        MEMBER = "user:phred@example.com"
        expected = set([MEMBER])
        policy = self._make_one()

        with warnings.catch_warnings(record=True) as warned:
            policy.editors = [MEMBER]

        (warning,) = warned
        assert warning.category is DeprecationWarning
        assert policy[EDITOR_ROLE] == expected

    def test_viewers_getter(self):
        from google.api_core.iam import VIEWER_ROLE

        MEMBER = "user:phred@example.com"
        expected = frozenset([MEMBER])
        policy = self._make_one()
        policy[VIEWER_ROLE] = [MEMBER]
        assert policy.viewers == expected

    def test_viewers_setter(self):
        import warnings
        from google.api_core.iam import VIEWER_ROLE

        MEMBER = "user:phred@example.com"
        expected = set([MEMBER])
        policy = self._make_one()

        with warnings.catch_warnings(record=True) as warned:
            policy.viewers = [MEMBER]

        (warning,) = warned
        assert warning.category is DeprecationWarning
        assert policy[VIEWER_ROLE] == expected

    def test_user(self):
        EMAIL = "phred@example.com"
        MEMBER = "user:%s" % (EMAIL,)
        policy = self._make_one()
        assert policy.user(EMAIL) == MEMBER

    def test_service_account(self):
        EMAIL = "phred@example.com"
        MEMBER = "serviceAccount:%s" % (EMAIL,)
        policy = self._make_one()
        assert policy.service_account(EMAIL) == MEMBER

    def test_group(self):
        EMAIL = "phred@example.com"
        MEMBER = "group:%s" % (EMAIL,)
        policy = self._make_one()
        assert policy.group(EMAIL) == MEMBER

    def test_domain(self):
        DOMAIN = "example.com"
        MEMBER = "domain:%s" % (DOMAIN,)
        policy = self._make_one()
        assert policy.domain(DOMAIN) == MEMBER

    def test_all_users(self):
        policy = self._make_one()
        assert policy.all_users() == "allUsers"

    def test_authenticated_users(self):
        policy = self._make_one()
        assert policy.authenticated_users() == "allAuthenticatedUsers"

    def test_from_api_repr_only_etag(self):
        empty = frozenset()
        RESOURCE = {"etag": "ACAB"}
        klass = self._get_target_class()
        policy = klass.from_api_repr(RESOURCE)
        assert policy.etag == "ACAB"
        assert policy.version is None
        assert policy.owners == empty
        assert policy.editors == empty
        assert policy.viewers == empty
        assert dict(policy) == {}

    def test_from_api_repr_complete(self):
        from google.api_core.iam import OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE

        OWNER1 = "group:cloud-logs@google.com"
        OWNER2 = "user:phred@example.com"
        EDITOR1 = "domain:google.com"
        EDITOR2 = "user:phred@example.com"
        VIEWER1 = "serviceAccount:1234-abcdef@service.example.com"
        VIEWER2 = "user:phred@example.com"
        RESOURCE = {
            "etag": "DEADBEEF",
            "version": 1,
            "bindings": [
                {"role": OWNER_ROLE, "members": [OWNER1, OWNER2]},
                {"role": EDITOR_ROLE, "members": [EDITOR1, EDITOR2]},
                {"role": VIEWER_ROLE, "members": [VIEWER1, VIEWER2]},
            ],
        }
        klass = self._get_target_class()
        policy = klass.from_api_repr(RESOURCE)
        assert policy.etag == "DEADBEEF"
        assert policy.version == 1
        assert policy.owners, frozenset([OWNER1 == OWNER2])
        assert policy.editors, frozenset([EDITOR1 == EDITOR2])
        assert policy.viewers, frozenset([VIEWER1 == VIEWER2])
        assert dict(policy) == {
            OWNER_ROLE: set([OWNER1, OWNER2]),
            EDITOR_ROLE: set([EDITOR1, EDITOR2]),
            VIEWER_ROLE: set([VIEWER1, VIEWER2]),
        }
        assert policy.bindings == [
            {"role": OWNER_ROLE, "members": set([OWNER1, OWNER2])},
            {"role": EDITOR_ROLE, "members": set([EDITOR1, EDITOR2])},
            {"role": VIEWER_ROLE, "members": set([VIEWER1, VIEWER2])},
        ]

    def test_from_api_repr_unknown_role(self):
        USER = "user:phred@example.com"
        GROUP = "group:cloud-logs@google.com"
        RESOURCE = {
            "etag": "DEADBEEF",
            "version": 1,
            "bindings": [{"role": "unknown", "members": [USER, GROUP]}],
        }
        klass = self._get_target_class()
        policy = klass.from_api_repr(RESOURCE)
        assert policy.etag == "DEADBEEF"
        assert policy.version == 1
        assert dict(policy), {"unknown": set([GROUP == USER])}

    def test_to_api_repr_defaults(self):
        policy = self._make_one()
        assert policy.to_api_repr() == {}

    def test_to_api_repr_only_etag(self):
        policy = self._make_one("DEADBEEF")
        assert policy.to_api_repr() == {"etag": "DEADBEEF"}

    def test_to_api_repr_binding_wo_members(self):
        policy = self._make_one()
        policy["empty"] = []
        assert policy.to_api_repr() == {}

    def test_to_api_repr_binding_w_duplicates(self):
        import warnings
        from google.api_core.iam import OWNER_ROLE

        OWNER = "group:cloud-logs@google.com"
        policy = self._make_one()
        with warnings.catch_warnings(record=True):
            policy.owners = [OWNER, OWNER]
        assert policy.to_api_repr() == {
            "bindings": [{"role": OWNER_ROLE, "members": [OWNER]}]
        }

    def test_to_api_repr_full(self):
        import operator
        from google.api_core.iam import OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE

        OWNER1 = "group:cloud-logs@google.com"
        OWNER2 = "user:phred@example.com"
        EDITOR1 = "domain:google.com"
        EDITOR2 = "user:phred@example.com"
        VIEWER1 = "serviceAccount:1234-abcdef@service.example.com"
        VIEWER2 = "user:phred@example.com"
        CONDITION = {
            "title": "title",
            "description": "description",
            "expression": "true",
        }
        BINDINGS = [
            {"role": OWNER_ROLE, "members": [OWNER1, OWNER2]},
            {"role": EDITOR_ROLE, "members": [EDITOR1, EDITOR2]},
            {"role": VIEWER_ROLE, "members": [VIEWER1, VIEWER2]},
            {
                "role": VIEWER_ROLE,
                "members": [VIEWER1, VIEWER2],
                "condition": CONDITION,
            },
        ]
        policy = self._make_one("DEADBEEF", 1)
        policy.bindings = BINDINGS
        resource = policy.to_api_repr()
        assert resource["etag"] == "DEADBEEF"
        assert resource["version"] == 1
        key = operator.itemgetter("role")
        assert sorted(resource["bindings"], key=key) == sorted(BINDINGS, key=key)
