| // Copyright (C) 2020 The Android Open Source Project |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| #include "span_group.h" |
| |
| #include <tuple> |
| #include <utility> |
| |
| #include "autoevent.h" |
| #include "command_stream.h" |
| #include "hash_table.h" |
| #include "optional.h" |
| #include "pyiter.h" |
| #include "pylog.h" |
| #include "pyobj.h" |
| #include "pyparsetuple.h" |
| #include "pyparsetuplenpy.h" |
| #include "result_buffer.h" |
| #include "span.h" |
| #include "span_result_buffer.h" |
| |
| namespace dctv { |
| namespace { |
| |
| using std::tie; |
| using std::string_view; |
| |
| using SgCommandOut = CommandOut<SgCommand>; |
| |
| // The span group includes a "global" partition not named by any |
| // specific partition number. PartitionId can name either that global |
| // partition or a numbered partition. "pid" in this file is an |
| // abbreviation for PartitionId and has nothing to do with POSIX |
| // process IDs (except in that higher-level code can choose to |
| // partition data by process ID --- but that's none of our |
| // business here.) |
| |
| struct PartitionId final { |
| constexpr explicit PartitionId(Partition p) |
| : partition(p), global(false) {} |
| PartitionId(const PartitionId&) noexcept = default; // NOLINT |
| bool operator==(const PartitionId& other) const noexcept { |
| return tie(this->partition, this->global) == |
| tie(other.partition, other.global); |
| } |
| bool operator!=(const PartitionId& other) const noexcept { |
| return !(*this == other); |
| } |
| const Partition partition; // NOLINT |
| const bool global; // NOLINT |
| static const PartitionId GLOBAL() { return PartitionId(); } |
| private: |
| constexpr PartitionId() : partition(), global(true) {} |
| }; |
| |
| struct PartitionStateBaseOpt { |
| protected: |
| explicit PartitionStateBaseOpt(PartitionId) {} |
| void check_partition(PartitionId) {} |
| }; |
| |
| struct PartitionStateBaseDebug { |
| protected: |
| explicit PartitionStateBaseDebug(PartitionId pid) |
| : pid(pid) |
| {} |
| void check_partition(PartitionId pid) { |
| assume(this->pid == pid); |
| } |
| private: |
| PartitionId pid; |
| }; |
| |
| using PartitionStateBase = std::conditional_t< |
| safe_mode, |
| PartitionStateBaseDebug, |
| PartitionStateBaseOpt>; |
| |
| struct PartitionState final : PartitionStateBase, NoCopy { |
| explicit PartitionState(PartitionId pid) |
| : PartitionStateBase(pid) {} |
| |
| void mark_grouper_open(TimeStamp closed_ts) { |
| assert(!this->grouper_closed_ts.has_value()); |
| this->grouper_closed_ts = closed_ts; |
| } |
| void mark_grouper_closed(TimeStamp closed_ts) { |
| assert(this->grouper_closed_ts.has_value()); |
| assert(this->grouper_closed_ts == closed_ts); |
| this->grouper_closed_ts.reset(); |
| } |
| bool is_grouper_open() const { |
| return this->grouper_closed_ts.has_value(); |
| } |
| TimeStamp get_grouper_closed_ts() const { |
| assert(this->grouper_closed_ts.has_value()); |
| return *this->grouper_closed_ts; |
| } |
| void mark_grouped_open(TimeStamp closed_ts) { |
| assert(!this->grouped_closed_ts.has_value()); |
| this->grouped_closed_ts = closed_ts; |
| } |
| void mark_grouped_closed(TimeStamp closed_ts) { |
| assert(this->grouped_closed_ts.has_value()); |
| assert(this->grouped_closed_ts == closed_ts); |
| this->grouped_closed_ts.reset(); |
| } |
| bool is_grouped_open() const { |
| return this->grouped_closed_ts.has_value(); |
| } |
| TimeStamp get_grouped_closed_ts() const { |
| assert(this->grouped_closed_ts.has_value()); |
| return *this->grouped_closed_ts; |
| } |
| void accumulate(SgCommandOut* co, PartitionId pid) { |
| this->check_partition(pid); |
| assert(this->is_grouped_open()); |
| this->have_data = true; |
| if (pid.global) |
| co->emit(SgCommand::SLURP_AND_ACCUMULATE_GLOBAL); |
| else |
| co->emit(SgCommand::SLURP_AND_ACCUMULATE, pid.partition); |
| } |
| void copy_last_to(SgCommandOut* co, |
| PartitionId pid, |
| PartitionId other_pid, |
| PartitionState* other_ps) { |
| this->check_partition(pid); |
| other_ps->check_partition(other_pid); |
| assert(pid.global); |
| assert(!other_pid.global); |
| assert(this->have_data); |
| other_ps->have_data = this->have_data; |
| co->emit(SgCommand::COPY_LAST_GLOBAL_TO, other_pid.partition); |
| } |
| void reset_data(SgCommandOut* co, |
| PartitionId pid, |
| bool keep_last = false) |
| { |
| this->check_partition(pid); |
| this->have_data = keep_last; |
| if (pid.global) |
| co->emit((keep_last |
| ? SgCommand::RESET_GLOBAL_KEEP_LAST |
| : SgCommand::RESET_GLOBAL)); |
| else |
| co->emit((keep_last |
| ? SgCommand::RESET_KEEP_LAST |
| : SgCommand::RESET), |
| pid.partition); |
| } |
| bool get_have_data() const { |
| return this->have_data; |
| } |
| void emit_aggregations(SgCommandOut* co, PartitionId pid) { |
| this->check_partition(pid); |
| assert(this->is_grouper_open()); |
| if (pid.global) |
| co->emit(SgCommand::EMIT_GLOBAL); |
| else |
| co->emit(SgCommand::EMIT, pid.partition); |
| } |
| private: |
| optional<TimeStamp> grouped_closed_ts; |
| optional<TimeStamp> grouper_closed_ts; |
| bool have_data = false; |
| }; |
| |
| unique_pyref |
| do_span_group(PyObject*, PyObject* py_args) |
| { |
| // TODO(dancol): support multiple grouped span tables? |
| |
| namespace ae = auto_event; |
| |
| auto args = PARSEPYARGS_V( |
| (pyref, unique_partition_values) |
| (pyref, metadata_grouped_source) |
| (pyref, metadata_grouper_source) |
| (pyref, metadata_out) |
| (pyref, command_out) |
| (QueryExecution*, qe) |
| (bool, grouper_is_partitioned) |
| (bool, grouped_is_partitioned) |
| (bool, grouped_is_span) |
| (bool, allow_empty_groups) |
| )(py_args); |
| |
| const bool do_log = false; |
| const bool grouper_is_partitioned = args.grouper_is_partitioned; |
| const bool grouped_is_partitioned = args.grouped_is_partitioned; |
| const bool grouped_is_span = args.grouped_is_span; |
| const bool allow_empty_groups = args.allow_empty_groups; |
| const bool permanent_partitions = args.unique_partition_values != Py_None; |
| |
| if (do_log) { |
| pylog.debug("grouper_is_partitioned=%s", grouper_is_partitioned); |
| pylog.debug("grouped_is_partitioned=%s", grouped_is_partitioned); |
| pylog.debug("grouped_is_span=%s", grouped_is_span); |
| pylog.debug("allow_empty_groups=%s", allow_empty_groups); |
| pylog.debug("permanent_partitions=%s", permanent_partitions); |
| } |
| |
| if (permanent_partitions != |
| (allow_empty_groups && |
| !grouper_is_partitioned && |
| grouped_is_partitioned)) |
| throw_invalid_query("invalid partition list usage"); |
| |
| const bool output_is_partitioned = |
| grouper_is_partitioned || grouped_is_partitioned; |
| SgCommandOut co(args.command_out.notnull().addref(), args.qe); |
| |
| // TODO(dancol): remove ordered mode |
| |
| using PartitionTable = HashTable<Partition, PartitionState>; |
| using PartitionTableIterator = |
| typename PartitionTable::iterator; |
| |
| // Main mutable state updated during traversal |
| nocopy_optional<PartitionTable> state_by_partition_u; |
| if (grouper_is_partitioned || grouped_is_partitioned) |
| state_by_partition_u.emplace(); |
| nocopy_optional<PartitionState> unpartitioned_ps; |
| if (!grouper_is_partitioned || !grouped_is_partitioned) |
| unpartitioned_ps.emplace(PartitionId::GLOBAL()); |
| |
| auto find_or_create_partition_state_u = [&](Partition partition, |
| bool* inserted=nullptr) |
| -> PartitionState& |
| { |
| assert(state_by_partition_u); |
| auto result = state_by_partition_u |
| ->try_emplace(partition, PartitionId(partition)); |
| if (inserted) |
| *inserted = result.second; |
| return result.first->second; |
| }; |
| |
| if (permanent_partitions) { // Pre-create all partitions |
| assert(state_by_partition_u); |
| pyarray_ref array = args.unique_partition_values.as<PyArrayObject>(); |
| NpyIteration1<Partition> iter(array); |
| if (!iter.is_at_eof()) { |
| do { |
| Partition partition = iter.get(); |
| find_or_create_partition_state_u(partition); |
| } while (iter.advance()); |
| } |
| } |
| |
| AUTOEVENT_DECLARE_EVENT( |
| GrouperOpen, |
| (TimeStamp, duration, ae::span_duration) |
| (Partition, partition, ae::partition) |
| ); |
| |
| AUTOEVENT_DECLARE_EVENT( |
| GrouperClosed, |
| (TimeStamp, open_ts) |
| (Partition, partition, ae::partition) |
| ); |
| |
| AUTOEVENT_DECLARE_EVENT( |
| GroupedOpen, |
| (TimeStamp, duration, ae::span_duration) |
| (Partition, partition, ae::partition) |
| ); |
| |
| AUTOEVENT_DECLARE_EVENT( |
| GroupedClosed, |
| (Partition, partition, ae::partition) |
| ); |
| |
| AUTOEVENT_DECLARE_EVENT( |
| GroupedEvent, |
| (Partition, partition, ae::partition) |
| ); |
| |
| auto closed_ts = [](TimeStamp ts, const auto& event) -> TimeStamp { |
| return ts + event.duration; |
| }; |
| |
| SpanTableResultBuffer span_out(addref(args.metadata_out), |
| args.qe, |
| output_is_partitioned); |
| |
| // In the code below, we have four combinations of partitioned and |
| // unpartitioned inputs: unpartitioned grouper and unpartitioned |
| // grouped (uu), partitioned grouper and unpartitioned grouped (pu), |
| // unpartitioned grouper and partitioned grouped (up), and |
| // partitioned grouper and partitioned grouped (pp). We define |
| // families of functions with members for each of the cases above. |
| // In some cases, the functions might bear a certain similarity to |
| // each other, but don't be tempted to combine them: that way lies |
| // madness, as, comingled, state management becomes impossible to |
| // reason about. |
| |
| // In the completely unpartitioned case (uu), we simply keep all |
| // information in unpartitioned_ps. |
| |
| // Unpartitioned grouper and partitioned grouped (up) is the |
| // "broadcast" scenario in which we replicate the grouper into each |
| // partition, storing the aggregate state in the partition table, |
| // but storing in unpartitioned_ps enough information to copy any |
| // open grouper state into a new per-partition state. |
| |
| // Partitioned grouper and unpartitioned grouped (pu) means that we |
| // essentially "replicate" the grouped data into each entry in the |
| // partition table; the overall result looks like the pp case, |
| // except that we use the unpartitioned_ps table as an intermediate |
| // buffer into which we read the grouped data before replicating |
| // them to the grouper partitions. |
| |
| // When both grouper and grouped are partitioned (pp), we just track |
| // the per-partition state in the partition table. |
| |
| // TODO(dancol): avoid emitting clear commands immediately before |
| // erase commands. It didn't matter much in the virtual aggregate |
| // case, but it's worth avoiding now that we're using |
| // command streams. |
| |
| auto maybe_delete_partition_u = [&](PartitionTableIterator it) { |
| const bool erase = !permanent_partitions && |
| !it->second.is_grouper_open() && |
| !it->second.is_grouped_open(); |
| if (!erase) |
| return std::pair(std::next(it), false); |
| co.emit(SgCommand::FORGET_PARTITION, it->first); |
| // Post-increment it so that we can return the next iterator. |
| // Abseil's hash map returns void from erase for some reason. |
| state_by_partition_u->erase(it++); |
| return std::pair(it, true); |
| }; |
| |
| auto find_partition_it_u = [&](Partition partition) { |
| assert(state_by_partition_u); |
| auto it = state_by_partition_u->find(partition); |
| assert(it != state_by_partition_u->end()); |
| return it; |
| }; |
| |
| auto for_each_partition_u = [&](auto functor) { |
| assert(state_by_partition_u); |
| for (auto& [partition, ps] : *state_by_partition_u) |
| functor(partition, ps); |
| }; |
| |
| auto for_each_partition_cleanup_u = [&](auto functor) { |
| bool erased; |
| assert(state_by_partition_u); |
| auto it = state_by_partition_u->begin(); |
| while (it != state_by_partition_u->end()) { |
| functor(it->first, it->second); |
| tie(it, erased) = maybe_delete_partition_u(it); |
| } |
| }; |
| |
| auto emit_ps = [&](TimeStamp open_ts, |
| TimeStamp closed_ts, |
| PartitionId pid, |
| PartitionState& ps) |
| { |
| span_out.add(open_ts, closed_ts - open_ts, pid.partition); |
| ps.emit_aggregations(&co, pid); |
| }; |
| |
| auto handle_grouper_open_uu = [&] |
| (TimeStamp ts, const GrouperOpen& gro) |
| { |
| const TimeStamp grouper_closed_ts = closed_ts(ts, gro); |
| assert(unpartitioned_ps); |
| unpartitioned_ps->mark_grouper_open(grouper_closed_ts); |
| }; |
| |
| auto handle_grouper_open_up = [&] |
| (TimeStamp ts, const GrouperOpen& gro) |
| { |
| const TimeStamp grouper_closed_ts = closed_ts(ts, gro); |
| assert(unpartitioned_ps); |
| unpartitioned_ps->mark_grouper_open(grouper_closed_ts); |
| for_each_partition_u([&](Partition partition, PartitionState& ps) { |
| ps.mark_grouper_open(grouper_closed_ts); |
| }); |
| }; |
| |
| auto handle_grouper_open_pu = [&] |
| (TimeStamp ts, const GrouperOpen& gro) |
| { |
| const TimeStamp grouper_closed_ts = closed_ts(ts, gro); |
| bool inserted; |
| PartitionState& ps = |
| find_or_create_partition_state_u(gro.partition, &inserted); |
| ps.mark_grouper_open(grouper_closed_ts); |
| assert(unpartitioned_ps); |
| if (inserted && unpartitioned_ps->is_grouped_open()) { |
| ps.mark_grouped_open(unpartitioned_ps->get_grouped_closed_ts()); |
| if (unpartitioned_ps->get_have_data()) |
| unpartitioned_ps->copy_last_to( |
| &co, |
| PartitionId::GLOBAL(), |
| /*other_pid=*/PartitionId(gro.partition), |
| &ps); |
| } |
| }; |
| |
| auto handle_grouper_open_pp = [&] |
| (TimeStamp ts, const GrouperOpen& gro) |
| { |
| const TimeStamp grouper_closed_ts = closed_ts(ts, gro); |
| PartitionState& ps = find_or_create_partition_state_u(gro.partition); |
| ps.mark_grouper_open(grouper_closed_ts); |
| }; |
| |
| auto handle_grouper_open = [&] |
| (auto&& ae, TimeStamp ts, const GrouperOpen& gro) |
| { |
| if (!grouper_is_partitioned) { |
| if (!grouped_is_partitioned) |
| handle_grouper_open_uu(ts, gro); |
| else |
| handle_grouper_open_up(ts, gro); |
| } else { |
| if (!grouped_is_partitioned) |
| handle_grouper_open_pu(ts, gro); |
| else |
| handle_grouper_open_pp(ts, gro); |
| } |
| |
| ae::enqueue_event( |
| ae, |
| ts + gro.duration, |
| GrouperClosed { |
| /*open_ts*/ts, |
| /*partition=*/gro.partition, |
| } |
| ); |
| }; |
| |
| auto grouper_emit_and_reset = [&]( |
| TimeStamp open_ts, |
| TimeStamp closed_ts, |
| PartitionId pid, |
| PartitionState& ps) |
| { |
| assert(ps.get_grouper_closed_ts() == closed_ts); |
| if (allow_empty_groups || ps.get_have_data()) { |
| emit_ps(open_ts, closed_ts, pid, ps); |
| if (ps.get_have_data()) { |
| bool will_continue = |
| ps.is_grouped_open() && |
| ps.get_grouped_closed_ts() > closed_ts; |
| ps.reset_data(&co, pid, /*keep_last=*/will_continue); |
| } |
| } |
| }; |
| |
| auto handle_grouper_closed_uu = [&] |
| (TimeStamp ts, const GrouperClosed& grc) |
| { |
| const TimeStamp open_ts = grc.open_ts; |
| const TimeStamp closed_ts = ts; |
| assert(unpartitioned_ps); |
| grouper_emit_and_reset(open_ts, |
| closed_ts, |
| PartitionId::GLOBAL(), |
| *unpartitioned_ps); |
| unpartitioned_ps->mark_grouper_closed(closed_ts); |
| }; |
| |
| auto handle_grouper_closed_up = [&] |
| (TimeStamp ts, const GrouperClosed& grc) |
| { |
| // Note that we emit more rows than we have grouper inputs in this |
| // case, one row per partition. |
| const TimeStamp open_ts = grc.open_ts; |
| const TimeStamp closed_ts = ts; |
| assert(unpartitioned_ps); |
| unpartitioned_ps->mark_grouper_closed(closed_ts); |
| for_each_partition_cleanup_u([&]( |
| Partition partition, PartitionState& ps) { |
| grouper_emit_and_reset(open_ts, |
| closed_ts, |
| PartitionId(partition), |
| ps); |
| ps.mark_grouper_closed(closed_ts); |
| }); |
| }; |
| |
| auto handle_grouper_closed_pu = [&] |
| (TimeStamp ts, const GrouperClosed& grc) |
| { |
| const TimeStamp open_ts = grc.open_ts; |
| const TimeStamp closed_ts = ts; |
| auto it = find_partition_it_u(grc.partition); |
| PartitionState& ps = it->second; |
| grouper_emit_and_reset(open_ts, |
| closed_ts, |
| PartitionId(grc.partition), |
| ps); |
| ps.mark_grouper_closed(closed_ts); |
| maybe_delete_partition_u(it); |
| }; |
| |
| auto handle_grouper_closed_pp = [&] |
| (TimeStamp ts, const GrouperClosed& grc) |
| { |
| const TimeStamp open_ts = grc.open_ts; |
| const TimeStamp closed_ts = ts; |
| auto it = find_partition_it_u(grc.partition); |
| PartitionState& ps = it->second; |
| grouper_emit_and_reset(open_ts, |
| closed_ts, |
| PartitionId(grc.partition), |
| ps); |
| ps.mark_grouper_closed(closed_ts); |
| maybe_delete_partition_u(it); |
| }; |
| |
| auto handle_grouper_closed = [&] |
| (auto&& ae, TimeStamp ts, const GrouperClosed& grc) |
| { |
| if (!grouper_is_partitioned) { |
| if (!grouped_is_partitioned) |
| handle_grouper_closed_uu(ts, grc); |
| else |
| handle_grouper_closed_up(ts, grc); |
| } else { |
| if (!grouped_is_partitioned) |
| handle_grouper_closed_pu(ts, grc); |
| else |
| handle_grouper_closed_pp(ts, grc); |
| } |
| }; |
| |
| // N.B. handle_grouped_open_* functions need to call accumulate |
| // exactly once in order to preserve row synchronicity. |
| |
| auto handle_grouped_open_uu = [&] |
| (TimeStamp ts, const GroupedOpen& gdo) |
| { |
| const TimeStamp grouped_closed_ts = closed_ts(ts, gdo); |
| assert(unpartitioned_ps); |
| unpartitioned_ps->mark_grouped_open(grouped_closed_ts); |
| unpartitioned_ps->accumulate(&co, PartitionId::GLOBAL()); |
| }; |
| |
| auto handle_grouped_open_up = [&] |
| (TimeStamp ts, const GroupedOpen& gdo) { |
| const TimeStamp grouped_closed_ts = closed_ts(ts, gdo); |
| bool inserted; |
| PartitionState& ps = |
| find_or_create_partition_state_u(gdo.partition, &inserted); |
| // We process GROUPER_OPEN before GROUPED_OPEN, so we might as |
| // well call mark_grouper_open here before mark_grouped_open. |
| assert(unpartitioned_ps); |
| if (inserted && unpartitioned_ps->is_grouper_open()) |
| ps.mark_grouper_open(unpartitioned_ps->get_grouper_closed_ts()); |
| ps.mark_grouped_open(grouped_closed_ts); |
| ps.accumulate(&co, PartitionId(gdo.partition)); |
| }; |
| |
| auto handle_grouped_open_pu = [&] |
| (TimeStamp ts, const GroupedOpen& gdo) |
| { |
| const TimeStamp grouped_closed_ts = closed_ts(ts, gdo); |
| assert(unpartitioned_ps); |
| unpartitioned_ps->mark_grouped_open(grouped_closed_ts); |
| unpartitioned_ps->accumulate(&co, PartitionId::GLOBAL()); |
| for_each_partition_u([&](Partition p, PartitionState& ps) { |
| ps.mark_grouped_open(grouped_closed_ts); |
| unpartitioned_ps->copy_last_to(&co, |
| PartitionId::GLOBAL(), |
| PartitionId(p), |
| &ps); |
| }); |
| }; |
| |
| auto handle_grouped_open_pp = [&] |
| (TimeStamp ts, const GroupedOpen& gdo) |
| { |
| const TimeStamp grouped_closed_ts = closed_ts(ts, gdo); |
| PartitionState& ps = find_or_create_partition_state_u(gdo.partition); |
| ps.mark_grouped_open(grouped_closed_ts); |
| ps.accumulate(&co, PartitionId(gdo.partition)); |
| }; |
| |
| auto handle_grouped_open = [&] |
| (auto&& ae, TimeStamp ts, const GroupedOpen& gdo) |
| { |
| if (!grouper_is_partitioned) { |
| if (!grouped_is_partitioned) |
| handle_grouped_open_uu(ts, gdo); |
| else |
| handle_grouped_open_up(ts, gdo); |
| } else { |
| if (!grouped_is_partitioned) |
| handle_grouped_open_pu(ts, gdo); |
| else |
| handle_grouped_open_pp(ts, gdo); |
| } |
| |
| ae::enqueue_event( |
| ae, |
| ts + gdo.duration, |
| GroupedClosed { |
| gdo.partition, |
| } |
| ); |
| }; |
| |
| auto handle_grouped_closed_uu = [&] |
| (TimeStamp ts, const GroupedClosed& gdc) |
| { |
| const TimeStamp grouped_closed_ts = ts; |
| assert(unpartitioned_ps); |
| unpartitioned_ps->mark_grouped_closed(grouped_closed_ts); |
| if (!unpartitioned_ps->is_grouper_open()) |
| unpartitioned_ps->reset_data(&co, PartitionId::GLOBAL()); |
| }; |
| |
| auto handle_grouped_closed_up = [&] |
| (TimeStamp ts, const GroupedClosed& gdc) |
| { |
| const TimeStamp grouped_closed_ts = ts; |
| auto it = find_partition_it_u(gdc.partition); |
| PartitionState& ps = it->second; |
| ps.mark_grouped_closed(grouped_closed_ts); |
| if (!ps.is_grouper_open()) |
| ps.reset_data(&co, PartitionId(it->first)); |
| maybe_delete_partition_u(it); |
| }; |
| |
| auto handle_grouped_closed_pu = [&] |
| (TimeStamp ts, const GroupedClosed& gdc) |
| { |
| const TimeStamp grouped_closed_ts = ts; |
| assert(unpartitioned_ps); |
| unpartitioned_ps->mark_grouped_closed(grouped_closed_ts); |
| unpartitioned_ps->reset_data(&co, PartitionId::GLOBAL()); |
| for_each_partition_cleanup_u([&](Partition p, PartitionState& ps) { |
| ps.mark_grouped_closed(grouped_closed_ts); |
| if (!ps.is_grouper_open()) |
| ps.reset_data(&co, PartitionId(p)); |
| }); |
| }; |
| |
| auto handle_grouped_closed_pp = [&] |
| (TimeStamp ts, const GroupedClosed& gdc) |
| { |
| const TimeStamp grouped_closed_ts = ts; |
| auto it = find_partition_it_u(gdc.partition); |
| PartitionState& ps = it->second; |
| ps.mark_grouped_closed(grouped_closed_ts); |
| if (!ps.is_grouper_open()) |
| ps.reset_data(&co, PartitionId(it->first)); |
| maybe_delete_partition_u(it); |
| }; |
| |
| auto handle_grouped_closed = [&] |
| (auto&& ae, TimeStamp ts, const GroupedClosed& gdc) |
| { |
| if (!grouper_is_partitioned) { |
| if (!grouped_is_partitioned) |
| handle_grouped_closed_uu(ts, gdc); |
| else |
| handle_grouped_closed_up(ts, gdc); |
| } else { |
| if (!grouped_is_partitioned) |
| handle_grouped_closed_pu(ts, gdc); |
| else |
| handle_grouped_closed_pp(ts, gdc); |
| } |
| }; |
| |
| auto handle_grouped_event = [&] |
| (auto&& ae, TimeStamp ts, const GroupedEvent& ge) |
| { |
| GroupedOpen gdo = { |
| /*duration=*/0, |
| /*partition=*/ge.partition, |
| }; |
| handle_grouped_open(AUTOFWD(ae), ts, gdo); |
| }; |
| |
| // We process events at the same "time" in the order given here. |
| // Order is important for correctness! |
| ae::process( |
| ae::synthetic_event<GroupedClosed>( |
| handle_grouped_closed), |
| ae::synthetic_event<GrouperClosed>( |
| handle_grouper_closed), |
| ae::input<GrouperOpen>( |
| args.metadata_grouper_source, |
| handle_grouper_open), |
| ae::input<GroupedOpen>( |
| args.metadata_grouped_source, |
| handle_grouped_open, |
| ae::enable_when(grouped_is_span)), |
| ae::input<GroupedEvent>( |
| args.metadata_grouped_source, |
| handle_grouped_event, |
| ae::enable_when(!grouped_is_span)) |
| ); |
| |
| span_out.flush(); |
| co.flush(); |
| return addref(Py_None); |
| } |
| |
| } // anonymous namespace |
| |
| |
| |
| static PyMethodDef functions[] = { |
| make_methoddef("span_group", |
| wraperr<do_span_group>(), |
| METH_VARARGS, |
| "Reshape a span table into the image of another"), |
| { 0 } |
| }; |
| |
| void |
| init_span_group(pyref m) |
| { |
| register_functions(m, functions); |
| } |
| |
| } // namespace dctv |