| # Copyright 2025 Google LLC |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import base64 |
| import hashlib |
| import json |
| from unittest import mock |
| import urllib.parse |
| |
| from cryptography import x509 |
| import pytest |
| |
| from google.auth import _agent_identity_utils |
| from google.auth import environment_vars |
| from google.auth import exceptions |
| |
| # A mock PEM-encoded certificate without an Agent Identity SPIFFE ID. |
| NON_AGENT_IDENTITY_CERT_BYTES = ( |
| b"-----BEGIN CERTIFICATE-----\n" |
| b"MIIDIzCCAgugAwIBAgIJAMfISuBQ5m+5MA0GCSqGSIb3DQEBBQUAMBUxEzARBgNV\n" |
| b"BAMTCnVuaXQtdGVzdHMwHhcNMTExMjA2MTYyNjAyWhcNMjExMjAzMTYyNjAyWjAV\n" |
| b"MRMwEQYDVQQDEwp1bml0LXRlc3RzMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIB\n" |
| b"CgKCAQEA4ej0p7bQ7L/r4rVGUz9RN4VQWoej1Bg1mYWIDYslvKrk1gpj7wZgkdmM\n" |
| b"7oVK2OfgrSj/FCTkInKPqaCR0gD7K80q+mLBrN3PUkDrJQZpvRZIff3/xmVU1Wer\n" |
| b"uQLFJjnFb2dqu0s/FY/2kWiJtBCakXvXEOb7zfbINuayL+MSsCGSdVYsSliS5qQp\n" |
| b"gyDap+8b5fpXZVJkq92hrcNtbkg7hCYUJczt8n9hcCTJCfUpApvaFQ18pe+zpyl4\n" |
| b"+WzkP66I28hniMQyUlA1hBiskT7qiouq0m8IOodhv2fagSZKjOTTU2xkSBc//fy3\n" |
| b"ZpsL7WqgsZS7Q+0VRK8gKfqkxg5OYQIDAQABo3YwdDAdBgNVHQ4EFgQU2RQ8yO+O\n" |
| b"gN8oVW2SW7RLrfYd9jEwRQYDVR0jBD4wPIAU2RQ8yO+OgN8oVW2SW7RLrfYd9jGh\n" |
| b"GaQXMBUxEzARBgNVBAMTCnVuaXQtdGVzdHOCCQDHyErgUOZvuTAMBgNVHRMEBTAD\n" |
| b"AQH/MA0GCSqGSIb3DQEBBQUAA4IBAQBRv+M/6+FiVu7KXNjFI5pSN17OcW5QUtPr\n" |
| b"odJMlWrJBtynn/TA1oJlYu3yV5clc/71Vr/AxuX5xGP+IXL32YDF9lTUJXG/uUGk\n" |
| b"+JETpKmQviPbRsvzYhz4pf6ZIOZMc3/GIcNq92ECbseGO+yAgyWUVKMmZM0HqXC9\n" |
| b"ovNslqe0M8C1sLm1zAR5z/h/litE7/8O2ietija3Q/qtl2TOXJdCA6sgjJX2WUql\n" |
| b"ybrC55ct18NKf3qhpcEkGQvFU40rVYApJpi98DiZPYFdx1oBDp/f4uZ3ojpxRVFT\n" |
| b"cDwcJLfNRCPUhormsY7fDS9xSyThiHsW9mjJYdcaKQkwYZ0F11yB\n" |
| b"-----END CERTIFICATE-----\n" |
| ) |
| |
| |
| class TestAgentIdentityUtils: |
| @mock.patch("cryptography.x509.load_pem_x509_certificate") |
| def test_parse_certificate(self, mock_load_cert): |
| result = _agent_identity_utils.parse_certificate(b"cert_bytes") |
| mock_load_cert.assert_called_once_with(b"cert_bytes") |
| assert result == mock_load_cert.return_value |
| |
| def test__is_agent_identity_certificate_invalid(self): |
| cert = _agent_identity_utils.parse_certificate(NON_AGENT_IDENTITY_CERT_BYTES) |
| assert not _agent_identity_utils._is_agent_identity_certificate(cert) |
| |
| def test__is_agent_identity_certificate_valid_spiffe(self): |
| mock_cert = mock.MagicMock() |
| mock_ext = mock.MagicMock() |
| mock_san_value = mock.MagicMock() |
| mock_cert.extensions.get_extension_for_oid.return_value = mock_ext |
| mock_ext.value = mock_san_value |
| mock_san_value.get_values_for_type.return_value = [ |
| "spiffe://agents.global.proj-12345.system.id.goog/workload" |
| ] |
| assert _agent_identity_utils._is_agent_identity_certificate(mock_cert) |
| |
| def test__is_agent_identity_certificate_non_matching_spiffe(self): |
| mock_cert = mock.MagicMock() |
| mock_ext = mock.MagicMock() |
| mock_san_value = mock.MagicMock() |
| mock_cert.extensions.get_extension_for_oid.return_value = mock_ext |
| mock_ext.value = mock_san_value |
| mock_san_value.get_values_for_type.return_value = [ |
| "spiffe://other.domain.com/workload" |
| ] |
| assert not _agent_identity_utils._is_agent_identity_certificate(mock_cert) |
| |
| def test__is_agent_identity_certificate_no_san(self): |
| mock_cert = mock.MagicMock() |
| mock_cert.extensions.get_extension_for_oid.side_effect = x509.ExtensionNotFound( |
| "Test extension not found", None |
| ) |
| assert not _agent_identity_utils._is_agent_identity_certificate(mock_cert) |
| |
| def test__is_agent_identity_certificate_not_spiffe_uri(self): |
| mock_cert = mock.MagicMock() |
| mock_ext = mock.MagicMock() |
| mock_san_value = mock.MagicMock() |
| mock_cert.extensions.get_extension_for_oid.return_value = mock_ext |
| mock_ext.value = mock_san_value |
| mock_san_value.get_values_for_type.return_value = ["https://example.com"] |
| assert not _agent_identity_utils._is_agent_identity_certificate(mock_cert) |
| |
| def test_calculate_certificate_fingerprint(self): |
| mock_cert = mock.MagicMock() |
| mock_cert.public_bytes.return_value = b"der-bytes" |
| |
| # Expected: base64 (standard), unpadded, then URL-encoded |
| base64_fingerprint = base64.b64encode( |
| hashlib.sha256(b"der-bytes").digest() |
| ).decode("utf-8") |
| unpadded_base64_fingerprint = base64_fingerprint.rstrip("=") |
| expected_fingerprint = urllib.parse.quote(unpadded_base64_fingerprint) |
| |
| fingerprint = _agent_identity_utils.calculate_certificate_fingerprint(mock_cert) |
| |
| assert fingerprint == expected_fingerprint |
| |
| @mock.patch("google.auth._agent_identity_utils._is_agent_identity_certificate") |
| def test_should_request_bound_token(self, mock_is_agent, monkeypatch): |
| # Agent cert, default env var (opt-in) |
| mock_is_agent.return_value = True |
| monkeypatch.delenv( |
| environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES, |
| raising=False, |
| ) |
| assert _agent_identity_utils.should_request_bound_token(mock.sentinel.cert) |
| |
| # Agent cert, explicit opt-in |
| monkeypatch.setenv( |
| environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES, |
| "true", |
| ) |
| assert _agent_identity_utils.should_request_bound_token(mock.sentinel.cert) |
| |
| # Agent cert, explicit opt-out |
| monkeypatch.setenv( |
| environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES, |
| "false", |
| ) |
| assert not _agent_identity_utils.should_request_bound_token(mock.sentinel.cert) |
| |
| # Non-agent cert, opt-in |
| mock_is_agent.return_value = False |
| monkeypatch.setenv( |
| environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES, |
| "true", |
| ) |
| assert not _agent_identity_utils.should_request_bound_token(mock.sentinel.cert) |
| |
| def test_get_agent_identity_certificate_path_success(self, tmpdir, monkeypatch): |
| cert_path = tmpdir.join("cert.pem") |
| cert_path.write("cert_content") |
| config_path = tmpdir.join("config.json") |
| config_path.write( |
| json.dumps({"cert_configs": {"workload": {"cert_path": str(cert_path)}}}) |
| ) |
| monkeypatch.setenv( |
| environment_vars.GOOGLE_API_CERTIFICATE_CONFIG, str(config_path) |
| ) |
| |
| result = _agent_identity_utils.get_agent_identity_certificate_path() |
| assert result == str(cert_path) |
| |
| @mock.patch("time.sleep") |
| def test_get_agent_identity_certificate_path_retry( |
| self, mock_sleep, tmpdir, monkeypatch |
| ): |
| config_path = tmpdir.join("config.json") |
| monkeypatch.setenv( |
| environment_vars.GOOGLE_API_CERTIFICATE_CONFIG, str(config_path) |
| ) |
| |
| # File doesn't exist initially |
| with pytest.raises(exceptions.RefreshError): |
| _agent_identity_utils.get_agent_identity_certificate_path() |
| |
| assert mock_sleep.call_count == 100 |
| |
| @mock.patch("time.sleep") |
| def test_get_agent_identity_certificate_path_failure( |
| self, mock_sleep, tmpdir, monkeypatch |
| ): |
| config_path = tmpdir.join("non_existent_config.json") |
| monkeypatch.setenv( |
| environment_vars.GOOGLE_API_CERTIFICATE_CONFIG, str(config_path) |
| ) |
| |
| with pytest.raises(exceptions.RefreshError) as excinfo: |
| _agent_identity_utils.get_agent_identity_certificate_path() |
| |
| assert "not found after multiple retries" in str(excinfo.value) |
| assert ( |
| environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES |
| in str(excinfo.value) |
| ) |
| assert mock_sleep.call_count == 100 |
| |
| @mock.patch("time.sleep") |
| @mock.patch("os.path.exists") |
| def test_get_agent_identity_certificate_path_cert_not_found( |
| self, mock_exists, mock_sleep, tmpdir, monkeypatch |
| ): |
| cert_path_str = str(tmpdir.join("cert.pem")) |
| config_path = tmpdir.join("config.json") |
| config_path.write( |
| json.dumps({"cert_configs": {"workload": {"cert_path": cert_path_str}}}) |
| ) |
| monkeypatch.setenv( |
| environment_vars.GOOGLE_API_CERTIFICATE_CONFIG, str(config_path) |
| ) |
| |
| def exists_side_effect(path): |
| return path == str(config_path) |
| |
| mock_exists.side_effect = exists_side_effect |
| |
| with pytest.raises(exceptions.RefreshError): |
| _agent_identity_utils.get_agent_identity_certificate_path() |
| |
| assert mock_sleep.call_count == 100 |
| |
| @mock.patch("google.auth._agent_identity_utils.get_agent_identity_certificate_path") |
| def test_get_and_parse_agent_identity_certificate_opted_out( |
| self, mock_get_path, monkeypatch |
| ): |
| monkeypatch.setenv( |
| environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES, |
| "false", |
| ) |
| result = _agent_identity_utils.get_and_parse_agent_identity_certificate() |
| assert result is None |
| mock_get_path.assert_not_called() |
| |
| @mock.patch("google.auth._agent_identity_utils.get_agent_identity_certificate_path") |
| def test_get_and_parse_agent_identity_certificate_no_path( |
| self, mock_get_path, monkeypatch |
| ): |
| monkeypatch.setenv( |
| environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES, |
| "true", |
| ) |
| mock_get_path.return_value = None |
| result = _agent_identity_utils.get_and_parse_agent_identity_certificate() |
| assert result is None |
| mock_get_path.assert_called_once() |
| |
| @mock.patch("google.auth._agent_identity_utils.parse_certificate") |
| @mock.patch("google.auth._agent_identity_utils.get_agent_identity_certificate_path") |
| def test_get_and_parse_agent_identity_certificate_success( |
| self, mock_get_path, mock_parse_certificate, monkeypatch |
| ): |
| monkeypatch.setenv( |
| environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES, |
| "true", |
| ) |
| mock_get_path.return_value = "/fake/cert.pem" |
| mock_open = mock.mock_open(read_data=b"cert_bytes") |
| |
| with mock.patch("builtins.open", mock_open): |
| result = _agent_identity_utils.get_and_parse_agent_identity_certificate() |
| |
| mock_open.assert_called_once_with("/fake/cert.pem", "rb") |
| mock_parse_certificate.assert_called_once_with(b"cert_bytes") |
| assert result == mock_parse_certificate.return_value |
| |
| @mock.patch("time.sleep", return_value=None) |
| @mock.patch("google.auth._agent_identity_utils._is_certificate_file_ready") |
| def test_get_agent_identity_certificate_path_fallback_to_well_known_path( |
| self, mock_is_ready, mock_sleep, monkeypatch |
| ): |
| # Set a dummy config path that won't be found. |
| monkeypatch.setenv( |
| environment_vars.GOOGLE_API_CERTIFICATE_CONFIG, "/dummy/config.json" |
| ) |
| |
| # First, the primary path from the (mocked) config is not ready. |
| # Then, the fallback well-known path is ready. |
| mock_is_ready.side_effect = [False, True] |
| |
| result = _agent_identity_utils.get_agent_identity_certificate_path() |
| |
| assert result == _agent_identity_utils._WELL_KNOWN_CERT_PATH |
| # The sleep should have been called once before the fallback is checked. |
| mock_sleep.assert_called_once() |
| assert mock_is_ready.call_count == 2 |
| |
| def test_get_cached_cert_fingerprint_no_cert(self): |
| with pytest.raises(ValueError, match="mTLS connection is not configured."): |
| _agent_identity_utils.get_cached_cert_fingerprint(None) |
| |
| def test_get_cached_cert_fingerprint_with_cert(self): |
| fingerprint = _agent_identity_utils.get_cached_cert_fingerprint( |
| NON_AGENT_IDENTITY_CERT_BYTES |
| ) |
| assert isinstance(fingerprint, str) |
| |
| |
| class TestAgentIdentityUtilsNoCryptography: |
| @pytest.fixture(autouse=True) |
| def mock_cryptography_import(self): |
| with mock.patch.dict( |
| "sys.modules", |
| { |
| "cryptography": None, |
| "cryptography.hazmat": None, |
| "cryptography.hazmat.primitives": None, |
| "cryptography.hazmat.primitives.serialization": None, |
| }, |
| ): |
| yield |
| |
| def test_parse_certificate_raises_import_error(self): |
| with pytest.raises(ImportError, match="The cryptography library is required"): |
| _agent_identity_utils.parse_certificate(b"cert_bytes") |
| |
| def test_is_agent_identity_certificate_raises_import_error(self): |
| with pytest.raises(ImportError, match="The cryptography library is required"): |
| _agent_identity_utils._is_agent_identity_certificate(mock.sentinel.cert) |
| |
| def test_calculate_certificate_fingerprint_raises_import_error(self): |
| with pytest.raises(ImportError, match="The cryptography library is required"): |
| _agent_identity_utils.calculate_certificate_fingerprint(mock.sentinel.cert) |