blob: f924abb5f48d593fea13e7335d4383de77a5bcc2 [file] [log] [blame]
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""A tool to run long running tasks.
This allows a task to run in Task Queue which gives about 10 minutes execution
time.
Usage:
In https://chromeperf.appspot.com/_ah/stats/shell, pass a function to
task_runner.Run. Task function should be picklable and must include any
required imports within the function's body.
Example:
from dashboard import task_runner
def unique_test_suite_names():
from dashboard.models import graph_data
query = graph_data.Test.query(graph_data.Test.parent_test == None)
test_keys = query.fetch(limit=50000, keys_only=True)
return sorted(set(k.string_id() for k in test_keys))
task_runner.Run(unique_test_suite_names)
The task function return value and stdouts will be displayed at:
https://chromeperf.appspot.com/get_logs?namespace=task_runner&name=report
WARNING:
Running code in Appstats does affect live dashboard. So watchout for any
datastore writes that may corrupt or unintentionally delete data.
"""
import datetime
import marshal
import cStringIO
import sys
import time
import types
from google.appengine.ext import deferred
from dashboard import quick_logger
_TASK_QUEUE_NAME = 'task-runner-queue'
_REPORT_TEMPLATE = """%(function_name)s: %(start_time)s
Stdout:
%(stdout)s
Elapsed: %(elapsed_time)f seconds.
Returned results:
%(returned_results)s
"""
def Run(task_function):
"""Runs task in task queue."""
# Since defer uses pickle and pickle can't serialize non-global function,
# we'll use marshal to serialize and deserialize the function code object
# before and after defer.
code_string = marshal.dumps(task_function.func_code)
deferred.defer(_TaskWrapper, code_string, task_function.__name__,
_queue=_TASK_QUEUE_NAME)
def _TaskWrapper(code_string, function_name):
"""Runs the task and captures the stdout and the returned results."""
formatted_start_time = datetime.datetime.now().strftime(
'%Y-%m-%d %H:%M:%S %Z')
_AddReportToLog('Starting task "%s" at %s.' %
(function_name, formatted_start_time))
code = marshal.loads(code_string)
task_function = types.FunctionType(code, globals(), 'TaskFunction')
stdout_original = sys.stdout
sys.stdout = stream = cStringIO.StringIO()
start_time = time.time()
try:
returned_results = task_function()
except Exception as e: # Intentionally broad -- pylint: disable=broad-except
print str(e)
returned_results = ''
elapsed_time = time.time() - start_time
stdout = stream.getvalue()
sys.stdout = stdout_original
results = {
'function_name': function_name,
'start_time': formatted_start_time,
'stdout': stdout,
'returned_results': returned_results,
'elapsed_time': elapsed_time
}
_AddReportToLog(_REPORT_TEMPLATE % results)
def _AddReportToLog(report):
"""Adds a log for bench results."""
formatter = quick_logger.Formatter()
logger = quick_logger.QuickLogger('task_runner', 'report', formatter)
logger.Log(report)
logger.Save()