blob: d801ff6aa46015691d1b31fb41e68653cd1a2871 [file] [log] [blame]
#
# Copyright (C) 2018 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
from vts.proto import AndroidSystemControlMessage_pb2 as ASysCtrlMsg
from vts.proto import VtsResourceControllerMessage_pb2 as ResControlMsg
from vts.proto import ComponentSpecificationMessage_pb2 as CompSpecMsg
from vts.utils.python.mirror import mirror_object
class ResourceFmqMirror(mirror_object.MirrorObject):
"""This is a class that mirrors FMQ resource allocated on the target side.
Attributes:
SUPPORTED_SCALAR_TYPES: set, contains all scalar types supported by FMQ.
If the type of FMQ is one of those, this class
prepares the write data from caller provided
Python data.
_client: VtsTcpClient, the TCP client instance.
_queue_id: int, used to identify the queue object on the target side.
_data_type: type of data in the queue.
_sync: bool, whether the queue is synchronized.
"""
SUPPORTED_SCALAR_TYPES = {
"uint8_t", "int8_t", "uint16_t", "int16_t", "uint32_t", "int32_t",
"uint64_t", "int64_t", "bool_t", "double_t"
}
def __init__(self, data_type, sync, client, queue_id=-1):
"""Initialize a FMQ mirror.
Args:
data_type: string, type of data in the queue
(e.g. "uint32_t", "int16_t").
sync: bool, whether queue is synchronized (only has one reader).
client: VtsTcpClient, specifies the session that this mirror use.
queue_id: int, identifies the queue on the target side.
Optional if caller initializes a new FMQ mirror.
"""
super(ResourceFmqMirror, self).__init__(client)
self._data_type = data_type
self._sync = sync
self._queue_id = queue_id
def _create(self, queue_id, queue_size, blocking, reset_pointers):
"""Initiate a fast message queue object on the target side.
This method registers a FMQ object on the target side, and stores
the queue_id in the class attribute.
Users should not directly call this method because it will overwrite
the original queue_id stored in the mirror object, leaving that
queue object out of reference.
Users should always call InitFmq() in mirror_tracker.py to obtain a
new queue object.
Args:
queue_id: int, identifies the message queue object on the target side.
queue_size: int, size of the queue.
blocking: bool, whether blocking is enabled in the queue.
reset_pointers: bool, whether to reset read/write pointers when
creating a message queue object based on an existing message queue.
"""
# Prepare arguments.
request_msg = self._createTemplateRequestMessage(
ResControlMsg.FMQ_CREATE, queue_id)
request_msg.queue_size = queue_size
request_msg.blocking = blocking
request_msg.reset_pointers = reset_pointers
# Send and receive data.
fmq_response = self._client.SendFmqRequest(request_msg)
if fmq_response is not None and fmq_response.queue_id != -1:
self._queue_id = fmq_response.queue_id
else:
self._queue_id = -1
logging.error("Failed to create a new queue object.")
def read(self, data, data_size):
"""Initiate a non-blocking read request to FMQ driver.
Args:
data: list, data to be filled by this function. The list will
be emptied before the function starts to put read data into
it, which is consistent with the function behavior on the
target side.
data_size: int, length of data to read.
Returns:
bool, true if the operation succeeds,
false otherwise.
"""
# Prepare arguments.
del data[:]
request_msg = self._createTemplateRequestMessage(
ResControlMsg.FMQ_READ, self._queue_id)
request_msg.read_data_size = data_size
# Send and receive data.
fmq_response = self._client.SendFmqRequest(request_msg)
if fmq_response is not None and fmq_response.success:
self._extractReadData(fmq_response, data)
return True
return False
# TODO: support long-form blocking read in the future when there is use case.
def readBlocking(self, data, data_size, time_out_nanos=0):
"""Initiate a blocking read request (short-form) to FMQ driver.
Args:
data: list, data to be filled by this function. The list will
be emptied before the function starts to put read data into
it, which is consistent with the function behavior on the
target side.
data_size: int, length of data to read.
time_out_nanos: int, wait time (in nanoseconds) when blocking.
The default value is 0 (no blocking).
Returns:
bool, true if the operation succeeds,
false otherwise.
"""
# Prepare arguments.
del data[:]
request_msg = self._createTemplateRequestMessage(
ResControlMsg.FMQ_READ_BLOCKING, self._queue_id)
request_msg.read_data_size = data_size
request_msg.time_out_nanos = time_out_nanos
# Send and receive data.
fmq_response = self._client.SendFmqRequest(request_msg)
if fmq_response is not None and fmq_response.success:
self._extractReadData(fmq_response, data)
return True
return False
def write(self, data, data_size):
"""Initiate a non-blocking write request to FMQ driver.
Args:
data: list, data to be written.
data_size: int, length of data to write.
The function will only write data up until data_size,
i.e. extraneous data will be discarded.
Returns:
bool, true if the operation succeeds,
false otherwise.
"""
# Prepare arguments.
request_msg = self._createTemplateRequestMessage(
ResControlMsg.FMQ_WRITE, self._queue_id)
prepare_result = self._prepareWriteData(request_msg, data[:data_size])
if not prepare_result:
# Prepare write data failure, error logged in _prepareWriteData().
return False
# Send and receive data.
fmq_response = self._client.SendFmqRequest(request_msg)
if fmq_response is not None:
return fmq_response.success
return False
# TODO: support long-form blocking write in the future when there is use case.
def writeBlocking(self, data, data_size, time_out_nanos=0):
"""Initiate a blocking write request (short-form) to FMQ driver.
Args:
data: list, data to be written.
data_size: int, length of data to write.
The function will only write data up until data_size,
i.e. extraneous data will be discarded.
time_out_nanos: int, wait time (in nanoseconds) when blocking.
The default value is 0 (no blocking).
Returns:
bool, true if the operation succeeds,
false otherwise.
"""
# Prepare arguments.
request_msg = self._createTemplateRequestMessage(
ResControlMsg.FMQ_WRITE_BLOCKING, self._queue_id)
prepare_result = self._prepareWriteData(request_msg, data[:data_size])
if not prepare_result:
# Prepare write data failure, error logged in _prepareWriteData().
return False
request_msg.time_out_nanos = time_out_nanos
# Send and receive data.
fmq_response = self._client.SendFmqRequest(request_msg)
if fmq_response is not None:
return fmq_response.success
return False
def availableToWrite(self):
"""Get space available to write in the queue.
Returns:
int, number of slots available.
"""
# Prepare arguments.
request_msg = self._createTemplateRequestMessage(
ResControlMsg.FMQ_AVAILABLE_WRITE, self._queue_id)
# Send and receive data.
return self._processUtilMethod(request_msg)
def availableToRead(self):
"""Get number of items available to read.
Returns:
int, number of items.
"""
# Prepare arguments.
request_msg = self._createTemplateRequestMessage(
ResControlMsg.FMQ_AVAILABLE_READ, self._queue_id)
# Send and receive data.
return self._processUtilMethod(request_msg)
def getQuantumSize(self):
"""Get size of item in the queue.
Returns:
int, size of item.
"""
# Prepare arguments.
request_msg = self._createTemplateRequestMessage(
ResControlMsg.FMQ_GET_QUANTUM_SIZE, self._queue_id)
# send and receive data
return self._processUtilMethod(request_msg)
def getQuantumCount(self):
"""Get number of items that fit in the queue.
Returns:
int, number of items.
"""
# Prepare arguments.
request_msg = self._createTemplateRequestMessage(
ResControlMsg.FMQ_GET_QUANTUM_COUNT, self._queue_id)
# Send and receive data.
return self._processUtilMethod(request_msg)
def isValid(self):
"""Check if the queue is valid.
Returns:
bool, true if the queue is valid.
"""
# Prepare arguments.
request_msg = self._createTemplateRequestMessage(
ResControlMsg.FMQ_IS_VALID, self._queue_id)
# Send and receive data.
fmq_response = self._client.SendFmqRequest(request_msg)
if fmq_response is not None:
return fmq_response.success
return False
@property
def queueId(self):
"""Gets the id assigned from the target side.
Returns:
int, id of the queue.
"""
return self._queue_id
@property
def dataType(self):
"""Get the type of data of this FMQ mirror.
Returns:
string, type of data in the queue
"""
return self._data_type
@property
def sync(self):
"""Get the synchronization option of this FMQ mirror.
Returns:
bool, true if the queue is synchronized (only has one reader).
"""
return self._sync
def _createTemplateRequestMessage(self, operation, queue_id):
"""Creates a template FmqRequestMessage with common arguments among
all FMQ operations.
Args:
operation: FmqOp, fmq operations.
(see test/vts/proto/VtsResourceControllerMessage.proto).
queue_id: int, identifies the message queue object on target side.
Returns:
FmqRequestMessage, fmq request message.
(See test/vts/proto/VtsResourceControllerMessage.proto).
"""
request_msg = ResControlMsg.FmqRequestMessage()
request_msg.operation = operation
request_msg.data_type = self._data_type
request_msg.sync = self._sync
request_msg.queue_id = queue_id
return request_msg
def _prepareWriteData(self, request_msg, data):
"""Converts python list to repeated protobuf field.
If the type of data in the queue is a supported scalar, caller can
directly supply the python native value. Otherwise, caller needs to
supply a list of VariableSpecificationMessage.
Args:
request_msg: FmqRequestMessage, arguments for a FMQ operation
request.
data: VariableSpecificationMessage list or a list of scalar values.
If the type of FMQ is scalar type, caller can directly
specify the Python scalar data. Otherwise, caller has to
provide each item as VariableSpecificationMessage.
Returns:
bool, true if preparation succeeds, false otherwise.
This function can fail if caller doesn't provide a list of
VariableSpecificationMessage when type of data in the queue
is not a supported scalar type.
"""
for curr_value in data:
new_message = request_msg.write_data.add()
if isinstance(curr_value,
CompSpecMsg.VariableSpecificationMessage):
new_message.CopyFrom(curr_value)
elif self._data_type in self.SUPPORTED_SCALAR_TYPES:
new_message.type = CompSpecMsg.TYPE_SCALAR
new_message.scalar_type = self._data_type
setattr(new_message.scalar_value, self._data_type, curr_value)
else:
logging.error("Need to provide VariableSpecificationMessage " +
"if type of data in the queue is not a " +
"supported scalar type.")
return False
return True
def _extractReadData(self, response_msg, data):
"""Extracts read data from the response message returned by client.
Args:
response_msg: FmqResponseMessage, contains response from FMQ driver.
data: list, to be filled by this function. data buffer is provided
by caller, so this function will append every element to the
buffer.
"""
for item in response_msg.read_data:
data.append(self._client.GetPythonDataOfVariableSpecMsg(item))
def _processUtilMethod(self, request_msg):
"""Sends request message and process response message for util methods
that return an unsigned integer,
e.g. availableToWrite, availableToRead.
Args:
request_msg: FmqRequestMessage, arguments for a FMQ operation request.
Returns: int, information about the queue,
None if the operation is unsuccessful.
"""
fmq_response = self._client.SendFmqRequest(request_msg)
if fmq_response is not None and fmq_response.success:
return fmq_response.sizet_return_val
return None
class ResourceHidlMemoryMirror(mirror_object.MirrorObject):
"""This class mirrors hidl_memory resource allocated on the target side.
Attributes:
_client: the TCP client instance.
_mem_id: int, used to identify the memory region on the target side.
"""
def __init__(self, client, mem_id=-1):
super(ResourceHidlMemoryMirror, self).__init__(client)
self._mem_id = mem_id
def _allocate(self, mem_size):
"""Initiate a hidl_memory region on the target side.
This method stores the mem_id in the class attribute.
Users should not directly call this method to get a new memory region,
because it will overwrite the original memory object with mem_id,
making that memory object out of reference.
Users should always call InitHidlMemory() in mirror_tracker.py to get
a new memory region.
Args:
mem_size: int, size of the requested memory region.
"""
# Prepare arguments.
request_msg = self._createTemplateRequestMessage(
ResControlMsg.MEM_PROTO_ALLOCATE)
request_msg.mem_size = mem_size
# Send and receive data.
response_msg = self._client.SendHidlMemoryRequest(request_msg)
if response_msg is not None and response_msg.new_mem_id != -1:
self._mem_id = response_msg.new_mem_id
else:
logging.error("Failed to allocate memory region.")
def read(self):
"""Notify that caller will read the entire memory region.
Before every actual read operation, caller must call this method
or readRange() first.
Returns:
bool, true if the operation succeeds, false otherwise.
"""
request_msg = self._createTemplateRequestMessage(
ResControlMsg.MEM_PROTO_START_READ)
response_msg = self._client.SendHidlMemoryRequest(request_msg)
if response_msg is not None:
if not response_msg.success:
logging.error("Failed to find memory region with id %d",
self._mem_id)
return response_msg.success
return False
def readRange(self, start, length):
"""Notify that caller will read only part of memory region.
Notify that caller will read starting at start and
ending at start + length.
Before every actual read operation, caller must call this method
or read() first.
Args:
start: int, offset from the start of memory region to be modified.
length: int, number of bytes to be modified.
Returns:
bool, true if the operation succeeds, false otherwise.
"""
request_msg = self._createTemplateRequestMessage(
ResControlMsg.MEM_PROTO_START_READ_RANGE)
request_msg.start = start
request_msg.length = length
response_msg = self._client.SendHidlMemoryRequest(request_msg)
if response_msg is not None:
if not response_msg.success:
logging.error("Failed to find memory region with id %d",
self._mem_id)
return response_msg.success
return False
def update(self):
"""Notify that caller will possibly write to all memory region.
Before every actual write operation, caller must call this method
or updateRange() first.
Returns:
bool, true if the operation succeeds, false otherwise.
"""
request_msg = self._createTemplateRequestMessage(
ResControlMsg.MEM_PROTO_START_UPDATE)
response_msg = self._client.SendHidlMemoryRequest(request_msg)
if response_msg is not None:
if not response_msg.success:
logging.error("Failed to find memory region with id %d",
self._mem_id)
return response_msg.success
return False
def updateRange(self, start, length):
"""Notify that caller will only write to part of memory region.
Notify that caller will only write starting at start and
ending at start + length.
Before every actual write operation, caller must call this method
or update() first.
Args:
start: int, offset from the start of memory region to be modified.
length: int, number of bytes to be modified.
Returns:
bool, true if the operation succeeds, false otherwise.
"""
request_msg = self._createTemplateRequestMessage(
ResControlMsg.MEM_PROTO_START_UPDATE_RANGE)
request_msg.start = start
request_msg.length = length
response_msg = self._client.SendHidlMemoryRequest(request_msg)
if response_msg is not None:
if not response_msg.success:
logging.error("Failed to find memory region with id %d",
self._mem_id)
return response_msg.success
return False
def readBytes(self, length, start=0):
"""This method performs actual read operation.
This method helps caller perform actual read operation on the
memory region, because host side won't be able to cast c++ pointers.
Args:
length: int, number of bytes to read.
start: int, offset from the start of memory region to read.
Returns:
string, data read from memory.
Caller can perform conversion on the result to obtain the
corresponding data structure in python.
None, indicate if the read fails.
"""
request_msg = self._createTemplateRequestMessage(
ResControlMsg.MEM_PROTO_READ_BYTES)
request_msg.start = start
request_msg.length = length
response_msg = self._client.SendHidlMemoryRequest(request_msg)
if response_msg is not None:
if response_msg.success:
return response_msg.read_data
logging.error("Failed to find memory region with id %d",
self._mem_id)
return None
def updateBytes(self, data, length, start=0):
"""This method performs actual write operation.
This method helps caller perform actual write operation on the
memory region, because host side won't be able to cast c++ pointers.
Args:
data: string, bytes to be written into memory.
Caller can use bytearray() function to convert python
data structures into python, and call str() on the resulting
bytearray object.
length: int, number of bytes to write.
start: int, offset from the start of memory region to be modified.
Returns:
bool, true if the operation succeeds, false otherwise.
"""
request_msg = self._createTemplateRequestMessage(
ResControlMsg.MEM_PROTO_UPDATE_BYTES)
request_msg.write_data = data
request_msg.start = start
request_msg.length = length
response_msg = self._client.SendHidlMemoryRequest(request_msg)
if response_msg is not None:
if not response_msg.success:
logging.error("Failed to find memory region with id %d",
self._mem_id)
return response_msg.success
return False
def commit(self):
"""Caller signals done with operating on the memory region.
Caller needs to call this method after reading/writing.
Returns:
bool, true if the operation succeeds, false otherwise.
"""
request_msg = self._createTemplateRequestMessage(
ResControlMsg.MEM_PROTO_COMMIT)
response_msg = self._client.SendHidlMemoryRequest(request_msg)
if response_msg is not None:
if not response_msg.success:
logging.error("Failed to find memory region with id %d",
self._mem_id)
return response_msg.success
return False
def getSize(self):
"""Gets the size of the memory region.
Returns:
int, size of memory region, -1 to signal operation failure.
"""
request_msg = self._createTemplateRequestMessage(
ResControlMsg.MEM_PROTO_GET_SIZE)
response_msg = self._client.SendHidlMemoryRequest(request_msg)
if response_msg is not None:
if response_msg.success:
return response_msg.mem_size
logging.error("Failed to find memory region with id %d",
self._mem_id)
return -1
@property
def memId(self):
"""Gets the id assigned from the target side.
Returns:
int, id of the memory object.
"""
return self._mem_id
def _createTemplateRequestMessage(self, operation):
"""Creates a template HidlMemoryRequestMessage.
This method creates a message that contains common arguments among
all hidl_memory operations.
Args:
operation: HidlMemoryOp, hidl_memory operations.
(see test/vts/proto/VtsResourceControllerMessage.proto).
Returns:
HidlMemoryRequestMessage, hidl_memory request message.
(See test/vts/proto/VtsResourceControllerMessage.proto).
"""
request_msg = ResControlMsg.HidlMemoryRequestMessage()
request_msg.operation = operation
request_msg.mem_id = self._mem_id
return request_msg
class ResourceHidlHandleMirror(mirror_object.MirrorObject):
"""This class mirrors hidl_handle resource allocated on the target side.
TODO: support more than file types in the future, e.g. socket, pipe.
Attributes:
_client: the TCP client instance.
_handle_id: int, used to identify the handle object on the target side.
"""
def __init__(self, client, handle_id=-1):
super(ResourceHidlHandleMirror, self).__init__(client)
self._handle_id = handle_id
def CleanUp(self):
"""Close open file descriptors on target-side drivers.
Developers can call this method to close open file descriptors
in all handle objects.
Note: This method needs to be called before self._client
is disconnected. self._client is most likely initialized in
one of the hal_mirror.
"""
request_msg = self._createTemplateRequestMessage(
ResControlMsg.HANDLE_PROTO_DELETE)
self._client.SendHidlHandleRequest(request_msg)
def _createHandleForSingleFile(self, filepath, mode, int_data):
"""Initiate a hidl_handle object containing a single file descriptor.
This method stores the handle_id in the class attribute.
Users should not directly call this method to create a new
handle object, because it will overwrite the original handle object,
making that handle object out of reference.
Users should always call InitHidlHandle() in mirror_tracker.py to get
a new handle object.
Args:
filepath: string, path to the file to be opened.
mode: string, specifying the mode to open the file.
int_data: int list, useful integers to store in the handle object.
"""
# Prepare arguments.
request_msg = self._createTemplateRequestMessage(
ResControlMsg.HANDLE_PROTO_CREATE_FILE)
request_msg.handle_info.num_fds = 1
request_msg.handle_info.num_ints = len(int_data)
# TODO: support more than one file descriptors at once.
# Add the file information into proto message.
fd_message = request_msg.handle_info.fd_val.add()
fd_message.type = CompSpecMsg.FILE_TYPE
fd_message.file_mode_str = mode
fd_message.file_name = filepath
# Add the integers into proto message.
request_msg.handle_info.int_val.extend(int_data)
# Send and receive data.
response_msg = self._client.SendHidlHandleRequest(request_msg)
if response_msg is not None and response_msg.new_handle_id != -1:
self._handle_id = response_msg.new_handle_id
else:
logging.error("Failed to create handle object.")
def readFile(self, read_data_size, index=0):
"""Reads from a given file in the handle object.
Args:
read_data_size: int, number of bytes to read.
index: int, index of file among all files in the handle object.
Optional if host only wants to read from one file.
Returns:
string, data read from the file.
"""
# Prepare arguments.
request_msg = self._createTemplateRequestMessage(
ResControlMsg.HANDLE_PROTO_READ_FILE)
request_msg.read_data_size = read_data_size
# Send and receive data.
response_msg = self._client.SendHidlHandleRequest(request_msg)
if response_msg is not None and response_msg.success:
return response_msg.read_data
# TODO: more detailed error message.
logging.error("Failed to read from the file.")
return None
def writeFile(self, write_data, index=0):
"""Writes to a given file to the handle object.
Args:
write_data: string, data to be written into file.
index: int, index of file among all files in the handle object.
Optional if host only wants to write into one file.
Returns:
int, number of bytes written.
"""
# Prepare arguments.
request_msg = self._createTemplateRequestMessage(
ResControlMsg.HANDLE_PROTO_WRITE_FILE)
request_msg.write_data = write_data
# Send and receive data.
response_msg = self._client.SendHidlHandleRequest(request_msg)
if response_msg is not None and response_msg.success:
return response_msg.write_data_size
# TODO: more detailed error message.
logging.error("Failed to write into the file.")
return None
@property
def handleId(self):
"""Gets the id assigned from the target side.
Returns:
int, id of the handle object.
"""
return self._handle_id
def _createTemplateRequestMessage(self, operation):
"""Creates a template HidlHandleRequestMessage.
This method creates a HidlHandleRequestMessage with common arguments
among all hidl_handle operations.
Args:
operation: HidlHandleOp, hidl_handle operations.
(see test/vts/proto/VtsResourceControllerMessage.proto).
Returns:
HidlHandleRequestMessage, hidl_handle request message.
(See test/vts/proto/VtsResourceControllerMessage.proto).
"""
request_msg = ResControlMsg.HidlHandleRequestMessage()
request_msg.operation = operation
request_msg.handle_id = self._handle_id
return request_msg