| # |
| # Copyright (C) 2017 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| import logging |
| import socket |
| import threading |
| |
| import httplib2 |
| from googleapiclient import errors |
| |
| from host_controller.tfc import command_attempt |
| from host_controller.tradefed import remote_operation |
| |
| |
| class InvocationThread(threading.Thread): |
| """The thread that remotely executes a command task. |
| |
| Attributes: |
| _remote_client: The RemoteClient which executes the command. |
| _tfc_client: The TfcClient to which the command events are sent. |
| _attempt: The CommandAttempt whose events are sent to TFC. |
| _command: A list of strings, the command and arguments. |
| device_serials: A list of strings, the serial numbers of the devices |
| which need to be allocated to the task. |
| _allocated_serials: A list of strings, the serial numbers of the devices |
| which are successfully allocated. |
| _tfc_heartbeat_interval: The interval of TestRunInProgress events in |
| seconds. |
| """ |
| |
| def __init__(self, |
| remote_client, |
| tfc_client, |
| attempt, |
| command, |
| device_serials, |
| tfc_heartbeat_interval=5 * 60): |
| """Initializes the attributes.""" |
| super(InvocationThread, self).__init__() |
| self._remote_client = remote_client |
| self._tfc_client = tfc_client |
| self._attempt = attempt |
| self._command = command |
| self.device_serials = device_serials |
| self._allocated_serials = None |
| # The value in Java implementation is 5 minutes. |
| self._tfc_heartbeat_interval = tfc_heartbeat_interval |
| |
| def _AllocateDevices(self): |
| """Allocates all of device_serial.""" |
| for serial in self.device_serials: |
| self._remote_client.SendOperation( |
| remote_operation.AllocateDevice(serial)) |
| self._allocated_serials.append(serial) |
| |
| def _StartInvocation(self): |
| """Starts executing command and sends the event to TFC.""" |
| self._remote_client.SendOperation( |
| remote_operation.ExecuteCommand(self.device_serials[0], |
| *self._command)) |
| event = self._attempt.CreateCommandEvent( |
| command_attempt.EventType.INVOCATION_STARTED) |
| self._tfc_client.SubmitCommandEvents([event]) |
| |
| def _WaitForCommandResult(self): |
| """Waits for command result and keeps sending heartbeat to TFC |
| |
| Returns: |
| A JSON object returned from TradeFed remote manager. |
| """ |
| while True: |
| result = self._remote_client.WaitForCommandResult( |
| self.device_serials[0], self._tfc_heartbeat_interval) |
| if result: |
| return result |
| event = self._attempt.CreateCommandEvent( |
| command_attempt.EventType.TEST_RUN_IN_PROGRESS) |
| self._tfc_client.SubmitCommandEvents([event]) |
| |
| def _CompleteInvocation(self, result): |
| """Sends InvocationCompleted event according to the result. |
| |
| Args: |
| result: A JSON object returned from TradeFed remote manager. |
| """ |
| if result["status"] == "INVOCATION_SUCCESS": |
| event = self._attempt.CreateInvocationCompletedEvent( |
| str(result), 1, 0) |
| else: |
| event = self._attempt.CreateInvocationCompletedEvent( |
| str(result), 1, 1, error=str(result)) |
| self._tfc_client.SubmitCommandEvents([event]) |
| |
| def _FreeAllocatedDevices(self): |
| """Frees allocated devices and tolerates RemoteOperationException.""" |
| for serial in self._allocated_serials: |
| try: |
| self._remote_client.SendOperation( |
| remote_operation.FreeDevice(serial)) |
| except remote_operation.RemoteOperationException as e: |
| logging.exception(e) |
| except socket.error as e: |
| logging.exception(e) |
| break |
| self._allocated_serials = [] |
| |
| def _SubmitErrorEvent(self, event_type, error_msg): |
| """Submits an error event and tolerates http exceptions. |
| |
| Args: |
| event_type: A string, the type of the command event. |
| error_msg: A string, the error message. |
| """ |
| try: |
| self._tfc_client.SubmitCommandEvents( |
| [self._attempt.CreateCommandEvent(event_type, error_msg)]) |
| except (httplib2.HttpLib2Error, errors.HttpError) as e: |
| logging.exception(e) |
| |
| # @Override |
| def run(self): |
| """Executes a command task with exception handling.""" |
| self._allocated_serials = [] |
| last_error = None |
| error_event = command_attempt.EventType.ALLOCATION_FAILED |
| try: |
| self._AllocateDevices() |
| error_event = command_attempt.EventType.EXECUTE_FAILED |
| self._StartInvocation() |
| result = self._WaitForCommandResult() |
| self._CompleteInvocation(result) |
| error_event = None |
| except errors.HttpError as e: |
| logging.exception(e) |
| last_error = e |
| except remote_operation.RemoteOperationException as e: |
| logging.exception(e) |
| last_error = e |
| # ConfigurationException on TradeFed remote manager. |
| if str(e).startswith("Config error: "): |
| error_event = command_attempt.EventType.CONFIGURATION_ERROR |
| except httplib2.HttpLib2Error as e: |
| logging.exception("Cannot communicate with TradeFed cluster: %s\n" |
| "Skip submitting event %s.", e, error_event) |
| last_error = e |
| error_event = None |
| except socket.error as e: |
| logging.exception("Cannot communicate with TradeFed remote " |
| "manager: %s\nSkip freeing devices %s.", |
| e, self._allocated_serials) |
| last_error = e |
| self._allocated_serials = [] |
| finally: |
| if error_event: |
| self._SubmitErrorEvent(error_event, str(last_error)) |
| self._FreeAllocatedDevices() |