|  | # Copyright 2020 Google LLC | 
|  | # | 
|  | # Licensed under the Apache License, Version 2.0 (the "License"); | 
|  | # you may not use this file except in compliance with the License. | 
|  | # You may obtain a copy of the License at | 
|  | # | 
|  | #      http://www.apache.org/licenses/LICENSE-2.0 | 
|  | # | 
|  | # Unless required by applicable law or agreed to in writing, software | 
|  | # distributed under the License is distributed on an "AS IS" BASIS, | 
|  | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | # See the License for the specific language governing permissions and | 
|  | # limitations under the License. | 
|  |  | 
|  | """External Account Credentials. | 
|  |  | 
|  | This module provides credentials that exchange workload identity pool external | 
|  | credentials for Google access tokens. This facilitates accessing Google Cloud | 
|  | Platform resources from on-prem and non-Google Cloud platforms (e.g. AWS, | 
|  | Microsoft Azure, OIDC identity providers), using native credentials retrieved | 
|  | from the current environment without the need to copy, save and manage | 
|  | long-lived service account credentials. | 
|  |  | 
|  | Specifically, this is intended to use access tokens acquired using the GCP STS | 
|  | token exchange endpoint following the `OAuth 2.0 Token Exchange`_ spec. | 
|  |  | 
|  | .. _OAuth 2.0 Token Exchange: https://tools.ietf.org/html/rfc8693 | 
|  | """ | 
|  |  | 
|  | import abc | 
|  | import copy | 
|  | import datetime | 
|  | import json | 
|  | import re | 
|  |  | 
|  | import six | 
|  |  | 
|  | from google.auth import _helpers | 
|  | from google.auth import credentials | 
|  | from google.auth import exceptions | 
|  | from google.auth import impersonated_credentials | 
|  | from google.oauth2 import sts | 
|  | from google.oauth2 import utils | 
|  |  | 
|  | # External account JSON type identifier. | 
|  | _EXTERNAL_ACCOUNT_JSON_TYPE = "external_account" | 
|  | # The token exchange grant_type used for exchanging credentials. | 
|  | _STS_GRANT_TYPE = "urn:ietf:params:oauth:grant-type:token-exchange" | 
|  | # The token exchange requested_token_type. This is always an access_token. | 
|  | _STS_REQUESTED_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token" | 
|  | # Cloud resource manager URL used to retrieve project information. | 
|  | _CLOUD_RESOURCE_MANAGER = "https://cloudresourcemanager.googleapis.com/v1/projects/" | 
|  |  | 
|  |  | 
|  | @six.add_metaclass(abc.ABCMeta) | 
|  | class Credentials(credentials.Scoped, credentials.CredentialsWithQuotaProject): | 
|  | """Base class for all external account credentials. | 
|  |  | 
|  | This is used to instantiate Credentials for exchanging external account | 
|  | credentials for Google access token and authorizing requests to Google APIs. | 
|  | The base class implements the common logic for exchanging external account | 
|  | credentials for Google access tokens. | 
|  | """ | 
|  |  | 
|  | def __init__( | 
|  | self, | 
|  | audience, | 
|  | subject_token_type, | 
|  | token_url, | 
|  | credential_source, | 
|  | service_account_impersonation_url=None, | 
|  | client_id=None, | 
|  | client_secret=None, | 
|  | quota_project_id=None, | 
|  | scopes=None, | 
|  | default_scopes=None, | 
|  | workforce_pool_user_project=None, | 
|  | ): | 
|  | """Instantiates an external account credentials object. | 
|  |  | 
|  | Args: | 
|  | audience (str): The STS audience field. | 
|  | subject_token_type (str): The subject token type. | 
|  | token_url (str): The STS endpoint URL. | 
|  | credential_source (Mapping): The credential source dictionary. | 
|  | service_account_impersonation_url (Optional[str]): The optional service account | 
|  | impersonation generateAccessToken URL. | 
|  | client_id (Optional[str]): The optional client ID. | 
|  | client_secret (Optional[str]): The optional client secret. | 
|  | quota_project_id (Optional[str]): The optional quota project ID. | 
|  | scopes (Optional[Sequence[str]]): Optional scopes to request during the | 
|  | authorization grant. | 
|  | default_scopes (Optional[Sequence[str]]): Default scopes passed by a | 
|  | Google client library. Use 'scopes' for user-defined scopes. | 
|  | workforce_pool_user_project (Optona[str]): The optional workforce pool user | 
|  | project number when the credential corresponds to a workforce pool and not | 
|  | a workload identity pool. The underlying principal must still have | 
|  | serviceusage.services.use IAM permission to use the project for | 
|  | billing/quota. | 
|  | Raises: | 
|  | google.auth.exceptions.RefreshError: If the generateAccessToken | 
|  | endpoint returned an error. | 
|  | """ | 
|  | super(Credentials, self).__init__() | 
|  | self._audience = audience | 
|  | self._subject_token_type = subject_token_type | 
|  | self._token_url = token_url | 
|  | self._credential_source = credential_source | 
|  | self._service_account_impersonation_url = service_account_impersonation_url | 
|  | self._client_id = client_id | 
|  | self._client_secret = client_secret | 
|  | self._quota_project_id = quota_project_id | 
|  | self._scopes = scopes | 
|  | self._default_scopes = default_scopes | 
|  | self._workforce_pool_user_project = workforce_pool_user_project | 
|  |  | 
|  | if self._client_id: | 
|  | self._client_auth = utils.ClientAuthentication( | 
|  | utils.ClientAuthType.basic, self._client_id, self._client_secret | 
|  | ) | 
|  | else: | 
|  | self._client_auth = None | 
|  | self._sts_client = sts.Client(self._token_url, self._client_auth) | 
|  |  | 
|  | if self._service_account_impersonation_url: | 
|  | self._impersonated_credentials = self._initialize_impersonated_credentials() | 
|  | else: | 
|  | self._impersonated_credentials = None | 
|  | self._project_id = None | 
|  |  | 
|  | if not self.is_workforce_pool and self._workforce_pool_user_project: | 
|  | # Workload identity pools do not support workforce pool user projects. | 
|  | raise ValueError( | 
|  | "workforce_pool_user_project should not be set for non-workforce pool " | 
|  | "credentials" | 
|  | ) | 
|  |  | 
|  | @property | 
|  | def info(self): | 
|  | """Generates the dictionary representation of the current credentials. | 
|  |  | 
|  | Returns: | 
|  | Mapping: The dictionary representation of the credentials. This is the | 
|  | reverse of "from_info" defined on the subclasses of this class. It is | 
|  | useful for serializing the current credentials so it can deserialized | 
|  | later. | 
|  | """ | 
|  | config_info = { | 
|  | "type": _EXTERNAL_ACCOUNT_JSON_TYPE, | 
|  | "audience": self._audience, | 
|  | "subject_token_type": self._subject_token_type, | 
|  | "token_url": self._token_url, | 
|  | "service_account_impersonation_url": self._service_account_impersonation_url, | 
|  | "credential_source": copy.deepcopy(self._credential_source), | 
|  | "quota_project_id": self._quota_project_id, | 
|  | "client_id": self._client_id, | 
|  | "client_secret": self._client_secret, | 
|  | "workforce_pool_user_project": self._workforce_pool_user_project, | 
|  | } | 
|  | return {key: value for key, value in config_info.items() if value is not None} | 
|  |  | 
|  | @property | 
|  | def service_account_email(self): | 
|  | """Returns the service account email if service account impersonation is used. | 
|  |  | 
|  | Returns: | 
|  | Optional[str]: The service account email if impersonation is used. Otherwise | 
|  | None is returned. | 
|  | """ | 
|  | if self._service_account_impersonation_url: | 
|  | # Parse email from URL. The formal looks as follows: | 
|  | # https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/name@project-id.iam.gserviceaccount.com:generateAccessToken | 
|  | url = self._service_account_impersonation_url | 
|  | start_index = url.rfind("/") | 
|  | end_index = url.find(":generateAccessToken") | 
|  | if start_index != -1 and end_index != -1 and start_index < end_index: | 
|  | start_index = start_index + 1 | 
|  | return url[start_index:end_index] | 
|  | return None | 
|  |  | 
|  | @property | 
|  | def is_user(self): | 
|  | """Returns whether the credentials represent a user (True) or workload (False). | 
|  | Workloads behave similarly to service accounts. Currently workloads will use | 
|  | service account impersonation but will eventually not require impersonation. | 
|  | As a result, this property is more reliable than the service account email | 
|  | property in determining if the credentials represent a user or workload. | 
|  |  | 
|  | Returns: | 
|  | bool: True if the credentials represent a user. False if they represent a | 
|  | workload. | 
|  | """ | 
|  | # If service account impersonation is used, the credentials will always represent a | 
|  | # service account. | 
|  | if self._service_account_impersonation_url: | 
|  | return False | 
|  | return self.is_workforce_pool | 
|  |  | 
|  | @property | 
|  | def is_workforce_pool(self): | 
|  | """Returns whether the credentials represent a workforce pool (True) or | 
|  | workload (False) based on the credentials' audience. | 
|  |  | 
|  | This will also return True for impersonated workforce pool credentials. | 
|  |  | 
|  | Returns: | 
|  | bool: True if the credentials represent a workforce pool. False if they | 
|  | represent a workload. | 
|  | """ | 
|  | # Workforce pools representing users have the following audience format: | 
|  | # //iam.googleapis.com/locations/$location/workforcePools/$poolId/providers/$providerId | 
|  | p = re.compile(r"//iam\.googleapis\.com/locations/[^/]+/workforcePools/") | 
|  | return p.match(self._audience or "") is not None | 
|  |  | 
|  | @property | 
|  | def requires_scopes(self): | 
|  | """Checks if the credentials requires scopes. | 
|  |  | 
|  | Returns: | 
|  | bool: True if there are no scopes set otherwise False. | 
|  | """ | 
|  | return not self._scopes and not self._default_scopes | 
|  |  | 
|  | @property | 
|  | def project_number(self): | 
|  | """Optional[str]: The project number corresponding to the workload identity pool.""" | 
|  |  | 
|  | # STS audience pattern: | 
|  | # //iam.googleapis.com/projects/$PROJECT_NUMBER/locations/... | 
|  | components = self._audience.split("/") | 
|  | try: | 
|  | project_index = components.index("projects") | 
|  | if project_index + 1 < len(components): | 
|  | return components[project_index + 1] or None | 
|  | except ValueError: | 
|  | return None | 
|  |  | 
|  | @_helpers.copy_docstring(credentials.Scoped) | 
|  | def with_scopes(self, scopes, default_scopes=None): | 
|  | d = dict( | 
|  | audience=self._audience, | 
|  | subject_token_type=self._subject_token_type, | 
|  | token_url=self._token_url, | 
|  | credential_source=self._credential_source, | 
|  | service_account_impersonation_url=self._service_account_impersonation_url, | 
|  | client_id=self._client_id, | 
|  | client_secret=self._client_secret, | 
|  | quota_project_id=self._quota_project_id, | 
|  | scopes=scopes, | 
|  | default_scopes=default_scopes, | 
|  | workforce_pool_user_project=self._workforce_pool_user_project, | 
|  | ) | 
|  | if not self.is_workforce_pool: | 
|  | d.pop("workforce_pool_user_project") | 
|  | return self.__class__(**d) | 
|  |  | 
|  | @abc.abstractmethod | 
|  | def retrieve_subject_token(self, request): | 
|  | """Retrieves the subject token using the credential_source object. | 
|  |  | 
|  | Args: | 
|  | request (google.auth.transport.Request): A callable used to make | 
|  | HTTP requests. | 
|  | Returns: | 
|  | str: The retrieved subject token. | 
|  | """ | 
|  | # pylint: disable=missing-raises-doc | 
|  | # (pylint doesn't recognize that this is abstract) | 
|  | raise NotImplementedError("retrieve_subject_token must be implemented") | 
|  |  | 
|  | def get_project_id(self, request): | 
|  | """Retrieves the project ID corresponding to the workload identity or workforce pool. | 
|  | For workforce pool credentials, it returns the project ID corresponding to | 
|  | the workforce_pool_user_project. | 
|  |  | 
|  | When not determinable, None is returned. | 
|  |  | 
|  | This is introduced to support the current pattern of using the Auth library: | 
|  |  | 
|  | credentials, project_id = google.auth.default() | 
|  |  | 
|  | The resource may not have permission (resourcemanager.projects.get) to | 
|  | call this API or the required scopes may not be selected: | 
|  | https://cloud.google.com/resource-manager/reference/rest/v1/projects/get#authorization-scopes | 
|  |  | 
|  | Args: | 
|  | request (google.auth.transport.Request): A callable used to make | 
|  | HTTP requests. | 
|  | Returns: | 
|  | Optional[str]: The project ID corresponding to the workload identity pool | 
|  | or workforce pool if determinable. | 
|  | """ | 
|  | if self._project_id: | 
|  | # If already retrieved, return the cached project ID value. | 
|  | return self._project_id | 
|  | scopes = self._scopes if self._scopes is not None else self._default_scopes | 
|  | # Scopes are required in order to retrieve a valid access token. | 
|  | project_number = self.project_number or self._workforce_pool_user_project | 
|  | if project_number and scopes: | 
|  | headers = {} | 
|  | url = _CLOUD_RESOURCE_MANAGER + project_number | 
|  | self.before_request(request, "GET", url, headers) | 
|  | response = request(url=url, method="GET", headers=headers) | 
|  |  | 
|  | response_body = ( | 
|  | response.data.decode("utf-8") | 
|  | if hasattr(response.data, "decode") | 
|  | else response.data | 
|  | ) | 
|  | response_data = json.loads(response_body) | 
|  |  | 
|  | if response.status == 200: | 
|  | # Cache result as this field is immutable. | 
|  | self._project_id = response_data.get("projectId") | 
|  | return self._project_id | 
|  |  | 
|  | return None | 
|  |  | 
|  | @_helpers.copy_docstring(credentials.Credentials) | 
|  | def refresh(self, request): | 
|  | scopes = self._scopes if self._scopes is not None else self._default_scopes | 
|  | if self._impersonated_credentials: | 
|  | self._impersonated_credentials.refresh(request) | 
|  | self.token = self._impersonated_credentials.token | 
|  | self.expiry = self._impersonated_credentials.expiry | 
|  | else: | 
|  | now = _helpers.utcnow() | 
|  | additional_options = None | 
|  | # Do not pass workforce_pool_user_project when client authentication | 
|  | # is used. The client ID is sufficient for determining the user project. | 
|  | if self._workforce_pool_user_project and not self._client_id: | 
|  | additional_options = {"userProject": self._workforce_pool_user_project} | 
|  | response_data = self._sts_client.exchange_token( | 
|  | request=request, | 
|  | grant_type=_STS_GRANT_TYPE, | 
|  | subject_token=self.retrieve_subject_token(request), | 
|  | subject_token_type=self._subject_token_type, | 
|  | audience=self._audience, | 
|  | scopes=scopes, | 
|  | requested_token_type=_STS_REQUESTED_TOKEN_TYPE, | 
|  | additional_options=additional_options, | 
|  | ) | 
|  | self.token = response_data.get("access_token") | 
|  | lifetime = datetime.timedelta(seconds=response_data.get("expires_in")) | 
|  | self.expiry = now + lifetime | 
|  |  | 
|  | @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) | 
|  | def with_quota_project(self, quota_project_id): | 
|  | # Return copy of instance with the provided quota project ID. | 
|  | d = dict( | 
|  | audience=self._audience, | 
|  | subject_token_type=self._subject_token_type, | 
|  | token_url=self._token_url, | 
|  | credential_source=self._credential_source, | 
|  | service_account_impersonation_url=self._service_account_impersonation_url, | 
|  | client_id=self._client_id, | 
|  | client_secret=self._client_secret, | 
|  | quota_project_id=quota_project_id, | 
|  | scopes=self._scopes, | 
|  | default_scopes=self._default_scopes, | 
|  | workforce_pool_user_project=self._workforce_pool_user_project, | 
|  | ) | 
|  | if not self.is_workforce_pool: | 
|  | d.pop("workforce_pool_user_project") | 
|  | return self.__class__(**d) | 
|  |  | 
|  | def _initialize_impersonated_credentials(self): | 
|  | """Generates an impersonated credentials. | 
|  |  | 
|  | For more details, see `projects.serviceAccounts.generateAccessToken`_. | 
|  |  | 
|  | .. _projects.serviceAccounts.generateAccessToken: https://cloud.google.com/iam/docs/reference/credentials/rest/v1/projects.serviceAccounts/generateAccessToken | 
|  |  | 
|  | Returns: | 
|  | impersonated_credentials.Credential: The impersonated credentials | 
|  | object. | 
|  |  | 
|  | Raises: | 
|  | google.auth.exceptions.RefreshError: If the generateAccessToken | 
|  | endpoint returned an error. | 
|  | """ | 
|  | # Return copy of instance with no service account impersonation. | 
|  | d = dict( | 
|  | audience=self._audience, | 
|  | subject_token_type=self._subject_token_type, | 
|  | token_url=self._token_url, | 
|  | credential_source=self._credential_source, | 
|  | service_account_impersonation_url=None, | 
|  | client_id=self._client_id, | 
|  | client_secret=self._client_secret, | 
|  | quota_project_id=self._quota_project_id, | 
|  | scopes=self._scopes, | 
|  | default_scopes=self._default_scopes, | 
|  | workforce_pool_user_project=self._workforce_pool_user_project, | 
|  | ) | 
|  | if not self.is_workforce_pool: | 
|  | d.pop("workforce_pool_user_project") | 
|  | source_credentials = self.__class__(**d) | 
|  |  | 
|  | # Determine target_principal. | 
|  | target_principal = self.service_account_email | 
|  | if not target_principal: | 
|  | raise exceptions.RefreshError( | 
|  | "Unable to determine target principal from service account impersonation URL." | 
|  | ) | 
|  |  | 
|  | scopes = self._scopes if self._scopes is not None else self._default_scopes | 
|  | # Initialize and return impersonated credentials. | 
|  | return impersonated_credentials.Credentials( | 
|  | source_credentials=source_credentials, | 
|  | target_principal=target_principal, | 
|  | target_scopes=scopes, | 
|  | quota_project_id=self._quota_project_id, | 
|  | iam_endpoint_override=self._service_account_impersonation_url, | 
|  | ) |