blob: 05acb7dd9f529aa267d41007b9073911489331ba [file] [log] [blame]
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A module to process TFC events."""
import datetime
import json
import logging
import os
import threading
from typing import Union
import zlib
import flask
from protorpc import protojson
from tradefed_cluster import api_messages
from tradefed_cluster import common
from tradefed_cluster.services import task_scheduler
from tradefed_cluster.util import ndb_shim as ndb
from multitest_transport.models import event_log
from multitest_transport.models import ndb_models
from multitest_transport.models import test_run_hook
from multitest_transport.models import sql_models
from multitest_transport.test_scheduler import test_result_handler
from multitest_transport.test_scheduler import test_scheduler
from multitest_transport.util import analytics
from multitest_transport.util import file_util
from multitest_transport.util import tfc_client
from multitest_transport.util import olcs_session_stub
TEST_RUN_STATE_MAP = {
api_messages.RequestState.UNKNOWN: ndb_models.TestRunState.UNKNOWN,
api_messages.RequestState.QUEUED: ndb_models.TestRunState.QUEUED,
api_messages.RequestState.RUNNING: ndb_models.TestRunState.RUNNING,
api_messages.RequestState.CANCELED: ndb_models.TestRunState.CANCELED,
api_messages.RequestState.COMPLETED: ndb_models.TestRunState.COMPLETED,
api_messages.RequestState.ERROR: ndb_models.TestRunState.ERROR,
}
FAILED_TEST_COUNT_THRESHOLDS = [1, 10, 50, 100, 200, 500, 1000]
LOCAL_ID_TAG = 'custom'
APP = flask.Flask(__name__)
_tls = threading.local()
def _GetOlcsSessionStub() -> olcs_session_stub.OlcsSessionStub:
"""Returns a OlcsSessionStub for TFC."""
if not hasattr(_tls, 'olcs_session_stub'):
_tls.olcs_session_stub = olcs_session_stub.OlcsSessionStub(None)
return _tls.olcs_session_stub
def _ProcessSubscribedSessionResponse(response):
"""Session response subscriber."""
try:
request_id = response.get_session_response.session_detail.session_id.id
test_request = _GetOlcsSessionStub().GetRequest(request_id)
request_event = api_messages.RequestEventMessage(
type=common.ObjectEventType.REQUEST_STATE_CHANGED,
request_id=request_id,
new_state=test_request.state,
request=test_request,
event_time=datetime.datetime.now(),
)
ProcessRequestEvent(request_event)
except Exception as e:
logging.exception(
'Exception %s when processing subscribed session response', e
)
@ndb.transactional()
def _AfterTestRunHandler(test_run_id):
"""Performs after test run tasks.
After a test run is in one of the final states, MTT needs to do the following:
Save the final test context.
Invoke after run hooks.
Schedule a job to upload test output files.
Record test run metrics.
Args:
test_run_id: id for the test run.
"""
test_run = ndb_models.TestRun.get_by_id(test_run_id)
if not test_run:
return
test_run.is_finalized = True # Mark post-run handlers as completed
# Query and store next test context
if test_run.request_id:
tfc_test_context = _GetTestContext(test_run.request_id)
if tfc_test_context:
test_run.next_test_context = ndb_models.TestContextObj(
command_line=tfc_test_context.command_line,
env_vars=[
ndb_models.NameValuePair(name=p.key, value=p.value)
for p in tfc_test_context.env_vars
],
test_resources=[
_ConvertToTestResourceObj(r)
for r in tfc_test_context.test_resources
])
logging.debug(
'Setting the next_test_context = %s', test_run.next_test_context)
test_run.put()
# Schedule the report merging, it happens before plugin execution as some
# plugins like apfe, ants may need to access the merged report.
task_scheduler.AddCallableTask(
test_result_handler.MergeReports, test_run_id, _transactional=True
)
# Invoke after run hooks
if test_run.state == ndb_models.TestRunState.COMPLETED:
task_scheduler.AddCallableTask(test_run_hook.ExecuteHooks,
test_run.key.id(),
ndb_models.TestRunPhase.ON_SUCCESS,
_transactional=True)
elif test_run.state == ndb_models.TestRunState.ERROR:
task_scheduler.AddCallableTask(test_run_hook.ExecuteHooks,
test_run.key.id(),
ndb_models.TestRunPhase.ON_ERROR,
_transactional=True)
task_scheduler.AddCallableTask(test_run_hook.ExecuteHooks, test_run.key.id(),
ndb_models.TestRunPhase.AFTER_RUN,
_transactional=True)
# Record metrics
task_scheduler.AddCallableTask(_TrackTestRun, test_run.key.id(),
_transactional=True)
# Log final state
task_scheduler.AddCallableTask(event_log.Info, test_run.key,
'Test run reached final state',
_transactional=True)
# Schedule next run in sequence
if test_run.sequence_id:
task_scheduler.AddCallableTask(test_scheduler.ScheduleNextTestRun,
test_run.sequence_id, test_run.key)
def ProcessRequestEvent(message: api_messages.RequestEventMessage):
"""Process a TFC request state change event message."""
test_run = _GetTestRunToUpdate(message)
if not test_run:
return
_ProcessRequestEvent(test_run.key.id(), message)
test_run = test_run.key.get()
if test_run.IsFinal():
if test_run.test.result_file:
test_result_handler.UpdateTestRunSummary(test_run.key.id())
if not test_run.is_finalized:
_AfterTestRunHandler(test_run.key.id())
@ndb.transactional()
def _ProcessRequestEvent(test_run_id, message):
"""Process a TFC request state change event message."""
test_run = ndb_models.TestRun.get_by_id(test_run_id)
if not test_run:
return
# Update test run information
test_run.request_event_time = message.event_time
test_run.update_time = message.event_time
if (
os.environ.get('IS_OMNILAB_BASED') == 'true'
and message.request.next_attempt_session_id
):
test_run.request_id = message.request.next_attempt_session_id
test_run.put()
# Listen to retry request's status update.
_GetOlcsSessionStub().StartSubscribeSession(
test_run.request_id,
ndb.with_ndb_context(_ProcessSubscribedSessionResponse),
)
return
if not test_run.IsFinal():
test_run.state = TEST_RUN_STATE_MAP.get(
message.new_state, ndb_models.TestRunState.UNKNOWN)
if os.environ.get('IS_OMNILAB_BASED') == 'true' and test_run.IsFinal():
_ProcessCommandAttemptResult(test_run_id, test_run, message.request)
if not test_run.test.result_file:
# No test results file to parse, use partial test counts
test_run.total_test_count = (message.failed_test_count +
message.passed_test_count)
test_run.failed_test_count = message.failed_test_count
test_run.failed_test_run_count = message.failed_test_run_count
test_run.cancel_reason = (
message.request.cancel_reason if message.request else None)
test_run.put()
def _ProcessCommandAttemptResult(test_run_id, test_run, request):
"""Process command attempt result.
Args:
test_run_id: the id of the test run.
test_run: the test run entity.
request: the test run's request.
"""
for attempt in request.command_attempts:
device_serials = {
device.device_serial for device in test_run.test_devices
}
logging.info('attempt.device_serials: %s', attempt.device_serials)
test_run.test_devices.extend(
_GetTestDeviceInfos(set(attempt.device_serials) - device_serials)
)
logging.info('test_run.test_devices: %s', test_run.test_devices)
results = sql_models.GetTestModuleResults([attempt.attempt_id])
result_url = file_util.GetResultUrl(test_run, attempt)
# Skip loading test results if they already exist in DB.
if result_url and not results:
test_result_handler.StoreTestResults(
test_run_id, attempt.attempt_id, result_url
)
def ProcessCommandAttemptEvent(
message: api_messages.CommandAttemptEventMessage):
"""Process a TFC attempt state change event message."""
test_run = _GetTestRunToUpdate(message)
if test_run:
_ProcessCommandAttemptEvent(test_run.key.id(), message)
@ndb.transactional()
def _ProcessCommandAttemptEvent(test_run_id, message):
"""Process a TFC attempt state change event message."""
test_run = ndb_models.TestRun.get_by_id(test_run_id)
if not test_run:
return
# Update test run information
attempt = message.attempt
test_run.attempt_event_time = message.event_time
test_run.update_time = message.event_time
device_serials = {device.device_serial for device in test_run.test_devices}
test_run.test_devices.extend(
_GetTestDeviceInfos(set(attempt.device_serials) - device_serials)
)
test_run.put()
# Store attempt test results and invoke after attempt hooks
if common.IsFinalCommandState(attempt.state):
result_url = file_util.GetResultUrl(test_run, attempt)
if result_url:
task_scheduler.AddCallableTask(test_result_handler.StoreTestResults,
test_run_id, attempt.attempt_id,
result_url, _transactional=True)
else:
logging.warning('No result file for test run %s, skip processing',
test_run_id)
if not test_run.is_finalized:
task_scheduler.AddCallableTask(test_run_hook.ExecuteHooks, test_run_id,
ndb_models.TestRunPhase.AFTER_ATTEMPT,
attempt_id=attempt.attempt_id,
_transactional=True)
def _GetTestContext(request_id):
"""Retrieve the TFC test context for a request."""
request = tfc_client.GetRequest(request_id)
# TODO: merge test contexts if there is more than one.
if request.commands:
return tfc_client.GetTestContext(request.id, request.commands[0].id)
return None
def _ConvertToTestResourceObj(msg):
"""Convert TFC api_messages.TestResource to ndb_models.TestResourceObj."""
return ndb_models.TestResourceObj(
name=msg.name,
url=msg.url,
decompress=msg.decompress,
decompress_dir=msg.decompress_dir,
params=ndb_models.TestResourceParameters(
decompress_files=msg.params.decompress_files) if msg.params else None)
def _TrackTestRun(test_run_id):
"""Generate and send test run analytics information after completion."""
test_run = ndb_models.TestRun.get_by_id(test_run_id)
if not test_run:
return
package = test_run.test_package_info
duration = _GetCurrentTime() - test_run.create_time
device_count = None
attempt_count = None
if test_run.request_id:
request = tfc_client.GetRequest(test_run.request_id)
attempts = request.command_attempts
if attempts:
device_count = max(len(a.device_serials or []) for a in attempts)
attempt_count = len(attempts)
_TrackTestInvocations(request, test_run)
analytics.Log(analytics.TEST_RUN_CATEGORY, analytics.END_ACTION,
test_name=package.name if package else None,
test_version=package.version if package else None,
state=test_run.state.name,
duration_seconds=int(duration.total_seconds()),
device_count=device_count,
attempt_count=attempt_count,
is_rerun=test_run.prev_test_run_key is not None,
failed_module_count=test_run.failed_test_run_count,
test_count=test_run.total_test_count,
failed_test_count=test_run.failed_test_count)
def _TrackTestInvocations(request, test_run):
"""Generate and send analytics for each command attempt in a request."""
if not request.command_attempts:
return
attempts = request.command_attempts
test_run_config = test_run.test_run_config
test_run_command = test_run_config.command if test_run_config else ''
test_run_retry_command = (
test_run_config.retry_command if test_run_config else '')
test_id = test_run_config.test_key.id()
if test_id and ndb_models.IsLocalId(test_id):
test_id = LOCAL_ID_TAG
package = test_run.test_package_info
original_start_time = test_run.create_time
prev_total_test_count = None
prev_failed_module_count = None
prev_failed_test_count = None
missing_previous_run = bool(test_run.prev_test_context and
not test_run.prev_test_run_key)
# If the test run is a rerun, get data from the previous run
if test_run.prev_test_run_key:
prev_test_run = ndb_models.TestRun.get_by_id(
test_run.prev_test_run_key.id())
if prev_test_run:
original_start_time = prev_test_run.create_time
prev_total_test_count = prev_test_run.total_test_count
prev_failed_module_count = prev_test_run.failed_test_run_count
prev_failed_test_count = prev_test_run.failed_test_count
# Trace retries to get the create time of the first run
while prev_test_run.prev_test_run_key:
prev_test_run = ndb_models.TestRun.get_by_id(
prev_test_run.prev_test_run_key.id())
if not prev_test_run:
break
original_start_time = prev_test_run.create_time
# Check if first run was a rerun of a non-local run
if not prev_test_run or prev_test_run.prev_test_context:
missing_previous_run = True
# Log data for each attempt
for attempt in attempts:
# Cancelled attempts could still be running and might not have an end time,
# in that case the request cancellation time is used
attempt_end_time = attempt.end_time or request.update_time
# Attempts can also be cancelled before starting, this ensures duration >= 0
attempt_start_time = attempt.start_time or attempt_end_time
duration = max(attempt_end_time - attempt_start_time, datetime.timedelta(0))
elapsed_time = attempt_end_time - original_start_time
failed_test_count_threshold = _FindFailedTestCountThreshold(
attempt.failed_test_count, prev_failed_test_count)
if failed_test_count_threshold:
failed_test_count_threshold = str(failed_test_count_threshold)
analytics.Log(analytics.INVOCATION_CATEGORY, analytics.END_ACTION,
command=request.command_line,
test_run_command=test_run_command,
test_run_retry_command=test_run_retry_command,
test_id=test_id,
test_name=package.name if package else None,
test_version=package.version if package else None,
state=attempt.state.name,
failed_test_count_threshold=failed_test_count_threshold,
duration_seconds=int(duration.total_seconds()),
elapsed_time_seconds=int(elapsed_time.total_seconds()),
device_count=len(attempt.device_serials or []),
test_count=attempt.total_test_count,
failed_module_count=attempt.failed_test_run_count,
failed_test_count=attempt.failed_test_count,
prev_total_test_count=prev_total_test_count,
prev_failed_module_count=prev_failed_module_count,
prev_failed_test_count=prev_failed_test_count,
missing_previous_run=missing_previous_run,
is_sequence_run=test_run.sequence_id is not None)
prev_total_test_count = attempt.total_test_count
prev_failed_module_count = attempt.failed_test_run_count
prev_failed_test_count = attempt.failed_test_count
def _FindFailedTestCountThreshold(failed_test_count, prev_failed_test_count):
"""Returns the minimum threshold between two counts."""
if failed_test_count is None:
return None
if prev_failed_test_count is None:
prev_failed_test_count = float('inf')
for threshold in FAILED_TEST_COUNT_THRESHOLDS:
if failed_test_count < threshold <= prev_failed_test_count:
return threshold
return None
def _GetCurrentTime():
"""Return current time, visible for testing."""
return datetime.datetime.now()
def _GetTestRunToUpdate(
message: Union[api_messages.RequestEventMessage,
api_messages.CommandAttemptEventMessage]):
"""Retrieve the associated test run if it exists and should be updated."""
request_event = isinstance(message, api_messages.RequestEventMessage)
request_id = (message.request_id if request_event
else message.attempt.request_id)
query = ndb_models.TestRun.query(ndb_models.TestRun.request_id == request_id)
test_run_key = query.get(keys_only=True)
if not test_run_key:
logging.warning('Cannot find test run for request %s', request_id)
return None
test_run = test_run_key.get(use_cache=False, use_memcache=False)
last_event_time = (test_run.request_event_time if request_event
else test_run.attempt_event_time)
if last_event_time and message.event_time < last_event_time:
logging.warning('Event timestamp too old (%s < %s); skipping message: %s',
message.event_time, last_event_time, message)
return None
return test_run
def _GetTestDeviceInfos(device_serials):
"""Retrieve information about the test devices."""
device_infos = []
for serial in device_serials:
device = tfc_client.GetDeviceInfo(serial)
if device:
device_info = ndb_models.TestDeviceInfo(
device_serial=device.device_serial,
hostname=device.hostname,
run_target=device.run_target,
build_id=device.build_id,
product=device.product,
sdk_version=device.sdk_version)
device_infos.append(device_info)
else:
logging.error('Device %s not found', serial)
return device_infos
# Event message classes and handlers
_EVENT_HANDLERS = {
common.ObjectEventType.COMMAND_ATTEMPT_STATE_CHANGED: (
api_messages.CommandAttemptEventMessage, ProcessCommandAttemptEvent),
common.ObjectEventType.REQUEST_STATE_CHANGED: (
api_messages.RequestEventMessage, ProcessRequestEvent)
}
# Sets the callback method for the request event.
tfc_client.SetRequestEventMessageHandler(ProcessRequestEvent)
@APP.route('/', methods=['POST'])
@APP.route('/<path:fake>', methods=['POST'])
def HandleTask(fake=None):
"""Handle tasks from the TFC event queue."""
del fake
body = flask.request.get_data()
try:
body = zlib.decompress(body)
except zlib.error:
logging.warning(
'payload may not be compressed: %s', body, exc_info=True)
message_type = json.loads(body).get('type')
message_cls, handler = _EVENT_HANDLERS[message_type]
if not message_cls or not handler:
raise ValueError('Unknown TFC event type %s' % message_type)
message = protojson.decode_message(message_cls, body) # pytype: disable=module-attr
handler(message)
return common.HTTP_OK