blob: e1a926f73a8c3e3f27a3e0f822f12f293e0f085e [file]
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import hashlib
import json
from unittest import mock
import urllib.parse
from cryptography import x509
import pytest
from google.auth import _agent_identity_utils
from google.auth import environment_vars
from google.auth import exceptions
# A mock PEM-encoded certificate without an Agent Identity SPIFFE ID.
NON_AGENT_IDENTITY_CERT_BYTES = (
b"-----BEGIN CERTIFICATE-----\n"
b"MIIDIzCCAgugAwIBAgIJAMfISuBQ5m+5MA0GCSqGSIb3DQEBBQUAMBUxEzARBgNV\n"
b"BAMTCnVuaXQtdGVzdHMwHhcNMTExMjA2MTYyNjAyWhcNMjExMjAzMTYyNjAyWjAV\n"
b"MRMwEQYDVQQDEwp1bml0LXRlc3RzMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIB\n"
b"CgKCAQEA4ej0p7bQ7L/r4rVGUz9RN4VQWoej1Bg1mYWIDYslvKrk1gpj7wZgkdmM\n"
b"7oVK2OfgrSj/FCTkInKPqaCR0gD7K80q+mLBrN3PUkDrJQZpvRZIff3/xmVU1Wer\n"
b"uQLFJjnFb2dqu0s/FY/2kWiJtBCakXvXEOb7zfbINuayL+MSsCGSdVYsSliS5qQp\n"
b"gyDap+8b5fpXZVJkq92hrcNtbkg7hCYUJczt8n9hcCTJCfUpApvaFQ18pe+zpyl4\n"
b"+WzkP66I28hniMQyUlA1hBiskT7qiouq0m8IOodhv2fagSZKjOTTU2xkSBc//fy3\n"
b"ZpsL7WqgsZS7Q+0VRK8gKfqkxg5OYQIDAQABo3YwdDAdBgNVHQ4EFgQU2RQ8yO+O\n"
b"gN8oVW2SW7RLrfYd9jEwRQYDVR0jBD4wPIAU2RQ8yO+OgN8oVW2SW7RLrfYd9jGh\n"
b"GaQXMBUxEzARBgNVBAMTCnVuaXQtdGVzdHOCCQDHyErgUOZvuTAMBgNVHRMEBTAD\n"
b"AQH/MA0GCSqGSIb3DQEBBQUAA4IBAQBRv+M/6+FiVu7KXNjFI5pSN17OcW5QUtPr\n"
b"odJMlWrJBtynn/TA1oJlYu3yV5clc/71Vr/AxuX5xGP+IXL32YDF9lTUJXG/uUGk\n"
b"+JETpKmQviPbRsvzYhz4pf6ZIOZMc3/GIcNq92ECbseGO+yAgyWUVKMmZM0HqXC9\n"
b"ovNslqe0M8C1sLm1zAR5z/h/litE7/8O2ietija3Q/qtl2TOXJdCA6sgjJX2WUql\n"
b"ybrC55ct18NKf3qhpcEkGQvFU40rVYApJpi98DiZPYFdx1oBDp/f4uZ3ojpxRVFT\n"
b"cDwcJLfNRCPUhormsY7fDS9xSyThiHsW9mjJYdcaKQkwYZ0F11yB\n"
b"-----END CERTIFICATE-----\n"
)
class TestAgentIdentityUtils:
@mock.patch("cryptography.x509.load_pem_x509_certificate")
def test_parse_certificate(self, mock_load_cert):
result = _agent_identity_utils.parse_certificate(b"cert_bytes")
mock_load_cert.assert_called_once_with(b"cert_bytes")
assert result == mock_load_cert.return_value
def test__is_agent_identity_certificate_invalid(self):
cert = _agent_identity_utils.parse_certificate(NON_AGENT_IDENTITY_CERT_BYTES)
assert not _agent_identity_utils._is_agent_identity_certificate(cert)
def test__is_agent_identity_certificate_valid_spiffe(self):
mock_cert = mock.MagicMock()
mock_ext = mock.MagicMock()
mock_san_value = mock.MagicMock()
mock_cert.extensions.get_extension_for_oid.return_value = mock_ext
mock_ext.value = mock_san_value
mock_san_value.get_values_for_type.return_value = [
"spiffe://agents.global.proj-12345.system.id.goog/workload"
]
assert _agent_identity_utils._is_agent_identity_certificate(mock_cert)
def test__is_agent_identity_certificate_non_matching_spiffe(self):
mock_cert = mock.MagicMock()
mock_ext = mock.MagicMock()
mock_san_value = mock.MagicMock()
mock_cert.extensions.get_extension_for_oid.return_value = mock_ext
mock_ext.value = mock_san_value
mock_san_value.get_values_for_type.return_value = [
"spiffe://other.domain.com/workload"
]
assert not _agent_identity_utils._is_agent_identity_certificate(mock_cert)
def test__is_agent_identity_certificate_no_san(self):
mock_cert = mock.MagicMock()
mock_cert.extensions.get_extension_for_oid.side_effect = x509.ExtensionNotFound(
"Test extension not found", None
)
assert not _agent_identity_utils._is_agent_identity_certificate(mock_cert)
def test__is_agent_identity_certificate_not_spiffe_uri(self):
mock_cert = mock.MagicMock()
mock_ext = mock.MagicMock()
mock_san_value = mock.MagicMock()
mock_cert.extensions.get_extension_for_oid.return_value = mock_ext
mock_ext.value = mock_san_value
mock_san_value.get_values_for_type.return_value = ["https://example.com"]
assert not _agent_identity_utils._is_agent_identity_certificate(mock_cert)
def test_calculate_certificate_fingerprint(self):
mock_cert = mock.MagicMock()
mock_cert.public_bytes.return_value = b"der-bytes"
# Expected: base64 (standard), unpadded, then URL-encoded
base64_fingerprint = base64.b64encode(
hashlib.sha256(b"der-bytes").digest()
).decode("utf-8")
unpadded_base64_fingerprint = base64_fingerprint.rstrip("=")
expected_fingerprint = urllib.parse.quote(unpadded_base64_fingerprint)
fingerprint = _agent_identity_utils.calculate_certificate_fingerprint(mock_cert)
assert fingerprint == expected_fingerprint
@mock.patch("google.auth._agent_identity_utils._is_agent_identity_certificate")
def test_should_request_bound_token(self, mock_is_agent, monkeypatch):
# Agent cert, default env var (opt-in)
mock_is_agent.return_value = True
monkeypatch.delenv(
environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES,
raising=False,
)
assert _agent_identity_utils.should_request_bound_token(mock.sentinel.cert)
# Agent cert, explicit opt-in
monkeypatch.setenv(
environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES,
"true",
)
assert _agent_identity_utils.should_request_bound_token(mock.sentinel.cert)
# Agent cert, explicit opt-out
monkeypatch.setenv(
environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES,
"false",
)
assert not _agent_identity_utils.should_request_bound_token(mock.sentinel.cert)
# Non-agent cert, opt-in
mock_is_agent.return_value = False
monkeypatch.setenv(
environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES,
"true",
)
assert not _agent_identity_utils.should_request_bound_token(mock.sentinel.cert)
def test_get_agent_identity_certificate_path_success(self, tmpdir, monkeypatch):
cert_path = tmpdir.join("cert.pem")
cert_path.write("cert_content")
config_path = tmpdir.join("config.json")
config_path.write(
json.dumps({"cert_configs": {"workload": {"cert_path": str(cert_path)}}})
)
monkeypatch.setenv(
environment_vars.GOOGLE_API_CERTIFICATE_CONFIG, str(config_path)
)
result = _agent_identity_utils.get_agent_identity_certificate_path()
assert result == str(cert_path)
@mock.patch("time.sleep")
def test_get_agent_identity_certificate_path_retry(
self, mock_sleep, tmpdir, monkeypatch
):
config_path = tmpdir.join("config.json")
monkeypatch.setenv(
environment_vars.GOOGLE_API_CERTIFICATE_CONFIG, str(config_path)
)
# File doesn't exist initially
with pytest.raises(exceptions.RefreshError):
_agent_identity_utils.get_agent_identity_certificate_path()
assert mock_sleep.call_count == 100
@mock.patch("time.sleep")
def test_get_agent_identity_certificate_path_failure(
self, mock_sleep, tmpdir, monkeypatch
):
config_path = tmpdir.join("non_existent_config.json")
monkeypatch.setenv(
environment_vars.GOOGLE_API_CERTIFICATE_CONFIG, str(config_path)
)
with pytest.raises(exceptions.RefreshError) as excinfo:
_agent_identity_utils.get_agent_identity_certificate_path()
assert "not found after multiple retries" in str(excinfo.value)
assert (
environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES
in str(excinfo.value)
)
assert mock_sleep.call_count == 100
@mock.patch("time.sleep")
@mock.patch("os.path.exists")
def test_get_agent_identity_certificate_path_cert_not_found(
self, mock_exists, mock_sleep, tmpdir, monkeypatch
):
cert_path_str = str(tmpdir.join("cert.pem"))
config_path = tmpdir.join("config.json")
config_path.write(
json.dumps({"cert_configs": {"workload": {"cert_path": cert_path_str}}})
)
monkeypatch.setenv(
environment_vars.GOOGLE_API_CERTIFICATE_CONFIG, str(config_path)
)
def exists_side_effect(path):
return path == str(config_path)
mock_exists.side_effect = exists_side_effect
with pytest.raises(exceptions.RefreshError):
_agent_identity_utils.get_agent_identity_certificate_path()
assert mock_sleep.call_count == 100
@mock.patch("google.auth._agent_identity_utils.get_agent_identity_certificate_path")
def test_get_and_parse_agent_identity_certificate_opted_out(
self, mock_get_path, monkeypatch
):
monkeypatch.setenv(
environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES,
"false",
)
result = _agent_identity_utils.get_and_parse_agent_identity_certificate()
assert result is None
mock_get_path.assert_not_called()
@mock.patch("google.auth._agent_identity_utils.get_agent_identity_certificate_path")
def test_get_and_parse_agent_identity_certificate_no_path(
self, mock_get_path, monkeypatch
):
monkeypatch.setenv(
environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES,
"true",
)
mock_get_path.return_value = None
result = _agent_identity_utils.get_and_parse_agent_identity_certificate()
assert result is None
mock_get_path.assert_called_once()
@mock.patch("google.auth._agent_identity_utils.parse_certificate")
@mock.patch("google.auth._agent_identity_utils.get_agent_identity_certificate_path")
def test_get_and_parse_agent_identity_certificate_success(
self, mock_get_path, mock_parse_certificate, monkeypatch
):
monkeypatch.setenv(
environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES,
"true",
)
mock_get_path.return_value = "/fake/cert.pem"
mock_open = mock.mock_open(read_data=b"cert_bytes")
with mock.patch("builtins.open", mock_open):
result = _agent_identity_utils.get_and_parse_agent_identity_certificate()
mock_open.assert_called_once_with("/fake/cert.pem", "rb")
mock_parse_certificate.assert_called_once_with(b"cert_bytes")
assert result == mock_parse_certificate.return_value
@mock.patch("time.sleep", return_value=None)
@mock.patch("google.auth._agent_identity_utils._is_certificate_file_ready")
def test_get_agent_identity_certificate_path_fallback_to_well_known_path(
self, mock_is_ready, mock_sleep, monkeypatch
):
# Set a dummy config path that won't be found.
monkeypatch.setenv(
environment_vars.GOOGLE_API_CERTIFICATE_CONFIG, "/dummy/config.json"
)
# First, the primary path from the (mocked) config is not ready.
# Then, the fallback well-known path is ready.
mock_is_ready.side_effect = [False, True]
result = _agent_identity_utils.get_agent_identity_certificate_path()
assert result == _agent_identity_utils._WELL_KNOWN_CERT_PATH
# The sleep should have been called once before the fallback is checked.
mock_sleep.assert_called_once()
assert mock_is_ready.call_count == 2
def test_get_cached_cert_fingerprint_no_cert(self):
with pytest.raises(ValueError, match="mTLS connection is not configured."):
_agent_identity_utils.get_cached_cert_fingerprint(None)
def test_get_cached_cert_fingerprint_with_cert(self):
fingerprint = _agent_identity_utils.get_cached_cert_fingerprint(
NON_AGENT_IDENTITY_CERT_BYTES
)
assert isinstance(fingerprint, str)
class TestAgentIdentityUtilsNoCryptography:
@pytest.fixture(autouse=True)
def mock_cryptography_import(self):
with mock.patch.dict(
"sys.modules",
{
"cryptography": None,
"cryptography.hazmat": None,
"cryptography.hazmat.primitives": None,
"cryptography.hazmat.primitives.serialization": None,
},
):
yield
def test_parse_certificate_raises_import_error(self):
with pytest.raises(ImportError, match="The cryptography library is required"):
_agent_identity_utils.parse_certificate(b"cert_bytes")
def test_is_agent_identity_certificate_raises_import_error(self):
with pytest.raises(ImportError, match="The cryptography library is required"):
_agent_identity_utils._is_agent_identity_certificate(mock.sentinel.cert)
def test_calculate_certificate_fingerprint_raises_import_error(self):
with pytest.raises(ImportError, match="The cryptography library is required"):
_agent_identity_utils.calculate_certificate_fingerprint(mock.sentinel.cert)