| // Copyright (C) 2020 The Android Open Source Project |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| #include "span_fixup.h" |
| |
| #include <tuple> |
| |
| #include "autoevent.h" |
| #include "hash_table.h" |
| #include "pyobj.h" |
| #include "pyparsetuple.h" |
| #include "pyparsetuplenpy.h" |
| #include "result_buffer.h" |
| #include "span.h" |
| #include "span_result_buffer.h" |
| |
| namespace dctv { |
| |
| static |
| unique_pyref |
| span_fixup(PyObject*, PyObject* args) |
| { |
| namespace ae = auto_event; |
| |
| PARSEPYARGS( |
| (pyref, meta_source) |
| (pyref, meta_out) |
| (QueryExecution*, qe) |
| (bool, is_partitioned) |
| )(args); |
| |
| struct PartitionState final { |
| TimeStamp open_ts = -1; |
| Index open_count = 0; |
| }; |
| |
| HashTable<Partition, PartitionState> partitions; |
| |
| auto get_or_create_ps = [&](Partition partition) |
| -> PartitionState& { |
| return partitions.try_emplace(partition).first->second; |
| }; |
| |
| auto get_ps = [&](Partition partition) -> PartitionState& { |
| auto it = partitions.find(partition); |
| assert(it != partitions.end()); |
| return it->second; |
| }; |
| |
| SpanTableResultBuffer meta_rb(addref(meta_out), qe, is_partitioned); |
| |
| AUTOEVENT_DECLARE_EVENT( |
| Open, |
| (TimeStamp, duration) |
| (Partition, partition, ae::partition) |
| ); |
| |
| AUTOEVENT_DECLARE_EVENT( |
| Close, |
| (Partition, partition, ae::partition) |
| ); |
| |
| auto handle_open = [&](auto&& ae, TimeStamp ts, const Open& open) { |
| PartitionState& ps = get_or_create_ps(open.partition); |
| if (!ps.open_count) |
| ps.open_ts = ts; |
| ps.open_count += 1; |
| ae::enqueue_event( |
| ae, |
| ts + open.duration, |
| Close { open.partition }); |
| }; |
| |
| auto handle_close = [&](auto&& ae, TimeStamp ts, const Close& close) { |
| PartitionState& ps = get_ps(close.partition); |
| assert(ps.open_count > 0); |
| ps.open_count -= 1; |
| if (!ps.open_count) |
| meta_rb.add(ps.open_ts, ts - ps.open_ts, close.partition); |
| }; |
| |
| ae::process( |
| ae::synthetic_event<Close>(handle_close), |
| ae::input<Open>(meta_source, handle_open) |
| ); |
| |
| meta_rb.flush(); |
| return addref(Py_None); |
| } |
| |
| |
| |
| static PyMethodDef functions[] = { |
| make_methoddef("span_fixup", |
| wraperr<span_fixup>(), |
| METH_VARARGS, |
| "Fix invalid span nesting"), |
| { 0 } |
| }; |
| |
| void |
| init_span_fixup(pyref m) |
| { |
| register_functions(m, functions); |
| } |
| |
| } // namespace dctv |