blob: f5644d234e723200e8e25dad2ad67cf8097e54c5 [file] [log] [blame]
// Copyright (C) 2020 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "take.h"
#include <tuple>
#include <type_traits>
#include <utility>
#include "automethod.h"
#include "block.h"
#include "npyiter.h"
#include "optional.h"
#include "pyerr.h"
#include "pyobj.h"
#include "pyparsetuple.h"
#include "pyparsetuplenpy.h"
#include "pyseq.h"
#include "query.h"
namespace dctv {
namespace {
using std::tie;
using IndexIterator = NpyIteration<1, 1>;
using ValueIterator = NpyIteration<2, 2>;
struct IndexCheckOpt {
IndexCheckOpt(Index = 0) {} // NOLINT
void check_index(Index) {}
};
struct IndexCheckDbg {
IndexCheckDbg(Index last_index = 0) // NOLINT
: last_index(last_index) {}
Index last_index;
void check_index(Index index) {
if (index != -1 && this->last_index != index) {
if (this->last_index > index)
throw_invalid_query_fmt(
"bad index sequence: last_index=%s > index=%s",
this->last_index, index);
this->last_index = index;
}
}
};
using IndexCheck =
std::conditional_t<safe_mode, IndexCheckDbg, IndexCheckOpt>;
IndexIterator
make_indexit(pyarray_ref array)
{
NpyIterConfig config;
config.casting = NPY_NO_CASTING;
config.iter_flags = 0; // Operand views can't be buffered
unique_dtype dtype_buf = type_descr<Index>();
PyArrayObject* refs[1] = { array.notnull().get() };
PyArray_Descr* dtypes[1] = { dtype_buf.get() };
return IndexIterator({&refs[0], 1}, {&dtypes[0], 1}, config);
}
ValueIterator
make_value_iterator(dtype_ref dtype,
pyarray_ref values,
pyarray_ref mask /* nullable */)
{
constexpr int nop = 2;
NpyIterConfig config;
config.casting = NPY_NO_CASTING;
config.iter_flags = 0; // Operand views can't be buffered
unique_pyarray mask_fill;
if (!mask)
mask_fill = make_false_array();
PyArrayObject* inputs[2] = {
values.notnull().get(),
mask ? mask.get() : mask_fill.get(),
};
unique_dtype bool_dtype = type_descr<bool>();
PyArray_Descr* dtypes[nop] = {
dtype.notnull().get(),
bool_dtype.notnull().get(),
};
return ValueIterator({&inputs[0], nop}, {&dtypes[0], nop}, config);
}
unique_pyarray
make_out_mask(QueryCache* qc,
npy_intp size,
npy_intp out_index)
{
unique_pyarray out_mask = make_uninit_hunk_array(
qc, type_descr<bool>(), size);
assume(out_index >= 0);
memset(npy_data<bool>(out_mask), 0, sizeof (bool) * out_index);
return out_mask;
}
npy_intp
index_sequential_prefix_length(IndexIterator* indexit_io)
{
npy_intp prefix_length = 0;
if (!indexit_io->is_at_eof()) {
IndexIterator it = std::move(*indexit_io);
FINALLY({
it.rewind();
*indexit_io = std::move(it);
});
Index last = it.get<Index>(0);
prefix_length += 1;
while (it.advance()) {
Index cur = it.get<Index>(0);
if (last + 1 != cur)
break;
last = cur;
prefix_length += 1;
}
}
return prefix_length;
}
std::pair<Index, npy_intp>
check_indexer_can_broadcast(IndexIterator* indexit_io,
Index index_min,
Index index_max)
{
assume(0 <= index_min);
assume(index_min <= index_max);
if (indexit_io->is_at_eof())
return {-1, 0};
IndexIterator it = std::move(*indexit_io);
FINALLY({
it.rewind();
*indexit_io = std::move(it);
});
IndexCheck check(index_min);
Index index;
npy_intp count = 0;
do {
index = it.get<Index>(0);
if (!(index_min <= index && index <= index_max))
return {-1, 0};
check.check_index(index);
count += 1;
} while (it.advance());
return {index, count};
}
void
fast_forward_to_value_index(Index wanted_index,
ValueIterator* valueit_io,
Index* value_index_io) noexcept
{
if (valueit_io->is_at_eof())
return;
Index value_index = *value_index_io;
if (value_index == wanted_index)
return;
assume(value_index < wanted_index);
ValueIterator it = std::move(*valueit_io);
FINALLY({
*valueit_io = std::move(it);
*value_index_io = value_index;
});
while (value_index < wanted_index && !it.is_at_eof()) {
value_index++;
it.advance();
}
}
struct SequentialTaker final : BasePyObject, HasPyCtor {
SequentialTaker(unique_obj_pyref<QueryCache> qc,
unique_dtype dtype,
bool bcast_conservative);
unique_obj_pyref<Block> step(obj_pyref<Block> index,
obj_pyref<Block> value);
inline bool want_value() const noexcept;
inline bool want_index() const noexcept;
static PyTypeObject pytype;
private:
template<typename T> void step_loop(T dummy);
unique_obj_pyref<Block> try_passthrough();
unique_obj_pyref<Block> try_value_broadcast();
unique_obj_pyref<QueryCache> qc;
unique_dtype dtype;
// Packaging these values into a separate structure lets us move it
// to the stack during inner loop execution, avoiding memory loads.
// (It's a poor man's restrict.)
struct State final : IndexCheck {
npy_intp value_index = 0;
npy_intp value_index_at_reload;
ValueIterator valueit;
IndexIterator indexit;
npy_intp out_index = 0;
unique_pyarray out_data;
unique_pyarray out_mask;
bool bcast_conservative;
inline bool want_value() const noexcept;
inline bool want_index() const noexcept;
};
State saved_state;
static PyMethodDef pymethods[];
static PyGetSetDef pygetset[];
};
SequentialTaker::SequentialTaker(unique_obj_pyref<QueryCache> qc,
unique_dtype dtype,
bool bcast_conservative)
: qc(std::move(qc)),
dtype(std::move(dtype))
{
this->saved_state.bcast_conservative = bcast_conservative;
}
unique_obj_pyref<Block>
SequentialTaker::step(obj_pyref<Block> index, obj_pyref<Block> value)
{
State* s = &this->saved_state;
if (index) {
if (!s->want_index())
throw_pyerr_msg(PyExc_RuntimeError,
"index replenished but not depleted");
if (index->has_mask())
throw_invalid_query("index may not have NULLs");
assert(!s->out_data);
assert(!s->out_mask);
if (index->get_size() == 0)
return Block::from_arrays(
this->qc.get(),
make_empty_pyarray(this->dtype.notnull().addref()),
/*mask=*/unique_pyarray());
assert(s->indexit.is_at_eof());
s->indexit = make_indexit(index->get_data());
assume(s->out_index == 0);
}
if (value) {
if (!s->want_value())
throw_pyerr_msg(PyExc_RuntimeError,
"value replenished but not yet depleted");
// We always have a valid index because we don't know we want a
// value block until an index tells us to pull something other
// than -1.
assert(!s->indexit.is_at_eof());
assert(s->indexit.get<Index>(0) >= 0);
if (value->get_size() == 0)
throw_invalid_query_fmt("early value EOF at index %s "
"looking for index %s",
s->value_index,
s->indexit.get<Index>(0));
s->valueit = make_value_iterator(
this->dtype,
value->get_data(),
value->get_mask());
s->value_index_at_reload = s->value_index;
assert(!s->want_value());
}
if (this->want_index())
return {};
// If we want a non-NULL value, skip ahead to it before trying the
// optimizations below.
Index cur_index = s->indexit.get<Index>(0);
if (cur_index >= 0) {
s->check_index(cur_index);
fast_forward_to_value_index(s->indexit.get<Index>(0),
&s->valueit,
&s->value_index);
if (s->valueit.is_at_eof()) {
assert(s->want_value());
s->valueit = ValueIterator();
return {};
}
}
assert(!s->want_index());
assert(!s->want_value());
// See whether various optimizations might apply.
if (unique_obj_pyref<Block> bcasted = this->try_value_broadcast())
return bcasted;
if (unique_obj_pyref<Block> sequential = this->try_passthrough())
return sequential;
// We have everything we need to mint a block on the normal path.
// Past this point, we need to materialize our output array.
if (!s->out_data) {
assume(s->out_index == 0);
assume(!s->out_mask);
s->out_data = make_uninit_hunk_array(
this->qc,
this->dtype.notnull().addref(),
s->indexit.size());
}
// Do some copying.
npy_type_dispatch(this->dtype, [&](auto dummy) {
this->step_loop(dummy);
});
// We might have run out of data before being able to fill the
// output array.
if (!s->indexit.is_at_eof()) {
assert(s->want_value());
return {};
}
// Drain our buffers.
unique_pyarray out_data = std::move(s->out_data);
unique_pyarray out_mask = std::move(s->out_mask);
s->indexit = IndexIterator();
s->out_index = 0;
return Block::from_arrays(this->qc.get(),
std::move(out_data),
std::move(out_mask));
}
template<typename T>
void
SequentialTaker::step_loop(T)
{
State s_onstack = std::move(this->saved_state);
FINALLY(this->saved_state = std::move(s_onstack));
State* s = &s_onstack;
assert(!s->indexit.is_at_eof());
T* out_data = npy_data<T>(s->out_data);
bool* out_mask = s->out_mask
? npy_data<bool>(s->out_mask)
: nullptr;
do {
Index index = s->indexit.template get<Index>(0);
s->check_index(index);
T value;
bool mask;
if (index < 0) {
value = 0;
mask = true;
} else {
while (!s->valueit.is_at_eof() &&
s->value_index < index) {
s->valueit.advance();
s->value_index += 1;
}
if (s->valueit.is_at_eof()) {
s->valueit = ValueIterator();
break;
}
value = s->valueit.get<T>(0);
mask = s->valueit.get<bool>(1);
}
if (mask && !out_mask) {
s->out_mask = make_out_mask(
this->qc.get(),
s->indexit.size(),
s->out_index);
out_mask = npy_data<bool>(s->out_mask);
}
// Cannot fail past this point.
out_data[s->out_index] = value;
if (out_mask)
out_mask[s->out_index] = mask;
s->indexit.advance();
s->out_index += 1;
} while (!s->indexit.is_at_eof());
}
unique_obj_pyref<Block>
SequentialTaker::try_passthrough()
{
State* s = &this->saved_state;
// If we've already built a partial output. Keep using the normal
// output strategy. Below, we can assume that the size of the index
// iterator is the number of index elements remaining.
if (s->out_data)
return {};
// If we have no output array, we must be at the start of
// the output.
assume(s->out_index == 0);
// Checked in caller
assume(!s->indexit.is_at_eof());
// We can't handle NULLs via sequential take because we require
// values "under" NULL to be zero and can't guarantee that of the
// value array if we were to just forward it without reading it.
Index cur_index = s->indexit.get<Index>(0);
if (cur_index < 0)
return {};
// Checked in caller
assume(s->value_index == cur_index);
// Figure out how many values we can take sequentially.
npy_intp index_nseq = index_sequential_prefix_length(&s->indexit);
npy_intp value_chunk_used = s->value_index - s->value_index_at_reload;
assume(value_chunk_used >= 0);
npy_intp value_chunk_total = s->valueit.size();
assume(value_chunk_total > 0);
npy_intp value_chunk_left = value_chunk_total - value_chunk_used;
assume(value_chunk_left > 0);
npy_intp nseq = std::min(index_nseq, value_chunk_left);
assume(nseq > 0);
// Let's do the sequential take only when we can grab a whole index
// block, which should hopefully prevent internal BlockBuilder
// buffering that would defeat the whole point of this optimization.
if (nseq != s->indexit.size())
return {};
// Now we know we can take sequentially.
npy_intp vstart = value_chunk_used;
npy_intp vend = vstart + nseq;
bool vwhole = vstart == 0 && vend == value_chunk_total;
unique_pyarray value_data = s->valueit.get_operand_view(0);
unique_pyarray data_chunk = vwhole
? value_data.addref()
: get_npy_slice(value_data, vstart, vend);
unique_pyarray value_mask = s->valueit.get_operand_view(1);
unique_pyarray mask_chunk = vwhole
? value_mask.addref()
: get_npy_slice(value_mask, vstart, vend);
maybe_drop_mask(&mask_chunk);
unique_obj_pyref<Block> block =
Block::from_arrays(this->qc.get(),
std::move(data_chunk),
std::move(mask_chunk));
// Cannot fail past this point.
// Skip the values we slurped above. Leave the last value in place
// so that the next index hunk can reference it if needed.
cur_index += nseq - 1;
s->check_index(cur_index);
fast_forward_to_value_index(cur_index,
&s->valueit,
&s->value_index);
// We exhausted the whole index array, so we can just fix up the
// index iterator by nullifying it.
assume(!s->out_data);
assume(!s->out_mask);
assume(s->out_index == 0);
s->indexit = IndexIterator();
return block;
}
// If we're indexing into broadcasted data, we'll always yield the
// same output, so just return that value, but broadcasted. We can
// perform this optimization only when we're not adding NULL values
// due to our requirement that the value under NULL is zero.
unique_obj_pyref<Block>
SequentialTaker::try_value_broadcast()
{
State* s = &this->saved_state;
// If we've already built a partial output, keep going.
if (s->out_data)
return {};
// If we have no output array, we must be at the start of
// the output.
assume(s->out_index == 0);
// Checked by caller
assume(!s->indexit.is_at_eof());
Index cur_index = s->indexit.get<Index>(0);
// We can't broadcast NULLs in the indexer due to the
// zero-under-NULL constraint. (Existing NULLs in the input data
// array work fine.)
if (cur_index < 0)
return {};
// Checked in caller
assume(s->value_index == cur_index);
unique_pyarray value_data_broadcaster =
npy_get_broadcaster(s->valueit.get_operand_view(0));
if (!value_data_broadcaster)
return {};
bool bcast_masked;
{
unique_pyarray mask_view = s->valueit.get_operand_view(1);
unique_pyarray mask_broadcaster = npy_get_broadcaster(mask_view);
if (!mask_broadcaster)
return {};
bcast_masked = npy_data<bool>(mask_broadcaster)[0];
}
Index final_index;
npy_intp bcast_count;
Index index_min = s->value_index;
Index index_max;
if (s->bcast_conservative) {
Index it_start_index = s->value_index_at_reload;
assume(it_start_index <= index_min);
Index value_used = index_min - it_start_index;
assume(value_used >= 0);
Index value_remaining = s->valueit.size() - value_used;
assume(value_remaining > 0);
index_max = index_min + value_remaining;
} else {
index_max = std::numeric_limits<Index>::max();
}
tie(final_index, bcast_count) =
check_indexer_can_broadcast(&s->indexit, index_min, index_max);
if (final_index < 0)
return {};
assume(bcast_count >= 0);
unique_pyarray bcast_data =
npy_broadcast_to(value_data_broadcaster, bcast_count);
unique_pyarray bcast_mask;
if (bcast_masked)
bcast_mask = npy_broadcast_to(make_true_array(), bcast_count);
// Can't fail past here.
cur_index = final_index;
s->check_index(final_index);
fast_forward_to_value_index(cur_index,
&s->valueit,
&s->value_index);
// We used the whole index block, so we can just reset the
// index state.
s->indexit = IndexIterator();
assume(!s->out_data);
assume(!s->out_mask);
assume(s->out_index == 0);
return Block::from_arrays(
this->qc.get(),
std::move(bcast_data),
std::move(bcast_mask));
}
bool
SequentialTaker::State::want_index() const noexcept
{
const State* s = this;
return s->indexit.is_at_eof();
}
bool
SequentialTaker::want_index() const noexcept
{
return this->saved_state.want_index();
}
bool
SequentialTaker::State::want_value() const noexcept
{
const State* s = this;
return (!s->indexit.is_at_eof() &&
s->indexit.get<Index>(0) >= 0 &&
s->valueit.is_at_eof());
}
bool
SequentialTaker::want_value() const noexcept
{
return this->saved_state.want_value();
}
unique_pyref
Taker_want_index(PyObject* self, void*)
{
return make_pybool(
pyref(self).as_unsafe<SequentialTaker>()
->want_index());
}
unique_pyref
Taker_want_value(PyObject* self, void*)
{
return make_pybool(
pyref(self).as_unsafe<SequentialTaker>()
->want_value());
}
static
unique_pyref
study_index_for_take(pyarray_ref array)
{
NpyIteration1<Index> it(array);
bool sequential = true;
bool has_nulls = false;
if (!it.is_at_eof()) {
Index last = it.get();
has_nulls = last < 0;
while (!(sequential && has_nulls) && it.advance()) {
Index index = it.get();
sequential = sequential && last + 1 == index;
has_nulls = has_nulls || index < 0;
last = index;
}
}
return pytuple::of(make_pybool(sequential), make_pybool(has_nulls));
}
} // anonymous namespace
template<>
unique_obj_pyref<SequentialTaker>
PythonConstructor<SequentialTaker>::make(
PyTypeObject* type,
pyref args,
pyref kwargs)
{
PARSEPYARGS(
(QueryCache*, qc)
(unique_dtype, dtype, no_default, convert_dtype)
(bool, bcast_conserverative)
)(args, kwargs);
return unique_obj_pyref<SequentialTaker>::make(
type,
xaddref_unsafe(qc),
std::move(dtype),
bcast_conserverative);
}
PyMethodDef SequentialTaker::pymethods[] = {
AUTOMETHOD(&SequentialTaker::step,
"step",
"Perform an execution step",
(OPTIONAL_ARGS_FOLLOW)
(obj_pyref<Block>, index)
(obj_pyref<Block>, value)
),
{ 0 },
};
PyGetSetDef SequentialTaker::pygetset[] = {
make_getset("want_index",
"Whether we want an index block",
wraperr<Taker_want_index>()),
make_getset("want_value",
"Whether we want a value block",
wraperr<Taker_want_value>()),
{ 0 },
};
PyTypeObject SequentialTaker::pytype = make_py_type<SequentialTaker>(
"dctv._native.SequentialTaker",
"Implements sequential take support",
[](PyTypeObject* t) {
t->tp_methods = SequentialTaker::pymethods;
t->tp_getset = SequentialTaker::pygetset;
});
static PyMethodDef functions[] = {
AUTOFUNCTION(study_index_for_take,
"Study an index array for TakeQuery",
(unique_pyarray, array, no_default, convert_any_array)
),
{ 0 }
};
void
init_take(pyref m)
{
register_type(m, &SequentialTaker::pytype);
register_functions(m, functions);
}
} // namespace dctv