blob: ec8e9fc14a581a8c39f743e1b08fdd55ee25d93b [file] [log] [blame]
// Copyright (C) 2020 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "span_fixup.h"
#include <tuple>
#include "autoevent.h"
#include "hash_table.h"
#include "pyobj.h"
#include "pyparsetuple.h"
#include "pyparsetuplenpy.h"
#include "result_buffer.h"
#include "span.h"
#include "span_result_buffer.h"
namespace dctv {
static
unique_pyref
span_fixup(PyObject*, PyObject* args)
{
namespace ae = auto_event;
PARSEPYARGS(
(pyref, meta_source)
(pyref, meta_out)
(QueryExecution*, qe)
(bool, is_partitioned)
)(args);
struct PartitionState final {
TimeStamp open_ts = -1;
Index open_count = 0;
};
HashTable<Partition, PartitionState> partitions;
auto get_or_create_ps = [&](Partition partition)
-> PartitionState& {
return partitions.try_emplace(partition).first->second;
};
auto get_ps = [&](Partition partition) -> PartitionState& {
auto it = partitions.find(partition);
assert(it != partitions.end());
return it->second;
};
SpanTableResultBuffer meta_rb(addref(meta_out), qe, is_partitioned);
AUTOEVENT_DECLARE_EVENT(
Open,
(TimeStamp, duration)
(Partition, partition, ae::partition)
);
AUTOEVENT_DECLARE_EVENT(
Close,
(Partition, partition, ae::partition)
);
auto handle_open = [&](auto&& ae, TimeStamp ts, const Open& open) {
PartitionState& ps = get_or_create_ps(open.partition);
if (!ps.open_count)
ps.open_ts = ts;
ps.open_count += 1;
ae::enqueue_event(
ae,
ts + open.duration,
Close { open.partition });
};
auto handle_close = [&](auto&& ae, TimeStamp ts, const Close& close) {
PartitionState& ps = get_ps(close.partition);
assert(ps.open_count > 0);
ps.open_count -= 1;
if (!ps.open_count)
meta_rb.add(ps.open_ts, ts - ps.open_ts, close.partition);
};
ae::process(
ae::synthetic_event<Close>(handle_close),
ae::input<Open>(meta_source, handle_open)
);
meta_rb.flush();
return addref(Py_None);
}
static PyMethodDef functions[] = {
make_methoddef("span_fixup",
wraperr<span_fixup>(),
METH_VARARGS,
"Fix invalid span nesting"),
{ 0 }
};
void
init_span_fixup(pyref m)
{
register_functions(m, functions);
}
} // namespace dctv