| # Copyright (C) 2020 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http:#www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| """Utility functions for GUI |
| |
| This module might get loaded in workers. By separating it out, we |
| spare workers from having to load GUI modules. |
| """ |
| |
| import logging |
| from itertools import count |
| from collections import defaultdict |
| import numpy as np |
| from modernmp.util import ( |
| ChainableFuture, |
| ClosingContextManager, |
| safe_close, |
| the, |
| ) |
| from modernmp.process import WorkerProcess |
| from modernmp.apartment import proxy_for, Proxy |
| from .model import TraceAnalysisSession, FileTraceContext |
| from .util import ( |
| ExplicitInheritance, |
| Immutable, |
| final, |
| future_exception, |
| override, |
| tattr, |
| ) |
| from .query import ( |
| QuerySchema, |
| QueryNode, |
| ) |
| |
| log = logging.getLogger(__name__) |
| |
| class QpRemoteSession(ExplicitInheritance): |
| """Query-process state object""" |
| |
| __initialized = False |
| |
| @override |
| def __init__(self, *, query_completion_callback): |
| assert callable(query_completion_callback) |
| self.__session = TraceAnalysisSession() |
| self.__query_completion_callback = query_completion_callback |
| self.__initialized = True |
| |
| def __del__(self): |
| if self.__initialized: |
| safe_close(self.__session) |
| |
| def mount_trace(self, |
| mount_path, |
| trace_file_name, |
| *, |
| progress_callback=None): |
| """Mount a trace in the underlying trace session""" |
| if progress_callback: |
| log.warning("ignoring progress_callback from GUI: FIXME") |
| # TODO(dancol): make progress_callback work again |
| self.__session.mount_trace( |
| mount_path, |
| FileTraceContext(trace_file_name)) |
| |
| def batch_sql_query(self, |
| query_bundles_and_qids, |
| *, |
| progress_callback=None): |
| """Execute a batch of SQL queries |
| |
| The pre-set QUERY_COMPLETION_CALLBACK supplied to this class's |
| constructor is invoked once for each query result that becomes |
| available, in arbitrary order. QUERY_COMPLETION_CALLBACK is |
| invoked with two arguments: a sequence of QIDs and a numpy array |
| providing data for those QIDs. (It's possible for a single array |
| to supply the data for multiple input queries.) |
| """ |
| session = self.__session |
| qids_by_qn = defaultdict(list) |
| # We require that each query bundle describe a single-columned SQL |
| # query, which is why the QueryNode.coerce_ below is valid. |
| for query_bundle, qid in query_bundles_and_qids: |
| qn = QueryNode.coerce_( |
| session.parse_sql_query( |
| query_bundle.sql, |
| query_bundle.args)) |
| qids_by_qn[qn].append(qid) |
| st_cache = None |
| # GUI queries are typically of modest size (because windowing) and |
| # so we query by whole columns instead of accepting row chunks. |
| for qn, array in session.qe.execute_for_columns( |
| qids_by_qn.keys(), |
| progress_callback=progress_callback): |
| if isinstance(array, np.ma.masked_array): |
| log.warning("XXX ignoring NULLs in result set") |
| array = array.data # TODO(dancol): XXX FIXME WTF |
| if qn.schema.is_string: |
| if not st_cache: |
| st_cache = session.st.make_lookup_cache() |
| array = session.st.vlookup(array, st_cache) |
| self.__query_completion_callback(qids_by_qn[qn], array) |
| |
| def study_query_bundle(self, query_bundle, *, progress_callback=None): |
| """Analyze a query, returning a StudyResult""" |
| # TODO(dancol): use progress_callback? |
| assert not progress_callback or callable(progress_callback) |
| qt = self.__session.parse_sql_query(query_bundle.sql) |
| return StudyResult( |
| columns=qt.columns, |
| schemas=[qt[column].schema for column in qt.columns], |
| ) |
| |
| class StudyResult(Immutable): |
| """Result of analyzing a query for GUI display""" |
| columns = tattr(str) |
| schemas = tattr(QuerySchema) |
| |
| @final |
| class RemoteSession(ExplicitInheritance, ClosingContextManager): |
| """Holder for trace in subprocess |
| |
| The GUI process holds an instance of this object, which provides |
| convenience methods for asynchronously performing operations on the |
| corresponding TraceAnalysisSession object in the query process. |
| """ |
| |
| __qp_remote_session = None |
| |
| @override |
| def __init__(self, wp): |
| """Don't call: use RemoteSession.open_async() instead""" |
| super().__init__() |
| self.wp = the(WorkerProcess, wp) |
| self.__pending_queries_by_qid = {} |
| self.__qid_counter = count() |
| |
| @staticmethod |
| async def open_async(): |
| """Begin analyzing a trace""" |
| # pylint: disable=protected-access |
| remote_session = RemoteSession(WorkerProcess()) |
| query_completion_callback = proxy_for(remote_session) \ |
| .__on_query_completion.call_oneway |
| qp_remote_session = await remote_session.wp.call_gate \ |
| .fn(QpRemoteSession).proxy_ret().call_aio( |
| query_completion_callback=query_completion_callback) |
| assert isinstance(qp_remote_session, Proxy) |
| remote_session.__qp_remote_session = qp_remote_session |
| return remote_session |
| |
| async def mount_trace_async(self, |
| mount_path, |
| trace_file_name, |
| *, |
| progress_callback=None): |
| """Asynchronously mount a trace in the query process""" |
| return await self.__qp_remote_session.mount_trace.call_aio( |
| mount_path, |
| trace_file_name, |
| progress_callback=progress_callback, |
| ) |
| |
| def __on_query_completion(self, qids, array): |
| for qid in qids: |
| query_future = self.__pending_queries_by_qid.get(qid) |
| if query_future: |
| query_future.set_result(array) |
| |
| def __on_query_future_done(self, query_future): |
| # pylint: disable=protected-access |
| del self.__pending_queries_by_qid[query_future.__qid] |
| |
| @staticmethod |
| def __on_batch_future_done(batch_future): |
| # pylint: disable=protected-access |
| if future_exception(batch_future): |
| ex = future_exception(batch_future) |
| for query_future in batch_future.__query_futures: |
| if not query_future.done(): |
| query_future.set_exception(ex) |
| else: |
| assert all(query_future.done() |
| for query_future in batch_future.__query_futures) |
| |
| def batch_sql_query_async(self, |
| query_bundles, |
| *, |
| progress_callback=None): |
| """Run a batch of queries. |
| |
| QUERY_BUNDLES is a sequence of SqlBundle objects describing |
| queries to run simultaneously. |
| |
| Return (BATCH_FUTURE, QUERY_FUTURES), where BATCH_FUTURE is the |
| future representing the completion of the batch as a whole |
| (yielding None on success) and QUERY_FUTURES is a sequence of |
| features representing the corresponding queries in QUERY_BUNDLES, |
| each yielding a numpy array. |
| """ |
| # pylint: disable=protected-access |
| query_futures = [] |
| query_bundles_and_qids = [] |
| try: |
| for query_bundle in query_bundles: |
| qid = next(self.__qid_counter) |
| query_future = ChainableFuture() |
| |
| # TODO(dancol): support cancelation of these futures |
| query_future.set_running_or_notify_cancel() |
| |
| query_future.__qid = qid |
| query_futures.append(query_future) |
| self.__pending_queries_by_qid[qid] = query_future |
| query_future.add_done_callback(self.__on_query_future_done) |
| query_bundles_and_qids.append((query_bundle, qid)) |
| assert len(query_futures) == len(query_bundles_and_qids) |
| batch_future = self.__qp_remote_session.batch_sql_query.call_async( |
| query_bundles_and_qids, |
| progress_callback=progress_callback, |
| ) |
| batch_future.__query_futures = query_futures |
| batch_future.add_done_callback(self.__on_batch_future_done) |
| return batch_future, query_futures |
| except: |
| for query_future in query_futures: |
| self.__pending_queries_by_qid.pop(query_future.__qid, None) |
| raise |
| |
| async def study_query_bundle_async(self, |
| query_bundle, |
| *, |
| progress_callback=None): |
| """Study a query""" |
| return await self.__qp_remote_session.study_query_bundle.call_aio( |
| query_bundle, |
| progress_callback=progress_callback, |
| ) |
| |
| @override |
| def _do_close(self): |
| super()._do_close() |
| self.__qp_remote_session = None # Drop reference |