blob: 1c8a3d0f7c747558e2ec746028f5e14f65340f36 [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright (C) 2017 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import datetime
import logging
import webapp2
from webapp.src import vtslab_status as Status
from webapp.src.proto import model
from webapp.src.utils import logger
def StrGT(left, right):
"""Returns true if `left` string is greater than `right` in value."""
if len(left) > len(right):
right = "0" * (len(left) - len(right)) + right
elif len(right) > len(left):
left = "0" * (len(right) - len(left)) + left
return left > right
class PeriodicScheduler(webapp2.RequestHandler):
"""Main class for /tasks/schedule servlet.
This class creates jobs from registered schedules periodically.
Attributes:
logger: Logger class
"""
logger = logger.Logger()
def ReserveDevices(self, target_device_serials):
"""Reserves devices.
Args:
target_device_serials: a list of strings, containing target device
serial numbers.
"""
device_query = model.DeviceModel.query(
model.DeviceModel.serial.IN(target_device_serials))
devices = device_query.fetch()
for device in devices:
device.scheduling_status = Status.DEVICE_SCHEDULING_STATUS_DICT[
"reserved"]
device.put()
def FindBuildId(self, new_job):
"""Finds build ID for a new job.
Args:
new_job: JobModel, a new job.
Return:
string, build ID found.
"""
build_id = ""
build_query = model.BuildModel.query(
model.BuildModel.manifest_branch == new_job.manifest_branch)
builds = build_query.fetch()
if builds:
self.logger.Println("-- Find build ID")
# Remove builds if build_id info is none
build_id_filled = [x for x in builds if x.build_id]
sorted_list = sorted(
build_id_filled, key=lambda x: int(x.build_id), reverse=True)
filtered_list = [
x for x in sorted_list
if (all(
hasattr(x, attrs)
for attrs in ["build_target", "build_type", "build_id"])
and x.build_target and x.build_type)
]
for device_build in filtered_list:
candidate_build_target = "-".join(
[device_build.build_target, device_build.build_type])
if (new_job.build_target == candidate_build_target and
(not new_job.require_signed_device_build or
device_build.signed)):
build_id = device_build.build_id
break
return build_id
def get(self):
"""Generates an HTML page based on the task schedules kept in DB."""
self.logger.Clear()
schedule_query = model.ScheduleModel.query()
schedules = schedule_query.fetch()
if schedules:
for schedule in schedules:
self.logger.Println("Schedule: %s (%s %s)" %
(schedule.test_name,
schedule.manifest_branch,
schedule.build_target))
self.logger.Indent()
if self.NewPeriod(schedule):
self.logger.Println("- Need new job")
target_host, target_device, target_device_serials =\
self.SelectTargetLab(schedule)
self.logger.Println("- Target host: %s" % target_host)
self.logger.Println("- Target device: %s" % target_device)
self.logger.Println(
"- Target serials: %s" % target_device_serials)
# TODO: update device status
# create job and add.
if target_host:
new_job = model.JobModel()
new_job.hostname = target_host
new_job.priority = schedule.priority
new_job.test_name = schedule.test_name
new_job.require_signed_device_build = (
schedule.require_signed_device_build)
new_job.device = target_device
new_job.period = schedule.period
new_job.serial.extend(target_device_serials)
new_job.build_storage_type = schedule.build_storage_type
new_job.manifest_branch = schedule.manifest_branch
new_job.build_target = schedule.build_target
new_job.shards = schedule.shards
new_job.param = schedule.param
new_job.retry_count = schedule.retry_count
new_job.gsi_storage_type = schedule.gsi_storage_type
new_job.gsi_branch = schedule.gsi_branch
new_job.gsi_build_target = schedule.gsi_build_target
new_job.gsi_pab_account_id = schedule.gsi_pab_account_id
new_job.test_storage_type = schedule.test_storage_type
new_job.test_branch = schedule.test_branch
new_job.test_build_target = schedule.test_build_target
new_job.test_pab_account_id = (
schedule.test_pab_account_id)
new_job.build_id = ""
if new_job.build_storage_type == (
Status.STORAGE_TYPE_DICT["PAB"]):
new_job.build_id = self.FindBuildId(new_job)
if new_job.build_id:
self.ReserveDevices(target_device_serials)
new_job.status = Status.JOB_STATUS_DICT[
"ready"]
new_job.timestamp = datetime.datetime.now()
new_job.put()
self.logger.Println("NEW JOB")
else:
self.logger.Println("NO BUILD FOUND")
elif new_job.build_storage_type == (
Status.STORAGE_TYPE_DICT["GCS"]):
new_job.status = Status.JOB_STATUS_DICT["ready"]
new_job.timestamp = datetime.datetime.now()
new_job.put()
self.logger.Println("NEW JOB - GCS")
else:
self.logger.Println("Unexpected storage type.")
self.logger.Unindent()
self.response.write(
"<pre>\n" + "\n".join(self.logger.Get()) + "\n</pre>")
def NewPeriod(self, schedule):
"""Checks whether a new job creation is needed.
Args:
schedule: a proto containing schedule information.
Returns:
True if new job is required, False otherwise.
"""
job_query = model.JobModel.query(
model.JobModel.manifest_branch == schedule.manifest_branch,
model.JobModel.build_target == schedule.build_target,
model.JobModel.test_name == schedule.test_name,
model.JobModel.period == schedule.period,
model.JobModel.shards == schedule.shards,
model.JobModel.retry_count == schedule.retry_count,
model.JobModel.gsi_branch == schedule.gsi_branch,
model.JobModel.test_branch == schedule.test_branch)
same_jobs = job_query.fetch()
same_jobs = [
x for x in same_jobs
if (set(x.param) == set(schedule.param)
and x.device in schedule.device)
]
if not same_jobs:
return True
outdated_jobs = [
x for x in same_jobs
if (datetime.datetime.now() - x.timestamp > datetime.timedelta(
minutes=x.period))
]
outdated_ready_jobs = [
x for x in outdated_jobs
if x.status == Status.JOB_STATUS_DICT["expired"]
]
if outdated_ready_jobs:
msg = ("Job key[{}] is(are) outdated. "
"They became infra-err status.").format(
", ".join(
[str(x.key.id()) for x in outdated_ready_jobs]))
logging.debug(msg)
self.logger.Println(msg)
for job in outdated_ready_jobs:
job.status = Status.JOB_STATUS_DICT["infra-err"]
job.put()
outdated_leased_jobs = [
x for x in outdated_jobs
if x.status == Status.JOB_STATUS_DICT["leased"]
]
if outdated_leased_jobs:
msg = ("Job key[{}] is(are) expected to be completed "
"however still in leased status.").format(
", ".join(
[str(x.key.id()) for x in outdated_leased_jobs]))
logging.debug(msg)
self.logger.Println(msg)
recent_jobs = [x for x in same_jobs if x not in outdated_jobs]
if recent_jobs or outdated_leased_jobs:
return False
else:
return True
def SelectTargetLab(self, schedule):
"""Find target host and devices to schedule a new job.
Args:
schedule: a proto containing the information of a schedule.
Returns:
a string which represents hostname,
a string containing target lab and product with '/' separator,
a list of selected devices serial (see whether devices will be
selected later when the job is picked up.)
"""
for target_device in schedule.device:
if "/" not in target_device:
# device malformed
continue
target_lab, target_product_type = target_device.split("/")
self.logger.Println("- Seeking product %s in lab %s" %
(target_product_type, target_lab))
self.logger.Indent()
lab_query = model.LabModel.query(model.LabModel.name == target_lab)
target_labs = lab_query.fetch()
available_devices = {}
if target_labs:
for lab in target_labs:
self.logger.Println("- target lab found")
self.logger.Println("- target device %s %s" %
(lab.hostname, target_product_type))
self.logger.Indent()
device_query = model.DeviceModel.query(
model.DeviceModel.hostname == lab.hostname)
host_devices = device_query.fetch()
for device in host_devices:
self.logger.Println("- check device %s %s" %
(device.status, device.product))
if ((device.status in [
Status.DEVICE_STATUS_DICT["fastboot"],
Status.DEVICE_STATUS_DICT["online"],
Status.DEVICE_STATUS_DICT["ready"]
]) and (device.scheduling_status ==
Status.DEVICE_SCHEDULING_STATUS_DICT["free"])
and device.product == target_product_type):
self.logger.Println(
"- a device found %s" % device.serial)
if device.hostname not in available_devices:
available_devices[device.hostname] = set()
available_devices[device.hostname].add(
device.serial)
self.logger.Unindent()
for host in available_devices:
self.logger.Println("- len(devices) %s >= shards %s ?" %
(len(available_devices[host]),
schedule.shards))
if len(available_devices[host]) >= schedule.shards:
self.logger.Unindent()
return host, target_device, list(
available_devices[host])[:schedule.shards]
self.logger.Unindent()
return None, None, []