blob: bfecaede2d99ef1172f6a1ee690670be00d7193c [file] [log] [blame]
// Copyright (C) 2020 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "stackify_start_stop.h"
#include "hash_table.h"
#include "npyiter.h"
#include "pyobj.h"
#include "pyparsetuple.h"
#include "pyparsetuplenpy.h"
#include "result_buffer.h"
#include "span.h"
namespace dctv {
// TODO(dancol): XXX: remove these typedefs
using Token = int64_t;
using StackId = Partition;
using TaskId = Partition;
using Depth = int64_t;
static
unique_pyref
stackify_start_stop(PyObject*, PyObject* py_args)
{
auto args = PARSEPYARGS_V(
(pyref, chunk_iter)
(pyref, history_cb)
(pyref, stack_content_cb)
(QueryExecution*, qe)
)(py_args);
using Stack = Vector<Token>;
struct StackHasher final {
using result_type = size_t;
using argument_type = Stack;
size_t operator()(const Stack& stack) const noexcept {
HashSponge sponge;
for (const auto& value : stack)
sponge.absorb(value);
return sponge.squeeze();
}
};
StackId next_stack_id = 0;
HashTable<Stack, StackId, StackHasher> known_stacks;
HashTable<TaskId, Stack> current_stack_by_task;
auto stack_history =
ResultBuffer<TimeStamp, TaskId, StackId>(
addref(args.history_cb),
args.qe);
auto stack_content =
ResultBuffer<TaskId, Depth, Token>(
addref(args.stack_content_cb),
args.qe);
auto intern_stack = [&](const Stack& stack) -> StackId {
StackId stack_id;
auto it = known_stacks.find(stack);
if (it == known_stacks.end()) {
stack_id = next_stack_id++;
known_stacks[stack] = stack_id;
int depth = 0;
for (const auto& token : stack)
stack_content.add(stack_id, depth++, token);
} else {
stack_id = it->second;
}
return stack_id;
};
intern_stack(Stack()); // For side effect
for (PythonChunkIterator<TaskId, TimeStamp, Token, bool>
iter(addref(args.chunk_iter));
!iter.is_at_eof();
iter.advance()) {
auto [task_id, ts, token, end_flag] = iter.get();
Stack& stack = current_stack_by_task[task_id];
if (end_flag) {
if (!stack.empty())
stack.pop_back();
} else {
stack.push_back(token);
}
StackId stack_id = intern_stack(stack);
stack_history.add(ts, task_id, stack_id);
}
stack_content.flush();
stack_history.flush();
return addref(Py_None);
}
static PyMethodDef functions[] = {
make_methoddef("stackify_start_stop_qqqq",
wraperr<stackify_start_stop>(),
METH_VARARGS,
"Transform start and stop history into interned stacks"),
{ 0 }
};
void
init_stackify_start_stop(pyref m)
{
register_functions(m, functions);
}
} // namespace dctv