blob: ebd798b84c297c451f8fa0d3ca0d0ebd9d9f0e1f [file] [log] [blame]
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A module for streamz metric objects shared across the application."""
import datetime
import enum
import logging
import pytz
from tradefed_cluster import env_config
from tradefed_cluster.util import metric_util
METRIC_FIELD_CLUSTER = 'cluster'
METRIC_FIELD_HOSTNAME = 'hostname'
METRIC_FIELD_TEST_RUNNER = 'test_runner'
METRIC_FIELD_TYPE = 'type'
METRIC_FIELD_RUN_TARGET = 'run_target'
METRIC_FIELD_ACTION = 'action'
METRIC_FIELD_QUEUE = 'queue'
METRIC_FIELD_STATE = 'state'
def _Metric(name, descriptor):
"""A helper factory for Metric.
Args:
name: a metric name.
descriptor: a metric_client.MetricDescriptor object.
Returns:
a metric_client.Metric object.
"""
return metric_util.Metric(
name, descriptor=descriptor, client=env_config.CONFIG.metric_client)
QUEUE_METRIC_FIELDS = [
(METRIC_FIELD_QUEUE, str),
]
# A metric object for the number of tasks remaining in a queue.
queue_size = _Metric(
'queue/size',
metric_util.MetricDescriptor(
type_=metric_util.MetricType.VALUE,
value_type=int,
desc='Number of tasks remaining in the queue',
fields=QUEUE_METRIC_FIELDS))
# A metric object for the number of tasks leased in the last minute.
queue_leased = _Metric(
'queue/leased',
metric_util.MetricDescriptor(
type_=metric_util.MetricType.VALUE,
value_type=int,
desc='Number of tasks leased in the last minute',
fields=QUEUE_METRIC_FIELDS))
# A metric object for the delta from now to the oldest ETA.
queue_eta = _Metric(
'queue/eta',
metric_util.MetricDescriptor(
type_=metric_util.MetricType.VALUE,
value_type=int,
desc='Seconds from now to the oldest ETA',
fields=QUEUE_METRIC_FIELDS,
units=metric_util.Units.SECONDS))
COMMAND_EVENT_METRIC_FIELDS = [
(METRIC_FIELD_TYPE, str),
]
# A metric object to count number of command events for a type.
command_event_type_count = _Metric(
'command_event/type',
metric_util.MetricDescriptor(
type_=metric_util.MetricType.COUNTER,
desc='Number of command events of this type',
fields=COMMAND_EVENT_METRIC_FIELDS))
# A metric object to count number of command events using legacy pipeline
command_event_legacy_processing_count = _Metric(
'command_event/legacy_processing_count',
metric_util.MetricDescriptor(
type_=metric_util.MetricType.COUNTER,
desc='Command event legacy processing counts'))
COMMAND_ATTEMPT_METRIC_FIELDS = [
(METRIC_FIELD_CLUSTER, str),
(METRIC_FIELD_RUN_TARGET, str),
(METRIC_FIELD_STATE, str),
]
# A metric object to count command attempts.
command_attempt_count = _Metric(
'command_attempt/count',
metric_util.MetricDescriptor(
type_=metric_util.MetricType.COUNTER,
desc='Command attempt counts',
fields=COMMAND_ATTEMPT_METRIC_FIELDS))
def RecordCommandAttemptMetric(cluster_id, run_target, state):
"""Record timing metrics on command attempts.
Args:
cluster_id: Cluster ID of the command leased
run_target: Run target of the command leased
state: State of the command attempt
"""
metric_fields = {
METRIC_FIELD_CLUSTER: cluster_id,
METRIC_FIELD_RUN_TARGET: run_target,
METRIC_FIELD_STATE: state
}
command_attempt_count.Increment(metric_fields)
COMMAND_METRIC_FIELDS = [
(METRIC_FIELD_CLUSTER, str),
(METRIC_FIELD_RUN_TARGET, str),
(METRIC_FIELD_ACTION, str),
]
class CommandAction(enum.Enum):
"""Applicable actions for command metrics."""
LEASE = 1 # Latency to lease a command
RESCHEDULE = 2 # Latency to reschedule a command
CANCEL = 3 # Latency to cancel a command
INVOCATION_FETCH_BUILD = 4 # Time taken to fetch build
INVOCATION_SETUP = 5 # Time taken to setup the test (preparers)
ENSURE_LEASABLE = 6 # Latency to ensure that a command is leasable
INVOCATION_COMPLETED = 7 # Time taken for an invocation to complete
# A metric object to measure command timing breakdown.
command_timing = _Metric(
'command/timing',
metric_util.MetricDescriptor(
type_=metric_util.MetricType.EVENT,
desc='Command timing breakdown',
fields=COMMAND_METRIC_FIELDS,
units=metric_util.Units.SECONDS))
# A metric object to count command actions.
command_action_count = _Metric(
'command/action_count',
metric_util.MetricDescriptor(
type_=metric_util.MetricType.COUNTER,
desc='Command action counts',
fields=COMMAND_METRIC_FIELDS))
def RecordCommandTimingMetric(cluster_id,
run_target,
command_action,
latency_secs=None,
create_timestamp=None,
count=False):
"""Record timing metrics on command tasks.
Either latency_secs or create_timestamp must be specified.
Args:
cluster_id: Cluster ID of the command leased
run_target: Run target of the command leased
command_action: CommandAction to log
latency_secs: Latency of the action in seconds
create_timestamp: Task creation timestamp
count: Increments the counter metric
"""
metric_fields = {
METRIC_FIELD_CLUSTER: cluster_id,
METRIC_FIELD_RUN_TARGET: run_target,
METRIC_FIELD_ACTION: command_action.name
}
if latency_secs is None:
if (create_timestamp.tzinfo is not None and
create_timestamp.tzinfo.utcoffset(create_timestamp) is not None):
# create_timestamp is an aware datetime. Making it naive.
create_timestamp = create_timestamp.astimezone(pytz.utc)
create_timestamp = create_timestamp.replace(tzinfo=None)
latency_secs = (
datetime.datetime.utcnow() - create_timestamp).total_seconds()
logging.debug(
'%s command for cluster %s and run target %s took %s seconds',
command_action.name, cluster_id, run_target, latency_secs)
command_timing.Record(latency_secs, metric_fields)
if count:
command_action_count.Increment(metric_fields)
_TEST_RUNNER_METRIC_FIELDS = [
(METRIC_FIELD_CLUSTER, str),
(METRIC_FIELD_HOSTNAME, str),
(METRIC_FIELD_TEST_RUNNER, str),
]
_test_runner_version_metric = _Metric(
'host/test_runner_version',
metric_util.MetricDescriptor(
type_=metric_util.MetricType.VALUE,
value_type=str,
desc="Host's test runner version.",
fields=_TEST_RUNNER_METRIC_FIELDS))
def SetHostTestRunnerVersion(
runner, runner_version, cluster, hostname):
"""Set tf version for host.
Args:
runner: test runner on the host
runner_version: test runner version
cluster: cluster name
hostname: hostname
"""
_test_runner_version_metric.Set(
runner_version,
{METRIC_FIELD_CLUSTER: cluster,
METRIC_FIELD_HOSTNAME: hostname,
METRIC_FIELD_TEST_RUNNER: runner})