blob: c54b01242a836840a4010cff1664e200d86467d2 [file] [log] [blame]
# Copyright (C) 2020 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utility functions for GUI
This module might get loaded in workers. By separating it out, we
spare workers from having to load GUI modules.
"""
import logging
from itertools import count
from collections import defaultdict
import numpy as np
from modernmp.util import (
ChainableFuture,
ClosingContextManager,
safe_close,
the,
)
from modernmp.process import WorkerProcess
from modernmp.apartment import proxy_for, Proxy
from .model import TraceAnalysisSession, FileTraceContext
from .util import (
ExplicitInheritance,
Immutable,
final,
future_exception,
override,
tattr,
)
from .query import (
QuerySchema,
QueryNode,
)
log = logging.getLogger(__name__)
class QpRemoteSession(ExplicitInheritance):
"""Query-process state object"""
__initialized = False
@override
def __init__(self, *, query_completion_callback):
assert callable(query_completion_callback)
self.__session = TraceAnalysisSession()
self.__query_completion_callback = query_completion_callback
self.__initialized = True
def __del__(self):
if self.__initialized:
safe_close(self.__session)
def mount_trace(self,
mount_path,
trace_file_name,
*,
progress_callback=None):
"""Mount a trace in the underlying trace session"""
if progress_callback:
log.warning("ignoring progress_callback from GUI: FIXME")
# TODO(dancol): make progress_callback work again
self.__session.mount_trace(
mount_path,
FileTraceContext(trace_file_name))
def batch_sql_query(self,
query_bundles_and_qids,
*,
progress_callback=None):
"""Execute a batch of SQL queries
The pre-set QUERY_COMPLETION_CALLBACK supplied to this class's
constructor is invoked once for each query result that becomes
available, in arbitrary order. QUERY_COMPLETION_CALLBACK is
invoked with two arguments: a sequence of QIDs and a numpy array
providing data for those QIDs. (It's possible for a single array
to supply the data for multiple input queries.)
"""
session = self.__session
qids_by_qn = defaultdict(list)
# We require that each query bundle describe a single-columned SQL
# query, which is why the QueryNode.coerce_ below is valid.
for query_bundle, qid in query_bundles_and_qids:
qn = QueryNode.coerce_(
session.parse_sql_query(
query_bundle.sql,
query_bundle.args))
qids_by_qn[qn].append(qid)
st_cache = None
# GUI queries are typically of modest size (because windowing) and
# so we query by whole columns instead of accepting row chunks.
for qn, array in session.qe.execute_for_columns(
qids_by_qn.keys(),
progress_callback=progress_callback):
if isinstance(array, np.ma.masked_array):
log.warning("XXX ignoring NULLs in result set")
array = array.data # TODO(dancol): XXX FIXME WTF
if qn.schema.is_string:
if not st_cache:
st_cache = session.st.make_lookup_cache()
array = session.st.vlookup(array, st_cache)
self.__query_completion_callback(qids_by_qn[qn], array)
def study_query_bundle(self, query_bundle, *, progress_callback=None):
"""Analyze a query, returning a StudyResult"""
# TODO(dancol): use progress_callback?
assert not progress_callback or callable(progress_callback)
qt = self.__session.parse_sql_query(query_bundle.sql)
return StudyResult(
columns=qt.columns,
schemas=[qt[column].schema for column in qt.columns],
)
class StudyResult(Immutable):
"""Result of analyzing a query for GUI display"""
columns = tattr(str)
schemas = tattr(QuerySchema)
@final
class RemoteSession(ExplicitInheritance, ClosingContextManager):
"""Holder for trace in subprocess
The GUI process holds an instance of this object, which provides
convenience methods for asynchronously performing operations on the
corresponding TraceAnalysisSession object in the query process.
"""
__qp_remote_session = None
@override
def __init__(self, wp):
"""Don't call: use RemoteSession.open_async() instead"""
super().__init__()
self.wp = the(WorkerProcess, wp)
self.__pending_queries_by_qid = {}
self.__qid_counter = count()
@staticmethod
async def open_async():
"""Begin analyzing a trace"""
# pylint: disable=protected-access
remote_session = RemoteSession(WorkerProcess())
query_completion_callback = proxy_for(remote_session) \
.__on_query_completion.call_oneway
qp_remote_session = await remote_session.wp.call_gate \
.fn(QpRemoteSession).proxy_ret().call_aio(
query_completion_callback=query_completion_callback)
assert isinstance(qp_remote_session, Proxy)
remote_session.__qp_remote_session = qp_remote_session
return remote_session
async def mount_trace_async(self,
mount_path,
trace_file_name,
*,
progress_callback=None):
"""Asynchronously mount a trace in the query process"""
return await self.__qp_remote_session.mount_trace.call_aio(
mount_path,
trace_file_name,
progress_callback=progress_callback,
)
def __on_query_completion(self, qids, array):
for qid in qids:
query_future = self.__pending_queries_by_qid.get(qid)
if query_future:
query_future.set_result(array)
def __on_query_future_done(self, query_future):
# pylint: disable=protected-access
del self.__pending_queries_by_qid[query_future.__qid]
@staticmethod
def __on_batch_future_done(batch_future):
# pylint: disable=protected-access
if future_exception(batch_future):
ex = future_exception(batch_future)
for query_future in batch_future.__query_futures:
if not query_future.done():
query_future.set_exception(ex)
else:
assert all(query_future.done()
for query_future in batch_future.__query_futures)
def batch_sql_query_async(self,
query_bundles,
*,
progress_callback=None):
"""Run a batch of queries.
QUERY_BUNDLES is a sequence of SqlBundle objects describing
queries to run simultaneously.
Return (BATCH_FUTURE, QUERY_FUTURES), where BATCH_FUTURE is the
future representing the completion of the batch as a whole
(yielding None on success) and QUERY_FUTURES is a sequence of
features representing the corresponding queries in QUERY_BUNDLES,
each yielding a numpy array.
"""
# pylint: disable=protected-access
query_futures = []
query_bundles_and_qids = []
try:
for query_bundle in query_bundles:
qid = next(self.__qid_counter)
query_future = ChainableFuture()
# TODO(dancol): support cancelation of these futures
query_future.set_running_or_notify_cancel()
query_future.__qid = qid
query_futures.append(query_future)
self.__pending_queries_by_qid[qid] = query_future
query_future.add_done_callback(self.__on_query_future_done)
query_bundles_and_qids.append((query_bundle, qid))
assert len(query_futures) == len(query_bundles_and_qids)
batch_future = self.__qp_remote_session.batch_sql_query.call_async(
query_bundles_and_qids,
progress_callback=progress_callback,
)
batch_future.__query_futures = query_futures
batch_future.add_done_callback(self.__on_batch_future_done)
return batch_future, query_futures
except:
for query_future in query_futures:
self.__pending_queries_by_qid.pop(query_future.__qid, None)
raise
async def study_query_bundle_async(self,
query_bundle,
*,
progress_callback=None):
"""Study a query"""
return await self.__qp_remote_session.study_query_bundle.call_aio(
query_bundle,
progress_callback=progress_callback,
)
@override
def _do_close(self):
super()._do_close()
self.__qp_remote_session = None # Drop reference