blob: 838aed07f6dabfad346d334bb91088bf74373c38 [file] [log] [blame]
# Copyright 2020 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import dataclasses
import enum
import logging
from typing import Any, Dict, Optional
from googleapiclient import discovery
import googleapiclient.errors
# TODO(sergiitk): replace with tenacity
import retrying
from framework.infrastructure import gcp
logger = logging.getLogger(__name__)
class ComputeV1(gcp.api.GcpProjectApiResource):
# TODO(sergiitk): move someplace better
_WAIT_FOR_BACKEND_SEC = 60 * 10
_WAIT_FOR_OPERATION_SEC = 60 * 5
@dataclasses.dataclass(frozen=True)
class GcpResource:
name: str
url: str
@dataclasses.dataclass(frozen=True)
class ZonalGcpResource(GcpResource):
zone: str
def __init__(self, api_manager: gcp.api.GcpApiManager, project: str):
super().__init__(api_manager.compute('v1'), project)
class HealthCheckProtocol(enum.Enum):
TCP = enum.auto()
GRPC = enum.auto()
class BackendServiceProtocol(enum.Enum):
HTTP2 = enum.auto()
GRPC = enum.auto()
def create_health_check(self,
name: str,
protocol: HealthCheckProtocol,
*,
port: Optional[int] = None) -> GcpResource:
if protocol is self.HealthCheckProtocol.TCP:
health_check_field = 'tcpHealthCheck'
elif protocol is self.HealthCheckProtocol.GRPC:
health_check_field = 'grpcHealthCheck'
else:
raise TypeError(f'Unexpected Health Check protocol: {protocol}')
health_check_settings = {}
if port is None:
health_check_settings['portSpecification'] = 'USE_SERVING_PORT'
else:
health_check_settings['portSpecification'] = 'USE_FIXED_PORT'
health_check_settings['port'] = port
return self._insert_resource(
self.api.healthChecks(), {
'name': name,
'type': protocol.name,
health_check_field: health_check_settings,
})
def delete_health_check(self, name):
self._delete_resource(self.api.healthChecks(), 'healthCheck', name)
def create_backend_service_traffic_director(
self,
name: str,
health_check: GcpResource,
protocol: Optional[BackendServiceProtocol] = None) -> GcpResource:
if not isinstance(protocol, self.BackendServiceProtocol):
raise TypeError(f'Unexpected Backend Service protocol: {protocol}')
return self._insert_resource(
self.api.backendServices(),
{
'name': name,
'loadBalancingScheme':
'INTERNAL_SELF_MANAGED', # Traffic Director
'healthChecks': [health_check.url],
'protocol': protocol.name,
})
def get_backend_service_traffic_director(self, name: str) -> GcpResource:
return self._get_resource(self.api.backendServices(),
backendService=name)
def patch_backend_service(self, backend_service, body, **kwargs):
self._patch_resource(collection=self.api.backendServices(),
backendService=backend_service.name,
body=body,
**kwargs)
def backend_service_add_backends(self, backend_service, backends):
backend_list = [{
'group': backend.url,
'balancingMode': 'RATE',
'maxRatePerEndpoint': 5
} for backend in backends]
self._patch_resource(collection=self.api.backendServices(),
body={'backends': backend_list},
backendService=backend_service.name)
def backend_service_remove_all_backends(self, backend_service):
self._patch_resource(collection=self.api.backendServices(),
body={'backends': []},
backendService=backend_service.name)
def delete_backend_service(self, name):
self._delete_resource(self.api.backendServices(), 'backendService',
name)
def create_url_map(
self,
name: str,
matcher_name: str,
src_hosts,
dst_default_backend_service: GcpResource,
dst_host_rule_match_backend_service: Optional[GcpResource] = None,
) -> GcpResource:
if dst_host_rule_match_backend_service is None:
dst_host_rule_match_backend_service = dst_default_backend_service
return self._insert_resource(
self.api.urlMaps(), {
'name':
name,
'defaultService':
dst_default_backend_service.url,
'hostRules': [{
'hosts': src_hosts,
'pathMatcher': matcher_name,
}],
'pathMatchers': [{
'name': matcher_name,
'defaultService': dst_host_rule_match_backend_service.url,
}],
})
def delete_url_map(self, name):
self._delete_resource(self.api.urlMaps(), 'urlMap', name)
def create_target_grpc_proxy(
self,
name: str,
url_map: GcpResource,
) -> GcpResource:
return self._insert_resource(self.api.targetGrpcProxies(), {
'name': name,
'url_map': url_map.url,
'validate_for_proxyless': True,
})
def delete_target_grpc_proxy(self, name):
self._delete_resource(self.api.targetGrpcProxies(), 'targetGrpcProxy',
name)
def create_target_http_proxy(
self,
name: str,
url_map: GcpResource,
) -> GcpResource:
return self._insert_resource(self.api.targetHttpProxies(), {
'name': name,
'url_map': url_map.url,
})
def delete_target_http_proxy(self, name):
self._delete_resource(self.api.targetHttpProxies(), 'targetHttpProxy',
name)
def create_forwarding_rule(
self,
name: str,
src_port: int,
target_proxy: GcpResource,
network_url: str,
) -> GcpResource:
return self._insert_resource(
self.api.globalForwardingRules(),
{
'name': name,
'loadBalancingScheme':
'INTERNAL_SELF_MANAGED', # Traffic Director
'portRange': src_port,
'IPAddress': '0.0.0.0',
'network': network_url,
'target': target_proxy.url,
})
def delete_forwarding_rule(self, name):
self._delete_resource(self.api.globalForwardingRules(),
'forwardingRule', name)
@staticmethod
def _network_endpoint_group_not_ready(neg):
return not neg or neg.get('size', 0) == 0
def wait_for_network_endpoint_group(self, name, zone):
@retrying.retry(retry_on_result=self._network_endpoint_group_not_ready,
stop_max_delay=60 * 1000,
wait_fixed=2 * 1000)
def _wait_for_network_endpoint_group_ready():
try:
neg = self.get_network_endpoint_group(name, zone)
logger.debug(
'Waiting for endpoints: NEG %s in zone %s, '
'current count %s', neg['name'], zone, neg.get('size'))
except googleapiclient.errors.HttpError as error:
# noinspection PyProtectedMember
reason = error._get_reason()
logger.debug('Retrying NEG load, got %s, details %s',
error.resp.status, reason)
raise
return neg
network_endpoint_group = _wait_for_network_endpoint_group_ready()
# TODO(sergiitk): dataclass
return self.ZonalGcpResource(network_endpoint_group['name'],
network_endpoint_group['selfLink'], zone)
def get_network_endpoint_group(self, name, zone):
neg = self.api.networkEndpointGroups().get(project=self.project,
networkEndpointGroup=name,
zone=zone).execute()
# TODO(sergiitk): dataclass
return neg
def wait_for_backends_healthy_status(
self,
backend_service,
backends,
timeout_sec=_WAIT_FOR_BACKEND_SEC,
wait_sec=4,
):
pending = set(backends)
@retrying.retry(retry_on_result=lambda result: not result,
stop_max_delay=timeout_sec * 1000,
wait_fixed=wait_sec * 1000)
def _retry_backends_health():
for backend in pending:
result = self.get_backend_service_backend_health(
backend_service, backend)
if 'healthStatus' not in result:
logger.debug('Waiting for instances: backend %s, zone %s',
backend.name, backend.zone)
continue
backend_healthy = True
for instance in result['healthStatus']:
logger.debug(
'Backend %s in zone %s: instance %s:%s health: %s',
backend.name, backend.zone, instance['ipAddress'],
instance['port'], instance['healthState'])
if instance['healthState'] != 'HEALTHY':
backend_healthy = False
if backend_healthy:
logger.info('Backend %s in zone %s reported healthy',
backend.name, backend.zone)
pending.remove(backend)
return not pending
_retry_backends_health()
def get_backend_service_backend_health(self, backend_service, backend):
return self.api.backendServices().getHealth(
project=self.project,
backendService=backend_service.name,
body={
"group": backend.url
}).execute()
def _get_resource(self, collection: discovery.Resource,
**kwargs) -> GcpResource:
resp = collection.get(project=self.project, **kwargs).execute()
logger.info('Loaded compute resource:\n%s',
self._resource_pretty_format(resp))
return self.GcpResource(resp['name'], resp['selfLink'])
def _insert_resource(self, collection: discovery.Resource,
body: Dict[str, Any]) -> GcpResource:
logger.info('Creating compute resource:\n%s',
self._resource_pretty_format(body))
resp = self._execute(collection.insert(project=self.project, body=body))
return self.GcpResource(body['name'], resp['targetLink'])
def _patch_resource(self, collection, body, **kwargs):
logger.info('Patching compute resource:\n%s',
self._resource_pretty_format(body))
self._execute(
collection.patch(project=self.project, body=body, **kwargs))
def _delete_resource(self, collection: discovery.Resource,
resource_type: str, resource_name: str) -> bool:
try:
params = {"project": self.project, resource_type: resource_name}
self._execute(collection.delete(**params))
return True
except googleapiclient.errors.HttpError as error:
if error.resp and error.resp.status == 404:
logger.info(
'Resource %s "%s" not deleted since it does not exist',
resource_type, resource_name)
else:
logger.warning('Failed to delete %s "%s", %r', resource_type,
resource_name, error)
return False
@staticmethod
def _operation_status_done(operation):
return 'status' in operation and operation['status'] == 'DONE'
def _execute(self,
request,
*,
test_success_fn=None,
timeout_sec=_WAIT_FOR_OPERATION_SEC):
operation = request.execute(num_retries=self._GCP_API_RETRIES)
logger.debug('Response %s', operation)
# TODO(sergiitk) try using wait() here
# https://googleapis.github.io/google-api-python-client/docs/dyn/compute_v1.globalOperations.html#wait
operation_request = self.api.globalOperations().get(
project=self.project, operation=operation['name'])
if test_success_fn is None:
test_success_fn = self._operation_status_done
logger.debug('Waiting for global operation %s, timeout %s sec',
operation['name'], timeout_sec)
response = self.wait_for_operation(operation_request=operation_request,
test_success_fn=test_success_fn,
timeout_sec=timeout_sec)
if 'error' in response:
logger.debug('Waiting for global operation failed, response: %r',
response)
raise Exception(f'Operation {operation["name"]} did not complete '
f'within {timeout_sec}s, error={response["error"]}')
return response