blob: 7c488248503975a505a9371c61300f40a3f3de77 [file] [log] [blame]
# Copyright (C) 2020 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Query operator implementations
This file provides non-core query engine operators, e.g., some of the
complicated span operations and associated data manipulation
facilities. If something feels like basic SQL, it probably goes in
query.py.
"""
import logging
from functools import partial
import numpy as np
from numpy import ndarray
from numpy.ma import masked_array, nomask
from cytoolz import (
cons,
)
from modernmp.util import the, assert_seq_type
from .util import (
AutoNumber,
BOOL,
EqImmutable,
FLOAT64,
INT64,
UINT64,
abstract,
all_same,
cached_property,
consume_list,
enumattr,
final,
iattr,
is_nondecreasing,
load_pandas,
make_pd_dataframe,
override,
sattr,
tattr,
)
from .query import (
C_UNIQUE,
DTYPE_KINDS,
DURATION_SCHEMA,
GenericQueryTable,
INDEXER_SCHEMA,
InvalidQueryException,
QueryAction,
QueryNode,
QuerySchema,
QueryTable,
SPAN_UNPARTITIONED_TIME_MAJOR,
SimpleQueryNode,
TS_SCHEMA,
TableKind,
TableSchema,
TableSorting,
check_same_lengths,
iattr_query_node_int,
)
from ._native import (
DTYPE_COMMAND_STREAM,
TSTS_CLOSE,
TSTS_CLOSE_BROADCAST,
TSTS_OPEN,
agg,
backfill,
get_supported_native_aggregations,
merge_spans_into_events,
native_aggregation_info,
native_aggregation_result_dtype,
npy_broadcast_to,
npy_explode,
npy_get_mask,
npy_has_mask,
npy_make_rw_view,
span_fixup,
span_group,
span_group_column,
span_merge_join,
span_pivot,
span_pivot_column,
stackify_start_stop_qqqq,
time_series_to_span,
)
from .querythread import(
SynchronousInput,
SynchronousOutput,
SynchronousQueryExecution,
)
log = logging.getLogger(__name__)
class AutoMultiQueryBaseAction(QueryAction):
"""Base class for auto-generated MIMO QueryAction classes"""
# Precise type asserted in subclass, manual or automatic
config = iattr(kwonly=True)
@override
def _compute_outputs(self):
return self.config.compute_outputs()
@override
def _compute_inputs(self):
return self.config.compute_inputs()
@override
async def run_async(self, qe):
if type(self.config).run_async is not AutoMultiQuery.run_async:
await self.config.run_async(qe)
else:
await super().run_async(qe)
class AutoMultiQueryMeta(type(EqImmutable)):
"""Metaclass for AutoMultiQuery"""
def __new__(mcs, cls_name, bases, dict_):
# pylint: disable=missing-docstring
cls = super().__new__(mcs, cls_name, bases, dict_)
if cls_name == "AutoMultiQuery":
return cls
Meta = cls.Meta
assert issubclass(Meta, AutoMultiQuery.AutoMeta), \
("AutoMultiQuery Meta inner class must be subclass of "
"AutoMultiQuery.AutoMeta: found {}".format(Meta))
meta_schema = Meta._meta_schema # pylint: disable=protected-access
Action = getattr(cls, "Action", None)
if Action:
assert issubclass(Action, AutoMultiQueryBaseAction)
else:
class Action(AutoMultiQueryBaseAction): # pylint: disable=function-redefined
@override
def _post_init_assert(self):
super()._post_init_assert()
assert isinstance(self.config, cls)
Action.__qualname__ = cls.__qualname__ + "." + Action.__name__
cls.Action = Action
class MetaQuery(QueryNode):
config = iattr(cls)
meta = enumattr(cls.Meta)
@override
def _compute_schema(self):
return meta_schema[self.meta](self.config)
@override
def countq(self):
return self.config.countq_for_meta(self) or super().countq()
@override
def make_action(self):
return Action(config=self.config)
MetaQuery.__qualname__ = cls.__qualname__ + "." + MetaQuery.__name__
cls.MetaQuery = MetaQuery
return cls
class AutoMultiQueryMetaMeta(type(AutoNumber)):
"""Schematize meta queries from AutoMultiQuery"""
def __new__(mcs, cls_name, bases, dict_):
if cls_name == "AutoMeta":
return super().__new__(mcs, cls_name, bases, dict_)
assert bases == (AutoMultiQuery.AutoMeta,)
def _make_schema_generator(value):
assert callable(value) or isinstance(value, QuerySchema), \
("AutoMultiQuery metadata value should be schema or "
"lambda yielding a schema: found {!r}".format(value))
return value if callable(value) else lambda _: value
schemas = []
for name in tuple(dict_):
if not name.startswith("__"):
schemas.append((name, _make_schema_generator(dict_[name])))
dict_[name] = ()
cls = super().__new__(mcs, cls_name, bases, dict_)
# pylint: disable=protected-access
cls._meta_schema = {
getattr(cls, name): schema
for name, schema in schemas
}
return cls
class AutoMultiQuery(EqImmutable, metaclass=AutoMultiQueryMeta):
"""Automatically define QueryNode boilerplate
This class is useful for defining a family of related *Config,
*Meta, *MetaQuery, *Action classes that together describe a
multi-input, multi-output query. Previously, we just wrote all
these classes by hand, but since they've come to obey a consistent
pattern, we generate them instead.
To use AutoMultiQuery, derive a class from this one. The class will
become the query family "config" object.
"""
class AutoMeta(AutoNumber, metaclass=AutoMultiQueryMetaMeta):
"""Meta class for AutoMultiQueryMetaMeta"""
@abstract
def compute_inputs(self):
"""Compute action inputs"""
raise NotImplementedError("abstract")
@abstract
def compute_outputs(self):
"""Compute action outputs"""
raise NotImplementedError("abstract")
@abstract
async def run_async(self, qe):
"""Execute the query asynchronously"""
raise NotImplementedError("abstract")
def metaq(self, meta):
"""Create a query object for a metadatum"""
# pylint: disable=no-member
return self.MetaQuery(self, meta)
def countq_for_meta(self, meta): # pylint: disable=unused-argument,no-self-use
"""Count query for a meta query
Return None to use the default"""
return None
def make_query_table(self):
"""Make a query table for the metadata of this configuration.
The column names are the lowercase versions of the values in the
metadata enumeration. This conversion is appropriate for most
AutoMultiQuery users.
"""
# pylint: disable=no-member
return GenericQueryTable([
(meta_name.lower(), self.metaq(meta))
for meta_name, meta in self.Meta.__members__.items()
])
@final
class SpanTableConfig(EqImmutable):
"""Span table query bundle"""
# The reason we have a SpanTableConfig object instead of just
# accepting a QueryTable where we want a span table is that
# SpanTableConfig is conceptually a query bundle that's immutable
# and comparable, whereas a QueryTable isn't meant to be long-lived
# or comparable.
ts = TS_SCHEMA.qn_iattr()
duration = DURATION_SCHEMA.qn_iattr()
partition = iattr(QueryNode, nullable=True, default=None)
@staticmethod
def from_qt(qt):
"""Build from a QueryTable
The resulting SpanTableConfig contains references to specific
QueryNode instances from QT and not QT itself.
"""
table_schema = qt.table_schema
if table_schema.kind != TableKind.SPAN:
raise InvalidQueryException("table is not a span table")
if table_schema.partition:
return SpanTableConfig(
qt["_ts"],
qt["_duration"],
qt[table_schema.partition])
return SpanTableConfig(qt["_ts"], qt["_duration"])
@property
def inputs(self):
"""Generate query inputs"""
if self.partition:
return (self.ts,
self.duration,
self.partition)
return (self.ts,
self.duration)
def make_synchronous_input(self,
fill_partition=False,
extra=()):
"""Make a synchronous input object"""
# We need to know the fill column so that we can apply special
# relaxed casting rules to it.
if self.partition:
return SynchronousInput(
self.ts,
self.duration,
SynchronousInput.Column(
self.partition,
partition=True),
*extra)
if fill_partition:
return SynchronousInput(self.ts, self.duration, 0, *extra)
return SynchronousInput(self.ts, self.duration, *extra)
class EventTableConfig(EqImmutable):
"""Event table query bundle"""
ts = TS_SCHEMA.qn_iattr()
partition = iattr(QueryNode, nullable=True, default=None)
@property
def inputs(self):
"""Generate query inputs"""
if self.partition:
return (self.ts, self.partition)
return (self.ts,)
def make_synchronous_input(self, fill_partition=False):
"""Like inputs, but substitute missing partition with zero"""
if self.partition:
return SynchronousInput(
self.ts,
SynchronousInput.Column(
self.partition,
partition=True),
)
if fill_partition:
return SynchronousInput(self.ts, 0)
return SynchronousInput(self.ts)
# Grouping
class GroupMeta(AutoNumber):
"""Metadata needed for a group-by operation"""
LABELS = ()
GROUP_IDS = ()
NGROUPS = ()
@final
class PandasGroupMetaQuery(QueryNode):
"""Query that yields the factorized labels assigned to groups"""
group_by = tattr(QueryNode)
meta = enumattr(GroupMeta)
@override
def _compute_schema(self):
# TODO(dancol): do we ever want to apply constraints here?
domain = None if self.meta is GroupMeta.NGROUPS else "group_meta"
return QuerySchema(INT64, domain=domain)
@override
def make_action(self):
return PandasGroupMetaAction(self.group_by)
def _make_grouper(arrays):
# Can modify arrays list destructively, but not the array data
def _fix_array(array):
# This whole function shouldn't be necessary. We should write our
# own grouping factorization stuff that works on any dtype.
dtype = None
if array.dtype == UINT64:
dtype = INT64 # Ugh
return npy_make_rw_view(array, dtype)
arrays = [_fix_array(a) for a in consume_list(arrays)]
pd = load_pandas()
Grouping = pd.core.groupby.groupby.Grouping
group_axis = pd.RangeIndex(len(arrays[0]))
return pd.core.groupby.groupby.BaseGrouper(
group_axis,
[Grouping(group_axis, array, sort=False) for array in arrays],
sort=False)
def expand_nullable(arrays):
"""Potentially expand ARRAYS with nullability information.
Return (EXPANDED_ARRAYS, IS_EXPANDED). EXPANDED_ARRAYS is a
sequence of arrays with one or two arrays for each array in ARRAYS.
In the latter case, the original array is said to be "expanded", and
the extra entry in EXPANDED_ARRAYS is an int64 column that's nonzero
if the corresponding array is masked. IS_EXPANDED is a sequence of
booleans equal in lengh to ARRAYS indicating, for each original
array in ARRAYS, whether we supplied one or two entries in
EXPANDED_ARRAYS.
N.B. ARRAYS, to be specific, must be a generic iterable that yields
ndarray or masked_array instances. We iterate it only once; it may
be a generator that frees resources as it produces values.
"""
expanded_arrays = []
is_expanded = []
for array in arrays:
array, mask = npy_explode(array)
# TODO(dancol): do we need this mask.any()? ISTM adding an extra
# exploded column is expensive enough that we should try
# extra-hard to avoid it, but this check isn't necessary
# for correctness.
has_nulls = mask is not nomask and mask.any()
is_expanded.append(has_nulls)
if has_nulls:
assert not array[mask].any(), \
"values under a mask should be zero"
expanded_arrays.append(array)
# Ugh: if you try to group-by a bool array, Pandas uses the
# group-by-_Object_ code, which is slow. Instead, convert the
# mask to int64; we'll convert it back to bool in
# _package_group_column below.
assert mask.dtype is BOOL
expanded_arrays.append(mask.astype(INT64))
else:
expanded_arrays.append(array)
return expanded_arrays, is_expanded
@final
class PandasGroupMetaAction(QueryAction):
"""Action that sucks grouping metadata from Pandas"""
# N.B. We need this separate QueryAction class (as opposed to just
# making PandasGroupMetaQuery a SimpleQueryAction) because this one
# action produces _all_ the group metadata at once (which is the
# only efficient way to do it), and a SimpleQueryNode can't have
# multiple outputs.
group_by = tattr(QueryNode)
def __metaq(self, meta):
return PandasGroupMetaQuery(self.group_by, meta)
def __labels(self, column):
return GroupLabelsQuery(self.group_by, column)
@override
def _compute_inputs(self):
return self.group_by
@override
def _compute_outputs(self):
# Comprehension ensures we fail if we add new group members
return ([self.__metaq(meta) for meta in GroupMeta] +
[self.__labels(column) for column in self.group_by])
@staticmethod
def __compute_groups(column_arrays, column_expanded, dtypes):
grouper = _make_grouper(column_arrays)
assert type(grouper).__name__ == "BaseGrouper"
# pylint: disable=unpacking-non-sequence
comp_ids, group_ids, ngroups = grouper.group_info
result_index = grouper.result_index
assert len(group_ids) == ngroups
del grouper
# Pandas converts to platform dtype internally, so use it
# ourselves to avoid a conversion on query. Compression should
# take care of extra padding.
# TODO(dancol): we can't move to our own grouping soon enough
assert np.dtype(np.intp) is INT64
comp_ids = np.asarray(comp_ids, INT64)
meta_out = [comp_ids, group_ids, np.asarray([ngroups])]
assert len(meta_out) == len(tuple(GroupMeta))
idx = 0
def _fix_group_values_type(group_values, dtype):
if group_values.dtype is dtype:
return group_values
if dtype is UINT64 and group_values.dtype is INT64:
return group_values.view(UINT64)
casting = "same_kind"
if dtype is BOOL and group_values.dtype is INT64:
casting = "unsafe"
return group_values.astype(dtype, copy=False, casting=casting)
def _package_group_column(is_expanded, dtype):
nonlocal idx
group_values = result_index.get_level_values(idx).values # pylint: disable=no-member
assert len(group_values) == ngroups
group_values = _fix_group_values_type(group_values, dtype)
idx += 1
if is_expanded:
# pylint: disable=no-member
mask = result_index.get_level_values(idx).values.astype(
BOOL, copy=False)
idx += 1
return masked_array(group_values, mask)
return group_values
label_out = [
_package_group_column(is_expanded, dtype)
for is_expanded, dtype in zip(column_expanded, dtypes)
]
return meta_out + label_out
@override
async def run_async(self, qe):
# Here, we delve into Pandas internals in order to save the
# internal grouper information, allowing us to save and restore it
# via the usual query caching system.
# The Pandas grouping code won't do NULLs right, since it
# understands NaN as NULL for floats and doesn't have a concept of
# an integer NULL at all. Instead of relying on Pandas NULL
# support, we just provide non-NULL values and model NULL as a
# separate grouping column. Here, we expand the group-by column
# list so that we substitute for each grouped column that might
# contain NULL values a non-NULL group (filling in a dummy value
# for the NULL rows) and an is-NULL flag.
# TODO(dancol): switch to our own grouping code
# TODO(dancol): stream-ify this more
group_by = self.group_by
assert all(isinstance(g, QueryNode) for g in group_by)
output_queries = [
self.__metaq(GroupMeta.LABELS),
self.__metaq(GroupMeta.GROUP_IDS),
self.__metaq(GroupMeta.NGROUPS),
] + [
self.__labels(column) for column in group_by
]
ics, ocs = await qe.async_setup(group_by, output_queries)
# TODO(dancol): add a "produce as available and in arbitrary
# order" option for QueryExecution.async_io()?
input_huge_blocks = await qe.async_io(*(ic.read_all() for ic in ics))
dtypes = [ic.dtype for ic in ics]
def _maybe_munge_bool(array):
# See the bool comment in expand_nullable
if array.dtype is BOOL:
array = array.astype(INT64)
return array
column_arrays, column_expanded = expand_nullable(
(_maybe_munge_bool(block.as_array()) for block in
consume_list(input_huge_blocks)))
assert not input_huge_blocks
outs = self.__compute_groups(column_arrays,
column_expanded,
dtypes)
assert not column_arrays, "should have been consumed"
assert len(ocs) == len(outs)
write_ops = [oc.write(out, True) for oc, out in zip(ocs, outs)]
outs.clear() # Make sure the output operations own memory exclusively
await qe.async_io(*write_ops)
WELL_KNOWN_AGGREGATIONS = frozenset(get_supported_native_aggregations())
class NativeAggregationQuery(SimpleQueryNode):
"""Perform an aggregation using DCTV C++ code"""
__abstract__ = True
aggregation = iattr(str, kwonly=True)
data = iattr(QueryNode, kwonly=True)
distinct = iattr(bool, default=False, kwonly=True)
@override
def _compute_inputs(self):
if self.schema.is_null:
return ()
return (self.data,)
@final
@override
def _compute_schema(self):
data_schema = self.data.schema
aggregation = self.aggregation
if data_schema.domain:
raise InvalidQueryException(
"aggregation {!r} does not support domain {!r}"
.format(aggregation, data_schema.domain))
acls, has_empty_value = \
native_aggregation_info(aggregation)
if data_schema.is_null and not has_empty_value:
return data_schema.propagate_constraints(C_UNIQUE)
dtype = native_aggregation_result_dtype(
aggregation, data_schema.dtype)
if acls == "=":
assert dtype == data_schema.dtype
return data_schema.propagate_constraints(C_UNIQUE)
if acls == "i":
return QuerySchema(dtype)
assert acls in "+*"
if data_schema.is_string:
raise InvalidQueryException(
"aggregation {!r} does not support strings"
.format(aggregation))
if not data_schema.unit or acls == "+":
return data_schema.evolve(
dtype=dtype, constraints=frozenset())
assert acls == "*"
raise InvalidQueryException(
"aggregation {!r} does not make sense with a value in unit "
"{!r}".format(aggregation, data_schema.unit))
@final
class NativeUngroupedAggregationQuery(NativeAggregationQuery):
"""Aggregate a whole sequence"""
@override
async def run_async(self, qe):
if self.schema.is_null:
# We know we don't have a non-NULL empty value because
# NativeAggregationQuery.compute_schema() checked for it.
# Therefore, we always emit a simple null here.
[], [oc] = await qe.async_setup((), (self,))
await oc.write(masked_array([0], [True], dtype=oc.dtype), True)
return
data = SynchronousInput(
SynchronousInput.Column(0),
SynchronousInput.Column(
self.data,
masked=True,
))
out = SynchronousOutput(self, masked=True)
sync_qe, (), () = await SynchronousQueryExecution.async_setup(
qe, (), (), sync_io=(data, out))
def _run_operator():
data_schema = self.data.schema
st = qe.st if data_schema.is_string else None
# TODO(dancol): support user-specified collations
collation = "binary" if st else None
if collation and self.aggregation == "biggest":
collation = "length"
agg(qe,
self.aggregation,
self.distinct,
data_schema.dtype,
1,
data,
out,
st,
collation)
await sync_qe.async_run(_run_operator)
@final
class NativeGroupedAggregationQuery(NativeAggregationQuery):
"""Group a column by another column and aggregation values"""
group_by = tattr(QueryNode, kwonly=True)
def __meta(self, meta):
return PandasGroupMetaQuery(self.group_by, meta)
@override
def _compute_inputs(self):
return super()._compute_inputs() + (
self.__meta(GroupMeta.LABELS),
self.__meta(GroupMeta.NGROUPS),
)
@override
async def run_async(self, qe):
if self.schema.is_null:
# We know we don't have a non-NULL empty value because
# NativeAggregationQuery.compute_schema() checked for it.
# Therefore, we always emit a simple null here.
[_ic_labels, ic_ngroups], [oc] = await qe.async_setup(
(self.__meta(GroupMeta.LABELS), self.__meta(GroupMeta.NGROUPS)),
(self,))
ngroups = await ic_ngroups.read_int()
await oc.write(npy_broadcast_to(
masked_array([0], [True], dtype=oc.dtype),
ngroups), True)
return
data = SynchronousInput(
self.__meta(GroupMeta.LABELS),
SynchronousInput.Column(self.data, masked=True))
out = SynchronousOutput(self, masked=True)
sync_qe, [ic_ngroups], () = \
await SynchronousQueryExecution.async_setup(
qe,
(self.__meta(GroupMeta.NGROUPS),),
(), sync_io=(data, out))
ngroups = await ic_ngroups.read_int()
def _run_operator():
data_schema = self.data.schema
st = qe.st if data_schema.is_string else None
# TODO(dancol): support user-specified collations
collation = "binary" if st else None
if collation and self.aggregation == "biggest":
collation = "length"
agg(qe,
self.aggregation,
self.distinct,
data_schema.dtype,
ngroups,
data,
out,
st,
collation)
await sync_qe.async_run(_run_operator)
@final
class GroupSizesQuery(SimpleQueryNode):
"""Query that returns the sizes of each group"""
group_by = tattr(QueryNode)
@override
def _compute_schema(self):
return QuerySchema(INT64)
def __meta(self, meta):
return PandasGroupMetaQuery(self.group_by, meta)
@override
def _compute_inputs(self):
return (
self.__meta(GroupMeta.LABELS),
self.__meta(GroupMeta.NGROUPS),
)
@override
def countq(self):
return self.__meta(GroupMeta.NGROUPS)
@override
async def run_async(self, qe):
[ic_labels, ic_ngroups], [oc] = await qe.async_setup(
(self.__meta(GroupMeta.LABELS),
self.__meta(GroupMeta.NGROUPS)),
(self,))
labels_block, ngroups = await qe.async_io(ic_labels.read_all(),
ic_ngroups.read_int())
assert not labels_block.has_mask
def _filter_labels():
labels = labels_block.as_array()
return labels[labels != -1]
group_sizes = np.bincount(_filter_labels(), minlength=ngroups or None)
out_io = oc.write(group_sizes, True)
del group_sizes
del labels_block
await out_io
def GroupCountQuery(group_by): # pylint: disable=invalid-name
"""Make a new group-count query.
GROUP_BY is the grouping column; see GroupMeta.
"""
return PandasGroupMetaQuery(group_by, GroupMeta.NGROUPS)
class GroupLabelsQuery(QueryNode):
"""Return the entries in a group-by operation"""
group_by = tattr(QueryNode)
column = iattr(QueryNode)
@override
def _post_init_assert(self):
super()._post_init_assert()
assert self.column in self.group_by
@override
def _compute_schema(self):
return self.column.schema.propagate_constraints(C_UNIQUE)
@override
def make_action(self):
return PandasGroupMetaAction(self.group_by)
# Time series to span conversion: time series
class TimeSeriesSourceRole(AutoNumber):
"""Kind of event source"""
START = ()
STOP = ()
STOP_BROADCAST = ()
class TimeSeriesEventEdge(AutoNumber):
"""Specifies the edge from which to extract event data"""
RISING = ()
FALLING = ()
@final
class TimeSeriesSource(EqImmutable):
"""Source description"""
ts = TS_SCHEMA.qn_iattr()
partition = iattr(QueryNode, nullable=True)
role = enumattr(TimeSeriesSourceRole)
priority = iattr(int)
@final
class TimeSeriesToSpan(AutoMultiQuery):
"""Analyze a time series as a set of spans
This QueryTable efficiently converts a set of discrete events into a
table of regular spans, with each row in the resulting span table
having a distinct start time and duration.
The input tables describe "events" that "arrive" at specific times
and that tell us when certain activities have started or stopped.
(These tables must be ordered by a "_ts" column.)
Events come in two kinds: "start" events and "stop" events. When a
span is not active and we begin a span, the event that starts the
span is said to be the rising edge of that span. The event that
terminates a span is the "falling" edge of that span. If we already
have a span active and see another "start" event, we terminate the
active span and start a new span in the same spot. The event that
prompts us to stop and start this way is _simultaneously_ the
falling-edge event for the span we close and the rising-edge event
for the span we start in its place.
These edges are important for determining where we get the values
making up the columns that this table provides. When you create a
TimeSeriesQueryTable, you specify which event source a
column's values comes from and which _edge_ supplies that value.
Partition is another important concept. Sometimes spans start and
stop in different "domains" and don't interfere with each other
across domains. Consider system process scheduling: we can model the
activity of a CPU as a sequence of spans, but different CPUs act
independently. The value of the partition column divides events into
independent overlapping series.
"""
sources = tattr(TimeSeriesSource)
class Meta(AutoMultiQuery.AutoMeta):
"""Metadata for time series spans"""
TS = TS_SCHEMA
DURATION = DURATION_SCHEMA
PARTITION = lambda config: config.partition_schema
@cached_property
def partition_schema(self):
"""The schema of the partition column"""
return QuerySchema.concat(
(source.partition.schema.unconstrain()
for source in self.sources
if source.partition))
@final
class RawIndexerQuery(QueryNode):
"""Unsorted indexer queries: not directly usable!"""
config = iattr()
source_index = iattr(int)
edge = enumattr(TimeSeriesEventEdge)
@override
def _post_init_assert(self):
super()._post_init_assert()
assert isinstance(self.config, TimeSeriesToSpan)
assert 0 <= self.source_index < len(self.config.sources)
@override
def _compute_schema(self):
return INDEXER_SCHEMA
@override
def make_action(self):
# pylint: disable=no-member
return TimeSeriesToSpan.Action(config=self.config)
def raw_indexerq(self, source_index, edge):
"""Make a query yielding an unsorted take indexer"""
return TimeSeriesToSpan.RawIndexerQuery(self, source_index, edge)
SOURCE_KINDS = {
TimeSeriesSourceRole.START: TSTS_OPEN,
TimeSeriesSourceRole.STOP: TSTS_CLOSE,
TimeSeriesSourceRole.STOP_BROADCAST: TSTS_CLOSE_BROADCAST,
}
@override
def compute_inputs(self):
inputs = []
for source in self.sources:
inputs.append(source.ts)
if source.partition:
inputs.append(source.partition)
return inputs
@override
def compute_outputs(self):
Meta = self.Meta
outputs = [
self.metaq(Meta.TS),
self.metaq(Meta.DURATION),
]
any_has_partition = False
for source_index, source in enumerate(self.sources):
for edge in TimeSeriesEventEdge:
outputs.append(self.raw_indexerq(source_index, edge))
any_has_partition = any_has_partition or source.partition
if any_has_partition:
outputs.append(self.metaq(Meta.PARTITION))
return outputs
@override
async def run_async(self, qe):
# TODO(dancol): we should be able to make do with just one output
# iterator! We should just define a C++ specialization for some
# small maximum number of sources.
Meta = self.Meta
io_specs = []
source_specs = []
any_has_partition = False
def _handle_source(source_index, source):
nonlocal any_has_partition
# Local function avoids unwanted closure-capture
if source.partition:
source_input = SynchronousInput(
source.ts,
SynchronousInput.Column(
source.partition,
partition=True))
io_specs.append(source_input)
any_has_partition = True
else:
source_input = SynchronousInput(source.ts)
io_specs.append(source_input)
# Broadcast a zero in place of the partition
source_input = ((input_array, 0) for [input_array] in source_input)
source_output = SynchronousOutput(
self.raw_indexerq(source_index, TimeSeriesEventEdge.RISING),
self.raw_indexerq(source_index, TimeSeriesEventEdge.FALLING),
)
io_specs.append(source_output)
source_specs.append((
source_input,
source_output,
self.SOURCE_KINDS[source.role],
source.priority,
))
for source_index, source in enumerate(self.sources):
_handle_source(source_index, source)
partition_output_q = \
self.metaq(Meta.PARTITION) if any_has_partition else None
metadata_output = SynchronousOutput(
self.metaq(Meta.TS),
self.metaq(Meta.DURATION),
partition_output_q,
partition=partition_output_q)
io_specs.append(metadata_output)
if not any_has_partition:
def _metadata_output_wrapper(ts, duration, _partition, *,
mo=metadata_output):
mo(ts, duration)
metadata_output = _metadata_output_wrapper # pylint: disable=redefined-variable-type
sync_qe, (), () = await SynchronousQueryExecution.async_setup(
qe, (), (), sync_io=io_specs)
def _run_operator():
time_series_to_span(
source_specs,
metadata_output,
qe)
await sync_qe.async_run(_run_operator)
@final
class TimeSeriesQueryTable(QueryTable):
"""Convenient query table for a time series conversion"""
_config = iattr(TimeSeriesToSpan)
_columns = iattr(dict)
@override
def __new__(cls, *,
sources,
columns,
partition=None):
my_es = []
es_nicknames = {}
es_sources = []
has_partition = False
def _event_source_spec(source,
*,
role=TimeSeriesSourceRole.START,
priority=0,
nickname=None):
if not isinstance(source, QueryTable):
raise InvalidQueryException("source is not a table, but a {}: {!r}"
.format(type(source), source))
source_table_schema = source.table_schema
if source_table_schema.kind != TableKind.EVENT:
raise InvalidQueryException("invalid event source table type {}"
.format(source_table_schema))
source = source.to_schema(sorting=TableSorting.TIME_MAJOR)
source_table_schema = source.table_schema
partition = source_table_schema.partition
ts_q = source["_ts"]
partition_q = None if not partition else source[partition]
nonlocal has_partition
has_partition = has_partition or partition_q
if nickname:
es_nicknames[the(str, nickname)] = len(my_es)
es_sources.append(source)
my_es.append(TimeSeriesSource(ts_q,
partition_q,
TimeSeriesSourceRole(role),
priority))
for event_source_spec in sources:
if isinstance(event_source_spec, dict):
_event_source_spec(**event_source_spec)
elif isinstance(event_source_spec, (list, tuple)):
_event_source_spec(*event_source_spec)
else:
_event_source_spec(event_source_spec)
config = TimeSeriesToSpan(my_es)
if has_partition and not partition:
raise InvalidQueryException(
"partitioned event sources but output partition not specified")
if partition and not has_partition:
raise InvalidQueryException(
"output partition specified but sources are unpartitioned")
if has_partition \
and not all(source.partition for source in config.sources
if source.role != TimeSeriesSourceRole.STOP_BROADCAST):
raise InvalidQueryException(
"if one input is partitioned, they all must be, except for "
"STOP_BROADCAST sources")
# TODO(dancol): add a mode for built-in column coalescing
my_columns = {}
def _column_spec(column,
source=None,
source_column=None,
edge=TimeSeriesEventEdge.RISING):
if source is None:
if len(my_es) == 1:
source = 0
else:
raise InvalidQueryException("multiple sources: must specify")
source_column = source_column or column
if isinstance(source, str):
source_index = es_nicknames[source]
else:
source_index = source
source = es_sources[source_index]
assert isinstance(source, QueryTable)
if source_column not in source:
raise InvalidQueryException("column {!r} not in source {}"
.format(source_column, source))
assert source_column in source
my_columns[column] = (source_index,
source[source_column],
TimeSeriesEventEdge(edge))
for column_spec in columns:
if isinstance(column_spec, dict):
_column_spec(**column_spec)
elif isinstance(column_spec, (list, tuple)):
_column_spec(*column_spec)
else:
_column_spec(column_spec)
table_schema = (SPAN_UNPARTITIONED_TIME_MAJOR
if not partition else
TableSchema(TableKind.SPAN,
partition,
TableSorting.NONE))
return cls._do_new(cls,
_config=config,
_columns=my_columns,
table_schema=table_schema)
@override
def _make_column_tuple(self):
return self.table_schema.meta_columns + tuple(self._columns)
@override
def _make_column_query(self, column_name):
if column_name == "_ts":
return self._config.metaq(TimeSeriesToSpan.Meta.TS)
if column_name == "_duration":
return self._config.metaq(TimeSeriesToSpan.Meta.DURATION)
if column_name == self.table_schema.partition:
return self._config.metaq(TimeSeriesToSpan.Meta.PARTITION)
source_index, source_query, edge = self._columns[column_name]
return source_query.take(
self._config.raw_indexerq(source_index, edge))
# Time series to span conversion: hierarchy
STACK_ID_SCHEMA = QuerySchema(INT64, domain="stack_id")
STACK_DEPTH_SCHEMA = QuerySchema(INT64, domain="stack_depth")
class Stackify(AutoMultiQuery):
"""Turn time series into stacks"""
# TODO(dancol): just use NULL to indicate endings
# TODO(dancol): generate real span tables on output
ts = TS_SCHEMA.qn_iattr()
partition = iattr(QueryNode)
frame_id = iattr(QueryNode)
end_flag = iattr(QueryNode)
class Meta(AutoMultiQuery.AutoMeta):
"""Metadata selector"""
SH_TS = TS_SCHEMA
SH_STACK_ID = STACK_ID_SCHEMA
SH_PARTITION = lambda config: config.partition.schema
SD_STACK_ID = STACK_ID_SCHEMA
SD_DEPTH = STACK_DEPTH_SCHEMA
SD_FRAME_ID = lambda config: config.frame_id.schema
@override
def compute_inputs(self):
return (
self.ts,
self.partition,
self.frame_id,
self.end_flag,
)
@override
def compute_outputs(self):
return [self.metaq(meta) for meta in self.Meta]
@override
async def run_async(self, qe):
# TODO(dancol): make ordering of fields canonical
Meta = self.Meta
meta_input = SynchronousInput(self.partition,
self.ts,
self.frame_id,
self.end_flag)
history_output = SynchronousOutput(self.metaq(Meta.SH_TS),
self.metaq(Meta.SH_PARTITION),
self.metaq(Meta.SH_STACK_ID))
description_output = SynchronousOutput(self.metaq(Meta.SD_STACK_ID),
self.metaq(Meta.SD_DEPTH),
self.metaq(Meta.SD_FRAME_ID))
sync_qe, (), () = await SynchronousQueryExecution.async_setup(
qe, (), (), sync_io=(meta_input, history_output, description_output))
def _run_operator():
stackify_start_stop_qqqq(meta_input,
history_output,
description_output,
qe)
await sync_qe.async_run(_run_operator)
# Joins
JOIN_KINDS = ("inner", "left", "right", "outer")
class JoinMeta(AutoNumber):
"""Metadatum for a join operation"""
LEFT_INDEXER = ()
RIGHT_INDEXER = ()
@final
class JoinMetaQuery(QueryNode):
"""Query producing join metadata"""
# TODO(dancol): should the keys be sattr, not tattr? Does key
# ordering matter? It should be possible to have an sattr of pairs
# instead of two tattrs.
_left_key = tattr(QueryNode, name="left_key")
_right_key = tattr(QueryNode, name="right_key")
_null_allowed = tattr(bool, name="null_allowed")
_kind = iattr(str, name="kind")
_meta = enumattr(JoinMeta, name="meta")
@override
def _post_init_assert(self):
super()._post_init_assert()
assert len(self._left_key) == len(self._right_key)
assert self._kind in JOIN_KINDS
@override
def _compute_schema(self):
return INDEXER_SCHEMA
@override
def make_action(self):
return JoinMetaAction(self._left_key, self._right_key,
self._null_allowed, self._kind)
_INT64_FOR_FACTORIZE = {
"b": lambda array: array.astype(INT64, copy=False),
"i": lambda array: array.astype(INT64, copy=False),
"u": lambda array: array.astype(UINT64, copy=False).view(INT64),
"f": lambda array: array.astype(FLOAT64).view(INT64),
}
def _int64_for_factorize(array):
"""Transform ARRAY into an INT64 array for factorization"""
assert type(array) is ndarray, ( # pylint: disable=unidiomatic-typecheck
"array {!r} has type {!r} but we want base ndarray".format(
array, type(array)))
# We care only about the bitwise identity of the keys, so just
# interpret any value of the appropriate size to int64.
array = _INT64_FOR_FACTORIZE[array.dtype.kind](array)
assert array.dtype is INT64
return array
def _factorize_keys(pd, lk, rk):
# This function is a simplified version of the Pandas original.
# The gist of it is to take arbitrary keys in LK and RK and "intern"
# them in a hash table, converting each to a small integer group
# number in that hash table. (Equal keys get interned into the same
# group number.) This way, for subsequent passes, we can just index
# directly into a table instead of having to do hash lookups at each
# step. (We effectively front-load the hash lookups.)
lk, lmask = npy_explode(lk)
lk = _int64_for_factorize(lk)
rk, rmask = npy_explode(rk)
rk = _int64_for_factorize(rk)
# TODO(dancol): use our own hash table instead of Pandas's. We can
# directly support all data types instead of needing
# _int64_for_factorize above.
klass = pd._libs.hashtable.Int64Factorizer # pylint: disable=protected-access
rizer = klass(max(len(lk), len(rk)))
llab = rizer.factorize(lk)
rlab = rizer.factorize(rk)
count = rizer.get_count()
# Make a brand-new label for the masked value.
if lmask.any() or rmask.any():
llab[lmask] = count
rlab[rmask] = count
count += 1
return llab, rlab, count
def _get_join_keys(pd, llab, rlab, shape):
# This function is a simplified version of the Pandas original, with
# more elaborate comments. The gist of the function is to condense
# a potentially large space of keys into a smaller space that fits
# in an int64; for the common case where we don't have enough keys
# to cause an int64 overflow, we don't re-hash.
assert all(v.dtype is INT64 for v in llab)
assert all(v.dtype is INT64 for v in rlab)
is_int64_overflow_possible = pd.core.sorting.is_int64_overflow_possible
# Find the maximum number of levels we can process as a chunk
# without overflow. If we have more than this many levels, we take
# the levels we've built by "stretching" and condense them to dense
# labels, then repeat the process.
pred = lambda i: not is_int64_overflow_possible(shape[:i])
nlev = next(filter(pred, range(len(shape), 0, -1)))
# Get keys for the first `nlev` levels, effectively "stretching"
# each level to make room for the values from the next level to fit
# in the spaces between the stretched values.
stride = np.prod(shape[1:nlev], dtype=INT64)
# Invoke np.multiply explicitly (with explicit dtype) because the
# infix multiply operator (or dtype-less np.multiply) produces
# "q"-dtype outputs from big "l"-type inputs. "q"-dtype results are
# bitwise-equivalent to "l"-dtype ones (on 64-bit machines) but have
# the wrong dtype, triggering assertions later.
lkey = np.multiply(stride, llab[0], dtype=INT64)
rkey = np.multiply(stride, rlab[0], dtype=INT64)
for i in range(1, nlev):
with np.errstate(divide="ignore"):
stride //= shape[i]
lkey += llab[i] * stride
rkey += rlab[i] * stride
if nlev == len(shape): # all done!
return lkey, rkey
# Densify current keys to avoid overflow.
lkey, rkey, count = _factorize_keys(pd, lkey, rkey)
llab = [lkey] + llab[nlev:]
rlab = [rkey] + rlab[nlev:]
shape = [count] + shape[nlev:]
return _get_join_keys(pd, llab, rlab, shape)
def _remove_nulls_from_join_keys(keys, null_allowed):
assert all_same(map(len, keys))
assert len(keys) == len(null_allowed)
if not any((npy_has_mask(k) and not null_allowed)
for k, null_allowed
in zip(keys, null_allowed)):
return keys, None
# Use np.concatenate directly because np.stack makes useless
# internal copies (because it calls np.asanyarray).
n = len(keys[0])
column_shape = (n, 1)
any_mask = np.logical_or.reduce( # pylint: disable=no-member
np.concatenate(
[npy_get_mask(k).reshape(column_shape)
for k, null_allowed in zip(keys, null_allowed)
if not null_allowed and npy_has_mask(k)] or ((False,),),
axis=1),
axis=1)
if not any_mask.any():
return keys, None
keep_mask = np.logical_not(any_mask, out=any_mask)
munged_keys = [k[keep_mask] for k in keys]
remap = np.arange(n)[keep_mask]
return munged_keys, remap
def _get_join_indexers(pd, left_keys, right_keys, null_allowed, kind):
# This function is a simplified version of the Pandas original.
# It actually computes the join. This version also understands
# DCTV masks.
assert len(left_keys) == len(right_keys)
fkeys = partial(_factorize_keys, pd)
left_keys, left_remap = _remove_nulls_from_join_keys(
left_keys, null_allowed)
right_keys, right_remap = _remove_nulls_from_join_keys(
right_keys, null_allowed)
# get left & right join labels and num. of levels at each location
llab, rlab, shape = map(list, zip(*map(fkeys, left_keys, right_keys)))
lkey, rkey = _get_join_keys(pd, llab, rlab, shape)
# factorize keys to a dense i8 space
# `count` is the num. of unique keys
# set(lkey) | set(rkey) == range(count)
lkey, rkey, count = fkeys(lkey, rkey)
libjoin = pd._libs.join # pylint: disable=protected-access
if kind == "inner":
left_indexer, right_indexer = libjoin.inner_join(lkey, rkey, count)
# Same algorithm left join uses to preserve frame order, but we do
# it by hand here because it's not built-in like it is for
# left_outer_join (with sort=False). We have to use
# groupsort_indexer unconditionally because the original sorter
# isn't available to us outside inner_join.
# TODO(dancol): benchmark against using left_outer_join
# unconditionally and filtering.
# pylint: disable=protected-access
rev, _ = pd._libs.algos.groupsort_indexer(left_indexer, len(lkey))
left_indexer = left_indexer.take(rev)
right_indexer = right_indexer.take(rev)
elif kind == "left":
left_indexer, right_indexer = \
libjoin.left_outer_join(lkey, rkey, count, sort=False)
elif kind == "right":
left_indexer, right_indexer = \
libjoin.left_outer_join(rkey, lkey, count, sort=False)[::-1]
else:
assert kind == "outer"
left_indexer, right_indexer = \
libjoin.full_outer_join(lkey, rkey, count)
if left_remap is not None:
left_indexer = left_remap.take(left_indexer)
if right_remap is not None:
right_indexer = right_remap.take(right_indexer)
return left_indexer, right_indexer
class JoinMetaAction(QueryAction):
"""Compute join metadata"""
_left_key = tattr(QueryNode, name="left_key")
_right_key = tattr(QueryNode, name="right_key")
_null_allowed = tattr(bool, name="null_allowed")
_kind = iattr(str, name="kind")
@override
def _post_init_check(self):
super()._post_init_check()
assert len(self._left_key) == len(self._right_key)
assert len(self._null_allowed) == len(self._left_key)
assert self._kind in JOIN_KINDS
for left_key_q, right_key_q in zip(self._left_key, self._right_key):
# Make sure the joined columns are compatible by side effect
ls = left_key_q.schema
rs = right_key_q.schema
if ls.is_null or rs.is_null:
return
if ls.is_string != rs.is_string:
raise InvalidQueryException("cannot join string and non-string")
if ls.domain != rs.domain:
raise InvalidQueryException("cannot join domain {!r} with domain {!r}"
.format(ls.domain, rs.domain))
# TODO(dancol): support non-lossy unit conversions?
if ls.unit != rs.unit:
raise InvalidQueryException("cannot join unit {!r} with unit {!r}"
.format(ls.unit, rs.unit))
if (ls.dtype.kind != rs.dtype.kind and
not (ls.dtype.kind in "iu" and rs.dtype.kind in "iu")):
raise InvalidQueryException(
"cannot join {} with {}".format(
DTYPE_KINDS[ls.dtype.kind],
DTYPE_KINDS[rs.dtype.kind]))
@override
def _compute_inputs(self):
return self._left_key + self._right_key
def __metaq(self, meta):
return JoinMetaQuery(self._left_key,
self._right_key,
self._null_allowed,
self._kind, meta)
@override
def _compute_outputs(self):
return tuple(self.__metaq(meta) for meta in JoinMeta)
@override
async def run_async(self, qe):
def _list_split(things):
n = len(things) // 2
right_things = things[n:]
del things[n:]
left_things = things
assert len(left_things) == len(right_things)
return left_things, right_things
# pylint: disable=protected-access
ics, [oc_lindexer, oc_rindexer] = await qe.async_setup(
(self._left_key + self._right_key),
(self.__metaq(JoinMeta.LEFT_INDEXER),
self.__metaq(JoinMeta.RIGHT_INDEXER)))
if not ics:
return # Nothing to do
left_ics, right_ics = _list_split(list(ics))
# TODO(dancol): make iterative join work in right-join mode by
# inverting the join internally. If we really need
# larger-than-core full outer join, we can do it with
# multiple passes.
# TODO(dancol): make NULL-safe
join_block_size = 0
kind = self._kind
pd = load_pandas()
if kind in ("left", "inner"):
# We know in this join mode that the left side indexer values will
# be non-decreasing, allowing us to loop over chunks of the left
# side, joining each chunk against the entire right side, and
# avoid loading the entire left array into memory.
join_block_size = (qe.env.get("join_block_size")
or 10 * qe.block_size)
if join_block_size:
rleft = lambda ic: ic.read_at_least(join_block_size)
else:
rleft = lambda ic: ic.read_all()
left_reqs = [rleft(ic) for ic in left_ics]
right_reqs = [ic.read_all() for ic in right_ics]
inps = await qe.async_io(*(left_reqs + right_reqs))
left_inps, right_inps = _list_split(inps)
check_same_lengths(left_ics, left_inps)
check_same_lengths(right_ics, right_inps)
right_key_arrays = [npy_make_rw_view(block) for block in right_inps]
left_rows_consumed = 0
while True:
chunk_size = len(left_inps[0])
if not chunk_size:
break
lindexer, rindexer = \
_get_join_indexers(
pd,
[npy_make_rw_view(block) for block in left_inps],
right_key_arrays,
self._null_allowed,
kind)
assert len(lindexer) == len(rindexer)
if kind in ("inner", "left"):
assert is_nondecreasing(lindexer)
lindexer += left_rows_consumed
is_eof = ((not join_block_size) or
chunk_size < join_block_size)
left_rows_consumed += chunk_size
_, _, *left_inps = await qe.async_io(
oc_lindexer.write(lindexer, is_eof),
oc_rindexer.write(rindexer, is_eof),
*[rleft(ic) for ic in left_ics])
# Dropping duplicates
class DropDuplicatesAction(QueryAction):
"""Implemention of dropping duplicates"""
_group = sattr(QueryNode, name="group")
@override
def _compute_inputs(self):
return self._group
@override
def _compute_outputs(self):
return cons(DropDuplicatesIndexerQuery(self._group),
(DropDuplicatesQuery(self._group, member)
for member in self._group))
@override
async def run_async(self, qe):
# TODO(dancol): if we want larger-than-RAM support here, we're
# going to do have to manually sort the input group and iterate
# over it in chunks, taking care to remember the block boundary
# values. Let's see whether it's actually a problem in practice.
members = tuple(self._group)
out_indexer_query = DropDuplicatesIndexerQuery(self._group)
out_member_queries = [DropDuplicatesQuery(self._group, member)
for member in members]
ics, [oc_indexer, *oc_members] = await qe.async_setup(
members, [out_indexer_query] + out_member_queries)
arrays = [block.as_array() for block in
await qe.async_io(*[ic.read_all() for ic in ics])]
arrays, is_expanded = expand_nullable(arrays)
def _make_out_reqs():
column_names = ["col{}".format(i) for i in range(len(arrays))]
df = make_pd_dataframe(
dict(zip(column_names, map(npy_make_rw_view, arrays))))
arrays.clear()
df.drop_duplicates(inplace=True)
out_reqs = [oc_indexer.write(df.index.values, True)]
for oc_member, c_is_expanded in zip(oc_members, is_expanded):
if c_is_expanded:
out_array = masked_array(
df[column_names.pop(0)].values,
df[column_names.pop(0)].values)
else:
out_array = df[column_names.pop(0)].values
out_reqs.append(oc_member.write(out_array, True))
return out_reqs
await qe.async_io(*_make_out_reqs())
class DropDuplicatesIndexerQuery(QueryNode):
"""Yields row numbers of kept values"""
_group = sattr(QueryNode, name="group")
@override
def _compute_schema(self):
return INDEXER_SCHEMA.constrain(C_UNIQUE)
@override
def make_action(self):
return DropDuplicatesAction(self._group)
class DropDuplicatesQuery(QueryNode):
"""QueryNode for dropping duplicates"""
_group = sattr(QueryNode, name="group")
_column = iattr(QueryNode)
@override
def _post_init_assert(self):
super()._post_init_assert()
assert self._column in self._group
@override
def _compute_schema(self):
# We're guaranteed columnwise UNIQUE only if our group has size
# one. But if the base column is already UNIQUE, our output is
# UNIQUE regardless of group size.
schema = self._column.schema
if len(self._group) == 1:
schema = schema.constrain(C_UNIQUE)
return schema
@override
def make_action(self):
return DropDuplicatesAction(self._group)
@classmethod
def of1(cls, query):
"""Convenience function for when we want to compress one query"""
return cls(frozenset((query,)), query)
# Span join
class SpanJoin(AutoMultiQuery):
"""Join spans together along the time axis"""
sources = tattr()
@override
def _post_init_assert(self):
super()._post_init_assert()
assert assert_seq_type(tuple, SpanJoin.Source, self.sources)
class Meta(AutoMultiQuery.AutoMeta):
"""Metadata selection"""
TIMESTAMP = TS_SCHEMA
DURATION = DURATION_SCHEMA
PARTITION = lambda config: config.partition_schema
@cached_property
def partition_schema(self):
"""The schema of the result partition"""
return QuerySchema.concat(
(source.span.partition.schema.unconstrain()
for source in self.sources
if source.span.partition))
class Source(EqImmutable):
"""One source for a span join"""
span = iattr(SpanTableConfig)
required = iattr(bool, kwonly=True)
class IndexerQuery(QueryNode):
"""Input row mappings from a span join"""
config = iattr()
source_index = iattr(int)
@override
def _post_init_assert(self):
super()._post_init_assert()
assert isinstance(self.config, SpanJoin)
assert 0 <= self.source_index < len(self.config.sources)
@override
def _compute_schema(self):
return INDEXER_SCHEMA # Not unique
@override
def make_action(self):
return SpanJoin.Action(config=self.config) # pylint: disable=no-member
def indexerq(self, source_index):
"""Return a query for the indexes to take for a given source"""
# TODO(dancol): incremental taking
return self.IndexerQuery(self, source_index)
@override
async def run_async(self, qe):
Meta = self.Meta
io_specs = []
source_specs = []
any_has_partition = False
for source_index, source in enumerate(self.sources):
sp = source.span
source_input = sp.make_synchronous_input(fill_partition=True)
io_specs.append(source_input)
source_output = SynchronousOutput(self.indexerq(source_index))
io_specs.append(source_output)
has_partition = bool(source.span.partition)
any_has_partition = any_has_partition or has_partition
source_specs.append((
source_input,
source_output,
source.required,
has_partition,
))
partition_output_q = \
self.metaq(Meta.PARTITION) if any_has_partition else None
metadata_output = SynchronousOutput(
self.metaq(Meta.TIMESTAMP),
self.metaq(Meta.DURATION),
partition_output_q,
partition=partition_output_q)
io_specs.append(metadata_output)
sync_qe, _, _ = \
await SynchronousQueryExecution.async_setup(
qe, (), (), sync_io=io_specs)
def _run_operator():
span_merge_join(source_specs, metadata_output, qe)
await sync_qe.async_run(_run_operator)
@override
def compute_inputs(self):
inputs = []
for source in self.sources:
inputs.extend(source.span.inputs)
return inputs
@override
def compute_outputs(self):
Meta = self.Meta
outputs = []
any_has_partition = False
for source_index, source in enumerate(self.sources):
any_has_partition = any_has_partition or source.span.partition
outputs.append(self.indexerq(source_index))
outputs.append(self.metaq(Meta.TIMESTAMP))
outputs.append(self.metaq(Meta.DURATION))
if any_has_partition:
outputs.append(self.metaq(Meta.PARTITION))
return outputs
# Span group
COMMAND_STREAM_SPAN_PIVOT_SCHEMA = QuerySchema(
DTYPE_COMMAND_STREAM,
domain="command_stream_span_pivot")
COMMAND_STREAM_SPAN_GROUP_SCHEMA = QuerySchema(
DTYPE_COMMAND_STREAM,
domain="command_stream_span_group")
COLUMN_AGG_BY_DOMAIN = {
"command_stream_span_pivot": span_pivot_column,
"command_stream_span_group": span_group_column,
}
@final
class NativeSpanAggregationQuery(NativeAggregationQuery):
"""Perform an aggregation using native DCTV kernels"""
command = iattr(QueryNode, kwonly=True)
@override
def _post_init_assert(self):
super()._post_init_assert()
assert self.command.schema.domain in COLUMN_AGG_BY_DOMAIN
@override
def _post_init_check(self):
super()._post_init_check()
if self.distinct:
raise NotImplementedError(
"DISTINCT on native aggregations not implemented")
@override
def _compute_inputs(self):
return super()._compute_inputs() + (self.command,)
@override
async def run_async(self, qe):
data = SynchronousInput(
SynchronousInput.Column(
self.data,
masked=True,
))
command = SynchronousInput(self.command)
out = SynchronousOutput(self, masked=True)
sync_qe, (), () = await SynchronousQueryExecution.async_setup(
qe, (), (), sync_io=(data, command, out))
def _run_operator():
data_schema = self.data.schema
st = qe.st if data_schema.is_string else None
collation = "binary" if st else None
COLUMN_AGG_BY_DOMAIN[self.command.schema.domain](
data,
command,
out,
data_schema.dtype,
self.aggregation,
qe,
st,
collation,
)
await sync_qe.async_run(_run_operator)
class SpanPivot(AutoMultiQuery):
"""Group spans by their partitions"""
class Meta(AutoMultiQuery.AutoMeta):
"""Metadata selector for span group"""
TIMESTAMP = TS_SCHEMA
DURATION = DURATION_SCHEMA
COMMAND = lambda config: COMMAND_STREAM_SPAN_PIVOT_SCHEMA
OUTPUT_PARTITION = lambda config: config.output_partition_schema
grouped = iattr(SpanTableConfig, kwonly=True)
output_partition = iattr(QueryNode, nullable=True, kwonly=True)
min_npartitions = iattr_query_node_int(kwonly=True)
@property
def output_partition_schema(self):
"""The schema of the output partition column
Error if we don't have an output partition
"""
return self.output_partition.schema.unconstrain()
@override
def compute_outputs(self):
Meta = self.Meta
outputs = [
self.metaq(Meta.TIMESTAMP),
self.metaq(Meta.DURATION),
self.metaq(Meta.COMMAND),
]
if self.output_partition:
outputs.append(self.metaq(Meta.OUTPUT_PARTITION))
return outputs
@override
def compute_inputs(self):
inputs = self.grouped.inputs + (self.min_npartitions,)
if self.output_partition:
inputs += (self.output_partition,)
return inputs
def dataq(self, aggregation, data, distinct):
"""Create a data query for this span group operation.
AGGREGATION is the name of an aggregation to apply. DATA is a
QueryNode to aggregate. DISTINCT is whether the aggregation is
limited to distinct values.
"""
return NativeSpanAggregationQuery(
aggregation=aggregation,
data=data,
distinct=distinct,
command=self.metaq(self.Meta.COMMAND),
)
@override
async def run_async(self, qe):
Meta = self.Meta
grouped = self.grouped
assert grouped.partition
output_partition = self.output_partition
grouped_input = SynchronousInput(
grouped.ts,
grouped.duration,
SynchronousInput.Column(
grouped.partition,
partition=True,
),
(0 if not output_partition
else SynchronousInput.Column(output_partition, partition=True,)),
)
command_output = SynchronousOutput(self.metaq(Meta.COMMAND))
partition_output_q = (self.metaq(Meta.OUTPUT_PARTITION)
if output_partition else None)
metadata_output = SynchronousOutput(
self.metaq(Meta.TIMESTAMP),
self.metaq(Meta.DURATION),
partition_output_q,
partition=partition_output_q)
sync_qe, [ic_min_npartitions], _ = \
await SynchronousQueryExecution.async_setup(
qe, [self.min_npartitions], (),
sync_io=(grouped_input, command_output, metadata_output))
min_npartitions = await ic_min_npartitions.read_int()
if not output_partition:
def metadata_output(ts_array, duration_array, op_array, # pylint: disable=function-redefined
og=metadata_output):
# Throw away the dummy partition array
assert not op_array.any() # Should be all zero in dummy case
return og(ts_array, duration_array)
def _run_operator():
span_pivot(
grouped_input,
metadata_output,
command_output,
min_npartitions,
qe)
await sync_qe.async_run(_run_operator)
class SpanGroup(AutoMultiQuery):
"""Group spans or events by spans"""
class Meta(AutoMultiQuery.AutoMeta):
"""Metadata selector for span group"""
TIMESTAMP = TS_SCHEMA
DURATION = DURATION_SCHEMA
COMMAND = COMMAND_STREAM_SPAN_GROUP_SCHEMA
PARTITION = lambda config: config.partition_schema
grouped = iattr((SpanTableConfig, EventTableConfig), kwonly=True)
grouper = iattr(SpanTableConfig, kwonly=True)
unique_pvals = iattr(QueryNode, nullable=True, kwonly=True)
intersect = iattr(bool, kwonly=True)
@override
def _post_init_assert(self):
super()._post_init_assert()
is_broadcast_union = bool((not self.intersect) and
self.grouped.partition and
(not self.grouper.partition))
assert bool(self.unique_pvals) == is_broadcast_union, \
"unique_pvals should be supplied iff broadcast union " \
"up:{!r} ib:{!r} is:{!r} p:{!r} gp:{!r}" \
.format(self.unique_pvals,
is_broadcast_union,
self.intersect,
self.grouped.partition,
self.grouper.partition)
@cached_property
def is_output_partitioned(self):
"""True iff the output is partitioned"""
return self.grouped.partition or self.grouper.partition
@property
def partition_schema(self):
"""The schema of the result partition"""
return self.is_output_partitioned.schema.unconstrain()
@override
def compute_outputs(self):
Meta = self.Meta
outputs = [
self.metaq(Meta.TIMESTAMP),
self.metaq(Meta.DURATION),
self.metaq(Meta.COMMAND),
]
if self.is_output_partitioned:
outputs.append(self.metaq(Meta.PARTITION))
return outputs
@override
def compute_inputs(self):
inputs = self.grouped.inputs + self.grouper.inputs
if self.unique_pvals:
inputs += (self.unique_pvals,)
return inputs
@override
async def run_async(self, qe):
Meta = self.Meta
grouped_input = self.grouped.make_synchronous_input(
fill_partition=True)
grouper_input = self.grouper.make_synchronous_input(
fill_partition=True)
output_partition = self.grouped.partition or self.grouper.partition
partition_output_q = \
self.metaq(Meta.PARTITION) if output_partition else None
metadata_output = SynchronousOutput(self.metaq(Meta.TIMESTAMP),
self.metaq(Meta.DURATION),
partition_output_q,
partition=partition_output_q)
command_output = SynchronousOutput(self.metaq(Meta.COMMAND))
inps = (self.unique_pvals,) if self.unique_pvals else ()
sync_qe, ics, _ = await SynchronousQueryExecution.async_setup(
qe, inps, (), sync_io=(
grouped_input,
grouper_input,
metadata_output,
command_output,
))
if self.unique_pvals:
unique_pvals = (await ics.pop().read_all()).as_array()
else:
unique_pvals = None
assert not ics
def _run_operator():
span_group(
unique_pvals,
grouped_input,
grouper_input,
metadata_output,
command_output,
qe,
bool(self.grouper.partition),
bool(self.grouped.partition),
isinstance(self.grouped, SpanTableConfig),
not self.intersect)
await sync_qe.async_run(_run_operator)
def dataq(self, aggregation, data, distinct):
"""Create a data query for this span group operation.
AGGREGATION is the name of an aggregation to apply. DATA is a
QueryNode to aggregate. DISTINCT is whether the aggregation is
limited to distinct values.
"""
return NativeSpanAggregationQuery(
aggregation=aggregation,
data=data,
distinct=distinct,
command=self.metaq(self.Meta.COMMAND),
)
# Event operations
class EventJoin(AutoMultiQuery):
"""Merging events with spans
(Joining two event tables is just interleaving and can be done with
normal SQL operators.)
"""
event = iattr(EventTableConfig)
span = iattr(SpanTableConfig)
span_is_required = iattr(bool)
class Meta(AutoMultiQuery.AutoMeta):
"""Metadata for event join"""
TIMESTAMP = TS_SCHEMA
PARTITION = lambda config: config.partition_schema
EVENT_INDEX = INDEXER_SCHEMA
SPAN_INDEX = INDEXER_SCHEMA
@cached_property
def partition_schema(self):
"""Schema of the output partition"""
event_partition = self.event.partition
span_partition = self.span.partition
if event_partition and span_partition:
return QuerySchema.concat(
(event_partition.schema,
span_partition.schema)).unconstrain()
assert event_partition or span_partition
return (event_partition or span_partition).schema.unconstrain()
@override
def compute_inputs(self):
return self.event.inputs + self.span.inputs
@property
def _has_partitioned_output(self):
return self.event.partition or self.span.partition
@override
def compute_outputs(self):
Meta = self.Meta
outputs = ((
(self.metaq(Meta.TIMESTAMP)),
(self.metaq(Meta.EVENT_INDEX)),
(self.metaq(Meta.SPAN_INDEX)),
)) + (() if not self._has_partitioned_output else (
(self.metaq(Meta.PARTITION)),
))
return outputs
@override
async def run_async(self, qe):
Meta = self.Meta
event_input = self.event.make_synchronous_input(fill_partition=True)
span_input = self.span.make_synchronous_input(fill_partition=True)
partition_output_q = (self.metaq(Meta.PARTITION)
if self._has_partitioned_output else None)
metadata_output = SynchronousOutput(
self.metaq(Meta.TIMESTAMP),
partition_output_q,
partition=partition_output_q)
index_output = SynchronousOutput(
self.metaq(Meta.EVENT_INDEX),
self.metaq(Meta.SPAN_INDEX))
sync_qe, _, _ = await SynchronousQueryExecution.async_setup(
qe, (), (), sync_io=(event_input, span_input,
metadata_output, index_output))
def _run_operator():
merge_spans_into_events(
event_input,
span_input,
metadata_output,
index_output,
qe,
bool(self.event.partition),
bool(self.span.partition),
self.span_is_required)
await sync_qe.async_run(_run_operator)
# Span growth
class SpanFixup(AutoMultiQuery):
"""Query that fixes up "bad" span tables"""
# The input isn't necessarily a span table --- the whole point of
# this operator is to finagle an invalid span-table-like thing into
# a span table --- so we don't want a SpanTableConfig as input.
# But we do want the individual values we're aggregating to be
# span-table-compatible, and we check that upon initialization.a
ts = TS_SCHEMA.qn_iattr()
duration = DURATION_SCHEMA.qn_iattr()
partition = iattr(QueryNode, nullable=True)
class Meta(AutoMultiQuery.AutoMeta):
"""Data for span fixup queries"""
TS = TS_SCHEMA
DURATION = DURATION_SCHEMA
PARTITION = lambda config: config.partition.schema.unconstrain()
@override
def compute_inputs(self):
inputs = [self.ts, self.duration]
if self.partition:
inputs.append(self.partition)
return inputs
@override
def compute_outputs(self):
Meta = self.Meta
outputs = [self.metaq(Meta.TS), self.metaq(Meta.DURATION)]
if self.partition:
outputs.append(self.metaq(Meta.PARTITION))
return outputs
@override
async def run_async(self, qe):
# TODO(dancol): simplify the operator by providing a partition
# unconditionally and just broadcasting zero if we don't have a
# "real" partition.
Meta = self.Meta
if self.partition:
span_input = SynchronousInput(
self.ts,
self.duration,
SynchronousInput.Column(
self.partition,
partition=True))
else:
span_input = SynchronousInput(self.ts, self.duration, 0)
partition_output_q = None
output_queries = [self.metaq(Meta.TS), self.metaq(Meta.DURATION)]
if self.partition:
partition_output_q = self.metaq(Meta.PARTITION)
output_queries.append(partition_output_q)
span_output = SynchronousOutput(*output_queries,
partition=partition_output_q)
sync_qe, _, _ = await SynchronousQueryExecution.async_setup(
qe, (), (), sync_io=(span_input, span_output))
def _run_operator():
span_fixup(span_input,
span_output,
qe,
bool(self.partition))
await sync_qe.async_run(_run_operator)
# Backfill
class Backfill(AutoMultiQuery):
"""Automatically fill missing data in spans"""
class Meta(AutoMultiQuery.AutoMeta):
"""Metadata output for backfill operation"""
VALUE = lambda config: config.value.schema
span_meta = iattr(SpanTableConfig)
value = iattr(QueryNode)
@override
def compute_inputs(self):
return self.span_meta.inputs + (self.value,)
@override
def compute_outputs(self):
return [self.metaq(meta) for meta in self.Meta]
@override
async def run_async(self, qe):
Meta = self.Meta
meta_input = self.span_meta.make_synchronous_input(
fill_partition=True,
extra=[
SynchronousInput.Column(self.value, masked=True)])
out = SynchronousOutput(self.metaq(Meta.VALUE),
masked=True)
sync_qe, _, _ = await SynchronousQueryExecution.async_setup(
qe, (), (), sync_io=[meta_input, out])
def _run_operator():
backfill(meta_input,
out,
qe,
self.value.schema.dtype)
await sync_qe.async_run(_run_operator)