| // Copyright (C) 2020 The Android Open Source Project |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| #include "block_builder.h" |
| |
| #include <tuple> |
| |
| #include "block.h" |
| #include "hunk.h" |
| #include "npy.h" |
| #include "pyerrfmt.h" |
| #include "query.h" |
| |
| namespace dctv { |
| |
| using std::tie; |
| |
| static |
| void |
| check_mask_zero(pyarray_ref data, pyarray_ref mask) |
| { |
| assume(data); |
| assume(mask); |
| |
| unique_pyref data_masked = get_item(data, mask); |
| unique_pyarray data_masked_eq_zero = py_xeq(data_masked, make_pylong(0)) |
| .addref_as<PyArrayObject>(); |
| if (!npy_all(data_masked_eq_zero)) |
| throw_pyerr_fmt(PyExc_AssertionError, |
| "data under mask must be zero"); |
| } |
| |
| static |
| std::pair<unique_pyarray, unique_pyarray> |
| normalize_block_data(pyref obj, dtype_ref dtype) |
| { |
| // Note that we don't need to do anything special to deal with Hunk |
| // or HunkPinner objects given directly as input: these objects |
| // shouldn't be "leaking" into user query code. |
| |
| unique_pyref data_object; |
| unique_pyref mask_object; |
| |
| if (isinstance_exact(obj, &PyArray_Type)) { |
| data_object = addref(obj); |
| } else if (pyarray_ref ma = maybe_ma(obj); ma) { |
| data_object = npy_get_data_of_ma(ma); |
| mask_object = npy_get_mask_of_ma(ma); |
| } else if (isinstance_exact(obj, &Block::pytype)) { |
| Block* block = obj.as_unsafe<Block>().get(); |
| data_object = addref(block->get_data()); |
| if (block->has_mask()) |
| mask_object = addref(block->get_mask()); |
| } else { |
| data_object = as_base_1d_pyarray(obj, dtype.addref()); |
| } |
| |
| unique_pyarray data_array = |
| collapse_useless_views( |
| as_base_1d_pyarray(data_object, dtype.addref())); |
| assert(npy_dtype(data_array) == dtype); |
| |
| unique_pyarray mask_array = mask_object |
| ? collapse_useless_views( |
| as_base_1d_pyarray(mask_object, type_descr<bool>())) |
| : unique_pyarray(); |
| |
| if (mask_array) { |
| assert(npy_dtype(mask_array) == type_descr<bool>()); |
| if (npy_size1d(data_array) != npy_size1d(mask_array)) |
| throw_pyerr_fmt(PyExc_AssertionError, |
| "data and mask size mismatch"); |
| if (safe_mode) |
| check_mask_zero(data_array, mask_array); |
| } |
| |
| return { std::move(data_array), std::move(mask_array) }; |
| } |
| |
| BlockBuilder::BlockBuilder(QueryCache* qc, |
| unique_dtype dtype, |
| OutputFunction output_function) |
| : deliver_block(std::move(output_function)), |
| qc(qc), |
| dtype(std::move(dtype).notnull()), |
| block_size(block_size_from_qc(qc)), |
| maximum_block_size(this->block_size) |
| { |
| assume(qc); |
| } |
| |
| void |
| BlockBuilder::set_block_size(npy_intp block_size, |
| npy_intp maximum_block_size) |
| { |
| assume(block_size > 0); |
| assume(block_size <= maximum_block_size); |
| npy_intp partial_size = this->partial_size; |
| if (partial_size >= block_size) { |
| // TODO(dancol): do something smarter and carve the block into |
| // smaller chunks? |
| this->flush_partial_block(); |
| assume(this->partial_size == 0); |
| } else if (this->partial_data) { |
| // TODO(dancol): this dance is silly and ugly |
| assume(partial_size > 0); |
| unique_pyarray data = std::move(this->partial_data.notnull()); |
| unique_pyarray mask = std::move(this->partial_mask); |
| this->partial_size = 0; // Preserve invariant if we throw |
| unique_hunk data_hunk = Hunk::retrieve_backing_hunk(data).notnull(); |
| data.reset(); // Unpins |
| data_hunk->resize(block_size); |
| data = data_hunk->inflate(); |
| if (mask) { |
| unique_hunk mask_hunk = Hunk::retrieve_backing_hunk(mask).notnull(); |
| mask.reset(); // Unpins |
| mask_hunk->resize(block_size); |
| mask = mask_hunk->inflate(); |
| } |
| this->partial_data = std::move(data); |
| if (mask) |
| this->partial_mask = std::move(mask); |
| this->partial_size = partial_size; |
| } else { |
| assume(partial_size == 0); |
| } |
| this->block_size = block_size; |
| this->maximum_block_size = maximum_block_size; |
| } |
| |
| void |
| BlockBuilder::add_1(pyref data, bool is_eof) |
| { |
| assert(data); |
| |
| Block* data_as_block = isinstance_exact(data, &Block::pytype) |
| ? data.as_unsafe<Block>().get() |
| : nullptr; |
| |
| const npy_intp block_size = this->block_size; |
| const npy_intp maximum_block_size = this->maximum_block_size; |
| assert(block_size > 0); |
| assert(maximum_block_size >= block_size); |
| |
| // The two fast paths below fire in the case (among other |
| // situations) when operator A is read_all()ing a big buffer and |
| // operator B writes a big array followed by EOF. In this case, we |
| // want to forward the buffer from A to B directly. |
| |
| // Fastest path: just pass through an existing block if the dtype |
| // matches, if we don't have any lingering partial data from |
| // previous chunks, and if we have a full block or if this block is |
| // the last one. |
| if (data_as_block && |
| this->partial_size == 0 && |
| data_as_block->get_dtype() == this->dtype && |
| data_as_block->get_size() > 0 && |
| ((data_as_block->get_size() >= block_size && |
| data_as_block->get_size() <= maximum_block_size) |
| || (is_eof && data_as_block->get_size() < block_size))) |
| { |
| this->deliver_block(data.addref_as_unsafe<Block>()); |
| return; |
| } |
| |
| unique_pyarray added_data, added_mask; |
| tie(added_data, added_mask) = normalize_block_data(data, this->dtype); |
| assert(added_data); |
| assert(npy_dtype(added_data) == this->dtype); |
| if (added_mask) { |
| assert(npy_dtype(added_mask) == type_descr<bool>()); |
| assert(npy_size1d(added_mask) == npy_size1d(added_data)); |
| } |
| |
| const npy_intp total_entries = npy_size1d(added_data); |
| if (total_entries == 0) |
| return; // Nothing to do |
| assume(total_entries > 0); |
| npy_intp entries_added = 0; |
| |
| // Still-fast path: if the caller gave us arrays that are exactly |
| // the right size and we don't have to deal with partial data, just |
| // mint a block out of the given arrays instead of copying them into |
| // local storage. |
| if (((total_entries >= block_size && |
| total_entries <= maximum_block_size) || |
| (is_eof && total_entries < block_size)) |
| && this->partial_size == 0) { |
| maybe_drop_mask(&added_mask); |
| this->deliver_block( |
| Block::from_arrays(this->qc, |
| std::move(added_data), |
| std::move(added_mask))); |
| return; |
| } |
| |
| // If we got a mask as input and don't already have a block mask, |
| // just drop the mask if it'd be redundant. If we already have a |
| // partial_mask, copy added_mask into it even if added_mask is |
| // all-zero, since we'd have to initialize partial_mask anyway. |
| if (!this->partial_mask) |
| maybe_drop_mask(&added_mask); |
| |
| // If we have a mask in this chunk data and didn't have a mask |
| // before when we accumulated partial data, materialize the partial |
| // mask now, backfilling it with zero. |
| if (added_mask && this->partial_size && !this->partial_mask) { |
| assert(npy_size1d(this->partial_data) == this->block_size); |
| this->partial_mask = |
| this->make_block_array(type_descr<bool>(), this->block_size); |
| set_slice(this->partial_mask, |
| 0, this->partial_size, |
| Py_False); |
| } |
| |
| // TODO(dancol): if the input is backed by a megahunk, just operate |
| // sequentially on the sub-hunks. |
| |
| // Actually copy into reblocking buffer. |
| for (npy_intp entries_added = 0; entries_added < total_entries;) { |
| const npy_intp chunk_size = std::min(total_entries - entries_added, |
| block_size - this->partial_size); |
| // The whole point of this loop is to break the input into |
| // block-sized chunks, so make sure we've done that. |
| assert(0 < chunk_size && chunk_size <= this->block_size); |
| |
| // Except for the first iteration through this loop, we should |
| // never have lingering partial data. |
| assert(this->partial_size == 0 || entries_added == 0); |
| |
| // If we have no partial data, we shouldn't have lingering partial |
| // data backing arrays. Below, we're going to copy into a new |
| // block, so we need new partial arrays. We don't have to worry |
| // about uselessly copying in the case where we have exactly |
| // right-sized arrays because we handled that case before we |
| // entered the loop. |
| if (this->partial_size == 0) { |
| assert(!this->partial_data); |
| assert(!this->partial_mask); |
| // If we're at EOF and going to be ending the block anyway, we |
| // might as well avoid creating a huge array that's just going |
| // to go to waste. |
| // TODO(dancol): add generic rows-to-EOF tracking? |
| npy_intp nsz = this->block_size; |
| npy_intp entries_left = total_entries - entries_added; |
| if (is_eof && entries_left < nsz) |
| nsz = entries_left; |
| this->partial_data = |
| this->make_block_array(this->dtype.addref(), nsz); |
| if (added_mask) |
| this->partial_mask = |
| this->make_block_array(type_descr<bool>(), nsz); |
| } |
| |
| // Actually copy into our under-construction arrays |
| assert(npy_dtype(this->partial_data) == this->dtype); |
| set_slice(this->partial_data, |
| this->partial_size, |
| this->partial_size + chunk_size, |
| get_slice(added_data, |
| entries_added, |
| entries_added + chunk_size)); |
| unique_pyref mask_fill; |
| |
| if (added_mask) { |
| assert(this->partial_mask); |
| mask_fill = get_slice(added_mask, |
| entries_added, |
| entries_added + chunk_size); |
| } else if (this->partial_mask) { |
| // *This* chunk doesn't have a mask, but we've previously |
| // created a mask, and we deliberately didn't initialize it. |
| // Fill it here. |
| mask_fill = addref(Py_False); |
| } |
| |
| if (mask_fill) |
| set_slice(this->partial_mask, |
| this->partial_size, |
| this->partial_size + chunk_size, |
| mask_fill); |
| |
| entries_added += chunk_size; |
| this->partial_size += chunk_size; |
| assert(this->partial_size <= this->block_size); |
| if (this->partial_size == this->block_size) |
| this->flush_partial_block(); |
| } |
| } |
| |
| void |
| BlockBuilder::flush_partial_block() |
| { |
| assume(this->partial_size); |
| assume(this->partial_size <= this->block_size); |
| |
| // We throw away our array references below, hopefully unpinning the |
| // data and mask hunks by side effect. TODO(dancol): on something |
| // with non-immediate GC like PyPy, we can't rely on the HunkPinner |
| // destructor running immediately and so we might need an explicit |
| // unpin-now-dammit mechanism. |
| |
| QueryCache* qc = this->qc; |
| unique_pyarray data = std::move(this->partial_data.notnull()); |
| unique_pyarray mask = std::move(this->partial_mask); |
| npy_intp partial_size = this->partial_size; |
| this->partial_size = 0; |
| |
| unique_hunk data_hunk = Hunk::retrieve_backing_hunk(data).notnull(); |
| data.reset(); // Unpins |
| data_hunk->resize(partial_size); |
| data_hunk->freeze(); |
| unique_hunk mask_hunk; |
| if (mask) { |
| mask_hunk = Hunk::retrieve_backing_hunk(mask); |
| mask.reset(); // Unpins |
| mask_hunk->resize(partial_size); |
| mask_hunk->freeze(); |
| } |
| |
| deliver_block(make_pyobj<Block>(std::move(data_hunk), |
| std::move(mask_hunk))); |
| } |
| |
| void |
| BlockBuilder::add(pyref data, bool is_eof) |
| { |
| if (this->is_eof()) { |
| if (data) |
| throw_pyerr_msg(PyExc_RuntimeError, |
| "sending data into dead stream"); |
| return; |
| } |
| |
| if (data) |
| this->add_1(data, is_eof); |
| |
| if (is_eof) { |
| if (this->partial_size) |
| this->flush_partial_block(); |
| this->block_size = 0; |
| assert(this->is_eof()); |
| } |
| } |
| |
| unique_pyarray |
| BlockBuilder::make_block_array(unique_dtype dtype, npy_intp size) |
| { |
| assume(size <= this->block_size); |
| return make_uninit_hunk_array(qc, std::move(dtype), size); |
| } |
| |
| int |
| BlockBuilder::py_traverse(visitproc visit, void* arg) const noexcept |
| { |
| Py_VISIT(this->dtype.get()); |
| Py_VISIT(this->partial_data.get()); |
| Py_VISIT(this->partial_mask.get()); |
| return 0; |
| } |
| |
| } // namespace dctv |