| # Copyright (C) 2020 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http:#www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| """Query operator implementations |
| |
| This file provides non-core query engine operators, e.g., some of the |
| complicated span operations and associated data manipulation |
| facilities. If something feels like basic SQL, it probably goes in |
| query.py. |
| """ |
| |
| import logging |
| from functools import partial |
| |
| import numpy as np |
| from numpy import ndarray |
| from numpy.ma import masked_array, nomask |
| |
| from cytoolz import ( |
| cons, |
| ) |
| |
| from modernmp.util import the, assert_seq_type |
| |
| from .util import ( |
| AutoNumber, |
| BOOL, |
| EqImmutable, |
| FLOAT64, |
| INT64, |
| UINT64, |
| abstract, |
| all_same, |
| cached_property, |
| consume_list, |
| enumattr, |
| final, |
| iattr, |
| is_nondecreasing, |
| load_pandas, |
| make_pd_dataframe, |
| override, |
| sattr, |
| tattr, |
| ) |
| |
| from .query import ( |
| C_UNIQUE, |
| DTYPE_KINDS, |
| DURATION_SCHEMA, |
| GenericQueryTable, |
| INDEXER_SCHEMA, |
| InvalidQueryException, |
| QueryAction, |
| QueryNode, |
| QuerySchema, |
| QueryTable, |
| SPAN_UNPARTITIONED_TIME_MAJOR, |
| SimpleQueryNode, |
| TS_SCHEMA, |
| TableKind, |
| TableSchema, |
| TableSorting, |
| check_same_lengths, |
| iattr_query_node_int, |
| ) |
| |
| from ._native import ( |
| DTYPE_COMMAND_STREAM, |
| TSTS_CLOSE, |
| TSTS_CLOSE_BROADCAST, |
| TSTS_OPEN, |
| agg, |
| backfill, |
| get_supported_native_aggregations, |
| merge_spans_into_events, |
| native_aggregation_info, |
| native_aggregation_result_dtype, |
| npy_broadcast_to, |
| npy_explode, |
| npy_get_mask, |
| npy_has_mask, |
| npy_make_rw_view, |
| span_fixup, |
| span_group, |
| span_group_column, |
| span_merge_join, |
| span_pivot, |
| span_pivot_column, |
| stackify_start_stop_qqqq, |
| time_series_to_span, |
| ) |
| |
| from .querythread import( |
| SynchronousInput, |
| SynchronousOutput, |
| SynchronousQueryExecution, |
| ) |
| |
| log = logging.getLogger(__name__) |
| |
| class AutoMultiQueryBaseAction(QueryAction): |
| """Base class for auto-generated MIMO QueryAction classes""" |
| # Precise type asserted in subclass, manual or automatic |
| config = iattr(kwonly=True) |
| @override |
| def _compute_outputs(self): |
| return self.config.compute_outputs() |
| @override |
| def _compute_inputs(self): |
| return self.config.compute_inputs() |
| @override |
| async def run_async(self, qe): |
| if type(self.config).run_async is not AutoMultiQuery.run_async: |
| await self.config.run_async(qe) |
| else: |
| await super().run_async(qe) |
| |
| class AutoMultiQueryMeta(type(EqImmutable)): |
| """Metaclass for AutoMultiQuery""" |
| def __new__(mcs, cls_name, bases, dict_): |
| # pylint: disable=missing-docstring |
| cls = super().__new__(mcs, cls_name, bases, dict_) |
| if cls_name == "AutoMultiQuery": |
| return cls |
| Meta = cls.Meta |
| assert issubclass(Meta, AutoMultiQuery.AutoMeta), \ |
| ("AutoMultiQuery Meta inner class must be subclass of " |
| "AutoMultiQuery.AutoMeta: found {}".format(Meta)) |
| meta_schema = Meta._meta_schema # pylint: disable=protected-access |
| Action = getattr(cls, "Action", None) |
| if Action: |
| assert issubclass(Action, AutoMultiQueryBaseAction) |
| else: |
| class Action(AutoMultiQueryBaseAction): # pylint: disable=function-redefined |
| @override |
| def _post_init_assert(self): |
| super()._post_init_assert() |
| assert isinstance(self.config, cls) |
| Action.__qualname__ = cls.__qualname__ + "." + Action.__name__ |
| cls.Action = Action |
| |
| class MetaQuery(QueryNode): |
| config = iattr(cls) |
| meta = enumattr(cls.Meta) |
| @override |
| def _compute_schema(self): |
| return meta_schema[self.meta](self.config) |
| |
| @override |
| def countq(self): |
| return self.config.countq_for_meta(self) or super().countq() |
| |
| @override |
| def make_action(self): |
| return Action(config=self.config) |
| MetaQuery.__qualname__ = cls.__qualname__ + "." + MetaQuery.__name__ |
| cls.MetaQuery = MetaQuery |
| |
| return cls |
| |
| class AutoMultiQueryMetaMeta(type(AutoNumber)): |
| """Schematize meta queries from AutoMultiQuery""" |
| def __new__(mcs, cls_name, bases, dict_): |
| if cls_name == "AutoMeta": |
| return super().__new__(mcs, cls_name, bases, dict_) |
| assert bases == (AutoMultiQuery.AutoMeta,) |
| def _make_schema_generator(value): |
| assert callable(value) or isinstance(value, QuerySchema), \ |
| ("AutoMultiQuery metadata value should be schema or " |
| "lambda yielding a schema: found {!r}".format(value)) |
| return value if callable(value) else lambda _: value |
| schemas = [] |
| for name in tuple(dict_): |
| if not name.startswith("__"): |
| schemas.append((name, _make_schema_generator(dict_[name]))) |
| dict_[name] = () |
| cls = super().__new__(mcs, cls_name, bases, dict_) |
| # pylint: disable=protected-access |
| cls._meta_schema = { |
| getattr(cls, name): schema |
| for name, schema in schemas |
| } |
| return cls |
| |
| class AutoMultiQuery(EqImmutable, metaclass=AutoMultiQueryMeta): |
| """Automatically define QueryNode boilerplate |
| |
| This class is useful for defining a family of related *Config, |
| *Meta, *MetaQuery, *Action classes that together describe a |
| multi-input, multi-output query. Previously, we just wrote all |
| these classes by hand, but since they've come to obey a consistent |
| pattern, we generate them instead. |
| |
| To use AutoMultiQuery, derive a class from this one. The class will |
| become the query family "config" object. |
| """ |
| |
| class AutoMeta(AutoNumber, metaclass=AutoMultiQueryMetaMeta): |
| """Meta class for AutoMultiQueryMetaMeta""" |
| |
| @abstract |
| def compute_inputs(self): |
| """Compute action inputs""" |
| raise NotImplementedError("abstract") |
| |
| @abstract |
| def compute_outputs(self): |
| """Compute action outputs""" |
| raise NotImplementedError("abstract") |
| |
| @abstract |
| async def run_async(self, qe): |
| """Execute the query asynchronously""" |
| raise NotImplementedError("abstract") |
| |
| def metaq(self, meta): |
| """Create a query object for a metadatum""" |
| # pylint: disable=no-member |
| return self.MetaQuery(self, meta) |
| |
| def countq_for_meta(self, meta): # pylint: disable=unused-argument,no-self-use |
| """Count query for a meta query |
| |
| Return None to use the default""" |
| return None |
| |
| def make_query_table(self): |
| """Make a query table for the metadata of this configuration. |
| |
| The column names are the lowercase versions of the values in the |
| metadata enumeration. This conversion is appropriate for most |
| AutoMultiQuery users. |
| """ |
| # pylint: disable=no-member |
| return GenericQueryTable([ |
| (meta_name.lower(), self.metaq(meta)) |
| for meta_name, meta in self.Meta.__members__.items() |
| ]) |
| |
| @final |
| class SpanTableConfig(EqImmutable): |
| """Span table query bundle""" |
| |
| # The reason we have a SpanTableConfig object instead of just |
| # accepting a QueryTable where we want a span table is that |
| # SpanTableConfig is conceptually a query bundle that's immutable |
| # and comparable, whereas a QueryTable isn't meant to be long-lived |
| # or comparable. |
| |
| ts = TS_SCHEMA.qn_iattr() |
| duration = DURATION_SCHEMA.qn_iattr() |
| partition = iattr(QueryNode, nullable=True, default=None) |
| |
| @staticmethod |
| def from_qt(qt): |
| """Build from a QueryTable |
| |
| The resulting SpanTableConfig contains references to specific |
| QueryNode instances from QT and not QT itself. |
| """ |
| table_schema = qt.table_schema |
| if table_schema.kind != TableKind.SPAN: |
| raise InvalidQueryException("table is not a span table") |
| if table_schema.partition: |
| return SpanTableConfig( |
| qt["_ts"], |
| qt["_duration"], |
| qt[table_schema.partition]) |
| return SpanTableConfig(qt["_ts"], qt["_duration"]) |
| |
| @property |
| def inputs(self): |
| """Generate query inputs""" |
| if self.partition: |
| return (self.ts, |
| self.duration, |
| self.partition) |
| return (self.ts, |
| self.duration) |
| |
| def make_synchronous_input(self, |
| fill_partition=False, |
| extra=()): |
| """Make a synchronous input object""" |
| # We need to know the fill column so that we can apply special |
| # relaxed casting rules to it. |
| if self.partition: |
| return SynchronousInput( |
| self.ts, |
| self.duration, |
| SynchronousInput.Column( |
| self.partition, |
| partition=True), |
| *extra) |
| if fill_partition: |
| return SynchronousInput(self.ts, self.duration, 0, *extra) |
| return SynchronousInput(self.ts, self.duration, *extra) |
| |
| class EventTableConfig(EqImmutable): |
| """Event table query bundle""" |
| |
| ts = TS_SCHEMA.qn_iattr() |
| partition = iattr(QueryNode, nullable=True, default=None) |
| |
| @property |
| def inputs(self): |
| """Generate query inputs""" |
| if self.partition: |
| return (self.ts, self.partition) |
| return (self.ts,) |
| |
| def make_synchronous_input(self, fill_partition=False): |
| """Like inputs, but substitute missing partition with zero""" |
| if self.partition: |
| return SynchronousInput( |
| self.ts, |
| SynchronousInput.Column( |
| self.partition, |
| partition=True), |
| ) |
| if fill_partition: |
| return SynchronousInput(self.ts, 0) |
| return SynchronousInput(self.ts) |
| |
| |
| # Grouping |
| |
| class GroupMeta(AutoNumber): |
| """Metadata needed for a group-by operation""" |
| LABELS = () |
| GROUP_IDS = () |
| NGROUPS = () |
| |
| @final |
| class PandasGroupMetaQuery(QueryNode): |
| """Query that yields the factorized labels assigned to groups""" |
| |
| group_by = tattr(QueryNode) |
| meta = enumattr(GroupMeta) |
| |
| @override |
| def _compute_schema(self): |
| # TODO(dancol): do we ever want to apply constraints here? |
| domain = None if self.meta is GroupMeta.NGROUPS else "group_meta" |
| return QuerySchema(INT64, domain=domain) |
| |
| @override |
| def make_action(self): |
| return PandasGroupMetaAction(self.group_by) |
| |
| def _make_grouper(arrays): |
| # Can modify arrays list destructively, but not the array data |
| def _fix_array(array): |
| # This whole function shouldn't be necessary. We should write our |
| # own grouping factorization stuff that works on any dtype. |
| dtype = None |
| if array.dtype == UINT64: |
| dtype = INT64 # Ugh |
| return npy_make_rw_view(array, dtype) |
| arrays = [_fix_array(a) for a in consume_list(arrays)] |
| pd = load_pandas() |
| Grouping = pd.core.groupby.groupby.Grouping |
| group_axis = pd.RangeIndex(len(arrays[0])) |
| return pd.core.groupby.groupby.BaseGrouper( |
| group_axis, |
| [Grouping(group_axis, array, sort=False) for array in arrays], |
| sort=False) |
| |
| def expand_nullable(arrays): |
| """Potentially expand ARRAYS with nullability information. |
| |
| Return (EXPANDED_ARRAYS, IS_EXPANDED). EXPANDED_ARRAYS is a |
| sequence of arrays with one or two arrays for each array in ARRAYS. |
| In the latter case, the original array is said to be "expanded", and |
| the extra entry in EXPANDED_ARRAYS is an int64 column that's nonzero |
| if the corresponding array is masked. IS_EXPANDED is a sequence of |
| booleans equal in lengh to ARRAYS indicating, for each original |
| array in ARRAYS, whether we supplied one or two entries in |
| EXPANDED_ARRAYS. |
| |
| N.B. ARRAYS, to be specific, must be a generic iterable that yields |
| ndarray or masked_array instances. We iterate it only once; it may |
| be a generator that frees resources as it produces values. |
| """ |
| |
| expanded_arrays = [] |
| is_expanded = [] |
| for array in arrays: |
| array, mask = npy_explode(array) |
| # TODO(dancol): do we need this mask.any()? ISTM adding an extra |
| # exploded column is expensive enough that we should try |
| # extra-hard to avoid it, but this check isn't necessary |
| # for correctness. |
| has_nulls = mask is not nomask and mask.any() |
| is_expanded.append(has_nulls) |
| if has_nulls: |
| assert not array[mask].any(), \ |
| "values under a mask should be zero" |
| expanded_arrays.append(array) |
| # Ugh: if you try to group-by a bool array, Pandas uses the |
| # group-by-_Object_ code, which is slow. Instead, convert the |
| # mask to int64; we'll convert it back to bool in |
| # _package_group_column below. |
| assert mask.dtype is BOOL |
| expanded_arrays.append(mask.astype(INT64)) |
| else: |
| expanded_arrays.append(array) |
| return expanded_arrays, is_expanded |
| |
| @final |
| class PandasGroupMetaAction(QueryAction): |
| """Action that sucks grouping metadata from Pandas""" |
| |
| # N.B. We need this separate QueryAction class (as opposed to just |
| # making PandasGroupMetaQuery a SimpleQueryAction) because this one |
| # action produces _all_ the group metadata at once (which is the |
| # only efficient way to do it), and a SimpleQueryNode can't have |
| # multiple outputs. |
| |
| group_by = tattr(QueryNode) |
| |
| def __metaq(self, meta): |
| return PandasGroupMetaQuery(self.group_by, meta) |
| |
| def __labels(self, column): |
| return GroupLabelsQuery(self.group_by, column) |
| |
| @override |
| def _compute_inputs(self): |
| return self.group_by |
| |
| @override |
| def _compute_outputs(self): |
| # Comprehension ensures we fail if we add new group members |
| return ([self.__metaq(meta) for meta in GroupMeta] + |
| [self.__labels(column) for column in self.group_by]) |
| |
| @staticmethod |
| def __compute_groups(column_arrays, column_expanded, dtypes): |
| grouper = _make_grouper(column_arrays) |
| assert type(grouper).__name__ == "BaseGrouper" |
| # pylint: disable=unpacking-non-sequence |
| comp_ids, group_ids, ngroups = grouper.group_info |
| result_index = grouper.result_index |
| assert len(group_ids) == ngroups |
| del grouper |
| # Pandas converts to platform dtype internally, so use it |
| # ourselves to avoid a conversion on query. Compression should |
| # take care of extra padding. |
| # TODO(dancol): we can't move to our own grouping soon enough |
| assert np.dtype(np.intp) is INT64 |
| comp_ids = np.asarray(comp_ids, INT64) |
| meta_out = [comp_ids, group_ids, np.asarray([ngroups])] |
| assert len(meta_out) == len(tuple(GroupMeta)) |
| idx = 0 |
| |
| def _fix_group_values_type(group_values, dtype): |
| if group_values.dtype is dtype: |
| return group_values |
| if dtype is UINT64 and group_values.dtype is INT64: |
| return group_values.view(UINT64) |
| casting = "same_kind" |
| if dtype is BOOL and group_values.dtype is INT64: |
| casting = "unsafe" |
| return group_values.astype(dtype, copy=False, casting=casting) |
| |
| def _package_group_column(is_expanded, dtype): |
| nonlocal idx |
| group_values = result_index.get_level_values(idx).values # pylint: disable=no-member |
| assert len(group_values) == ngroups |
| group_values = _fix_group_values_type(group_values, dtype) |
| idx += 1 |
| if is_expanded: |
| # pylint: disable=no-member |
| mask = result_index.get_level_values(idx).values.astype( |
| BOOL, copy=False) |
| idx += 1 |
| return masked_array(group_values, mask) |
| return group_values |
| label_out = [ |
| _package_group_column(is_expanded, dtype) |
| for is_expanded, dtype in zip(column_expanded, dtypes) |
| ] |
| return meta_out + label_out |
| |
| @override |
| async def run_async(self, qe): |
| # Here, we delve into Pandas internals in order to save the |
| # internal grouper information, allowing us to save and restore it |
| # via the usual query caching system. |
| |
| # The Pandas grouping code won't do NULLs right, since it |
| # understands NaN as NULL for floats and doesn't have a concept of |
| # an integer NULL at all. Instead of relying on Pandas NULL |
| # support, we just provide non-NULL values and model NULL as a |
| # separate grouping column. Here, we expand the group-by column |
| # list so that we substitute for each grouped column that might |
| # contain NULL values a non-NULL group (filling in a dummy value |
| # for the NULL rows) and an is-NULL flag. |
| |
| # TODO(dancol): switch to our own grouping code |
| # TODO(dancol): stream-ify this more |
| group_by = self.group_by |
| assert all(isinstance(g, QueryNode) for g in group_by) |
| output_queries = [ |
| self.__metaq(GroupMeta.LABELS), |
| self.__metaq(GroupMeta.GROUP_IDS), |
| self.__metaq(GroupMeta.NGROUPS), |
| ] + [ |
| self.__labels(column) for column in group_by |
| ] |
| |
| ics, ocs = await qe.async_setup(group_by, output_queries) |
| # TODO(dancol): add a "produce as available and in arbitrary |
| # order" option for QueryExecution.async_io()? |
| input_huge_blocks = await qe.async_io(*(ic.read_all() for ic in ics)) |
| dtypes = [ic.dtype for ic in ics] |
| def _maybe_munge_bool(array): |
| # See the bool comment in expand_nullable |
| if array.dtype is BOOL: |
| array = array.astype(INT64) |
| return array |
| column_arrays, column_expanded = expand_nullable( |
| (_maybe_munge_bool(block.as_array()) for block in |
| consume_list(input_huge_blocks))) |
| assert not input_huge_blocks |
| outs = self.__compute_groups(column_arrays, |
| column_expanded, |
| dtypes) |
| assert not column_arrays, "should have been consumed" |
| assert len(ocs) == len(outs) |
| write_ops = [oc.write(out, True) for oc, out in zip(ocs, outs)] |
| outs.clear() # Make sure the output operations own memory exclusively |
| await qe.async_io(*write_ops) |
| |
| WELL_KNOWN_AGGREGATIONS = frozenset(get_supported_native_aggregations()) |
| |
| class NativeAggregationQuery(SimpleQueryNode): |
| """Perform an aggregation using DCTV C++ code""" |
| |
| __abstract__ = True |
| |
| aggregation = iattr(str, kwonly=True) |
| data = iattr(QueryNode, kwonly=True) |
| distinct = iattr(bool, default=False, kwonly=True) |
| |
| @override |
| def _compute_inputs(self): |
| if self.schema.is_null: |
| return () |
| return (self.data,) |
| |
| @final |
| @override |
| def _compute_schema(self): |
| data_schema = self.data.schema |
| aggregation = self.aggregation |
| if data_schema.domain: |
| raise InvalidQueryException( |
| "aggregation {!r} does not support domain {!r}" |
| .format(aggregation, data_schema.domain)) |
| acls, has_empty_value = \ |
| native_aggregation_info(aggregation) |
| if data_schema.is_null and not has_empty_value: |
| return data_schema.propagate_constraints(C_UNIQUE) |
| dtype = native_aggregation_result_dtype( |
| aggregation, data_schema.dtype) |
| if acls == "=": |
| assert dtype == data_schema.dtype |
| return data_schema.propagate_constraints(C_UNIQUE) |
| if acls == "i": |
| return QuerySchema(dtype) |
| assert acls in "+*" |
| if data_schema.is_string: |
| raise InvalidQueryException( |
| "aggregation {!r} does not support strings" |
| .format(aggregation)) |
| if not data_schema.unit or acls == "+": |
| return data_schema.evolve( |
| dtype=dtype, constraints=frozenset()) |
| assert acls == "*" |
| raise InvalidQueryException( |
| "aggregation {!r} does not make sense with a value in unit " |
| "{!r}".format(aggregation, data_schema.unit)) |
| |
| @final |
| class NativeUngroupedAggregationQuery(NativeAggregationQuery): |
| """Aggregate a whole sequence""" |
| |
| @override |
| async def run_async(self, qe): |
| if self.schema.is_null: |
| # We know we don't have a non-NULL empty value because |
| # NativeAggregationQuery.compute_schema() checked for it. |
| # Therefore, we always emit a simple null here. |
| [], [oc] = await qe.async_setup((), (self,)) |
| await oc.write(masked_array([0], [True], dtype=oc.dtype), True) |
| return |
| data = SynchronousInput( |
| SynchronousInput.Column(0), |
| SynchronousInput.Column( |
| self.data, |
| masked=True, |
| )) |
| out = SynchronousOutput(self, masked=True) |
| sync_qe, (), () = await SynchronousQueryExecution.async_setup( |
| qe, (), (), sync_io=(data, out)) |
| def _run_operator(): |
| data_schema = self.data.schema |
| st = qe.st if data_schema.is_string else None |
| # TODO(dancol): support user-specified collations |
| collation = "binary" if st else None |
| if collation and self.aggregation == "biggest": |
| collation = "length" |
| agg(qe, |
| self.aggregation, |
| self.distinct, |
| data_schema.dtype, |
| 1, |
| data, |
| out, |
| st, |
| collation) |
| await sync_qe.async_run(_run_operator) |
| |
| @final |
| class NativeGroupedAggregationQuery(NativeAggregationQuery): |
| """Group a column by another column and aggregation values""" |
| group_by = tattr(QueryNode, kwonly=True) |
| |
| def __meta(self, meta): |
| return PandasGroupMetaQuery(self.group_by, meta) |
| |
| @override |
| def _compute_inputs(self): |
| return super()._compute_inputs() + ( |
| self.__meta(GroupMeta.LABELS), |
| self.__meta(GroupMeta.NGROUPS), |
| ) |
| |
| @override |
| async def run_async(self, qe): |
| if self.schema.is_null: |
| # We know we don't have a non-NULL empty value because |
| # NativeAggregationQuery.compute_schema() checked for it. |
| # Therefore, we always emit a simple null here. |
| [_ic_labels, ic_ngroups], [oc] = await qe.async_setup( |
| (self.__meta(GroupMeta.LABELS), self.__meta(GroupMeta.NGROUPS)), |
| (self,)) |
| ngroups = await ic_ngroups.read_int() |
| await oc.write(npy_broadcast_to( |
| masked_array([0], [True], dtype=oc.dtype), |
| ngroups), True) |
| return |
| data = SynchronousInput( |
| self.__meta(GroupMeta.LABELS), |
| SynchronousInput.Column(self.data, masked=True)) |
| out = SynchronousOutput(self, masked=True) |
| sync_qe, [ic_ngroups], () = \ |
| await SynchronousQueryExecution.async_setup( |
| qe, |
| (self.__meta(GroupMeta.NGROUPS),), |
| (), sync_io=(data, out)) |
| ngroups = await ic_ngroups.read_int() |
| def _run_operator(): |
| data_schema = self.data.schema |
| st = qe.st if data_schema.is_string else None |
| # TODO(dancol): support user-specified collations |
| collation = "binary" if st else None |
| if collation and self.aggregation == "biggest": |
| collation = "length" |
| agg(qe, |
| self.aggregation, |
| self.distinct, |
| data_schema.dtype, |
| ngroups, |
| data, |
| out, |
| st, |
| collation) |
| await sync_qe.async_run(_run_operator) |
| |
| |
| @final |
| class GroupSizesQuery(SimpleQueryNode): |
| """Query that returns the sizes of each group""" |
| |
| group_by = tattr(QueryNode) |
| |
| @override |
| def _compute_schema(self): |
| return QuerySchema(INT64) |
| |
| def __meta(self, meta): |
| return PandasGroupMetaQuery(self.group_by, meta) |
| |
| @override |
| def _compute_inputs(self): |
| return ( |
| self.__meta(GroupMeta.LABELS), |
| self.__meta(GroupMeta.NGROUPS), |
| ) |
| |
| @override |
| def countq(self): |
| return self.__meta(GroupMeta.NGROUPS) |
| |
| @override |
| async def run_async(self, qe): |
| [ic_labels, ic_ngroups], [oc] = await qe.async_setup( |
| (self.__meta(GroupMeta.LABELS), |
| self.__meta(GroupMeta.NGROUPS)), |
| (self,)) |
| labels_block, ngroups = await qe.async_io(ic_labels.read_all(), |
| ic_ngroups.read_int()) |
| assert not labels_block.has_mask |
| def _filter_labels(): |
| labels = labels_block.as_array() |
| return labels[labels != -1] |
| group_sizes = np.bincount(_filter_labels(), minlength=ngroups or None) |
| out_io = oc.write(group_sizes, True) |
| del group_sizes |
| del labels_block |
| await out_io |
| |
| def GroupCountQuery(group_by): # pylint: disable=invalid-name |
| """Make a new group-count query. |
| |
| GROUP_BY is the grouping column; see GroupMeta. |
| """ |
| return PandasGroupMetaQuery(group_by, GroupMeta.NGROUPS) |
| |
| class GroupLabelsQuery(QueryNode): |
| """Return the entries in a group-by operation""" |
| |
| group_by = tattr(QueryNode) |
| column = iattr(QueryNode) |
| |
| @override |
| def _post_init_assert(self): |
| super()._post_init_assert() |
| assert self.column in self.group_by |
| |
| @override |
| def _compute_schema(self): |
| return self.column.schema.propagate_constraints(C_UNIQUE) |
| |
| @override |
| def make_action(self): |
| return PandasGroupMetaAction(self.group_by) |
| |
| |
| # Time series to span conversion: time series |
| |
| class TimeSeriesSourceRole(AutoNumber): |
| """Kind of event source""" |
| START = () |
| STOP = () |
| STOP_BROADCAST = () |
| |
| class TimeSeriesEventEdge(AutoNumber): |
| """Specifies the edge from which to extract event data""" |
| RISING = () |
| FALLING = () |
| |
| @final |
| class TimeSeriesSource(EqImmutable): |
| """Source description""" |
| ts = TS_SCHEMA.qn_iattr() |
| partition = iattr(QueryNode, nullable=True) |
| role = enumattr(TimeSeriesSourceRole) |
| priority = iattr(int) |
| |
| @final |
| class TimeSeriesToSpan(AutoMultiQuery): |
| """Analyze a time series as a set of spans |
| |
| This QueryTable efficiently converts a set of discrete events into a |
| table of regular spans, with each row in the resulting span table |
| having a distinct start time and duration. |
| |
| The input tables describe "events" that "arrive" at specific times |
| and that tell us when certain activities have started or stopped. |
| (These tables must be ordered by a "_ts" column.) |
| |
| Events come in two kinds: "start" events and "stop" events. When a |
| span is not active and we begin a span, the event that starts the |
| span is said to be the rising edge of that span. The event that |
| terminates a span is the "falling" edge of that span. If we already |
| have a span active and see another "start" event, we terminate the |
| active span and start a new span in the same spot. The event that |
| prompts us to stop and start this way is _simultaneously_ the |
| falling-edge event for the span we close and the rising-edge event |
| for the span we start in its place. |
| |
| These edges are important for determining where we get the values |
| making up the columns that this table provides. When you create a |
| TimeSeriesQueryTable, you specify which event source a |
| column's values comes from and which _edge_ supplies that value. |
| |
| Partition is another important concept. Sometimes spans start and |
| stop in different "domains" and don't interfere with each other |
| across domains. Consider system process scheduling: we can model the |
| activity of a CPU as a sequence of spans, but different CPUs act |
| independently. The value of the partition column divides events into |
| independent overlapping series. |
| """ |
| |
| sources = tattr(TimeSeriesSource) |
| |
| class Meta(AutoMultiQuery.AutoMeta): |
| """Metadata for time series spans""" |
| TS = TS_SCHEMA |
| DURATION = DURATION_SCHEMA |
| PARTITION = lambda config: config.partition_schema |
| |
| @cached_property |
| def partition_schema(self): |
| """The schema of the partition column""" |
| return QuerySchema.concat( |
| (source.partition.schema.unconstrain() |
| for source in self.sources |
| if source.partition)) |
| |
| @final |
| class RawIndexerQuery(QueryNode): |
| """Unsorted indexer queries: not directly usable!""" |
| |
| config = iattr() |
| source_index = iattr(int) |
| edge = enumattr(TimeSeriesEventEdge) |
| |
| @override |
| def _post_init_assert(self): |
| super()._post_init_assert() |
| assert isinstance(self.config, TimeSeriesToSpan) |
| assert 0 <= self.source_index < len(self.config.sources) |
| |
| @override |
| def _compute_schema(self): |
| return INDEXER_SCHEMA |
| |
| @override |
| def make_action(self): |
| # pylint: disable=no-member |
| return TimeSeriesToSpan.Action(config=self.config) |
| |
| def raw_indexerq(self, source_index, edge): |
| """Make a query yielding an unsorted take indexer""" |
| return TimeSeriesToSpan.RawIndexerQuery(self, source_index, edge) |
| |
| SOURCE_KINDS = { |
| TimeSeriesSourceRole.START: TSTS_OPEN, |
| TimeSeriesSourceRole.STOP: TSTS_CLOSE, |
| TimeSeriesSourceRole.STOP_BROADCAST: TSTS_CLOSE_BROADCAST, |
| } |
| |
| @override |
| def compute_inputs(self): |
| inputs = [] |
| for source in self.sources: |
| inputs.append(source.ts) |
| if source.partition: |
| inputs.append(source.partition) |
| return inputs |
| |
| @override |
| def compute_outputs(self): |
| Meta = self.Meta |
| outputs = [ |
| self.metaq(Meta.TS), |
| self.metaq(Meta.DURATION), |
| ] |
| any_has_partition = False |
| for source_index, source in enumerate(self.sources): |
| for edge in TimeSeriesEventEdge: |
| outputs.append(self.raw_indexerq(source_index, edge)) |
| any_has_partition = any_has_partition or source.partition |
| if any_has_partition: |
| outputs.append(self.metaq(Meta.PARTITION)) |
| return outputs |
| |
| @override |
| async def run_async(self, qe): |
| # TODO(dancol): we should be able to make do with just one output |
| # iterator! We should just define a C++ specialization for some |
| # small maximum number of sources. |
| Meta = self.Meta |
| io_specs = [] |
| source_specs = [] |
| any_has_partition = False |
| |
| def _handle_source(source_index, source): |
| nonlocal any_has_partition |
| # Local function avoids unwanted closure-capture |
| if source.partition: |
| source_input = SynchronousInput( |
| source.ts, |
| SynchronousInput.Column( |
| source.partition, |
| partition=True)) |
| io_specs.append(source_input) |
| any_has_partition = True |
| else: |
| source_input = SynchronousInput(source.ts) |
| io_specs.append(source_input) |
| # Broadcast a zero in place of the partition |
| source_input = ((input_array, 0) for [input_array] in source_input) |
| source_output = SynchronousOutput( |
| self.raw_indexerq(source_index, TimeSeriesEventEdge.RISING), |
| self.raw_indexerq(source_index, TimeSeriesEventEdge.FALLING), |
| ) |
| io_specs.append(source_output) |
| source_specs.append(( |
| source_input, |
| source_output, |
| self.SOURCE_KINDS[source.role], |
| source.priority, |
| )) |
| |
| for source_index, source in enumerate(self.sources): |
| _handle_source(source_index, source) |
| |
| partition_output_q = \ |
| self.metaq(Meta.PARTITION) if any_has_partition else None |
| metadata_output = SynchronousOutput( |
| self.metaq(Meta.TS), |
| self.metaq(Meta.DURATION), |
| partition_output_q, |
| partition=partition_output_q) |
| io_specs.append(metadata_output) |
| if not any_has_partition: |
| def _metadata_output_wrapper(ts, duration, _partition, *, |
| mo=metadata_output): |
| mo(ts, duration) |
| metadata_output = _metadata_output_wrapper # pylint: disable=redefined-variable-type |
| |
| sync_qe, (), () = await SynchronousQueryExecution.async_setup( |
| qe, (), (), sync_io=io_specs) |
| |
| def _run_operator(): |
| time_series_to_span( |
| source_specs, |
| metadata_output, |
| qe) |
| await sync_qe.async_run(_run_operator) |
| |
| @final |
| class TimeSeriesQueryTable(QueryTable): |
| """Convenient query table for a time series conversion""" |
| |
| _config = iattr(TimeSeriesToSpan) |
| _columns = iattr(dict) |
| |
| @override |
| def __new__(cls, *, |
| sources, |
| columns, |
| partition=None): |
| my_es = [] |
| es_nicknames = {} |
| es_sources = [] |
| has_partition = False |
| def _event_source_spec(source, |
| *, |
| role=TimeSeriesSourceRole.START, |
| priority=0, |
| nickname=None): |
| if not isinstance(source, QueryTable): |
| raise InvalidQueryException("source is not a table, but a {}: {!r}" |
| .format(type(source), source)) |
| source_table_schema = source.table_schema |
| if source_table_schema.kind != TableKind.EVENT: |
| raise InvalidQueryException("invalid event source table type {}" |
| .format(source_table_schema)) |
| source = source.to_schema(sorting=TableSorting.TIME_MAJOR) |
| source_table_schema = source.table_schema |
| partition = source_table_schema.partition |
| ts_q = source["_ts"] |
| partition_q = None if not partition else source[partition] |
| nonlocal has_partition |
| has_partition = has_partition or partition_q |
| if nickname: |
| es_nicknames[the(str, nickname)] = len(my_es) |
| es_sources.append(source) |
| my_es.append(TimeSeriesSource(ts_q, |
| partition_q, |
| TimeSeriesSourceRole(role), |
| priority)) |
| for event_source_spec in sources: |
| if isinstance(event_source_spec, dict): |
| _event_source_spec(**event_source_spec) |
| elif isinstance(event_source_spec, (list, tuple)): |
| _event_source_spec(*event_source_spec) |
| else: |
| _event_source_spec(event_source_spec) |
| |
| config = TimeSeriesToSpan(my_es) |
| if has_partition and not partition: |
| raise InvalidQueryException( |
| "partitioned event sources but output partition not specified") |
| if partition and not has_partition: |
| raise InvalidQueryException( |
| "output partition specified but sources are unpartitioned") |
| if has_partition \ |
| and not all(source.partition for source in config.sources |
| if source.role != TimeSeriesSourceRole.STOP_BROADCAST): |
| raise InvalidQueryException( |
| "if one input is partitioned, they all must be, except for " |
| "STOP_BROADCAST sources") |
| |
| # TODO(dancol): add a mode for built-in column coalescing |
| |
| my_columns = {} |
| def _column_spec(column, |
| source=None, |
| source_column=None, |
| edge=TimeSeriesEventEdge.RISING): |
| if source is None: |
| if len(my_es) == 1: |
| source = 0 |
| else: |
| raise InvalidQueryException("multiple sources: must specify") |
| |
| source_column = source_column or column |
| if isinstance(source, str): |
| source_index = es_nicknames[source] |
| else: |
| source_index = source |
| source = es_sources[source_index] |
| assert isinstance(source, QueryTable) |
| if source_column not in source: |
| raise InvalidQueryException("column {!r} not in source {}" |
| .format(source_column, source)) |
| |
| assert source_column in source |
| my_columns[column] = (source_index, |
| source[source_column], |
| TimeSeriesEventEdge(edge)) |
| for column_spec in columns: |
| if isinstance(column_spec, dict): |
| _column_spec(**column_spec) |
| elif isinstance(column_spec, (list, tuple)): |
| _column_spec(*column_spec) |
| else: |
| _column_spec(column_spec) |
| table_schema = (SPAN_UNPARTITIONED_TIME_MAJOR |
| if not partition else |
| TableSchema(TableKind.SPAN, |
| partition, |
| TableSorting.NONE)) |
| return cls._do_new(cls, |
| _config=config, |
| _columns=my_columns, |
| table_schema=table_schema) |
| |
| @override |
| def _make_column_tuple(self): |
| return self.table_schema.meta_columns + tuple(self._columns) |
| |
| @override |
| def _make_column_query(self, column_name): |
| if column_name == "_ts": |
| return self._config.metaq(TimeSeriesToSpan.Meta.TS) |
| if column_name == "_duration": |
| return self._config.metaq(TimeSeriesToSpan.Meta.DURATION) |
| if column_name == self.table_schema.partition: |
| return self._config.metaq(TimeSeriesToSpan.Meta.PARTITION) |
| source_index, source_query, edge = self._columns[column_name] |
| return source_query.take( |
| self._config.raw_indexerq(source_index, edge)) |
| |
| |
| # Time series to span conversion: hierarchy |
| |
| STACK_ID_SCHEMA = QuerySchema(INT64, domain="stack_id") |
| STACK_DEPTH_SCHEMA = QuerySchema(INT64, domain="stack_depth") |
| |
| class Stackify(AutoMultiQuery): |
| """Turn time series into stacks""" |
| |
| # TODO(dancol): just use NULL to indicate endings |
| # TODO(dancol): generate real span tables on output |
| |
| ts = TS_SCHEMA.qn_iattr() |
| partition = iattr(QueryNode) |
| frame_id = iattr(QueryNode) |
| end_flag = iattr(QueryNode) |
| |
| class Meta(AutoMultiQuery.AutoMeta): |
| """Metadata selector""" |
| SH_TS = TS_SCHEMA |
| SH_STACK_ID = STACK_ID_SCHEMA |
| SH_PARTITION = lambda config: config.partition.schema |
| SD_STACK_ID = STACK_ID_SCHEMA |
| SD_DEPTH = STACK_DEPTH_SCHEMA |
| SD_FRAME_ID = lambda config: config.frame_id.schema |
| |
| @override |
| def compute_inputs(self): |
| return ( |
| self.ts, |
| self.partition, |
| self.frame_id, |
| self.end_flag, |
| ) |
| |
| @override |
| def compute_outputs(self): |
| return [self.metaq(meta) for meta in self.Meta] |
| |
| @override |
| async def run_async(self, qe): |
| # TODO(dancol): make ordering of fields canonical |
| Meta = self.Meta |
| meta_input = SynchronousInput(self.partition, |
| self.ts, |
| self.frame_id, |
| self.end_flag) |
| history_output = SynchronousOutput(self.metaq(Meta.SH_TS), |
| self.metaq(Meta.SH_PARTITION), |
| self.metaq(Meta.SH_STACK_ID)) |
| description_output = SynchronousOutput(self.metaq(Meta.SD_STACK_ID), |
| self.metaq(Meta.SD_DEPTH), |
| self.metaq(Meta.SD_FRAME_ID)) |
| sync_qe, (), () = await SynchronousQueryExecution.async_setup( |
| qe, (), (), sync_io=(meta_input, history_output, description_output)) |
| def _run_operator(): |
| stackify_start_stop_qqqq(meta_input, |
| history_output, |
| description_output, |
| qe) |
| await sync_qe.async_run(_run_operator) |
| |
| |
| # Joins |
| |
| JOIN_KINDS = ("inner", "left", "right", "outer") |
| |
| class JoinMeta(AutoNumber): |
| """Metadatum for a join operation""" |
| LEFT_INDEXER = () |
| RIGHT_INDEXER = () |
| |
| @final |
| class JoinMetaQuery(QueryNode): |
| """Query producing join metadata""" |
| |
| # TODO(dancol): should the keys be sattr, not tattr? Does key |
| # ordering matter? It should be possible to have an sattr of pairs |
| # instead of two tattrs. |
| |
| _left_key = tattr(QueryNode, name="left_key") |
| _right_key = tattr(QueryNode, name="right_key") |
| _null_allowed = tattr(bool, name="null_allowed") |
| _kind = iattr(str, name="kind") |
| _meta = enumattr(JoinMeta, name="meta") |
| |
| @override |
| def _post_init_assert(self): |
| super()._post_init_assert() |
| assert len(self._left_key) == len(self._right_key) |
| assert self._kind in JOIN_KINDS |
| |
| @override |
| def _compute_schema(self): |
| return INDEXER_SCHEMA |
| |
| @override |
| def make_action(self): |
| return JoinMetaAction(self._left_key, self._right_key, |
| self._null_allowed, self._kind) |
| |
| _INT64_FOR_FACTORIZE = { |
| "b": lambda array: array.astype(INT64, copy=False), |
| "i": lambda array: array.astype(INT64, copy=False), |
| "u": lambda array: array.astype(UINT64, copy=False).view(INT64), |
| "f": lambda array: array.astype(FLOAT64).view(INT64), |
| } |
| |
| def _int64_for_factorize(array): |
| """Transform ARRAY into an INT64 array for factorization""" |
| assert type(array) is ndarray, ( # pylint: disable=unidiomatic-typecheck |
| "array {!r} has type {!r} but we want base ndarray".format( |
| array, type(array))) |
| # We care only about the bitwise identity of the keys, so just |
| # interpret any value of the appropriate size to int64. |
| array = _INT64_FOR_FACTORIZE[array.dtype.kind](array) |
| assert array.dtype is INT64 |
| return array |
| |
| def _factorize_keys(pd, lk, rk): |
| # This function is a simplified version of the Pandas original. |
| # The gist of it is to take arbitrary keys in LK and RK and "intern" |
| # them in a hash table, converting each to a small integer group |
| # number in that hash table. (Equal keys get interned into the same |
| # group number.) This way, for subsequent passes, we can just index |
| # directly into a table instead of having to do hash lookups at each |
| # step. (We effectively front-load the hash lookups.) |
| lk, lmask = npy_explode(lk) |
| lk = _int64_for_factorize(lk) |
| rk, rmask = npy_explode(rk) |
| rk = _int64_for_factorize(rk) |
| # TODO(dancol): use our own hash table instead of Pandas's. We can |
| # directly support all data types instead of needing |
| # _int64_for_factorize above. |
| klass = pd._libs.hashtable.Int64Factorizer # pylint: disable=protected-access |
| rizer = klass(max(len(lk), len(rk))) |
| llab = rizer.factorize(lk) |
| rlab = rizer.factorize(rk) |
| count = rizer.get_count() |
| # Make a brand-new label for the masked value. |
| if lmask.any() or rmask.any(): |
| llab[lmask] = count |
| rlab[rmask] = count |
| count += 1 |
| return llab, rlab, count |
| |
| def _get_join_keys(pd, llab, rlab, shape): |
| # This function is a simplified version of the Pandas original, with |
| # more elaborate comments. The gist of the function is to condense |
| # a potentially large space of keys into a smaller space that fits |
| # in an int64; for the common case where we don't have enough keys |
| # to cause an int64 overflow, we don't re-hash. |
| assert all(v.dtype is INT64 for v in llab) |
| assert all(v.dtype is INT64 for v in rlab) |
| is_int64_overflow_possible = pd.core.sorting.is_int64_overflow_possible |
| # Find the maximum number of levels we can process as a chunk |
| # without overflow. If we have more than this many levels, we take |
| # the levels we've built by "stretching" and condense them to dense |
| # labels, then repeat the process. |
| pred = lambda i: not is_int64_overflow_possible(shape[:i]) |
| nlev = next(filter(pred, range(len(shape), 0, -1))) |
| # Get keys for the first `nlev` levels, effectively "stretching" |
| # each level to make room for the values from the next level to fit |
| # in the spaces between the stretched values. |
| stride = np.prod(shape[1:nlev], dtype=INT64) |
| # Invoke np.multiply explicitly (with explicit dtype) because the |
| # infix multiply operator (or dtype-less np.multiply) produces |
| # "q"-dtype outputs from big "l"-type inputs. "q"-dtype results are |
| # bitwise-equivalent to "l"-dtype ones (on 64-bit machines) but have |
| # the wrong dtype, triggering assertions later. |
| lkey = np.multiply(stride, llab[0], dtype=INT64) |
| rkey = np.multiply(stride, rlab[0], dtype=INT64) |
| for i in range(1, nlev): |
| with np.errstate(divide="ignore"): |
| stride //= shape[i] |
| lkey += llab[i] * stride |
| rkey += rlab[i] * stride |
| if nlev == len(shape): # all done! |
| return lkey, rkey |
| # Densify current keys to avoid overflow. |
| lkey, rkey, count = _factorize_keys(pd, lkey, rkey) |
| llab = [lkey] + llab[nlev:] |
| rlab = [rkey] + rlab[nlev:] |
| shape = [count] + shape[nlev:] |
| return _get_join_keys(pd, llab, rlab, shape) |
| |
| def _remove_nulls_from_join_keys(keys, null_allowed): |
| assert all_same(map(len, keys)) |
| assert len(keys) == len(null_allowed) |
| if not any((npy_has_mask(k) and not null_allowed) |
| for k, null_allowed |
| in zip(keys, null_allowed)): |
| return keys, None |
| # Use np.concatenate directly because np.stack makes useless |
| # internal copies (because it calls np.asanyarray). |
| n = len(keys[0]) |
| column_shape = (n, 1) |
| any_mask = np.logical_or.reduce( # pylint: disable=no-member |
| np.concatenate( |
| [npy_get_mask(k).reshape(column_shape) |
| for k, null_allowed in zip(keys, null_allowed) |
| if not null_allowed and npy_has_mask(k)] or ((False,),), |
| axis=1), |
| axis=1) |
| if not any_mask.any(): |
| return keys, None |
| keep_mask = np.logical_not(any_mask, out=any_mask) |
| munged_keys = [k[keep_mask] for k in keys] |
| remap = np.arange(n)[keep_mask] |
| return munged_keys, remap |
| |
| def _get_join_indexers(pd, left_keys, right_keys, null_allowed, kind): |
| # This function is a simplified version of the Pandas original. |
| # It actually computes the join. This version also understands |
| # DCTV masks. |
| assert len(left_keys) == len(right_keys) |
| fkeys = partial(_factorize_keys, pd) |
| left_keys, left_remap = _remove_nulls_from_join_keys( |
| left_keys, null_allowed) |
| right_keys, right_remap = _remove_nulls_from_join_keys( |
| right_keys, null_allowed) |
| # get left & right join labels and num. of levels at each location |
| llab, rlab, shape = map(list, zip(*map(fkeys, left_keys, right_keys))) |
| lkey, rkey = _get_join_keys(pd, llab, rlab, shape) |
| # factorize keys to a dense i8 space |
| # `count` is the num. of unique keys |
| # set(lkey) | set(rkey) == range(count) |
| lkey, rkey, count = fkeys(lkey, rkey) |
| libjoin = pd._libs.join # pylint: disable=protected-access |
| if kind == "inner": |
| left_indexer, right_indexer = libjoin.inner_join(lkey, rkey, count) |
| # Same algorithm left join uses to preserve frame order, but we do |
| # it by hand here because it's not built-in like it is for |
| # left_outer_join (with sort=False). We have to use |
| # groupsort_indexer unconditionally because the original sorter |
| # isn't available to us outside inner_join. |
| |
| # TODO(dancol): benchmark against using left_outer_join |
| # unconditionally and filtering. |
| # pylint: disable=protected-access |
| rev, _ = pd._libs.algos.groupsort_indexer(left_indexer, len(lkey)) |
| left_indexer = left_indexer.take(rev) |
| right_indexer = right_indexer.take(rev) |
| elif kind == "left": |
| left_indexer, right_indexer = \ |
| libjoin.left_outer_join(lkey, rkey, count, sort=False) |
| elif kind == "right": |
| left_indexer, right_indexer = \ |
| libjoin.left_outer_join(rkey, lkey, count, sort=False)[::-1] |
| else: |
| assert kind == "outer" |
| left_indexer, right_indexer = \ |
| libjoin.full_outer_join(lkey, rkey, count) |
| if left_remap is not None: |
| left_indexer = left_remap.take(left_indexer) |
| if right_remap is not None: |
| right_indexer = right_remap.take(right_indexer) |
| return left_indexer, right_indexer |
| |
| class JoinMetaAction(QueryAction): |
| """Compute join metadata""" |
| |
| _left_key = tattr(QueryNode, name="left_key") |
| _right_key = tattr(QueryNode, name="right_key") |
| _null_allowed = tattr(bool, name="null_allowed") |
| _kind = iattr(str, name="kind") |
| |
| @override |
| def _post_init_check(self): |
| super()._post_init_check() |
| assert len(self._left_key) == len(self._right_key) |
| assert len(self._null_allowed) == len(self._left_key) |
| assert self._kind in JOIN_KINDS |
| for left_key_q, right_key_q in zip(self._left_key, self._right_key): |
| # Make sure the joined columns are compatible by side effect |
| ls = left_key_q.schema |
| rs = right_key_q.schema |
| if ls.is_null or rs.is_null: |
| return |
| if ls.is_string != rs.is_string: |
| raise InvalidQueryException("cannot join string and non-string") |
| if ls.domain != rs.domain: |
| raise InvalidQueryException("cannot join domain {!r} with domain {!r}" |
| .format(ls.domain, rs.domain)) |
| # TODO(dancol): support non-lossy unit conversions? |
| if ls.unit != rs.unit: |
| raise InvalidQueryException("cannot join unit {!r} with unit {!r}" |
| .format(ls.unit, rs.unit)) |
| if (ls.dtype.kind != rs.dtype.kind and |
| not (ls.dtype.kind in "iu" and rs.dtype.kind in "iu")): |
| raise InvalidQueryException( |
| "cannot join {} with {}".format( |
| DTYPE_KINDS[ls.dtype.kind], |
| DTYPE_KINDS[rs.dtype.kind])) |
| |
| @override |
| def _compute_inputs(self): |
| return self._left_key + self._right_key |
| |
| def __metaq(self, meta): |
| return JoinMetaQuery(self._left_key, |
| self._right_key, |
| self._null_allowed, |
| self._kind, meta) |
| |
| @override |
| def _compute_outputs(self): |
| return tuple(self.__metaq(meta) for meta in JoinMeta) |
| |
| @override |
| async def run_async(self, qe): |
| def _list_split(things): |
| n = len(things) // 2 |
| right_things = things[n:] |
| del things[n:] |
| left_things = things |
| assert len(left_things) == len(right_things) |
| return left_things, right_things |
| |
| # pylint: disable=protected-access |
| ics, [oc_lindexer, oc_rindexer] = await qe.async_setup( |
| (self._left_key + self._right_key), |
| (self.__metaq(JoinMeta.LEFT_INDEXER), |
| self.__metaq(JoinMeta.RIGHT_INDEXER))) |
| if not ics: |
| return # Nothing to do |
| left_ics, right_ics = _list_split(list(ics)) |
| # TODO(dancol): make iterative join work in right-join mode by |
| # inverting the join internally. If we really need |
| # larger-than-core full outer join, we can do it with |
| # multiple passes. |
| # TODO(dancol): make NULL-safe |
| join_block_size = 0 |
| kind = self._kind |
| |
| pd = load_pandas() |
| |
| if kind in ("left", "inner"): |
| # We know in this join mode that the left side indexer values will |
| # be non-decreasing, allowing us to loop over chunks of the left |
| # side, joining each chunk against the entire right side, and |
| # avoid loading the entire left array into memory. |
| join_block_size = (qe.env.get("join_block_size") |
| or 10 * qe.block_size) |
| if join_block_size: |
| rleft = lambda ic: ic.read_at_least(join_block_size) |
| else: |
| rleft = lambda ic: ic.read_all() |
| left_reqs = [rleft(ic) for ic in left_ics] |
| right_reqs = [ic.read_all() for ic in right_ics] |
| inps = await qe.async_io(*(left_reqs + right_reqs)) |
| left_inps, right_inps = _list_split(inps) |
| check_same_lengths(left_ics, left_inps) |
| check_same_lengths(right_ics, right_inps) |
| right_key_arrays = [npy_make_rw_view(block) for block in right_inps] |
| left_rows_consumed = 0 |
| |
| while True: |
| chunk_size = len(left_inps[0]) |
| if not chunk_size: |
| break |
| lindexer, rindexer = \ |
| _get_join_indexers( |
| pd, |
| [npy_make_rw_view(block) for block in left_inps], |
| right_key_arrays, |
| self._null_allowed, |
| kind) |
| assert len(lindexer) == len(rindexer) |
| if kind in ("inner", "left"): |
| assert is_nondecreasing(lindexer) |
| lindexer += left_rows_consumed |
| is_eof = ((not join_block_size) or |
| chunk_size < join_block_size) |
| left_rows_consumed += chunk_size |
| _, _, *left_inps = await qe.async_io( |
| oc_lindexer.write(lindexer, is_eof), |
| oc_rindexer.write(rindexer, is_eof), |
| *[rleft(ic) for ic in left_ics]) |
| |
| |
| # Dropping duplicates |
| |
| class DropDuplicatesAction(QueryAction): |
| """Implemention of dropping duplicates""" |
| |
| _group = sattr(QueryNode, name="group") |
| |
| @override |
| def _compute_inputs(self): |
| return self._group |
| |
| @override |
| def _compute_outputs(self): |
| return cons(DropDuplicatesIndexerQuery(self._group), |
| (DropDuplicatesQuery(self._group, member) |
| for member in self._group)) |
| |
| @override |
| async def run_async(self, qe): |
| # TODO(dancol): if we want larger-than-RAM support here, we're |
| # going to do have to manually sort the input group and iterate |
| # over it in chunks, taking care to remember the block boundary |
| # values. Let's see whether it's actually a problem in practice. |
| members = tuple(self._group) |
| out_indexer_query = DropDuplicatesIndexerQuery(self._group) |
| out_member_queries = [DropDuplicatesQuery(self._group, member) |
| for member in members] |
| ics, [oc_indexer, *oc_members] = await qe.async_setup( |
| members, [out_indexer_query] + out_member_queries) |
| arrays = [block.as_array() for block in |
| await qe.async_io(*[ic.read_all() for ic in ics])] |
| arrays, is_expanded = expand_nullable(arrays) |
| def _make_out_reqs(): |
| column_names = ["col{}".format(i) for i in range(len(arrays))] |
| df = make_pd_dataframe( |
| dict(zip(column_names, map(npy_make_rw_view, arrays)))) |
| arrays.clear() |
| df.drop_duplicates(inplace=True) |
| out_reqs = [oc_indexer.write(df.index.values, True)] |
| for oc_member, c_is_expanded in zip(oc_members, is_expanded): |
| if c_is_expanded: |
| out_array = masked_array( |
| df[column_names.pop(0)].values, |
| df[column_names.pop(0)].values) |
| else: |
| out_array = df[column_names.pop(0)].values |
| out_reqs.append(oc_member.write(out_array, True)) |
| return out_reqs |
| await qe.async_io(*_make_out_reqs()) |
| |
| class DropDuplicatesIndexerQuery(QueryNode): |
| """Yields row numbers of kept values""" |
| |
| _group = sattr(QueryNode, name="group") |
| |
| @override |
| def _compute_schema(self): |
| return INDEXER_SCHEMA.constrain(C_UNIQUE) |
| |
| @override |
| def make_action(self): |
| return DropDuplicatesAction(self._group) |
| |
| class DropDuplicatesQuery(QueryNode): |
| """QueryNode for dropping duplicates""" |
| |
| _group = sattr(QueryNode, name="group") |
| _column = iattr(QueryNode) |
| |
| @override |
| def _post_init_assert(self): |
| super()._post_init_assert() |
| assert self._column in self._group |
| |
| @override |
| def _compute_schema(self): |
| # We're guaranteed columnwise UNIQUE only if our group has size |
| # one. But if the base column is already UNIQUE, our output is |
| # UNIQUE regardless of group size. |
| schema = self._column.schema |
| if len(self._group) == 1: |
| schema = schema.constrain(C_UNIQUE) |
| return schema |
| |
| @override |
| def make_action(self): |
| return DropDuplicatesAction(self._group) |
| |
| @classmethod |
| def of1(cls, query): |
| """Convenience function for when we want to compress one query""" |
| return cls(frozenset((query,)), query) |
| |
| |
| # Span join |
| |
| class SpanJoin(AutoMultiQuery): |
| """Join spans together along the time axis""" |
| |
| sources = tattr() |
| |
| @override |
| def _post_init_assert(self): |
| super()._post_init_assert() |
| assert assert_seq_type(tuple, SpanJoin.Source, self.sources) |
| |
| class Meta(AutoMultiQuery.AutoMeta): |
| """Metadata selection""" |
| TIMESTAMP = TS_SCHEMA |
| DURATION = DURATION_SCHEMA |
| PARTITION = lambda config: config.partition_schema |
| |
| @cached_property |
| def partition_schema(self): |
| """The schema of the result partition""" |
| return QuerySchema.concat( |
| (source.span.partition.schema.unconstrain() |
| for source in self.sources |
| if source.span.partition)) |
| |
| class Source(EqImmutable): |
| """One source for a span join""" |
| span = iattr(SpanTableConfig) |
| required = iattr(bool, kwonly=True) |
| |
| class IndexerQuery(QueryNode): |
| """Input row mappings from a span join""" |
| config = iattr() |
| source_index = iattr(int) |
| |
| @override |
| def _post_init_assert(self): |
| super()._post_init_assert() |
| assert isinstance(self.config, SpanJoin) |
| assert 0 <= self.source_index < len(self.config.sources) |
| |
| @override |
| def _compute_schema(self): |
| return INDEXER_SCHEMA # Not unique |
| |
| @override |
| def make_action(self): |
| return SpanJoin.Action(config=self.config) # pylint: disable=no-member |
| |
| def indexerq(self, source_index): |
| """Return a query for the indexes to take for a given source""" |
| # TODO(dancol): incremental taking |
| return self.IndexerQuery(self, source_index) |
| |
| @override |
| async def run_async(self, qe): |
| Meta = self.Meta |
| io_specs = [] |
| source_specs = [] |
| any_has_partition = False |
| for source_index, source in enumerate(self.sources): |
| sp = source.span |
| source_input = sp.make_synchronous_input(fill_partition=True) |
| io_specs.append(source_input) |
| source_output = SynchronousOutput(self.indexerq(source_index)) |
| io_specs.append(source_output) |
| has_partition = bool(source.span.partition) |
| any_has_partition = any_has_partition or has_partition |
| source_specs.append(( |
| source_input, |
| source_output, |
| source.required, |
| has_partition, |
| )) |
| partition_output_q = \ |
| self.metaq(Meta.PARTITION) if any_has_partition else None |
| metadata_output = SynchronousOutput( |
| self.metaq(Meta.TIMESTAMP), |
| self.metaq(Meta.DURATION), |
| partition_output_q, |
| partition=partition_output_q) |
| io_specs.append(metadata_output) |
| sync_qe, _, _ = \ |
| await SynchronousQueryExecution.async_setup( |
| qe, (), (), sync_io=io_specs) |
| def _run_operator(): |
| span_merge_join(source_specs, metadata_output, qe) |
| await sync_qe.async_run(_run_operator) |
| |
| @override |
| def compute_inputs(self): |
| inputs = [] |
| for source in self.sources: |
| inputs.extend(source.span.inputs) |
| return inputs |
| |
| @override |
| def compute_outputs(self): |
| Meta = self.Meta |
| outputs = [] |
| any_has_partition = False |
| for source_index, source in enumerate(self.sources): |
| any_has_partition = any_has_partition or source.span.partition |
| outputs.append(self.indexerq(source_index)) |
| outputs.append(self.metaq(Meta.TIMESTAMP)) |
| outputs.append(self.metaq(Meta.DURATION)) |
| if any_has_partition: |
| outputs.append(self.metaq(Meta.PARTITION)) |
| return outputs |
| |
| |
| # Span group |
| |
| COMMAND_STREAM_SPAN_PIVOT_SCHEMA = QuerySchema( |
| DTYPE_COMMAND_STREAM, |
| domain="command_stream_span_pivot") |
| |
| COMMAND_STREAM_SPAN_GROUP_SCHEMA = QuerySchema( |
| DTYPE_COMMAND_STREAM, |
| domain="command_stream_span_group") |
| |
| COLUMN_AGG_BY_DOMAIN = { |
| "command_stream_span_pivot": span_pivot_column, |
| "command_stream_span_group": span_group_column, |
| } |
| |
| @final |
| class NativeSpanAggregationQuery(NativeAggregationQuery): |
| """Perform an aggregation using native DCTV kernels""" |
| command = iattr(QueryNode, kwonly=True) |
| |
| @override |
| def _post_init_assert(self): |
| super()._post_init_assert() |
| assert self.command.schema.domain in COLUMN_AGG_BY_DOMAIN |
| |
| @override |
| def _post_init_check(self): |
| super()._post_init_check() |
| if self.distinct: |
| raise NotImplementedError( |
| "DISTINCT on native aggregations not implemented") |
| |
| @override |
| def _compute_inputs(self): |
| return super()._compute_inputs() + (self.command,) |
| |
| @override |
| async def run_async(self, qe): |
| data = SynchronousInput( |
| SynchronousInput.Column( |
| self.data, |
| masked=True, |
| )) |
| command = SynchronousInput(self.command) |
| out = SynchronousOutput(self, masked=True) |
| sync_qe, (), () = await SynchronousQueryExecution.async_setup( |
| qe, (), (), sync_io=(data, command, out)) |
| def _run_operator(): |
| data_schema = self.data.schema |
| st = qe.st if data_schema.is_string else None |
| collation = "binary" if st else None |
| COLUMN_AGG_BY_DOMAIN[self.command.schema.domain]( |
| data, |
| command, |
| out, |
| data_schema.dtype, |
| self.aggregation, |
| qe, |
| st, |
| collation, |
| ) |
| await sync_qe.async_run(_run_operator) |
| |
| class SpanPivot(AutoMultiQuery): |
| """Group spans by their partitions""" |
| |
| class Meta(AutoMultiQuery.AutoMeta): |
| """Metadata selector for span group""" |
| TIMESTAMP = TS_SCHEMA |
| DURATION = DURATION_SCHEMA |
| COMMAND = lambda config: COMMAND_STREAM_SPAN_PIVOT_SCHEMA |
| OUTPUT_PARTITION = lambda config: config.output_partition_schema |
| |
| grouped = iattr(SpanTableConfig, kwonly=True) |
| output_partition = iattr(QueryNode, nullable=True, kwonly=True) |
| min_npartitions = iattr_query_node_int(kwonly=True) |
| |
| @property |
| def output_partition_schema(self): |
| """The schema of the output partition column |
| |
| Error if we don't have an output partition |
| """ |
| return self.output_partition.schema.unconstrain() |
| |
| @override |
| def compute_outputs(self): |
| Meta = self.Meta |
| outputs = [ |
| self.metaq(Meta.TIMESTAMP), |
| self.metaq(Meta.DURATION), |
| self.metaq(Meta.COMMAND), |
| ] |
| if self.output_partition: |
| outputs.append(self.metaq(Meta.OUTPUT_PARTITION)) |
| return outputs |
| |
| @override |
| def compute_inputs(self): |
| inputs = self.grouped.inputs + (self.min_npartitions,) |
| if self.output_partition: |
| inputs += (self.output_partition,) |
| return inputs |
| |
| def dataq(self, aggregation, data, distinct): |
| """Create a data query for this span group operation. |
| |
| AGGREGATION is the name of an aggregation to apply. DATA is a |
| QueryNode to aggregate. DISTINCT is whether the aggregation is |
| limited to distinct values. |
| """ |
| return NativeSpanAggregationQuery( |
| aggregation=aggregation, |
| data=data, |
| distinct=distinct, |
| command=self.metaq(self.Meta.COMMAND), |
| ) |
| |
| @override |
| async def run_async(self, qe): |
| Meta = self.Meta |
| grouped = self.grouped |
| assert grouped.partition |
| output_partition = self.output_partition |
| grouped_input = SynchronousInput( |
| grouped.ts, |
| grouped.duration, |
| SynchronousInput.Column( |
| grouped.partition, |
| partition=True, |
| ), |
| (0 if not output_partition |
| else SynchronousInput.Column(output_partition, partition=True,)), |
| ) |
| command_output = SynchronousOutput(self.metaq(Meta.COMMAND)) |
| partition_output_q = (self.metaq(Meta.OUTPUT_PARTITION) |
| if output_partition else None) |
| metadata_output = SynchronousOutput( |
| self.metaq(Meta.TIMESTAMP), |
| self.metaq(Meta.DURATION), |
| partition_output_q, |
| partition=partition_output_q) |
| |
| sync_qe, [ic_min_npartitions], _ = \ |
| await SynchronousQueryExecution.async_setup( |
| qe, [self.min_npartitions], (), |
| sync_io=(grouped_input, command_output, metadata_output)) |
| min_npartitions = await ic_min_npartitions.read_int() |
| |
| if not output_partition: |
| def metadata_output(ts_array, duration_array, op_array, # pylint: disable=function-redefined |
| og=metadata_output): |
| # Throw away the dummy partition array |
| assert not op_array.any() # Should be all zero in dummy case |
| return og(ts_array, duration_array) |
| |
| def _run_operator(): |
| span_pivot( |
| grouped_input, |
| metadata_output, |
| command_output, |
| min_npartitions, |
| qe) |
| await sync_qe.async_run(_run_operator) |
| |
| class SpanGroup(AutoMultiQuery): |
| """Group spans or events by spans""" |
| |
| class Meta(AutoMultiQuery.AutoMeta): |
| """Metadata selector for span group""" |
| TIMESTAMP = TS_SCHEMA |
| DURATION = DURATION_SCHEMA |
| COMMAND = COMMAND_STREAM_SPAN_GROUP_SCHEMA |
| PARTITION = lambda config: config.partition_schema |
| |
| grouped = iattr((SpanTableConfig, EventTableConfig), kwonly=True) |
| grouper = iattr(SpanTableConfig, kwonly=True) |
| unique_pvals = iattr(QueryNode, nullable=True, kwonly=True) |
| intersect = iattr(bool, kwonly=True) |
| |
| @override |
| def _post_init_assert(self): |
| super()._post_init_assert() |
| is_broadcast_union = bool((not self.intersect) and |
| self.grouped.partition and |
| (not self.grouper.partition)) |
| assert bool(self.unique_pvals) == is_broadcast_union, \ |
| "unique_pvals should be supplied iff broadcast union " \ |
| "up:{!r} ib:{!r} is:{!r} p:{!r} gp:{!r}" \ |
| .format(self.unique_pvals, |
| is_broadcast_union, |
| self.intersect, |
| self.grouped.partition, |
| self.grouper.partition) |
| |
| @cached_property |
| def is_output_partitioned(self): |
| """True iff the output is partitioned""" |
| return self.grouped.partition or self.grouper.partition |
| |
| @property |
| def partition_schema(self): |
| """The schema of the result partition""" |
| return self.is_output_partitioned.schema.unconstrain() |
| |
| @override |
| def compute_outputs(self): |
| Meta = self.Meta |
| outputs = [ |
| self.metaq(Meta.TIMESTAMP), |
| self.metaq(Meta.DURATION), |
| self.metaq(Meta.COMMAND), |
| ] |
| if self.is_output_partitioned: |
| outputs.append(self.metaq(Meta.PARTITION)) |
| return outputs |
| |
| @override |
| def compute_inputs(self): |
| inputs = self.grouped.inputs + self.grouper.inputs |
| if self.unique_pvals: |
| inputs += (self.unique_pvals,) |
| return inputs |
| |
| @override |
| async def run_async(self, qe): |
| Meta = self.Meta |
| grouped_input = self.grouped.make_synchronous_input( |
| fill_partition=True) |
| grouper_input = self.grouper.make_synchronous_input( |
| fill_partition=True) |
| output_partition = self.grouped.partition or self.grouper.partition |
| partition_output_q = \ |
| self.metaq(Meta.PARTITION) if output_partition else None |
| metadata_output = SynchronousOutput(self.metaq(Meta.TIMESTAMP), |
| self.metaq(Meta.DURATION), |
| partition_output_q, |
| partition=partition_output_q) |
| command_output = SynchronousOutput(self.metaq(Meta.COMMAND)) |
| inps = (self.unique_pvals,) if self.unique_pvals else () |
| sync_qe, ics, _ = await SynchronousQueryExecution.async_setup( |
| qe, inps, (), sync_io=( |
| grouped_input, |
| grouper_input, |
| metadata_output, |
| command_output, |
| )) |
| |
| if self.unique_pvals: |
| unique_pvals = (await ics.pop().read_all()).as_array() |
| else: |
| unique_pvals = None |
| assert not ics |
| |
| def _run_operator(): |
| span_group( |
| unique_pvals, |
| grouped_input, |
| grouper_input, |
| metadata_output, |
| command_output, |
| qe, |
| bool(self.grouper.partition), |
| bool(self.grouped.partition), |
| isinstance(self.grouped, SpanTableConfig), |
| not self.intersect) |
| await sync_qe.async_run(_run_operator) |
| |
| def dataq(self, aggregation, data, distinct): |
| """Create a data query for this span group operation. |
| |
| AGGREGATION is the name of an aggregation to apply. DATA is a |
| QueryNode to aggregate. DISTINCT is whether the aggregation is |
| limited to distinct values. |
| """ |
| return NativeSpanAggregationQuery( |
| aggregation=aggregation, |
| data=data, |
| distinct=distinct, |
| command=self.metaq(self.Meta.COMMAND), |
| ) |
| |
| |
| # Event operations |
| |
| class EventJoin(AutoMultiQuery): |
| """Merging events with spans |
| |
| (Joining two event tables is just interleaving and can be done with |
| normal SQL operators.) |
| """ |
| |
| event = iattr(EventTableConfig) |
| span = iattr(SpanTableConfig) |
| span_is_required = iattr(bool) |
| |
| class Meta(AutoMultiQuery.AutoMeta): |
| """Metadata for event join""" |
| TIMESTAMP = TS_SCHEMA |
| PARTITION = lambda config: config.partition_schema |
| EVENT_INDEX = INDEXER_SCHEMA |
| SPAN_INDEX = INDEXER_SCHEMA |
| |
| @cached_property |
| def partition_schema(self): |
| """Schema of the output partition""" |
| event_partition = self.event.partition |
| span_partition = self.span.partition |
| if event_partition and span_partition: |
| return QuerySchema.concat( |
| (event_partition.schema, |
| span_partition.schema)).unconstrain() |
| assert event_partition or span_partition |
| return (event_partition or span_partition).schema.unconstrain() |
| |
| @override |
| def compute_inputs(self): |
| return self.event.inputs + self.span.inputs |
| |
| @property |
| def _has_partitioned_output(self): |
| return self.event.partition or self.span.partition |
| |
| @override |
| def compute_outputs(self): |
| Meta = self.Meta |
| outputs = (( |
| (self.metaq(Meta.TIMESTAMP)), |
| (self.metaq(Meta.EVENT_INDEX)), |
| (self.metaq(Meta.SPAN_INDEX)), |
| )) + (() if not self._has_partitioned_output else ( |
| (self.metaq(Meta.PARTITION)), |
| )) |
| return outputs |
| |
| @override |
| async def run_async(self, qe): |
| Meta = self.Meta |
| event_input = self.event.make_synchronous_input(fill_partition=True) |
| span_input = self.span.make_synchronous_input(fill_partition=True) |
| partition_output_q = (self.metaq(Meta.PARTITION) |
| if self._has_partitioned_output else None) |
| metadata_output = SynchronousOutput( |
| self.metaq(Meta.TIMESTAMP), |
| partition_output_q, |
| partition=partition_output_q) |
| index_output = SynchronousOutput( |
| self.metaq(Meta.EVENT_INDEX), |
| self.metaq(Meta.SPAN_INDEX)) |
| |
| sync_qe, _, _ = await SynchronousQueryExecution.async_setup( |
| qe, (), (), sync_io=(event_input, span_input, |
| metadata_output, index_output)) |
| def _run_operator(): |
| merge_spans_into_events( |
| event_input, |
| span_input, |
| metadata_output, |
| index_output, |
| qe, |
| bool(self.event.partition), |
| bool(self.span.partition), |
| self.span_is_required) |
| await sync_qe.async_run(_run_operator) |
| |
| |
| # Span growth |
| |
| class SpanFixup(AutoMultiQuery): |
| """Query that fixes up "bad" span tables""" |
| |
| # The input isn't necessarily a span table --- the whole point of |
| # this operator is to finagle an invalid span-table-like thing into |
| # a span table --- so we don't want a SpanTableConfig as input. |
| # But we do want the individual values we're aggregating to be |
| # span-table-compatible, and we check that upon initialization.a |
| |
| ts = TS_SCHEMA.qn_iattr() |
| duration = DURATION_SCHEMA.qn_iattr() |
| partition = iattr(QueryNode, nullable=True) |
| |
| class Meta(AutoMultiQuery.AutoMeta): |
| """Data for span fixup queries""" |
| TS = TS_SCHEMA |
| DURATION = DURATION_SCHEMA |
| PARTITION = lambda config: config.partition.schema.unconstrain() |
| |
| @override |
| def compute_inputs(self): |
| inputs = [self.ts, self.duration] |
| if self.partition: |
| inputs.append(self.partition) |
| return inputs |
| |
| @override |
| def compute_outputs(self): |
| Meta = self.Meta |
| outputs = [self.metaq(Meta.TS), self.metaq(Meta.DURATION)] |
| if self.partition: |
| outputs.append(self.metaq(Meta.PARTITION)) |
| return outputs |
| |
| @override |
| async def run_async(self, qe): |
| # TODO(dancol): simplify the operator by providing a partition |
| # unconditionally and just broadcasting zero if we don't have a |
| # "real" partition. |
| Meta = self.Meta |
| if self.partition: |
| span_input = SynchronousInput( |
| self.ts, |
| self.duration, |
| SynchronousInput.Column( |
| self.partition, |
| partition=True)) |
| else: |
| span_input = SynchronousInput(self.ts, self.duration, 0) |
| partition_output_q = None |
| output_queries = [self.metaq(Meta.TS), self.metaq(Meta.DURATION)] |
| if self.partition: |
| partition_output_q = self.metaq(Meta.PARTITION) |
| output_queries.append(partition_output_q) |
| span_output = SynchronousOutput(*output_queries, |
| partition=partition_output_q) |
| |
| sync_qe, _, _ = await SynchronousQueryExecution.async_setup( |
| qe, (), (), sync_io=(span_input, span_output)) |
| def _run_operator(): |
| span_fixup(span_input, |
| span_output, |
| qe, |
| bool(self.partition)) |
| await sync_qe.async_run(_run_operator) |
| |
| |
| # Backfill |
| |
| class Backfill(AutoMultiQuery): |
| """Automatically fill missing data in spans""" |
| |
| class Meta(AutoMultiQuery.AutoMeta): |
| """Metadata output for backfill operation""" |
| VALUE = lambda config: config.value.schema |
| |
| span_meta = iattr(SpanTableConfig) |
| value = iattr(QueryNode) |
| |
| @override |
| def compute_inputs(self): |
| return self.span_meta.inputs + (self.value,) |
| |
| @override |
| def compute_outputs(self): |
| return [self.metaq(meta) for meta in self.Meta] |
| |
| @override |
| async def run_async(self, qe): |
| Meta = self.Meta |
| meta_input = self.span_meta.make_synchronous_input( |
| fill_partition=True, |
| extra=[ |
| SynchronousInput.Column(self.value, masked=True)]) |
| out = SynchronousOutput(self.metaq(Meta.VALUE), |
| masked=True) |
| sync_qe, _, _ = await SynchronousQueryExecution.async_setup( |
| qe, (), (), sync_io=[meta_input, out]) |
| def _run_operator(): |
| backfill(meta_input, |
| out, |
| qe, |
| self.value.schema.dtype) |
| await sync_qe.async_run(_run_operator) |