blob: 8380573eadec31bde9c0d13397b288b412e91371 [file] [log] [blame]
#
# Copyright (C) 2017 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import httplib2
import logging
import threading
import time
from googleapiclient import discovery
from googleapiclient import http
from oauth2client.service_account import ServiceAccountCredentials
from host_controller.tfc import command_task
API_NAME = "tradefed_cluster"
API_VERSION = "v1"
SCOPES = ['https://www.googleapis.com/auth/userinfo.email']
class TfcClient(object):
"""The class for accessing TFC API.
Attributes:
_service: The TFC service.
"""
def __init__(self, service):
self._service = service
def LeaseHostTasks(self, cluster_id, next_cluster_ids, hostname, device_infos):
"""Calls leasehosttasks.
Args:
cluster_id: A string, the primary cluster to lease tasks from.
next_cluster_ids: A list of Strings, the secondary clusters to lease
tasks from.
hostname: A string, the name of the TradeFed host.
device_infos: A list of DeviceInfo, the information about the
devices connected to the host.
Returns:
A list of command_task.CommandTask, the leased tasks.
"""
lease = {"hostname": hostname,
"cluster": cluster_id,
"next_cluster_ids": next_cluster_ids,
"device_infos": [x.ToLeaseHostTasksJson()
for x in device_infos]}
logging.info("tasks.leasehosttasks body=%s", lease)
tasks = self._service.tasks().leasehosttasks(body=lease).execute()
logging.info("tasks.leasehosttasks response=%s", tasks)
if "tasks" not in tasks:
return []
return [command_task.CommandTask(**task) for task in tasks["tasks"]]
def TestResourceList(self, request_id):
"""Calls testResource.list.
Args:
request_id: int, id of request to grab resources for
Returns:
A list of TestResources
"""
logging.info("request.testResource.list request_id=%s", request_id)
test_resources = self._service.requests().testResource().list(request_id=request_id).execute()
logging.info("request.testResource.list response=%s", test_resources)
if 'test_resources' not in test_resources:
return {}
return test_resources['test_resources']
@staticmethod
def CreateDeviceSnapshot(cluster_id, hostname, dev_infos):
"""Creates a DeviceSnapshot which can be uploaded as host event.
Args:
cluster_id: A string, the cluster to upload snapshot to.
hostname: A string, the name of the TradeFed host.
dev_infos: A list of DeviceInfo.
Returns:
A JSON object.
"""
obj = {"time": int(time.time()),
"data": {},
"cluster": cluster_id,
"hostname": hostname,
"tf_version": "(unknown)",
"type": "DeviceSnapshot",
"device_infos": [x.ToDeviceSnapshotJson() for x in dev_infos]}
return obj
def SubmitHostEvents(self, host_events):
"""Calls host_events.submit.
Args:
host_events: A list of JSON objects. Currently DeviceSnapshot is
the only type of host events.
"""
json_obj = {"host_events": host_events}
logging.info("host_events.submit body=%s", json_obj)
self._service.host_events().submit(body=json_obj).execute()
def SubmitCommandEvents(self, command_events):
"""Calls command_events.submit.
Args:
command_events: A list of JSON objects converted from CommandAttempt.
"""
json_obj = {"command_events": command_events}
logging.info("command_events.submit body=%s", json_obj)
self._service.command_events().submit(body=json_obj).execute()
def NewRequest(self, request):
"""Calls requests.new.
Args:
request: An instance of Request.
Returns:
A JSON object, the new request queued in the cluster.
Sample
{'state': 'UNKNOWN',
'command_line': 'vts-codelab --run-target sailfish',
'id': '2',
'user': 'testuser'}
"""
body = request.GetBody()
params = request.GetParameters()
logging.info("requests.new parameters=%s body=%s", params, body)
return self._service.requests().new(body=body, **params).execute()
def CreateTfcClient(api_root, oauth2_service_json,
api_name=API_NAME, api_version=API_VERSION, scopes=SCOPES):
"""Builds an object of TFC service from a given URL.
Args:
api_root: The URL to the service.
oauth2_service_json: The path to service account key file.
Returns:
A TfcClient object.
"""
discovery_url = "%s/discovery/v1/apis/%s/%s/rest" % (
api_root, api_name, api_version)
logging.info("Build service from: %s", discovery_url)
credentials = ServiceAccountCredentials.from_json_keyfile_name(
oauth2_service_json, scopes=scopes)
# httplib2.Http is not thread-safe. Use thread local object.
thread_local = threading.local()
thread_local.http = credentials.authorize(httplib2.Http())
def BuildHttpRequest(unused_http, *args, **kwargs):
if not hasattr(thread_local, "http"):
thread_local.http = credentials.authorize(httplib2.Http())
return http.HttpRequest(thread_local.http, *args, **kwargs)
service = discovery.build(
api_name, api_version, http=thread_local.http,
discoveryServiceUrl=discovery_url,
requestBuilder=BuildHttpRequest)
return TfcClient(service)