blob: 2c8f2e0fb84a922edd631ab47914491b03ae1e93 [file] [log] [blame]
# Copyright (c) 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import datetime
import json
import logging
import os
import time
import urllib
import uuid
import webapp2
from google.appengine.api import taskqueue
from google.appengine.api import urlfetch
from perf_insights.endpoints.cloud_mapper import cloud_helper
from perf_insights.endpoints.cloud_mapper import job_info
from perf_insights import cloud_config
DEFAULT_TRACES_PER_INSTANCE = 64
class TaskPage(webapp2.RequestHandler):
def _QueryForTraces(self, corpus, query):
payload = urllib.urlencode({'q': query})
query_url = '%s/query?%s' % (corpus, payload)
headers = {
'X-URLFetch-Service-Id': cloud_config.Get().urlfetch_service_id
}
result = urlfetch.fetch(url=query_url,
payload=payload,
method=urlfetch.GET,
headers=headers,
follow_redirects=False,
deadline=10)
logging.info(result.content)
return json.loads(result.content)
def _DispatchTracesAndWaitForResult(self, job, traces, num_instances):
def _slice_it(li, cols=2):
start = 0
for i in xrange(cols):
stop = start + len(li[i::cols])
yield li[start:stop]
start = stop
# TODO(simonhatch): In the future it might be possibly to only specify a
# reducer and no mapper. Revisit this.
bucket_path = cloud_config.Get().control_bucket_path + "/jobs/"
mapper_url = '%s%s.mapper' % (bucket_path, job.key.id())
mapper_text = job.mapper.encode('ascii', 'ignore')
cloud_helper.WriteGCS(mapper_url, mapper_text)
version = self._GetVersion()
tasks = {}
# Split the traces up into N buckets.
for current_traces in _slice_it(traces, num_instances):
task_id = str(uuid.uuid4())
payload = {
'revision': job.revision,
'traces': json.dumps(current_traces),
'result': '%s%s.result' % (bucket_path, task_id),
'mapper': mapper_url,
'mapper_function': job.mapper_function
}
taskqueue.add(
queue_name='mapper-queue',
url='/cloud_worker/task',
target=version,
name=task_id,
params=payload)
tasks[task_id] = {'status': 'IN_PROGRESS'}
# On production servers, we could just sit and wait for the results, but
# dev_server is single threaded and won't run any other tasks until the
# current one is finished. We'll just do the easy thing for now and
# queue a task to check for the result.
timeout = (
datetime.datetime.now() + datetime.timedelta(minutes=10)).strftime(
'%Y-%m-%d %H:%M:%S')
taskqueue.add(
queue_name='default',
url='/cloud_mapper/task',
target=version,
countdown=1,
params={'jobid': job.key.id(),
'type': 'check',
'tasks': json.dumps(tasks),
'timeout': timeout})
def _GetVersion(self):
version = os.environ['CURRENT_VERSION_ID'].split('.')[0]
if cloud_config._is_devserver():
version = taskqueue.DEFAULT_APP_VERSION
return version
def _CancelTasks(self, tasks):
task_names = [task_id for task_id, _ in tasks.iteritems()]
taskqueue.Queue('mapper-queue').delete_tasks_by_name(task_names)
def _CheckOnResults(self, job):
tasks = json.loads(self.request.get('tasks'))
# TODO: There's no reducer yet, so we can't actually collapse multiple
# results into one results file.
results = None
for task_id, _ in tasks.iteritems():
task_results_path = '%s/jobs/%s.result' % (
cloud_config.Get().control_bucket_path, task_id)
stat_result = cloud_helper.StatGCS(task_results_path)
if stat_result is not None:
logging.info(str(stat_result))
tasks[task_id]['status'] = 'DONE'
results = task_results_path
if not results:
timeout = datetime.datetime.strptime(
self.request.get('timeout'), '%Y-%m-%d %H:%M:%S')
if datetime.datetime.now() > timeout:
self._CancelTasks(tasks)
job.status = 'ERROR'
job.put()
logging.error('Task timed out waiting for results.')
return
taskqueue.add(
url='/cloud_mapper/task',
target=self._GetVersion(),
countdown=1,
params={'jobid': job.key.id(),
'type': 'check',
'tasks': json.dumps(tasks),
'timeout': self.request.get('timeout')})
return
logging.info("Finished all tasks.")
job.status = 'COMPLETE'
job.results = results
job.put()
def _CalculateNumInstancesNeeded(self, num_traces):
return 1 + int(num_traces / DEFAULT_TRACES_PER_INSTANCE)
def _RunMappers(self, job):
# Get all the traces to process
traces = self._QueryForTraces(job.corpus, job.query)
# We can probably be smarter about this down the road, maybe breaking
# this into many smaller tasks and allowing each instance to run
# several tasks at once. For now we'll just break it into a few big ones.
num_instances = self._CalculateNumInstancesNeeded(len(traces))
return self._DispatchTracesAndWaitForResult(job, traces, num_instances)
def _CreateMapperJob(self, job):
if job.status != 'QUEUED':
return
job.status = 'IN_PROGRESS'
job.put()
self._RunMappers(job)
def post(self):
self.response.headers['Content-Type'] = 'text/plain'
jobid = self.request.get('jobid')
job = job_info.JobInfo.get_by_id(jobid)
if not job:
return
try:
if self.request.get('type') == 'create':
self._CreateMapperJob(job)
elif self.request.get('type') == 'check':
self._CheckOnResults(job)
except Exception as e:
job.status = 'ERROR'
job.put()
logging.exception('Failed job: %s' % e.message)
app = webapp2.WSGIApplication([('/cloud_mapper/task', TaskPage)])