# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json

import mock
import pytest
from six.moves import http_client
from six.moves import urllib

from google.auth import exceptions
from google.auth import transport
from google.oauth2 import sts
from google.oauth2 import utils

CLIENT_ID = "username"
CLIENT_SECRET = "password"
# Base64 encoding of "username:password"
BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ="


class TestStsClient(object):
    GRANT_TYPE = "urn:ietf:params:oauth:grant-type:token-exchange"
    RESOURCE = "https://api.example.com/"
    AUDIENCE = "urn:example:cooperation-context"
    SCOPES = ["scope1", "scope2"]
    REQUESTED_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token"
    SUBJECT_TOKEN = "HEADER.SUBJECT_TOKEN_PAYLOAD.SIGNATURE"
    SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt"
    ACTOR_TOKEN = "HEADER.ACTOR_TOKEN_PAYLOAD.SIGNATURE"
    ACTOR_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt"
    TOKEN_EXCHANGE_ENDPOINT = "https://example.com/token.oauth2"
    ADDON_HEADERS = {"x-client-version": "0.1.2"}
    ADDON_OPTIONS = {"additional": {"non-standard": ["options"], "other": "some-value"}}
    SUCCESS_RESPONSE = {
        "access_token": "ACCESS_TOKEN",
        "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
        "token_type": "Bearer",
        "expires_in": 3600,
        "scope": "scope1 scope2",
    }
    ERROR_RESPONSE = {
        "error": "invalid_request",
        "error_description": "Invalid subject token",
        "error_uri": "https://tools.ietf.org/html/rfc6749",
    }
    CLIENT_AUTH_BASIC = utils.ClientAuthentication(
        utils.ClientAuthType.basic, CLIENT_ID, CLIENT_SECRET
    )
    CLIENT_AUTH_REQUEST_BODY = utils.ClientAuthentication(
        utils.ClientAuthType.request_body, CLIENT_ID, CLIENT_SECRET
    )

    @classmethod
    def make_client(cls, client_auth=None):
        return sts.Client(cls.TOKEN_EXCHANGE_ENDPOINT, client_auth)

    @classmethod
    def make_mock_request(cls, data, status=http_client.OK):
        response = mock.create_autospec(transport.Response, instance=True)
        response.status = status
        response.data = json.dumps(data).encode("utf-8")

        request = mock.create_autospec(transport.Request)
        request.return_value = response

        return request

    @classmethod
    def assert_request_kwargs(cls, request_kwargs, headers, request_data):
        """Asserts the request was called with the expected parameters.
        """
        assert request_kwargs["url"] == cls.TOKEN_EXCHANGE_ENDPOINT
        assert request_kwargs["method"] == "POST"
        assert request_kwargs["headers"] == headers
        assert request_kwargs["body"] is not None
        body_tuples = urllib.parse.parse_qsl(request_kwargs["body"])
        for (k, v) in body_tuples:
            assert v.decode("utf-8") == request_data[k.decode("utf-8")]
        assert len(body_tuples) == len(request_data.keys())

    def test_exchange_token_full_success_without_auth(self):
        """Test token exchange success without client authentication using full
        parameters.
        """
        client = self.make_client()
        headers = self.ADDON_HEADERS.copy()
        headers["Content-Type"] = "application/x-www-form-urlencoded"
        request_data = {
            "grant_type": self.GRANT_TYPE,
            "resource": self.RESOURCE,
            "audience": self.AUDIENCE,
            "scope": " ".join(self.SCOPES),
            "requested_token_type": self.REQUESTED_TOKEN_TYPE,
            "subject_token": self.SUBJECT_TOKEN,
            "subject_token_type": self.SUBJECT_TOKEN_TYPE,
            "actor_token": self.ACTOR_TOKEN,
            "actor_token_type": self.ACTOR_TOKEN_TYPE,
            "options": urllib.parse.quote(json.dumps(self.ADDON_OPTIONS)),
        }
        request = self.make_mock_request(
            status=http_client.OK, data=self.SUCCESS_RESPONSE
        )

        response = client.exchange_token(
            request,
            self.GRANT_TYPE,
            self.SUBJECT_TOKEN,
            self.SUBJECT_TOKEN_TYPE,
            self.RESOURCE,
            self.AUDIENCE,
            self.SCOPES,
            self.REQUESTED_TOKEN_TYPE,
            self.ACTOR_TOKEN,
            self.ACTOR_TOKEN_TYPE,
            self.ADDON_OPTIONS,
            self.ADDON_HEADERS,
        )

        self.assert_request_kwargs(request.call_args[1], headers, request_data)
        assert response == self.SUCCESS_RESPONSE

    def test_exchange_token_partial_success_without_auth(self):
        """Test token exchange success without client authentication using
        partial (required only) parameters.
        """
        client = self.make_client()
        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        request_data = {
            "grant_type": self.GRANT_TYPE,
            "audience": self.AUDIENCE,
            "requested_token_type": self.REQUESTED_TOKEN_TYPE,
            "subject_token": self.SUBJECT_TOKEN,
            "subject_token_type": self.SUBJECT_TOKEN_TYPE,
        }
        request = self.make_mock_request(
            status=http_client.OK, data=self.SUCCESS_RESPONSE
        )

        response = client.exchange_token(
            request,
            grant_type=self.GRANT_TYPE,
            subject_token=self.SUBJECT_TOKEN,
            subject_token_type=self.SUBJECT_TOKEN_TYPE,
            audience=self.AUDIENCE,
            requested_token_type=self.REQUESTED_TOKEN_TYPE,
        )

        self.assert_request_kwargs(request.call_args[1], headers, request_data)
        assert response == self.SUCCESS_RESPONSE

    def test_exchange_token_non200_without_auth(self):
        """Test token exchange without client auth responding with non-200 status.
        """
        client = self.make_client()
        request = self.make_mock_request(
            status=http_client.BAD_REQUEST, data=self.ERROR_RESPONSE
        )

        with pytest.raises(exceptions.OAuthError) as excinfo:
            client.exchange_token(
                request,
                self.GRANT_TYPE,
                self.SUBJECT_TOKEN,
                self.SUBJECT_TOKEN_TYPE,
                self.RESOURCE,
                self.AUDIENCE,
                self.SCOPES,
                self.REQUESTED_TOKEN_TYPE,
                self.ACTOR_TOKEN,
                self.ACTOR_TOKEN_TYPE,
                self.ADDON_OPTIONS,
                self.ADDON_HEADERS,
            )

        assert excinfo.match(
            r"Error code invalid_request: Invalid subject token - https://tools.ietf.org/html/rfc6749"
        )

    def test_exchange_token_full_success_with_basic_auth(self):
        """Test token exchange success with basic client authentication using full
        parameters.
        """
        client = self.make_client(self.CLIENT_AUTH_BASIC)
        headers = self.ADDON_HEADERS.copy()
        headers["Content-Type"] = "application/x-www-form-urlencoded"
        headers["Authorization"] = "Basic {}".format(BASIC_AUTH_ENCODING)
        request_data = {
            "grant_type": self.GRANT_TYPE,
            "resource": self.RESOURCE,
            "audience": self.AUDIENCE,
            "scope": " ".join(self.SCOPES),
            "requested_token_type": self.REQUESTED_TOKEN_TYPE,
            "subject_token": self.SUBJECT_TOKEN,
            "subject_token_type": self.SUBJECT_TOKEN_TYPE,
            "actor_token": self.ACTOR_TOKEN,
            "actor_token_type": self.ACTOR_TOKEN_TYPE,
            "options": urllib.parse.quote(json.dumps(self.ADDON_OPTIONS)),
        }
        request = self.make_mock_request(
            status=http_client.OK, data=self.SUCCESS_RESPONSE
        )

        response = client.exchange_token(
            request,
            self.GRANT_TYPE,
            self.SUBJECT_TOKEN,
            self.SUBJECT_TOKEN_TYPE,
            self.RESOURCE,
            self.AUDIENCE,
            self.SCOPES,
            self.REQUESTED_TOKEN_TYPE,
            self.ACTOR_TOKEN,
            self.ACTOR_TOKEN_TYPE,
            self.ADDON_OPTIONS,
            self.ADDON_HEADERS,
        )

        self.assert_request_kwargs(request.call_args[1], headers, request_data)
        assert response == self.SUCCESS_RESPONSE

    def test_exchange_token_partial_success_with_basic_auth(self):
        """Test token exchange success with basic client authentication using
        partial (required only) parameters.
        """
        client = self.make_client(self.CLIENT_AUTH_BASIC)
        headers = {
            "Content-Type": "application/x-www-form-urlencoded",
            "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
        }
        request_data = {
            "grant_type": self.GRANT_TYPE,
            "audience": self.AUDIENCE,
            "requested_token_type": self.REQUESTED_TOKEN_TYPE,
            "subject_token": self.SUBJECT_TOKEN,
            "subject_token_type": self.SUBJECT_TOKEN_TYPE,
        }
        request = self.make_mock_request(
            status=http_client.OK, data=self.SUCCESS_RESPONSE
        )

        response = client.exchange_token(
            request,
            grant_type=self.GRANT_TYPE,
            subject_token=self.SUBJECT_TOKEN,
            subject_token_type=self.SUBJECT_TOKEN_TYPE,
            audience=self.AUDIENCE,
            requested_token_type=self.REQUESTED_TOKEN_TYPE,
        )

        self.assert_request_kwargs(request.call_args[1], headers, request_data)
        assert response == self.SUCCESS_RESPONSE

    def test_exchange_token_non200_with_basic_auth(self):
        """Test token exchange with basic client auth responding with non-200
        status.
        """
        client = self.make_client(self.CLIENT_AUTH_BASIC)
        request = self.make_mock_request(
            status=http_client.BAD_REQUEST, data=self.ERROR_RESPONSE
        )

        with pytest.raises(exceptions.OAuthError) as excinfo:
            client.exchange_token(
                request,
                self.GRANT_TYPE,
                self.SUBJECT_TOKEN,
                self.SUBJECT_TOKEN_TYPE,
                self.RESOURCE,
                self.AUDIENCE,
                self.SCOPES,
                self.REQUESTED_TOKEN_TYPE,
                self.ACTOR_TOKEN,
                self.ACTOR_TOKEN_TYPE,
                self.ADDON_OPTIONS,
                self.ADDON_HEADERS,
            )

        assert excinfo.match(
            r"Error code invalid_request: Invalid subject token - https://tools.ietf.org/html/rfc6749"
        )

    def test_exchange_token_full_success_with_reqbody_auth(self):
        """Test token exchange success with request body client authenticaiton
        using full parameters.
        """
        client = self.make_client(self.CLIENT_AUTH_REQUEST_BODY)
        headers = self.ADDON_HEADERS.copy()
        headers["Content-Type"] = "application/x-www-form-urlencoded"
        request_data = {
            "grant_type": self.GRANT_TYPE,
            "resource": self.RESOURCE,
            "audience": self.AUDIENCE,
            "scope": " ".join(self.SCOPES),
            "requested_token_type": self.REQUESTED_TOKEN_TYPE,
            "subject_token": self.SUBJECT_TOKEN,
            "subject_token_type": self.SUBJECT_TOKEN_TYPE,
            "actor_token": self.ACTOR_TOKEN,
            "actor_token_type": self.ACTOR_TOKEN_TYPE,
            "options": urllib.parse.quote(json.dumps(self.ADDON_OPTIONS)),
            "client_id": CLIENT_ID,
            "client_secret": CLIENT_SECRET,
        }
        request = self.make_mock_request(
            status=http_client.OK, data=self.SUCCESS_RESPONSE
        )

        response = client.exchange_token(
            request,
            self.GRANT_TYPE,
            self.SUBJECT_TOKEN,
            self.SUBJECT_TOKEN_TYPE,
            self.RESOURCE,
            self.AUDIENCE,
            self.SCOPES,
            self.REQUESTED_TOKEN_TYPE,
            self.ACTOR_TOKEN,
            self.ACTOR_TOKEN_TYPE,
            self.ADDON_OPTIONS,
            self.ADDON_HEADERS,
        )

        self.assert_request_kwargs(request.call_args[1], headers, request_data)
        assert response == self.SUCCESS_RESPONSE

    def test_exchange_token_partial_success_with_reqbody_auth(self):
        """Test token exchange success with request body client authentication
        using partial (required only) parameters.
        """
        client = self.make_client(self.CLIENT_AUTH_REQUEST_BODY)
        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        request_data = {
            "grant_type": self.GRANT_TYPE,
            "audience": self.AUDIENCE,
            "requested_token_type": self.REQUESTED_TOKEN_TYPE,
            "subject_token": self.SUBJECT_TOKEN,
            "subject_token_type": self.SUBJECT_TOKEN_TYPE,
            "client_id": CLIENT_ID,
            "client_secret": CLIENT_SECRET,
        }
        request = self.make_mock_request(
            status=http_client.OK, data=self.SUCCESS_RESPONSE
        )

        response = client.exchange_token(
            request,
            grant_type=self.GRANT_TYPE,
            subject_token=self.SUBJECT_TOKEN,
            subject_token_type=self.SUBJECT_TOKEN_TYPE,
            audience=self.AUDIENCE,
            requested_token_type=self.REQUESTED_TOKEN_TYPE,
        )

        self.assert_request_kwargs(request.call_args[1], headers, request_data)
        assert response == self.SUCCESS_RESPONSE

    def test_exchange_token_non200_with_reqbody_auth(self):
        """Test token exchange with POST request body client auth responding
        with non-200 status.
        """
        client = self.make_client(self.CLIENT_AUTH_REQUEST_BODY)
        request = self.make_mock_request(
            status=http_client.BAD_REQUEST, data=self.ERROR_RESPONSE
        )

        with pytest.raises(exceptions.OAuthError) as excinfo:
            client.exchange_token(
                request,
                self.GRANT_TYPE,
                self.SUBJECT_TOKEN,
                self.SUBJECT_TOKEN_TYPE,
                self.RESOURCE,
                self.AUDIENCE,
                self.SCOPES,
                self.REQUESTED_TOKEN_TYPE,
                self.ACTOR_TOKEN,
                self.ACTOR_TOKEN_TYPE,
                self.ADDON_OPTIONS,
                self.ADDON_HEADERS,
            )

        assert excinfo.match(
            r"Error code invalid_request: Invalid subject token - https://tools.ietf.org/html/rfc6749"
        )
