| // Copyright (C) 2020 The Android Open Source Project |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| #include "take.h" |
| |
| #include <tuple> |
| #include <type_traits> |
| #include <utility> |
| |
| #include "automethod.h" |
| #include "block.h" |
| #include "npyiter.h" |
| #include "optional.h" |
| #include "pyerr.h" |
| #include "pyobj.h" |
| #include "pyparsetuple.h" |
| #include "pyparsetuplenpy.h" |
| #include "pyseq.h" |
| #include "query.h" |
| |
| namespace dctv { |
| namespace { |
| |
| using std::tie; |
| |
| using IndexIterator = NpyIteration<1, 1>; |
| using ValueIterator = NpyIteration<2, 2>; |
| |
| struct IndexCheckOpt { |
| IndexCheckOpt(Index = 0) {} // NOLINT |
| void check_index(Index) {} |
| }; |
| |
| struct IndexCheckDbg { |
| IndexCheckDbg(Index last_index = 0) // NOLINT |
| : last_index(last_index) {} |
| Index last_index; |
| void check_index(Index index) { |
| if (index != -1 && this->last_index != index) { |
| if (this->last_index > index) |
| throw_invalid_query_fmt( |
| "bad index sequence: last_index=%s > index=%s", |
| this->last_index, index); |
| this->last_index = index; |
| } |
| } |
| }; |
| |
| using IndexCheck = |
| std::conditional_t<safe_mode, IndexCheckDbg, IndexCheckOpt>; |
| |
| IndexIterator |
| make_indexit(pyarray_ref array) |
| { |
| NpyIterConfig config; |
| config.casting = NPY_NO_CASTING; |
| config.iter_flags = 0; // Operand views can't be buffered |
| unique_dtype dtype_buf = type_descr<Index>(); |
| PyArrayObject* refs[1] = { array.notnull().get() }; |
| PyArray_Descr* dtypes[1] = { dtype_buf.get() }; |
| return IndexIterator({&refs[0], 1}, {&dtypes[0], 1}, config); |
| } |
| |
| ValueIterator |
| make_value_iterator(dtype_ref dtype, |
| pyarray_ref values, |
| pyarray_ref mask /* nullable */) |
| { |
| constexpr int nop = 2; |
| NpyIterConfig config; |
| config.casting = NPY_NO_CASTING; |
| config.iter_flags = 0; // Operand views can't be buffered |
| unique_pyarray mask_fill; |
| if (!mask) |
| mask_fill = make_false_array(); |
| PyArrayObject* inputs[2] = { |
| values.notnull().get(), |
| mask ? mask.get() : mask_fill.get(), |
| }; |
| unique_dtype bool_dtype = type_descr<bool>(); |
| PyArray_Descr* dtypes[nop] = { |
| dtype.notnull().get(), |
| bool_dtype.notnull().get(), |
| }; |
| return ValueIterator({&inputs[0], nop}, {&dtypes[0], nop}, config); |
| } |
| |
| unique_pyarray |
| make_out_mask(QueryCache* qc, |
| npy_intp size, |
| npy_intp out_index) |
| { |
| unique_pyarray out_mask = make_uninit_hunk_array( |
| qc, type_descr<bool>(), size); |
| assume(out_index >= 0); |
| memset(npy_data<bool>(out_mask), 0, sizeof (bool) * out_index); |
| return out_mask; |
| } |
| |
| npy_intp |
| index_sequential_prefix_length(IndexIterator* indexit_io) |
| { |
| npy_intp prefix_length = 0; |
| if (!indexit_io->is_at_eof()) { |
| IndexIterator it = std::move(*indexit_io); |
| FINALLY({ |
| it.rewind(); |
| *indexit_io = std::move(it); |
| }); |
| Index last = it.get<Index>(0); |
| prefix_length += 1; |
| while (it.advance()) { |
| Index cur = it.get<Index>(0); |
| if (last + 1 != cur) |
| break; |
| last = cur; |
| prefix_length += 1; |
| } |
| } |
| return prefix_length; |
| } |
| |
| std::pair<Index, npy_intp> |
| check_indexer_can_broadcast(IndexIterator* indexit_io, |
| Index index_min, |
| Index index_max) |
| { |
| assume(0 <= index_min); |
| assume(index_min <= index_max); |
| if (indexit_io->is_at_eof()) |
| return {-1, 0}; |
| IndexIterator it = std::move(*indexit_io); |
| FINALLY({ |
| it.rewind(); |
| *indexit_io = std::move(it); |
| }); |
| IndexCheck check(index_min); |
| Index index; |
| npy_intp count = 0; |
| do { |
| index = it.get<Index>(0); |
| if (!(index_min <= index && index <= index_max)) |
| return {-1, 0}; |
| check.check_index(index); |
| count += 1; |
| } while (it.advance()); |
| return {index, count}; |
| } |
| |
| void |
| fast_forward_to_value_index(Index wanted_index, |
| ValueIterator* valueit_io, |
| Index* value_index_io) noexcept |
| { |
| if (valueit_io->is_at_eof()) |
| return; |
| Index value_index = *value_index_io; |
| if (value_index == wanted_index) |
| return; |
| assume(value_index < wanted_index); |
| ValueIterator it = std::move(*valueit_io); |
| FINALLY({ |
| *valueit_io = std::move(it); |
| *value_index_io = value_index; |
| }); |
| while (value_index < wanted_index && !it.is_at_eof()) { |
| value_index++; |
| it.advance(); |
| } |
| } |
| |
| struct SequentialTaker final : BasePyObject, HasPyCtor { |
| SequentialTaker(unique_obj_pyref<QueryCache> qc, |
| unique_dtype dtype, |
| bool bcast_conservative); |
| unique_obj_pyref<Block> step(obj_pyref<Block> index, |
| obj_pyref<Block> value); |
| inline bool want_value() const noexcept; |
| inline bool want_index() const noexcept; |
| |
| static PyTypeObject pytype; |
| private: |
| template<typename T> void step_loop(T dummy); |
| unique_obj_pyref<Block> try_passthrough(); |
| unique_obj_pyref<Block> try_value_broadcast(); |
| |
| unique_obj_pyref<QueryCache> qc; |
| unique_dtype dtype; |
| // Packaging these values into a separate structure lets us move it |
| // to the stack during inner loop execution, avoiding memory loads. |
| // (It's a poor man's restrict.) |
| struct State final : IndexCheck { |
| npy_intp value_index = 0; |
| npy_intp value_index_at_reload; |
| ValueIterator valueit; |
| IndexIterator indexit; |
| npy_intp out_index = 0; |
| unique_pyarray out_data; |
| unique_pyarray out_mask; |
| bool bcast_conservative; |
| inline bool want_value() const noexcept; |
| inline bool want_index() const noexcept; |
| |
| }; |
| State saved_state; |
| |
| static PyMethodDef pymethods[]; |
| static PyGetSetDef pygetset[]; |
| }; |
| |
| SequentialTaker::SequentialTaker(unique_obj_pyref<QueryCache> qc, |
| unique_dtype dtype, |
| bool bcast_conservative) |
| : qc(std::move(qc)), |
| dtype(std::move(dtype)) |
| { |
| this->saved_state.bcast_conservative = bcast_conservative; |
| } |
| |
| unique_obj_pyref<Block> |
| SequentialTaker::step(obj_pyref<Block> index, obj_pyref<Block> value) |
| { |
| State* s = &this->saved_state; |
| |
| if (index) { |
| if (!s->want_index()) |
| throw_pyerr_msg(PyExc_RuntimeError, |
| "index replenished but not depleted"); |
| if (index->has_mask()) |
| throw_invalid_query("index may not have NULLs"); |
| assert(!s->out_data); |
| assert(!s->out_mask); |
| if (index->get_size() == 0) |
| return Block::from_arrays( |
| this->qc.get(), |
| make_empty_pyarray(this->dtype.notnull().addref()), |
| /*mask=*/unique_pyarray()); |
| assert(s->indexit.is_at_eof()); |
| s->indexit = make_indexit(index->get_data()); |
| assume(s->out_index == 0); |
| } |
| |
| if (value) { |
| if (!s->want_value()) |
| throw_pyerr_msg(PyExc_RuntimeError, |
| "value replenished but not yet depleted"); |
| // We always have a valid index because we don't know we want a |
| // value block until an index tells us to pull something other |
| // than -1. |
| assert(!s->indexit.is_at_eof()); |
| assert(s->indexit.get<Index>(0) >= 0); |
| if (value->get_size() == 0) |
| throw_invalid_query_fmt("early value EOF at index %s " |
| "looking for index %s", |
| s->value_index, |
| s->indexit.get<Index>(0)); |
| s->valueit = make_value_iterator( |
| this->dtype, |
| value->get_data(), |
| value->get_mask()); |
| s->value_index_at_reload = s->value_index; |
| assert(!s->want_value()); |
| } |
| |
| if (this->want_index()) |
| return {}; |
| |
| // If we want a non-NULL value, skip ahead to it before trying the |
| // optimizations below. |
| Index cur_index = s->indexit.get<Index>(0); |
| if (cur_index >= 0) { |
| s->check_index(cur_index); |
| fast_forward_to_value_index(s->indexit.get<Index>(0), |
| &s->valueit, |
| &s->value_index); |
| if (s->valueit.is_at_eof()) { |
| assert(s->want_value()); |
| s->valueit = ValueIterator(); |
| return {}; |
| } |
| } |
| |
| assert(!s->want_index()); |
| assert(!s->want_value()); |
| |
| // See whether various optimizations might apply. |
| if (unique_obj_pyref<Block> bcasted = this->try_value_broadcast()) |
| return bcasted; |
| if (unique_obj_pyref<Block> sequential = this->try_passthrough()) |
| return sequential; |
| |
| // We have everything we need to mint a block on the normal path. |
| // Past this point, we need to materialize our output array. |
| if (!s->out_data) { |
| assume(s->out_index == 0); |
| assume(!s->out_mask); |
| s->out_data = make_uninit_hunk_array( |
| this->qc, |
| this->dtype.notnull().addref(), |
| s->indexit.size()); |
| } |
| |
| // Do some copying. |
| npy_type_dispatch(this->dtype, [&](auto dummy) { |
| this->step_loop(dummy); |
| }); |
| |
| // We might have run out of data before being able to fill the |
| // output array. |
| if (!s->indexit.is_at_eof()) { |
| assert(s->want_value()); |
| return {}; |
| } |
| |
| // Drain our buffers. |
| unique_pyarray out_data = std::move(s->out_data); |
| unique_pyarray out_mask = std::move(s->out_mask); |
| s->indexit = IndexIterator(); |
| s->out_index = 0; |
| return Block::from_arrays(this->qc.get(), |
| std::move(out_data), |
| std::move(out_mask)); |
| } |
| |
| template<typename T> |
| void |
| SequentialTaker::step_loop(T) |
| { |
| State s_onstack = std::move(this->saved_state); |
| FINALLY(this->saved_state = std::move(s_onstack)); |
| State* s = &s_onstack; |
| assert(!s->indexit.is_at_eof()); |
| T* out_data = npy_data<T>(s->out_data); |
| bool* out_mask = s->out_mask |
| ? npy_data<bool>(s->out_mask) |
| : nullptr; |
| do { |
| Index index = s->indexit.template get<Index>(0); |
| s->check_index(index); |
| T value; |
| bool mask; |
| if (index < 0) { |
| value = 0; |
| mask = true; |
| } else { |
| while (!s->valueit.is_at_eof() && |
| s->value_index < index) { |
| s->valueit.advance(); |
| s->value_index += 1; |
| } |
| if (s->valueit.is_at_eof()) { |
| s->valueit = ValueIterator(); |
| break; |
| } |
| value = s->valueit.get<T>(0); |
| mask = s->valueit.get<bool>(1); |
| } |
| if (mask && !out_mask) { |
| s->out_mask = make_out_mask( |
| this->qc.get(), |
| s->indexit.size(), |
| s->out_index); |
| out_mask = npy_data<bool>(s->out_mask); |
| } |
| // Cannot fail past this point. |
| out_data[s->out_index] = value; |
| if (out_mask) |
| out_mask[s->out_index] = mask; |
| s->indexit.advance(); |
| s->out_index += 1; |
| } while (!s->indexit.is_at_eof()); |
| } |
| |
| unique_obj_pyref<Block> |
| SequentialTaker::try_passthrough() |
| { |
| State* s = &this->saved_state; |
| // If we've already built a partial output. Keep using the normal |
| // output strategy. Below, we can assume that the size of the index |
| // iterator is the number of index elements remaining. |
| if (s->out_data) |
| return {}; |
| |
| // If we have no output array, we must be at the start of |
| // the output. |
| assume(s->out_index == 0); |
| |
| // Checked in caller |
| assume(!s->indexit.is_at_eof()); |
| |
| // We can't handle NULLs via sequential take because we require |
| // values "under" NULL to be zero and can't guarantee that of the |
| // value array if we were to just forward it without reading it. |
| Index cur_index = s->indexit.get<Index>(0); |
| if (cur_index < 0) |
| return {}; |
| |
| // Checked in caller |
| assume(s->value_index == cur_index); |
| |
| // Figure out how many values we can take sequentially. |
| npy_intp index_nseq = index_sequential_prefix_length(&s->indexit); |
| npy_intp value_chunk_used = s->value_index - s->value_index_at_reload; |
| assume(value_chunk_used >= 0); |
| npy_intp value_chunk_total = s->valueit.size(); |
| assume(value_chunk_total > 0); |
| npy_intp value_chunk_left = value_chunk_total - value_chunk_used; |
| assume(value_chunk_left > 0); |
| npy_intp nseq = std::min(index_nseq, value_chunk_left); |
| assume(nseq > 0); |
| |
| // Let's do the sequential take only when we can grab a whole index |
| // block, which should hopefully prevent internal BlockBuilder |
| // buffering that would defeat the whole point of this optimization. |
| if (nseq != s->indexit.size()) |
| return {}; |
| |
| // Now we know we can take sequentially. |
| npy_intp vstart = value_chunk_used; |
| npy_intp vend = vstart + nseq; |
| bool vwhole = vstart == 0 && vend == value_chunk_total; |
| unique_pyarray value_data = s->valueit.get_operand_view(0); |
| unique_pyarray data_chunk = vwhole |
| ? value_data.addref() |
| : get_npy_slice(value_data, vstart, vend); |
| unique_pyarray value_mask = s->valueit.get_operand_view(1); |
| unique_pyarray mask_chunk = vwhole |
| ? value_mask.addref() |
| : get_npy_slice(value_mask, vstart, vend); |
| |
| maybe_drop_mask(&mask_chunk); |
| unique_obj_pyref<Block> block = |
| Block::from_arrays(this->qc.get(), |
| std::move(data_chunk), |
| std::move(mask_chunk)); |
| |
| // Cannot fail past this point. |
| |
| // Skip the values we slurped above. Leave the last value in place |
| // so that the next index hunk can reference it if needed. |
| cur_index += nseq - 1; |
| s->check_index(cur_index); |
| fast_forward_to_value_index(cur_index, |
| &s->valueit, |
| &s->value_index); |
| |
| // We exhausted the whole index array, so we can just fix up the |
| // index iterator by nullifying it. |
| assume(!s->out_data); |
| assume(!s->out_mask); |
| assume(s->out_index == 0); |
| s->indexit = IndexIterator(); |
| |
| return block; |
| } |
| |
| // If we're indexing into broadcasted data, we'll always yield the |
| // same output, so just return that value, but broadcasted. We can |
| // perform this optimization only when we're not adding NULL values |
| // due to our requirement that the value under NULL is zero. |
| unique_obj_pyref<Block> |
| SequentialTaker::try_value_broadcast() |
| { |
| State* s = &this->saved_state; |
| |
| // If we've already built a partial output, keep going. |
| if (s->out_data) |
| return {}; |
| |
| // If we have no output array, we must be at the start of |
| // the output. |
| assume(s->out_index == 0); |
| |
| // Checked by caller |
| assume(!s->indexit.is_at_eof()); |
| |
| Index cur_index = s->indexit.get<Index>(0); |
| |
| // We can't broadcast NULLs in the indexer due to the |
| // zero-under-NULL constraint. (Existing NULLs in the input data |
| // array work fine.) |
| if (cur_index < 0) |
| return {}; |
| |
| // Checked in caller |
| assume(s->value_index == cur_index); |
| |
| unique_pyarray value_data_broadcaster = |
| npy_get_broadcaster(s->valueit.get_operand_view(0)); |
| if (!value_data_broadcaster) |
| return {}; |
| |
| bool bcast_masked; |
| { |
| unique_pyarray mask_view = s->valueit.get_operand_view(1); |
| unique_pyarray mask_broadcaster = npy_get_broadcaster(mask_view); |
| if (!mask_broadcaster) |
| return {}; |
| bcast_masked = npy_data<bool>(mask_broadcaster)[0]; |
| } |
| |
| Index final_index; |
| npy_intp bcast_count; |
| |
| Index index_min = s->value_index; |
| Index index_max; |
| if (s->bcast_conservative) { |
| Index it_start_index = s->value_index_at_reload; |
| assume(it_start_index <= index_min); |
| Index value_used = index_min - it_start_index; |
| assume(value_used >= 0); |
| Index value_remaining = s->valueit.size() - value_used; |
| assume(value_remaining > 0); |
| index_max = index_min + value_remaining; |
| } else { |
| index_max = std::numeric_limits<Index>::max(); |
| } |
| |
| tie(final_index, bcast_count) = |
| check_indexer_can_broadcast(&s->indexit, index_min, index_max); |
| |
| if (final_index < 0) |
| return {}; |
| |
| assume(bcast_count >= 0); |
| |
| unique_pyarray bcast_data = |
| npy_broadcast_to(value_data_broadcaster, bcast_count); |
| unique_pyarray bcast_mask; |
| if (bcast_masked) |
| bcast_mask = npy_broadcast_to(make_true_array(), bcast_count); |
| |
| // Can't fail past here. |
| |
| cur_index = final_index; |
| s->check_index(final_index); |
| fast_forward_to_value_index(cur_index, |
| &s->valueit, |
| &s->value_index); |
| |
| // We used the whole index block, so we can just reset the |
| // index state. |
| s->indexit = IndexIterator(); |
| assume(!s->out_data); |
| assume(!s->out_mask); |
| assume(s->out_index == 0); |
| |
| return Block::from_arrays( |
| this->qc.get(), |
| std::move(bcast_data), |
| std::move(bcast_mask)); |
| } |
| |
| bool |
| SequentialTaker::State::want_index() const noexcept |
| { |
| const State* s = this; |
| return s->indexit.is_at_eof(); |
| } |
| |
| bool |
| SequentialTaker::want_index() const noexcept |
| { |
| return this->saved_state.want_index(); |
| } |
| |
| bool |
| SequentialTaker::State::want_value() const noexcept |
| { |
| const State* s = this; |
| return (!s->indexit.is_at_eof() && |
| s->indexit.get<Index>(0) >= 0 && |
| s->valueit.is_at_eof()); |
| } |
| |
| bool |
| SequentialTaker::want_value() const noexcept |
| { |
| return this->saved_state.want_value(); |
| } |
| |
| unique_pyref |
| Taker_want_index(PyObject* self, void*) |
| { |
| return make_pybool( |
| pyref(self).as_unsafe<SequentialTaker>() |
| ->want_index()); |
| } |
| |
| unique_pyref |
| Taker_want_value(PyObject* self, void*) |
| { |
| return make_pybool( |
| pyref(self).as_unsafe<SequentialTaker>() |
| ->want_value()); |
| } |
| |
| static |
| unique_pyref |
| study_index_for_take(pyarray_ref array) |
| { |
| NpyIteration1<Index> it(array); |
| bool sequential = true; |
| bool has_nulls = false; |
| if (!it.is_at_eof()) { |
| Index last = it.get(); |
| has_nulls = last < 0; |
| while (!(sequential && has_nulls) && it.advance()) { |
| Index index = it.get(); |
| sequential = sequential && last + 1 == index; |
| has_nulls = has_nulls || index < 0; |
| last = index; |
| } |
| } |
| return pytuple::of(make_pybool(sequential), make_pybool(has_nulls)); |
| } |
| |
| } // anonymous namespace |
| |
| |
| template<> |
| unique_obj_pyref<SequentialTaker> |
| PythonConstructor<SequentialTaker>::make( |
| PyTypeObject* type, |
| pyref args, |
| pyref kwargs) |
| { |
| PARSEPYARGS( |
| (QueryCache*, qc) |
| (unique_dtype, dtype, no_default, convert_dtype) |
| (bool, bcast_conserverative) |
| )(args, kwargs); |
| return unique_obj_pyref<SequentialTaker>::make( |
| type, |
| xaddref_unsafe(qc), |
| std::move(dtype), |
| bcast_conserverative); |
| } |
| |
| PyMethodDef SequentialTaker::pymethods[] = { |
| AUTOMETHOD(&SequentialTaker::step, |
| "step", |
| "Perform an execution step", |
| (OPTIONAL_ARGS_FOLLOW) |
| (obj_pyref<Block>, index) |
| (obj_pyref<Block>, value) |
| ), |
| { 0 }, |
| }; |
| |
| PyGetSetDef SequentialTaker::pygetset[] = { |
| make_getset("want_index", |
| "Whether we want an index block", |
| wraperr<Taker_want_index>()), |
| make_getset("want_value", |
| "Whether we want a value block", |
| wraperr<Taker_want_value>()), |
| { 0 }, |
| }; |
| |
| PyTypeObject SequentialTaker::pytype = make_py_type<SequentialTaker>( |
| "dctv._native.SequentialTaker", |
| "Implements sequential take support", |
| [](PyTypeObject* t) { |
| t->tp_methods = SequentialTaker::pymethods; |
| t->tp_getset = SequentialTaker::pygetset; |
| }); |
| |
| static PyMethodDef functions[] = { |
| AUTOFUNCTION(study_index_for_take, |
| "Study an index array for TakeQuery", |
| (unique_pyarray, array, no_default, convert_any_array) |
| ), |
| { 0 } |
| }; |
| |
| |
| void |
| init_take(pyref m) |
| { |
| register_type(m, &SequentialTaker::pytype); |
| register_functions(m, functions); |
| } |
| |
| } // namespace dctv |