| # |
| # Copyright (C) 2018 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| import logging |
| |
| from vts.proto import AndroidSystemControlMessage_pb2 as ASysCtrlMsg |
| from vts.proto import VtsResourceControllerMessage_pb2 as ResControlMsg |
| from vts.proto import ComponentSpecificationMessage_pb2 as CompSpecMsg |
| from vts.utils.python.mirror import mirror_object |
| |
| |
| class ResourceFmqMirror(mirror_object.MirrorObject): |
| """This is a class that mirrors FMQ resource allocated on the target side. |
| |
| Attributes: |
| SUPPORTED_SCALAR_TYPES: set, contains all scalar types supported by FMQ. |
| If the type of FMQ is one of those, this class |
| prepares the write data from caller provided |
| Python data. |
| _client: VtsTcpClient, the TCP client instance. |
| _queue_id: int, used to identify the queue object on the target side. |
| _data_type: type of data in the queue. |
| _sync: bool, whether the queue is synchronized. |
| """ |
| |
| SUPPORTED_SCALAR_TYPES = { |
| "uint8_t", "int8_t", "uint16_t", "int16_t", "uint32_t", "int32_t", |
| "uint64_t", "int64_t", "bool_t", "double_t" |
| } |
| |
| def __init__(self, data_type, sync, client, queue_id=-1): |
| """Initialize a FMQ mirror. |
| |
| Args: |
| data_type: string, type of data in the queue |
| (e.g. "uint32_t", "int16_t"). |
| sync: bool, whether queue is synchronized (only has one reader). |
| client: VtsTcpClient, specifies the session that this mirror use. |
| queue_id: int, identifies the queue on the target side. |
| Optional if caller initializes a new FMQ mirror. |
| """ |
| super(ResourceFmqMirror, self).__init__(client) |
| self._data_type = data_type |
| self._sync = sync |
| self._queue_id = queue_id |
| |
| def _create(self, queue_id, queue_size, blocking, reset_pointers): |
| """Initiate a fast message queue object on the target side. |
| |
| This method registers a FMQ object on the target side, and stores |
| the queue_id in the class attribute. |
| Users should not directly call this method because it will overwrite |
| the original queue_id stored in the mirror object, leaving that |
| queue object out of reference. |
| Users should always call InitFmq() in mirror_tracker.py to obtain a |
| new queue object. |
| |
| Args: |
| queue_id: int, identifies the message queue object on the target side. |
| queue_size: int, size of the queue. |
| blocking: bool, whether blocking is enabled in the queue. |
| reset_pointers: bool, whether to reset read/write pointers when |
| creating a message queue object based on an existing message queue. |
| """ |
| # Prepare arguments. |
| request_msg = self._createTemplateRequestMessage( |
| ResControlMsg.FMQ_CREATE, queue_id) |
| request_msg.queue_size = queue_size |
| request_msg.blocking = blocking |
| request_msg.reset_pointers = reset_pointers |
| |
| # Send and receive data. |
| fmq_response = self._client.SendFmqRequest(request_msg) |
| if fmq_response is not None and fmq_response.queue_id != -1: |
| self._queue_id = fmq_response.queue_id |
| else: |
| self._queue_id = -1 |
| logging.error("Failed to create a new queue object.") |
| |
| def read(self, data, data_size): |
| """Initiate a non-blocking read request to FMQ driver. |
| |
| Args: |
| data: list, data to be filled by this function. The list will |
| be emptied before the function starts to put read data into |
| it, which is consistent with the function behavior on the |
| target side. |
| data_size: int, length of data to read. |
| |
| Returns: |
| bool, true if the operation succeeds, |
| false otherwise. |
| """ |
| # Prepare arguments. |
| del data[:] |
| request_msg = self._createTemplateRequestMessage( |
| ResControlMsg.FMQ_READ, self._queue_id) |
| request_msg.read_data_size = data_size |
| |
| # Send and receive data. |
| fmq_response = self._client.SendFmqRequest(request_msg) |
| if fmq_response is not None and fmq_response.success: |
| self._extractReadData(fmq_response, data) |
| return True |
| return False |
| |
| # TODO: support long-form blocking read in the future when there is use case. |
| def readBlocking(self, data, data_size, time_out_nanos=0): |
| """Initiate a blocking read request (short-form) to FMQ driver. |
| |
| Args: |
| data: list, data to be filled by this function. The list will |
| be emptied before the function starts to put read data into |
| it, which is consistent with the function behavior on the |
| target side. |
| data_size: int, length of data to read. |
| time_out_nanos: int, wait time (in nanoseconds) when blocking. |
| The default value is 0 (no blocking). |
| |
| Returns: |
| bool, true if the operation succeeds, |
| false otherwise. |
| """ |
| # Prepare arguments. |
| del data[:] |
| request_msg = self._createTemplateRequestMessage( |
| ResControlMsg.FMQ_READ_BLOCKING, self._queue_id) |
| request_msg.read_data_size = data_size |
| request_msg.time_out_nanos = time_out_nanos |
| |
| # Send and receive data. |
| fmq_response = self._client.SendFmqRequest(request_msg) |
| if fmq_response is not None and fmq_response.success: |
| self._extractReadData(fmq_response, data) |
| return True |
| return False |
| |
| def write(self, data, data_size): |
| """Initiate a non-blocking write request to FMQ driver. |
| |
| Args: |
| data: list, data to be written. |
| data_size: int, length of data to write. |
| The function will only write data up until data_size, |
| i.e. extraneous data will be discarded. |
| |
| Returns: |
| bool, true if the operation succeeds, |
| false otherwise. |
| """ |
| # Prepare arguments. |
| request_msg = self._createTemplateRequestMessage( |
| ResControlMsg.FMQ_WRITE, self._queue_id) |
| prepare_result = self._prepareWriteData(request_msg, data[:data_size]) |
| if not prepare_result: |
| # Prepare write data failure, error logged in _prepareWriteData(). |
| return False |
| |
| # Send and receive data. |
| fmq_response = self._client.SendFmqRequest(request_msg) |
| if fmq_response is not None: |
| return fmq_response.success |
| return False |
| |
| # TODO: support long-form blocking write in the future when there is use case. |
| def writeBlocking(self, data, data_size, time_out_nanos=0): |
| """Initiate a blocking write request (short-form) to FMQ driver. |
| |
| Args: |
| data: list, data to be written. |
| data_size: int, length of data to write. |
| The function will only write data up until data_size, |
| i.e. extraneous data will be discarded. |
| time_out_nanos: int, wait time (in nanoseconds) when blocking. |
| The default value is 0 (no blocking). |
| |
| Returns: |
| bool, true if the operation succeeds, |
| false otherwise. |
| """ |
| # Prepare arguments. |
| request_msg = self._createTemplateRequestMessage( |
| ResControlMsg.FMQ_WRITE_BLOCKING, self._queue_id) |
| prepare_result = self._prepareWriteData(request_msg, data[:data_size]) |
| if not prepare_result: |
| # Prepare write data failure, error logged in _prepareWriteData(). |
| return False |
| request_msg.time_out_nanos = time_out_nanos |
| |
| # Send and receive data. |
| fmq_response = self._client.SendFmqRequest(request_msg) |
| if fmq_response is not None: |
| return fmq_response.success |
| return False |
| |
| def availableToWrite(self): |
| """Get space available to write in the queue. |
| |
| Returns: |
| int, number of slots available. |
| """ |
| # Prepare arguments. |
| request_msg = self._createTemplateRequestMessage( |
| ResControlMsg.FMQ_AVAILABLE_WRITE, self._queue_id) |
| |
| # Send and receive data. |
| return self._processUtilMethod(request_msg) |
| |
| def availableToRead(self): |
| """Get number of items available to read. |
| |
| Returns: |
| int, number of items. |
| """ |
| # Prepare arguments. |
| request_msg = self._createTemplateRequestMessage( |
| ResControlMsg.FMQ_AVAILABLE_READ, self._queue_id) |
| |
| # Send and receive data. |
| return self._processUtilMethod(request_msg) |
| |
| def getQuantumSize(self): |
| """Get size of item in the queue. |
| |
| Returns: |
| int, size of item. |
| """ |
| # Prepare arguments. |
| request_msg = self._createTemplateRequestMessage( |
| ResControlMsg.FMQ_GET_QUANTUM_SIZE, self._queue_id) |
| |
| # send and receive data |
| return self._processUtilMethod(request_msg) |
| |
| def getQuantumCount(self): |
| """Get number of items that fit in the queue. |
| |
| Returns: |
| int, number of items. |
| """ |
| # Prepare arguments. |
| request_msg = self._createTemplateRequestMessage( |
| ResControlMsg.FMQ_GET_QUANTUM_COUNT, self._queue_id) |
| |
| # Send and receive data. |
| return self._processUtilMethod(request_msg) |
| |
| def isValid(self): |
| """Check if the queue is valid. |
| |
| Returns: |
| bool, true if the queue is valid. |
| """ |
| # Prepare arguments. |
| request_msg = self._createTemplateRequestMessage( |
| ResControlMsg.FMQ_IS_VALID, self._queue_id) |
| |
| # Send and receive data. |
| fmq_response = self._client.SendFmqRequest(request_msg) |
| if fmq_response is not None: |
| return fmq_response.success |
| return False |
| |
| @property |
| def queueId(self): |
| """Gets the id assigned from the target side. |
| |
| Returns: |
| int, id of the queue. |
| """ |
| return self._queue_id |
| |
| @property |
| def dataType(self): |
| """Get the type of data of this FMQ mirror. |
| |
| Returns: |
| string, type of data in the queue |
| """ |
| return self._data_type |
| |
| @property |
| def sync(self): |
| """Get the synchronization option of this FMQ mirror. |
| |
| Returns: |
| bool, true if the queue is synchronized (only has one reader). |
| """ |
| return self._sync |
| |
| def _createTemplateRequestMessage(self, operation, queue_id): |
| """Creates a template FmqRequestMessage with common arguments among |
| all FMQ operations. |
| |
| Args: |
| operation: FmqOp, fmq operations. |
| (see test/vts/proto/VtsResourceControllerMessage.proto). |
| queue_id: int, identifies the message queue object on target side. |
| |
| Returns: |
| FmqRequestMessage, fmq request message. |
| (See test/vts/proto/VtsResourceControllerMessage.proto). |
| """ |
| request_msg = ResControlMsg.FmqRequestMessage() |
| request_msg.operation = operation |
| request_msg.data_type = self._data_type |
| request_msg.sync = self._sync |
| request_msg.queue_id = queue_id |
| return request_msg |
| |
| def _prepareWriteData(self, request_msg, data): |
| """Converts python list to repeated protobuf field. |
| |
| If the type of data in the queue is a supported scalar, caller can |
| directly supply the python native value. Otherwise, caller needs to |
| supply a list of VariableSpecificationMessage. |
| |
| Args: |
| request_msg: FmqRequestMessage, arguments for a FMQ operation |
| request. |
| data: VariableSpecificationMessage list or a list of scalar values. |
| If the type of FMQ is scalar type, caller can directly |
| specify the Python scalar data. Otherwise, caller has to |
| provide each item as VariableSpecificationMessage. |
| |
| Returns: |
| bool, true if preparation succeeds, false otherwise. |
| This function can fail if caller doesn't provide a list of |
| VariableSpecificationMessage when type of data in the queue |
| is not a supported scalar type. |
| """ |
| for curr_value in data: |
| new_message = request_msg.write_data.add() |
| if isinstance(curr_value, |
| CompSpecMsg.VariableSpecificationMessage): |
| new_message.CopyFrom(curr_value) |
| elif self._data_type in self.SUPPORTED_SCALAR_TYPES: |
| new_message.type = CompSpecMsg.TYPE_SCALAR |
| new_message.scalar_type = self._data_type |
| setattr(new_message.scalar_value, self._data_type, curr_value) |
| else: |
| logging.error("Need to provide VariableSpecificationMessage " + |
| "if type of data in the queue is not a " + |
| "supported scalar type.") |
| return False |
| return True |
| |
| def _extractReadData(self, response_msg, data): |
| """Extracts read data from the response message returned by client. |
| |
| Args: |
| response_msg: FmqResponseMessage, contains response from FMQ driver. |
| data: list, to be filled by this function. data buffer is provided |
| by caller, so this function will append every element to the |
| buffer. |
| """ |
| for item in response_msg.read_data: |
| data.append(self._client.GetPythonDataOfVariableSpecMsg(item)) |
| |
| def _processUtilMethod(self, request_msg): |
| """Sends request message and process response message for util methods |
| that return an unsigned integer, |
| e.g. availableToWrite, availableToRead. |
| |
| Args: |
| request_msg: FmqRequestMessage, arguments for a FMQ operation request. |
| |
| Returns: int, information about the queue, |
| None if the operation is unsuccessful. |
| """ |
| fmq_response = self._client.SendFmqRequest(request_msg) |
| if fmq_response is not None and fmq_response.success: |
| return fmq_response.sizet_return_val |
| return None |
| |
| |
| class ResourceHidlMemoryMirror(mirror_object.MirrorObject): |
| """This class mirrors hidl_memory resource allocated on the target side. |
| |
| Attributes: |
| _client: the TCP client instance. |
| _mem_id: int, used to identify the memory region on the target side. |
| """ |
| |
| def __init__(self, client, mem_id=-1): |
| super(ResourceHidlMemoryMirror, self).__init__(client) |
| self._mem_id = mem_id |
| |
| def _allocate(self, mem_size): |
| """Initiate a hidl_memory region on the target side. |
| |
| This method stores the mem_id in the class attribute. |
| Users should not directly call this method to get a new memory region, |
| because it will overwrite the original memory object with mem_id, |
| making that memory object out of reference. |
| Users should always call InitHidlMemory() in mirror_tracker.py to get |
| a new memory region. |
| |
| Args: |
| mem_size: int, size of the requested memory region. |
| """ |
| # Prepare arguments. |
| request_msg = self._createTemplateRequestMessage( |
| ResControlMsg.MEM_PROTO_ALLOCATE) |
| request_msg.mem_size = mem_size |
| |
| # Send and receive data. |
| response_msg = self._client.SendHidlMemoryRequest(request_msg) |
| if response_msg is not None and response_msg.new_mem_id != -1: |
| self._mem_id = response_msg.new_mem_id |
| else: |
| logging.error("Failed to allocate memory region.") |
| |
| def read(self): |
| """Notify that caller will read the entire memory region. |
| |
| Before every actual read operation, caller must call this method |
| or readRange() first. |
| |
| Returns: |
| bool, true if the operation succeeds, false otherwise. |
| """ |
| request_msg = self._createTemplateRequestMessage( |
| ResControlMsg.MEM_PROTO_START_READ) |
| |
| response_msg = self._client.SendHidlMemoryRequest(request_msg) |
| if response_msg is not None: |
| if not response_msg.success: |
| logging.error("Failed to find memory region with id %d", |
| self._mem_id) |
| return response_msg.success |
| return False |
| |
| def readRange(self, start, length): |
| """Notify that caller will read only part of memory region. |
| |
| Notify that caller will read starting at start and |
| ending at start + length. |
| Before every actual read operation, caller must call this method |
| or read() first. |
| |
| Args: |
| start: int, offset from the start of memory region to be modified. |
| length: int, number of bytes to be modified. |
| |
| Returns: |
| bool, true if the operation succeeds, false otherwise. |
| """ |
| request_msg = self._createTemplateRequestMessage( |
| ResControlMsg.MEM_PROTO_START_READ_RANGE) |
| request_msg.start = start |
| request_msg.length = length |
| |
| response_msg = self._client.SendHidlMemoryRequest(request_msg) |
| if response_msg is not None: |
| if not response_msg.success: |
| logging.error("Failed to find memory region with id %d", |
| self._mem_id) |
| return response_msg.success |
| return False |
| |
| def update(self): |
| """Notify that caller will possibly write to all memory region. |
| |
| Before every actual write operation, caller must call this method |
| or updateRange() first. |
| |
| Returns: |
| bool, true if the operation succeeds, false otherwise. |
| """ |
| request_msg = self._createTemplateRequestMessage( |
| ResControlMsg.MEM_PROTO_START_UPDATE) |
| |
| response_msg = self._client.SendHidlMemoryRequest(request_msg) |
| if response_msg is not None: |
| if not response_msg.success: |
| logging.error("Failed to find memory region with id %d", |
| self._mem_id) |
| return response_msg.success |
| return False |
| |
| def updateRange(self, start, length): |
| """Notify that caller will only write to part of memory region. |
| |
| Notify that caller will only write starting at start and |
| ending at start + length. |
| Before every actual write operation, caller must call this method |
| or update() first. |
| |
| Args: |
| start: int, offset from the start of memory region to be modified. |
| length: int, number of bytes to be modified. |
| |
| Returns: |
| bool, true if the operation succeeds, false otherwise. |
| """ |
| request_msg = self._createTemplateRequestMessage( |
| ResControlMsg.MEM_PROTO_START_UPDATE_RANGE) |
| request_msg.start = start |
| request_msg.length = length |
| |
| response_msg = self._client.SendHidlMemoryRequest(request_msg) |
| if response_msg is not None: |
| if not response_msg.success: |
| logging.error("Failed to find memory region with id %d", |
| self._mem_id) |
| return response_msg.success |
| return False |
| |
| def readBytes(self, length, start=0): |
| """This method performs actual read operation. |
| |
| This method helps caller perform actual read operation on the |
| memory region, because host side won't be able to cast c++ pointers. |
| |
| Args: |
| length: int, number of bytes to read. |
| start: int, offset from the start of memory region to read. |
| |
| Returns: |
| string, data read from memory. |
| Caller can perform conversion on the result to obtain the |
| corresponding data structure in python. |
| None, indicate if the read fails. |
| """ |
| request_msg = self._createTemplateRequestMessage( |
| ResControlMsg.MEM_PROTO_READ_BYTES) |
| request_msg.start = start |
| request_msg.length = length |
| |
| response_msg = self._client.SendHidlMemoryRequest(request_msg) |
| if response_msg is not None: |
| if response_msg.success: |
| return response_msg.read_data |
| logging.error("Failed to find memory region with id %d", |
| self._mem_id) |
| return None |
| |
| def updateBytes(self, data, length, start=0): |
| """This method performs actual write operation. |
| |
| This method helps caller perform actual write operation on the |
| memory region, because host side won't be able to cast c++ pointers. |
| |
| Args: |
| data: string, bytes to be written into memory. |
| Caller can use bytearray() function to convert python |
| data structures into python, and call str() on the resulting |
| bytearray object. |
| length: int, number of bytes to write. |
| start: int, offset from the start of memory region to be modified. |
| |
| Returns: |
| bool, true if the operation succeeds, false otherwise. |
| """ |
| request_msg = self._createTemplateRequestMessage( |
| ResControlMsg.MEM_PROTO_UPDATE_BYTES) |
| request_msg.write_data = data |
| request_msg.start = start |
| request_msg.length = length |
| |
| response_msg = self._client.SendHidlMemoryRequest(request_msg) |
| if response_msg is not None: |
| if not response_msg.success: |
| logging.error("Failed to find memory region with id %d", |
| self._mem_id) |
| return response_msg.success |
| return False |
| |
| def commit(self): |
| """Caller signals done with operating on the memory region. |
| |
| Caller needs to call this method after reading/writing. |
| |
| Returns: |
| bool, true if the operation succeeds, false otherwise. |
| """ |
| request_msg = self._createTemplateRequestMessage( |
| ResControlMsg.MEM_PROTO_COMMIT) |
| |
| response_msg = self._client.SendHidlMemoryRequest(request_msg) |
| if response_msg is not None: |
| if not response_msg.success: |
| logging.error("Failed to find memory region with id %d", |
| self._mem_id) |
| return response_msg.success |
| return False |
| |
| def getSize(self): |
| """Gets the size of the memory region. |
| |
| Returns: |
| int, size of memory region, -1 to signal operation failure. |
| """ |
| request_msg = self._createTemplateRequestMessage( |
| ResControlMsg.MEM_PROTO_GET_SIZE) |
| |
| response_msg = self._client.SendHidlMemoryRequest(request_msg) |
| if response_msg is not None: |
| if response_msg.success: |
| return response_msg.mem_size |
| logging.error("Failed to find memory region with id %d", |
| self._mem_id) |
| return -1 |
| |
| @property |
| def memId(self): |
| """Gets the id assigned from the target side. |
| |
| Returns: |
| int, id of the memory object. |
| """ |
| return self._mem_id |
| |
| def _createTemplateRequestMessage(self, operation): |
| """Creates a template HidlMemoryRequestMessage. |
| |
| This method creates a message that contains common arguments among |
| all hidl_memory operations. |
| |
| Args: |
| operation: HidlMemoryOp, hidl_memory operations. |
| (see test/vts/proto/VtsResourceControllerMessage.proto). |
| |
| Returns: |
| HidlMemoryRequestMessage, hidl_memory request message. |
| (See test/vts/proto/VtsResourceControllerMessage.proto). |
| """ |
| request_msg = ResControlMsg.HidlMemoryRequestMessage() |
| request_msg.operation = operation |
| request_msg.mem_id = self._mem_id |
| return request_msg |
| |
| |
| class ResourceHidlHandleMirror(mirror_object.MirrorObject): |
| """This class mirrors hidl_handle resource allocated on the target side. |
| |
| TODO: support more than file types in the future, e.g. socket, pipe. |
| |
| Attributes: |
| _client: the TCP client instance. |
| _handle_id: int, used to identify the handle object on the target side. |
| """ |
| |
| def __init__(self, client, handle_id=-1): |
| super(ResourceHidlHandleMirror, self).__init__(client) |
| self._handle_id = handle_id |
| |
| def CleanUp(self): |
| """Close open file descriptors on target-side drivers. |
| |
| Developers can call this method to close open file descriptors |
| in all handle objects. |
| Note: This method needs to be called before self._client |
| is disconnected. self._client is most likely initialized in |
| one of the hal_mirror. |
| """ |
| request_msg = self._createTemplateRequestMessage( |
| ResControlMsg.HANDLE_PROTO_DELETE) |
| self._client.SendHidlHandleRequest(request_msg) |
| |
| def _createHandleForSingleFile(self, filepath, mode, int_data): |
| """Initiate a hidl_handle object containing a single file descriptor. |
| |
| This method stores the handle_id in the class attribute. |
| Users should not directly call this method to create a new |
| handle object, because it will overwrite the original handle object, |
| making that handle object out of reference. |
| Users should always call InitHidlHandle() in mirror_tracker.py to get |
| a new handle object. |
| |
| Args: |
| filepath: string, path to the file to be opened. |
| mode: string, specifying the mode to open the file. |
| int_data: int list, useful integers to store in the handle object. |
| """ |
| # Prepare arguments. |
| request_msg = self._createTemplateRequestMessage( |
| ResControlMsg.HANDLE_PROTO_CREATE_FILE) |
| request_msg.handle_info.num_fds = 1 |
| request_msg.handle_info.num_ints = len(int_data) |
| |
| # TODO: support more than one file descriptors at once. |
| # Add the file information into proto message. |
| fd_message = request_msg.handle_info.fd_val.add() |
| fd_message.type = CompSpecMsg.FILE_TYPE |
| fd_message.file_mode_str = mode |
| fd_message.file_name = filepath |
| |
| # Add the integers into proto message. |
| request_msg.handle_info.int_val.extend(int_data) |
| |
| # Send and receive data. |
| response_msg = self._client.SendHidlHandleRequest(request_msg) |
| if response_msg is not None and response_msg.new_handle_id != -1: |
| self._handle_id = response_msg.new_handle_id |
| else: |
| logging.error("Failed to create handle object.") |
| |
| def readFile(self, read_data_size, index=0): |
| """Reads from a given file in the handle object. |
| |
| Args: |
| read_data_size: int, number of bytes to read. |
| index: int, index of file among all files in the handle object. |
| Optional if host only wants to read from one file. |
| |
| Returns: |
| string, data read from the file. |
| """ |
| # Prepare arguments. |
| request_msg = self._createTemplateRequestMessage( |
| ResControlMsg.HANDLE_PROTO_READ_FILE) |
| request_msg.read_data_size = read_data_size |
| |
| # Send and receive data. |
| response_msg = self._client.SendHidlHandleRequest(request_msg) |
| if response_msg is not None and response_msg.success: |
| return response_msg.read_data |
| # TODO: more detailed error message. |
| logging.error("Failed to read from the file.") |
| return None |
| |
| def writeFile(self, write_data, index=0): |
| """Writes to a given file to the handle object. |
| |
| Args: |
| write_data: string, data to be written into file. |
| index: int, index of file among all files in the handle object. |
| Optional if host only wants to write into one file. |
| |
| Returns: |
| int, number of bytes written. |
| """ |
| # Prepare arguments. |
| request_msg = self._createTemplateRequestMessage( |
| ResControlMsg.HANDLE_PROTO_WRITE_FILE) |
| request_msg.write_data = write_data |
| |
| # Send and receive data. |
| response_msg = self._client.SendHidlHandleRequest(request_msg) |
| if response_msg is not None and response_msg.success: |
| return response_msg.write_data_size |
| # TODO: more detailed error message. |
| logging.error("Failed to write into the file.") |
| return None |
| |
| @property |
| def handleId(self): |
| """Gets the id assigned from the target side. |
| |
| Returns: |
| int, id of the handle object. |
| """ |
| return self._handle_id |
| |
| def _createTemplateRequestMessage(self, operation): |
| """Creates a template HidlHandleRequestMessage. |
| |
| This method creates a HidlHandleRequestMessage with common arguments |
| among all hidl_handle operations. |
| |
| Args: |
| operation: HidlHandleOp, hidl_handle operations. |
| (see test/vts/proto/VtsResourceControllerMessage.proto). |
| |
| Returns: |
| HidlHandleRequestMessage, hidl_handle request message. |
| (See test/vts/proto/VtsResourceControllerMessage.proto). |
| """ |
| request_msg = ResControlMsg.HidlHandleRequestMessage() |
| request_msg.operation = operation |
| request_msg.handle_id = self._handle_id |
| return request_msg |