blob: 94b5a1e7d4cab7f6a9f456a3b714942959c5dcf8 [file] [log] [blame]
// Copyright (C) 2020 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "span_group.h"
#include <tuple>
#include <utility>
#include "autoevent.h"
#include "command_stream.h"
#include "hash_table.h"
#include "optional.h"
#include "pyiter.h"
#include "pylog.h"
#include "pyobj.h"
#include "pyparsetuple.h"
#include "pyparsetuplenpy.h"
#include "result_buffer.h"
#include "span.h"
#include "span_result_buffer.h"
namespace dctv {
namespace {
using std::tie;
using std::string_view;
using SgCommandOut = CommandOut<SgCommand>;
// The span group includes a "global" partition not named by any
// specific partition number. PartitionId can name either that global
// partition or a numbered partition. "pid" in this file is an
// abbreviation for PartitionId and has nothing to do with POSIX
// process IDs (except in that higher-level code can choose to
// partition data by process ID --- but that's none of our
// business here.)
struct PartitionId final {
constexpr explicit PartitionId(Partition p)
: partition(p), global(false) {}
PartitionId(const PartitionId&) noexcept = default; // NOLINT
bool operator==(const PartitionId& other) const noexcept {
return tie(this->partition, this->global) ==
tie(other.partition, other.global);
}
bool operator!=(const PartitionId& other) const noexcept {
return !(*this == other);
}
const Partition partition; // NOLINT
const bool global; // NOLINT
static const PartitionId GLOBAL() { return PartitionId(); }
private:
constexpr PartitionId() : partition(), global(true) {}
};
struct PartitionStateBaseOpt {
protected:
explicit PartitionStateBaseOpt(PartitionId) {}
void check_partition(PartitionId) {}
};
struct PartitionStateBaseDebug {
protected:
explicit PartitionStateBaseDebug(PartitionId pid)
: pid(pid)
{}
void check_partition(PartitionId pid) {
assume(this->pid == pid);
}
private:
PartitionId pid;
};
using PartitionStateBase = std::conditional_t<
safe_mode,
PartitionStateBaseDebug,
PartitionStateBaseOpt>;
struct PartitionState final : PartitionStateBase, NoCopy {
explicit PartitionState(PartitionId pid)
: PartitionStateBase(pid) {}
void mark_grouper_open(TimeStamp closed_ts) {
assert(!this->grouper_closed_ts.has_value());
this->grouper_closed_ts = closed_ts;
}
void mark_grouper_closed(TimeStamp closed_ts) {
assert(this->grouper_closed_ts.has_value());
assert(this->grouper_closed_ts == closed_ts);
this->grouper_closed_ts.reset();
}
bool is_grouper_open() const {
return this->grouper_closed_ts.has_value();
}
TimeStamp get_grouper_closed_ts() const {
assert(this->grouper_closed_ts.has_value());
return *this->grouper_closed_ts;
}
void mark_grouped_open(TimeStamp closed_ts) {
assert(!this->grouped_closed_ts.has_value());
this->grouped_closed_ts = closed_ts;
}
void mark_grouped_closed(TimeStamp closed_ts) {
assert(this->grouped_closed_ts.has_value());
assert(this->grouped_closed_ts == closed_ts);
this->grouped_closed_ts.reset();
}
bool is_grouped_open() const {
return this->grouped_closed_ts.has_value();
}
TimeStamp get_grouped_closed_ts() const {
assert(this->grouped_closed_ts.has_value());
return *this->grouped_closed_ts;
}
void accumulate(SgCommandOut* co, PartitionId pid) {
this->check_partition(pid);
assert(this->is_grouped_open());
this->have_data = true;
if (pid.global)
co->emit(SgCommand::SLURP_AND_ACCUMULATE_GLOBAL);
else
co->emit(SgCommand::SLURP_AND_ACCUMULATE, pid.partition);
}
void copy_last_to(SgCommandOut* co,
PartitionId pid,
PartitionId other_pid,
PartitionState* other_ps) {
this->check_partition(pid);
other_ps->check_partition(other_pid);
assert(pid.global);
assert(!other_pid.global);
assert(this->have_data);
other_ps->have_data = this->have_data;
co->emit(SgCommand::COPY_LAST_GLOBAL_TO, other_pid.partition);
}
void reset_data(SgCommandOut* co,
PartitionId pid,
bool keep_last = false)
{
this->check_partition(pid);
this->have_data = keep_last;
if (pid.global)
co->emit((keep_last
? SgCommand::RESET_GLOBAL_KEEP_LAST
: SgCommand::RESET_GLOBAL));
else
co->emit((keep_last
? SgCommand::RESET_KEEP_LAST
: SgCommand::RESET),
pid.partition);
}
bool get_have_data() const {
return this->have_data;
}
void emit_aggregations(SgCommandOut* co, PartitionId pid) {
this->check_partition(pid);
assert(this->is_grouper_open());
if (pid.global)
co->emit(SgCommand::EMIT_GLOBAL);
else
co->emit(SgCommand::EMIT, pid.partition);
}
private:
optional<TimeStamp> grouped_closed_ts;
optional<TimeStamp> grouper_closed_ts;
bool have_data = false;
};
unique_pyref
do_span_group(PyObject*, PyObject* py_args)
{
// TODO(dancol): support multiple grouped span tables?
namespace ae = auto_event;
auto args = PARSEPYARGS_V(
(pyref, unique_partition_values)
(pyref, metadata_grouped_source)
(pyref, metadata_grouper_source)
(pyref, metadata_out)
(pyref, command_out)
(QueryExecution*, qe)
(bool, grouper_is_partitioned)
(bool, grouped_is_partitioned)
(bool, grouped_is_span)
(bool, allow_empty_groups)
)(py_args);
const bool do_log = false;
const bool grouper_is_partitioned = args.grouper_is_partitioned;
const bool grouped_is_partitioned = args.grouped_is_partitioned;
const bool grouped_is_span = args.grouped_is_span;
const bool allow_empty_groups = args.allow_empty_groups;
const bool permanent_partitions = args.unique_partition_values != Py_None;
if (do_log) {
pylog.debug("grouper_is_partitioned=%s", grouper_is_partitioned);
pylog.debug("grouped_is_partitioned=%s", grouped_is_partitioned);
pylog.debug("grouped_is_span=%s", grouped_is_span);
pylog.debug("allow_empty_groups=%s", allow_empty_groups);
pylog.debug("permanent_partitions=%s", permanent_partitions);
}
if (permanent_partitions !=
(allow_empty_groups &&
!grouper_is_partitioned &&
grouped_is_partitioned))
throw_invalid_query("invalid partition list usage");
const bool output_is_partitioned =
grouper_is_partitioned || grouped_is_partitioned;
SgCommandOut co(args.command_out.notnull().addref(), args.qe);
// TODO(dancol): remove ordered mode
using PartitionTable = HashTable<Partition, PartitionState>;
using PartitionTableIterator =
typename PartitionTable::iterator;
// Main mutable state updated during traversal
nocopy_optional<PartitionTable> state_by_partition_u;
if (grouper_is_partitioned || grouped_is_partitioned)
state_by_partition_u.emplace();
nocopy_optional<PartitionState> unpartitioned_ps;
if (!grouper_is_partitioned || !grouped_is_partitioned)
unpartitioned_ps.emplace(PartitionId::GLOBAL());
auto find_or_create_partition_state_u = [&](Partition partition,
bool* inserted=nullptr)
-> PartitionState&
{
assert(state_by_partition_u);
auto result = state_by_partition_u
->try_emplace(partition, PartitionId(partition));
if (inserted)
*inserted = result.second;
return result.first->second;
};
if (permanent_partitions) { // Pre-create all partitions
assert(state_by_partition_u);
pyarray_ref array = args.unique_partition_values.as<PyArrayObject>();
NpyIteration1<Partition> iter(array);
if (!iter.is_at_eof()) {
do {
Partition partition = iter.get();
find_or_create_partition_state_u(partition);
} while (iter.advance());
}
}
AUTOEVENT_DECLARE_EVENT(
GrouperOpen,
(TimeStamp, duration, ae::span_duration)
(Partition, partition, ae::partition)
);
AUTOEVENT_DECLARE_EVENT(
GrouperClosed,
(TimeStamp, open_ts)
(Partition, partition, ae::partition)
);
AUTOEVENT_DECLARE_EVENT(
GroupedOpen,
(TimeStamp, duration, ae::span_duration)
(Partition, partition, ae::partition)
);
AUTOEVENT_DECLARE_EVENT(
GroupedClosed,
(Partition, partition, ae::partition)
);
AUTOEVENT_DECLARE_EVENT(
GroupedEvent,
(Partition, partition, ae::partition)
);
auto closed_ts = [](TimeStamp ts, const auto& event) -> TimeStamp {
return ts + event.duration;
};
SpanTableResultBuffer span_out(addref(args.metadata_out),
args.qe,
output_is_partitioned);
// In the code below, we have four combinations of partitioned and
// unpartitioned inputs: unpartitioned grouper and unpartitioned
// grouped (uu), partitioned grouper and unpartitioned grouped (pu),
// unpartitioned grouper and partitioned grouped (up), and
// partitioned grouper and partitioned grouped (pp). We define
// families of functions with members for each of the cases above.
// In some cases, the functions might bear a certain similarity to
// each other, but don't be tempted to combine them: that way lies
// madness, as, comingled, state management becomes impossible to
// reason about.
// In the completely unpartitioned case (uu), we simply keep all
// information in unpartitioned_ps.
// Unpartitioned grouper and partitioned grouped (up) is the
// "broadcast" scenario in which we replicate the grouper into each
// partition, storing the aggregate state in the partition table,
// but storing in unpartitioned_ps enough information to copy any
// open grouper state into a new per-partition state.
// Partitioned grouper and unpartitioned grouped (pu) means that we
// essentially "replicate" the grouped data into each entry in the
// partition table; the overall result looks like the pp case,
// except that we use the unpartitioned_ps table as an intermediate
// buffer into which we read the grouped data before replicating
// them to the grouper partitions.
// When both grouper and grouped are partitioned (pp), we just track
// the per-partition state in the partition table.
// TODO(dancol): avoid emitting clear commands immediately before
// erase commands. It didn't matter much in the virtual aggregate
// case, but it's worth avoiding now that we're using
// command streams.
auto maybe_delete_partition_u = [&](PartitionTableIterator it) {
const bool erase = !permanent_partitions &&
!it->second.is_grouper_open() &&
!it->second.is_grouped_open();
if (!erase)
return std::pair(std::next(it), false);
co.emit(SgCommand::FORGET_PARTITION, it->first);
// Post-increment it so that we can return the next iterator.
// Abseil's hash map returns void from erase for some reason.
state_by_partition_u->erase(it++);
return std::pair(it, true);
};
auto find_partition_it_u = [&](Partition partition) {
assert(state_by_partition_u);
auto it = state_by_partition_u->find(partition);
assert(it != state_by_partition_u->end());
return it;
};
auto for_each_partition_u = [&](auto functor) {
assert(state_by_partition_u);
for (auto& [partition, ps] : *state_by_partition_u)
functor(partition, ps);
};
auto for_each_partition_cleanup_u = [&](auto functor) {
bool erased;
assert(state_by_partition_u);
auto it = state_by_partition_u->begin();
while (it != state_by_partition_u->end()) {
functor(it->first, it->second);
tie(it, erased) = maybe_delete_partition_u(it);
}
};
auto emit_ps = [&](TimeStamp open_ts,
TimeStamp closed_ts,
PartitionId pid,
PartitionState& ps)
{
span_out.add(open_ts, closed_ts - open_ts, pid.partition);
ps.emit_aggregations(&co, pid);
};
auto handle_grouper_open_uu = [&]
(TimeStamp ts, const GrouperOpen& gro)
{
const TimeStamp grouper_closed_ts = closed_ts(ts, gro);
assert(unpartitioned_ps);
unpartitioned_ps->mark_grouper_open(grouper_closed_ts);
};
auto handle_grouper_open_up = [&]
(TimeStamp ts, const GrouperOpen& gro)
{
const TimeStamp grouper_closed_ts = closed_ts(ts, gro);
assert(unpartitioned_ps);
unpartitioned_ps->mark_grouper_open(grouper_closed_ts);
for_each_partition_u([&](Partition partition, PartitionState& ps) {
ps.mark_grouper_open(grouper_closed_ts);
});
};
auto handle_grouper_open_pu = [&]
(TimeStamp ts, const GrouperOpen& gro)
{
const TimeStamp grouper_closed_ts = closed_ts(ts, gro);
bool inserted;
PartitionState& ps =
find_or_create_partition_state_u(gro.partition, &inserted);
ps.mark_grouper_open(grouper_closed_ts);
assert(unpartitioned_ps);
if (inserted && unpartitioned_ps->is_grouped_open()) {
ps.mark_grouped_open(unpartitioned_ps->get_grouped_closed_ts());
if (unpartitioned_ps->get_have_data())
unpartitioned_ps->copy_last_to(
&co,
PartitionId::GLOBAL(),
/*other_pid=*/PartitionId(gro.partition),
&ps);
}
};
auto handle_grouper_open_pp = [&]
(TimeStamp ts, const GrouperOpen& gro)
{
const TimeStamp grouper_closed_ts = closed_ts(ts, gro);
PartitionState& ps = find_or_create_partition_state_u(gro.partition);
ps.mark_grouper_open(grouper_closed_ts);
};
auto handle_grouper_open = [&]
(auto&& ae, TimeStamp ts, const GrouperOpen& gro)
{
if (!grouper_is_partitioned) {
if (!grouped_is_partitioned)
handle_grouper_open_uu(ts, gro);
else
handle_grouper_open_up(ts, gro);
} else {
if (!grouped_is_partitioned)
handle_grouper_open_pu(ts, gro);
else
handle_grouper_open_pp(ts, gro);
}
ae::enqueue_event(
ae,
ts + gro.duration,
GrouperClosed {
/*open_ts*/ts,
/*partition=*/gro.partition,
}
);
};
auto grouper_emit_and_reset = [&](
TimeStamp open_ts,
TimeStamp closed_ts,
PartitionId pid,
PartitionState& ps)
{
assert(ps.get_grouper_closed_ts() == closed_ts);
if (allow_empty_groups || ps.get_have_data()) {
emit_ps(open_ts, closed_ts, pid, ps);
if (ps.get_have_data()) {
bool will_continue =
ps.is_grouped_open() &&
ps.get_grouped_closed_ts() > closed_ts;
ps.reset_data(&co, pid, /*keep_last=*/will_continue);
}
}
};
auto handle_grouper_closed_uu = [&]
(TimeStamp ts, const GrouperClosed& grc)
{
const TimeStamp open_ts = grc.open_ts;
const TimeStamp closed_ts = ts;
assert(unpartitioned_ps);
grouper_emit_and_reset(open_ts,
closed_ts,
PartitionId::GLOBAL(),
*unpartitioned_ps);
unpartitioned_ps->mark_grouper_closed(closed_ts);
};
auto handle_grouper_closed_up = [&]
(TimeStamp ts, const GrouperClosed& grc)
{
// Note that we emit more rows than we have grouper inputs in this
// case, one row per partition.
const TimeStamp open_ts = grc.open_ts;
const TimeStamp closed_ts = ts;
assert(unpartitioned_ps);
unpartitioned_ps->mark_grouper_closed(closed_ts);
for_each_partition_cleanup_u([&](
Partition partition, PartitionState& ps) {
grouper_emit_and_reset(open_ts,
closed_ts,
PartitionId(partition),
ps);
ps.mark_grouper_closed(closed_ts);
});
};
auto handle_grouper_closed_pu = [&]
(TimeStamp ts, const GrouperClosed& grc)
{
const TimeStamp open_ts = grc.open_ts;
const TimeStamp closed_ts = ts;
auto it = find_partition_it_u(grc.partition);
PartitionState& ps = it->second;
grouper_emit_and_reset(open_ts,
closed_ts,
PartitionId(grc.partition),
ps);
ps.mark_grouper_closed(closed_ts);
maybe_delete_partition_u(it);
};
auto handle_grouper_closed_pp = [&]
(TimeStamp ts, const GrouperClosed& grc)
{
const TimeStamp open_ts = grc.open_ts;
const TimeStamp closed_ts = ts;
auto it = find_partition_it_u(grc.partition);
PartitionState& ps = it->second;
grouper_emit_and_reset(open_ts,
closed_ts,
PartitionId(grc.partition),
ps);
ps.mark_grouper_closed(closed_ts);
maybe_delete_partition_u(it);
};
auto handle_grouper_closed = [&]
(auto&& ae, TimeStamp ts, const GrouperClosed& grc)
{
if (!grouper_is_partitioned) {
if (!grouped_is_partitioned)
handle_grouper_closed_uu(ts, grc);
else
handle_grouper_closed_up(ts, grc);
} else {
if (!grouped_is_partitioned)
handle_grouper_closed_pu(ts, grc);
else
handle_grouper_closed_pp(ts, grc);
}
};
// N.B. handle_grouped_open_* functions need to call accumulate
// exactly once in order to preserve row synchronicity.
auto handle_grouped_open_uu = [&]
(TimeStamp ts, const GroupedOpen& gdo)
{
const TimeStamp grouped_closed_ts = closed_ts(ts, gdo);
assert(unpartitioned_ps);
unpartitioned_ps->mark_grouped_open(grouped_closed_ts);
unpartitioned_ps->accumulate(&co, PartitionId::GLOBAL());
};
auto handle_grouped_open_up = [&]
(TimeStamp ts, const GroupedOpen& gdo) {
const TimeStamp grouped_closed_ts = closed_ts(ts, gdo);
bool inserted;
PartitionState& ps =
find_or_create_partition_state_u(gdo.partition, &inserted);
// We process GROUPER_OPEN before GROUPED_OPEN, so we might as
// well call mark_grouper_open here before mark_grouped_open.
assert(unpartitioned_ps);
if (inserted && unpartitioned_ps->is_grouper_open())
ps.mark_grouper_open(unpartitioned_ps->get_grouper_closed_ts());
ps.mark_grouped_open(grouped_closed_ts);
ps.accumulate(&co, PartitionId(gdo.partition));
};
auto handle_grouped_open_pu = [&]
(TimeStamp ts, const GroupedOpen& gdo)
{
const TimeStamp grouped_closed_ts = closed_ts(ts, gdo);
assert(unpartitioned_ps);
unpartitioned_ps->mark_grouped_open(grouped_closed_ts);
unpartitioned_ps->accumulate(&co, PartitionId::GLOBAL());
for_each_partition_u([&](Partition p, PartitionState& ps) {
ps.mark_grouped_open(grouped_closed_ts);
unpartitioned_ps->copy_last_to(&co,
PartitionId::GLOBAL(),
PartitionId(p),
&ps);
});
};
auto handle_grouped_open_pp = [&]
(TimeStamp ts, const GroupedOpen& gdo)
{
const TimeStamp grouped_closed_ts = closed_ts(ts, gdo);
PartitionState& ps = find_or_create_partition_state_u(gdo.partition);
ps.mark_grouped_open(grouped_closed_ts);
ps.accumulate(&co, PartitionId(gdo.partition));
};
auto handle_grouped_open = [&]
(auto&& ae, TimeStamp ts, const GroupedOpen& gdo)
{
if (!grouper_is_partitioned) {
if (!grouped_is_partitioned)
handle_grouped_open_uu(ts, gdo);
else
handle_grouped_open_up(ts, gdo);
} else {
if (!grouped_is_partitioned)
handle_grouped_open_pu(ts, gdo);
else
handle_grouped_open_pp(ts, gdo);
}
ae::enqueue_event(
ae,
ts + gdo.duration,
GroupedClosed {
gdo.partition,
}
);
};
auto handle_grouped_closed_uu = [&]
(TimeStamp ts, const GroupedClosed& gdc)
{
const TimeStamp grouped_closed_ts = ts;
assert(unpartitioned_ps);
unpartitioned_ps->mark_grouped_closed(grouped_closed_ts);
if (!unpartitioned_ps->is_grouper_open())
unpartitioned_ps->reset_data(&co, PartitionId::GLOBAL());
};
auto handle_grouped_closed_up = [&]
(TimeStamp ts, const GroupedClosed& gdc)
{
const TimeStamp grouped_closed_ts = ts;
auto it = find_partition_it_u(gdc.partition);
PartitionState& ps = it->second;
ps.mark_grouped_closed(grouped_closed_ts);
if (!ps.is_grouper_open())
ps.reset_data(&co, PartitionId(it->first));
maybe_delete_partition_u(it);
};
auto handle_grouped_closed_pu = [&]
(TimeStamp ts, const GroupedClosed& gdc)
{
const TimeStamp grouped_closed_ts = ts;
assert(unpartitioned_ps);
unpartitioned_ps->mark_grouped_closed(grouped_closed_ts);
unpartitioned_ps->reset_data(&co, PartitionId::GLOBAL());
for_each_partition_cleanup_u([&](Partition p, PartitionState& ps) {
ps.mark_grouped_closed(grouped_closed_ts);
if (!ps.is_grouper_open())
ps.reset_data(&co, PartitionId(p));
});
};
auto handle_grouped_closed_pp = [&]
(TimeStamp ts, const GroupedClosed& gdc)
{
const TimeStamp grouped_closed_ts = ts;
auto it = find_partition_it_u(gdc.partition);
PartitionState& ps = it->second;
ps.mark_grouped_closed(grouped_closed_ts);
if (!ps.is_grouper_open())
ps.reset_data(&co, PartitionId(it->first));
maybe_delete_partition_u(it);
};
auto handle_grouped_closed = [&]
(auto&& ae, TimeStamp ts, const GroupedClosed& gdc)
{
if (!grouper_is_partitioned) {
if (!grouped_is_partitioned)
handle_grouped_closed_uu(ts, gdc);
else
handle_grouped_closed_up(ts, gdc);
} else {
if (!grouped_is_partitioned)
handle_grouped_closed_pu(ts, gdc);
else
handle_grouped_closed_pp(ts, gdc);
}
};
auto handle_grouped_event = [&]
(auto&& ae, TimeStamp ts, const GroupedEvent& ge)
{
GroupedOpen gdo = {
/*duration=*/0,
/*partition=*/ge.partition,
};
handle_grouped_open(AUTOFWD(ae), ts, gdo);
};
// We process events at the same "time" in the order given here.
// Order is important for correctness!
ae::process(
ae::synthetic_event<GroupedClosed>(
handle_grouped_closed),
ae::synthetic_event<GrouperClosed>(
handle_grouper_closed),
ae::input<GrouperOpen>(
args.metadata_grouper_source,
handle_grouper_open),
ae::input<GroupedOpen>(
args.metadata_grouped_source,
handle_grouped_open,
ae::enable_when(grouped_is_span)),
ae::input<GroupedEvent>(
args.metadata_grouped_source,
handle_grouped_event,
ae::enable_when(!grouped_is_span))
);
span_out.flush();
co.flush();
return addref(Py_None);
}
} // anonymous namespace
static PyMethodDef functions[] = {
make_methoddef("span_group",
wraperr<do_span_group>(),
METH_VARARGS,
"Reshape a span table into the image of another"),
{ 0 }
};
void
init_span_group(pyref m)
{
register_functions(m, functions);
}
} // namespace dctv