|  | # Copyright 2021 Google LLC | 
|  | # | 
|  | # Licensed under the Apache License, Version 2.0 (the "License"); | 
|  | # you may not use this file except in compliance with the License. | 
|  | # You may obtain a copy of the License at | 
|  | # | 
|  | #      http://www.apache.org/licenses/LICENSE-2.0 | 
|  | # | 
|  | # Unless required by applicable law or agreed to in writing, software | 
|  | # distributed under the License is distributed on an "AS IS" BASIS, | 
|  | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | # See the License for the specific language governing permissions and | 
|  | # limitations under the License. | 
|  |  | 
|  | """ Challenges for reauthentication. | 
|  | """ | 
|  |  | 
|  | import abc | 
|  | import base64 | 
|  | import getpass | 
|  | import sys | 
|  |  | 
|  | import six | 
|  |  | 
|  | from google.auth import _helpers | 
|  | from google.auth import exceptions | 
|  |  | 
|  |  | 
|  | REAUTH_ORIGIN = "https://accounts.google.com" | 
|  | SAML_CHALLENGE_MESSAGE = ( | 
|  | "Please run `gcloud auth login` to complete reauthentication with SAML." | 
|  | ) | 
|  |  | 
|  |  | 
|  | def get_user_password(text): | 
|  | """Get password from user. | 
|  |  | 
|  | Override this function with a different logic if you are using this library | 
|  | outside a CLI. | 
|  |  | 
|  | Args: | 
|  | text (str): message for the password prompt. | 
|  |  | 
|  | Returns: | 
|  | str: password string. | 
|  | """ | 
|  | return getpass.getpass(text) | 
|  |  | 
|  |  | 
|  | @six.add_metaclass(abc.ABCMeta) | 
|  | class ReauthChallenge(object): | 
|  | """Base class for reauth challenges.""" | 
|  |  | 
|  | @property | 
|  | @abc.abstractmethod | 
|  | def name(self):  # pragma: NO COVER | 
|  | """Returns the name of the challenge.""" | 
|  | raise NotImplementedError("name property must be implemented") | 
|  |  | 
|  | @property | 
|  | @abc.abstractmethod | 
|  | def is_locally_eligible(self):  # pragma: NO COVER | 
|  | """Returns true if a challenge is supported locally on this machine.""" | 
|  | raise NotImplementedError("is_locally_eligible property must be implemented") | 
|  |  | 
|  | @abc.abstractmethod | 
|  | def obtain_challenge_input(self, metadata):  # pragma: NO COVER | 
|  | """Performs logic required to obtain credentials and returns it. | 
|  |  | 
|  | Args: | 
|  | metadata (Mapping): challenge metadata returned in the 'challenges' field in | 
|  | the initial reauth request. Includes the 'challengeType' field | 
|  | and other challenge-specific fields. | 
|  |  | 
|  | Returns: | 
|  | response that will be send to the reauth service as the content of | 
|  | the 'proposalResponse' field in the request body. Usually a dict | 
|  | with the keys specific to the challenge. For example, | 
|  | ``{'credential': password}`` for password challenge. | 
|  | """ | 
|  | raise NotImplementedError("obtain_challenge_input method must be implemented") | 
|  |  | 
|  |  | 
|  | class PasswordChallenge(ReauthChallenge): | 
|  | """Challenge that asks for user's password.""" | 
|  |  | 
|  | @property | 
|  | def name(self): | 
|  | return "PASSWORD" | 
|  |  | 
|  | @property | 
|  | def is_locally_eligible(self): | 
|  | return True | 
|  |  | 
|  | @_helpers.copy_docstring(ReauthChallenge) | 
|  | def obtain_challenge_input(self, unused_metadata): | 
|  | passwd = get_user_password("Please enter your password:") | 
|  | if not passwd: | 
|  | passwd = " "  # avoid the server crashing in case of no password :D | 
|  | return {"credential": passwd} | 
|  |  | 
|  |  | 
|  | class SecurityKeyChallenge(ReauthChallenge): | 
|  | """Challenge that asks for user's security key touch.""" | 
|  |  | 
|  | @property | 
|  | def name(self): | 
|  | return "SECURITY_KEY" | 
|  |  | 
|  | @property | 
|  | def is_locally_eligible(self): | 
|  | return True | 
|  |  | 
|  | @_helpers.copy_docstring(ReauthChallenge) | 
|  | def obtain_challenge_input(self, metadata): | 
|  | try: | 
|  | import pyu2f.convenience.authenticator | 
|  | import pyu2f.errors | 
|  | import pyu2f.model | 
|  | except ImportError: | 
|  | raise exceptions.ReauthFailError( | 
|  | "pyu2f dependency is required to use Security key reauth feature. " | 
|  | "It can be installed via `pip install pyu2f` or `pip install google-auth[reauth]`." | 
|  | ) | 
|  | sk = metadata["securityKey"] | 
|  | challenges = sk["challenges"] | 
|  | app_id = sk["applicationId"] | 
|  |  | 
|  | challenge_data = [] | 
|  | for c in challenges: | 
|  | kh = c["keyHandle"].encode("ascii") | 
|  | key = pyu2f.model.RegisteredKey(bytearray(base64.urlsafe_b64decode(kh))) | 
|  | challenge = c["challenge"].encode("ascii") | 
|  | challenge = base64.urlsafe_b64decode(challenge) | 
|  | challenge_data.append({"key": key, "challenge": challenge}) | 
|  |  | 
|  | try: | 
|  | api = pyu2f.convenience.authenticator.CreateCompositeAuthenticator( | 
|  | REAUTH_ORIGIN | 
|  | ) | 
|  | response = api.Authenticate( | 
|  | app_id, challenge_data, print_callback=sys.stderr.write | 
|  | ) | 
|  | return {"securityKey": response} | 
|  | except pyu2f.errors.U2FError as e: | 
|  | if e.code == pyu2f.errors.U2FError.DEVICE_INELIGIBLE: | 
|  | sys.stderr.write("Ineligible security key.\n") | 
|  | elif e.code == pyu2f.errors.U2FError.TIMEOUT: | 
|  | sys.stderr.write("Timed out while waiting for security key touch.\n") | 
|  | else: | 
|  | raise e | 
|  | except pyu2f.errors.NoDeviceFoundError: | 
|  | sys.stderr.write("No security key found.\n") | 
|  | return None | 
|  |  | 
|  |  | 
|  | class SamlChallenge(ReauthChallenge): | 
|  | """Challenge that asks the users to browse to their ID Providers. | 
|  |  | 
|  | Currently SAML challenge is not supported. When obtaining the challenge | 
|  | input, exception will be raised to instruct the users to run | 
|  | `gcloud auth login` for reauthentication. | 
|  | """ | 
|  |  | 
|  | @property | 
|  | def name(self): | 
|  | return "SAML" | 
|  |  | 
|  | @property | 
|  | def is_locally_eligible(self): | 
|  | return True | 
|  |  | 
|  | def obtain_challenge_input(self, metadata): | 
|  | # Magic Arch has not fully supported returning a proper dedirect URL | 
|  | # for programmatic SAML users today. So we error our here and request | 
|  | # users to use gcloud to complete a login. | 
|  | raise exceptions.ReauthSamlChallengeFailError(SAML_CHALLENGE_MESSAGE) | 
|  |  | 
|  |  | 
|  | AVAILABLE_CHALLENGES = { | 
|  | challenge.name: challenge | 
|  | for challenge in [SecurityKeyChallenge(), PasswordChallenge(), SamlChallenge()] | 
|  | } |