blob: 58204cacedab3d915a9400ff3a43deb03f3b616d [file] [log] [blame]
// Copyright (C) 2020 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "block_builder.h"
#include <tuple>
#include "block.h"
#include "hunk.h"
#include "npy.h"
#include "pyerrfmt.h"
#include "query.h"
namespace dctv {
using std::tie;
static
void
check_mask_zero(pyarray_ref data, pyarray_ref mask)
{
assume(data);
assume(mask);
unique_pyref data_masked = get_item(data, mask);
unique_pyarray data_masked_eq_zero = py_xeq(data_masked, make_pylong(0))
.addref_as<PyArrayObject>();
if (!npy_all(data_masked_eq_zero))
throw_pyerr_fmt(PyExc_AssertionError,
"data under mask must be zero");
}
static
std::pair<unique_pyarray, unique_pyarray>
normalize_block_data(pyref obj, dtype_ref dtype)
{
// Note that we don't need to do anything special to deal with Hunk
// or HunkPinner objects given directly as input: these objects
// shouldn't be "leaking" into user query code.
unique_pyref data_object;
unique_pyref mask_object;
if (isinstance_exact(obj, &PyArray_Type)) {
data_object = addref(obj);
} else if (pyarray_ref ma = maybe_ma(obj); ma) {
data_object = npy_get_data_of_ma(ma);
mask_object = npy_get_mask_of_ma(ma);
} else if (isinstance_exact(obj, &Block::pytype)) {
Block* block = obj.as_unsafe<Block>().get();
data_object = addref(block->get_data());
if (block->has_mask())
mask_object = addref(block->get_mask());
} else {
data_object = as_base_1d_pyarray(obj, dtype.addref());
}
unique_pyarray data_array =
collapse_useless_views(
as_base_1d_pyarray(data_object, dtype.addref()));
assert(npy_dtype(data_array) == dtype);
unique_pyarray mask_array = mask_object
? collapse_useless_views(
as_base_1d_pyarray(mask_object, type_descr<bool>()))
: unique_pyarray();
if (mask_array) {
assert(npy_dtype(mask_array) == type_descr<bool>());
if (npy_size1d(data_array) != npy_size1d(mask_array))
throw_pyerr_fmt(PyExc_AssertionError,
"data and mask size mismatch");
if (safe_mode)
check_mask_zero(data_array, mask_array);
}
return { std::move(data_array), std::move(mask_array) };
}
BlockBuilder::BlockBuilder(QueryCache* qc,
unique_dtype dtype,
OutputFunction output_function)
: deliver_block(std::move(output_function)),
qc(qc),
dtype(std::move(dtype).notnull()),
block_size(block_size_from_qc(qc)),
maximum_block_size(this->block_size)
{
assume(qc);
}
void
BlockBuilder::set_block_size(npy_intp block_size,
npy_intp maximum_block_size)
{
assume(block_size > 0);
assume(block_size <= maximum_block_size);
npy_intp partial_size = this->partial_size;
if (partial_size >= block_size) {
// TODO(dancol): do something smarter and carve the block into
// smaller chunks?
this->flush_partial_block();
assume(this->partial_size == 0);
} else if (this->partial_data) {
// TODO(dancol): this dance is silly and ugly
assume(partial_size > 0);
unique_pyarray data = std::move(this->partial_data.notnull());
unique_pyarray mask = std::move(this->partial_mask);
this->partial_size = 0; // Preserve invariant if we throw
unique_hunk data_hunk = Hunk::retrieve_backing_hunk(data).notnull();
data.reset(); // Unpins
data_hunk->resize(block_size);
data = data_hunk->inflate();
if (mask) {
unique_hunk mask_hunk = Hunk::retrieve_backing_hunk(mask).notnull();
mask.reset(); // Unpins
mask_hunk->resize(block_size);
mask = mask_hunk->inflate();
}
this->partial_data = std::move(data);
if (mask)
this->partial_mask = std::move(mask);
this->partial_size = partial_size;
} else {
assume(partial_size == 0);
}
this->block_size = block_size;
this->maximum_block_size = maximum_block_size;
}
void
BlockBuilder::add_1(pyref data, bool is_eof)
{
assert(data);
Block* data_as_block = isinstance_exact(data, &Block::pytype)
? data.as_unsafe<Block>().get()
: nullptr;
const npy_intp block_size = this->block_size;
const npy_intp maximum_block_size = this->maximum_block_size;
assert(block_size > 0);
assert(maximum_block_size >= block_size);
// The two fast paths below fire in the case (among other
// situations) when operator A is read_all()ing a big buffer and
// operator B writes a big array followed by EOF. In this case, we
// want to forward the buffer from A to B directly.
// Fastest path: just pass through an existing block if the dtype
// matches, if we don't have any lingering partial data from
// previous chunks, and if we have a full block or if this block is
// the last one.
if (data_as_block &&
this->partial_size == 0 &&
data_as_block->get_dtype() == this->dtype &&
data_as_block->get_size() > 0 &&
((data_as_block->get_size() >= block_size &&
data_as_block->get_size() <= maximum_block_size)
|| (is_eof && data_as_block->get_size() < block_size)))
{
this->deliver_block(data.addref_as_unsafe<Block>());
return;
}
unique_pyarray added_data, added_mask;
tie(added_data, added_mask) = normalize_block_data(data, this->dtype);
assert(added_data);
assert(npy_dtype(added_data) == this->dtype);
if (added_mask) {
assert(npy_dtype(added_mask) == type_descr<bool>());
assert(npy_size1d(added_mask) == npy_size1d(added_data));
}
const npy_intp total_entries = npy_size1d(added_data);
if (total_entries == 0)
return; // Nothing to do
assume(total_entries > 0);
npy_intp entries_added = 0;
// Still-fast path: if the caller gave us arrays that are exactly
// the right size and we don't have to deal with partial data, just
// mint a block out of the given arrays instead of copying them into
// local storage.
if (((total_entries >= block_size &&
total_entries <= maximum_block_size) ||
(is_eof && total_entries < block_size))
&& this->partial_size == 0) {
maybe_drop_mask(&added_mask);
this->deliver_block(
Block::from_arrays(this->qc,
std::move(added_data),
std::move(added_mask)));
return;
}
// If we got a mask as input and don't already have a block mask,
// just drop the mask if it'd be redundant. If we already have a
// partial_mask, copy added_mask into it even if added_mask is
// all-zero, since we'd have to initialize partial_mask anyway.
if (!this->partial_mask)
maybe_drop_mask(&added_mask);
// If we have a mask in this chunk data and didn't have a mask
// before when we accumulated partial data, materialize the partial
// mask now, backfilling it with zero.
if (added_mask && this->partial_size && !this->partial_mask) {
assert(npy_size1d(this->partial_data) == this->block_size);
this->partial_mask =
this->make_block_array(type_descr<bool>(), this->block_size);
set_slice(this->partial_mask,
0, this->partial_size,
Py_False);
}
// TODO(dancol): if the input is backed by a megahunk, just operate
// sequentially on the sub-hunks.
// Actually copy into reblocking buffer.
for (npy_intp entries_added = 0; entries_added < total_entries;) {
const npy_intp chunk_size = std::min(total_entries - entries_added,
block_size - this->partial_size);
// The whole point of this loop is to break the input into
// block-sized chunks, so make sure we've done that.
assert(0 < chunk_size && chunk_size <= this->block_size);
// Except for the first iteration through this loop, we should
// never have lingering partial data.
assert(this->partial_size == 0 || entries_added == 0);
// If we have no partial data, we shouldn't have lingering partial
// data backing arrays. Below, we're going to copy into a new
// block, so we need new partial arrays. We don't have to worry
// about uselessly copying in the case where we have exactly
// right-sized arrays because we handled that case before we
// entered the loop.
if (this->partial_size == 0) {
assert(!this->partial_data);
assert(!this->partial_mask);
// If we're at EOF and going to be ending the block anyway, we
// might as well avoid creating a huge array that's just going
// to go to waste.
// TODO(dancol): add generic rows-to-EOF tracking?
npy_intp nsz = this->block_size;
npy_intp entries_left = total_entries - entries_added;
if (is_eof && entries_left < nsz)
nsz = entries_left;
this->partial_data =
this->make_block_array(this->dtype.addref(), nsz);
if (added_mask)
this->partial_mask =
this->make_block_array(type_descr<bool>(), nsz);
}
// Actually copy into our under-construction arrays
assert(npy_dtype(this->partial_data) == this->dtype);
set_slice(this->partial_data,
this->partial_size,
this->partial_size + chunk_size,
get_slice(added_data,
entries_added,
entries_added + chunk_size));
unique_pyref mask_fill;
if (added_mask) {
assert(this->partial_mask);
mask_fill = get_slice(added_mask,
entries_added,
entries_added + chunk_size);
} else if (this->partial_mask) {
// *This* chunk doesn't have a mask, but we've previously
// created a mask, and we deliberately didn't initialize it.
// Fill it here.
mask_fill = addref(Py_False);
}
if (mask_fill)
set_slice(this->partial_mask,
this->partial_size,
this->partial_size + chunk_size,
mask_fill);
entries_added += chunk_size;
this->partial_size += chunk_size;
assert(this->partial_size <= this->block_size);
if (this->partial_size == this->block_size)
this->flush_partial_block();
}
}
void
BlockBuilder::flush_partial_block()
{
assume(this->partial_size);
assume(this->partial_size <= this->block_size);
// We throw away our array references below, hopefully unpinning the
// data and mask hunks by side effect. TODO(dancol): on something
// with non-immediate GC like PyPy, we can't rely on the HunkPinner
// destructor running immediately and so we might need an explicit
// unpin-now-dammit mechanism.
QueryCache* qc = this->qc;
unique_pyarray data = std::move(this->partial_data.notnull());
unique_pyarray mask = std::move(this->partial_mask);
npy_intp partial_size = this->partial_size;
this->partial_size = 0;
unique_hunk data_hunk = Hunk::retrieve_backing_hunk(data).notnull();
data.reset(); // Unpins
data_hunk->resize(partial_size);
data_hunk->freeze();
unique_hunk mask_hunk;
if (mask) {
mask_hunk = Hunk::retrieve_backing_hunk(mask);
mask.reset(); // Unpins
mask_hunk->resize(partial_size);
mask_hunk->freeze();
}
deliver_block(make_pyobj<Block>(std::move(data_hunk),
std::move(mask_hunk)));
}
void
BlockBuilder::add(pyref data, bool is_eof)
{
if (this->is_eof()) {
if (data)
throw_pyerr_msg(PyExc_RuntimeError,
"sending data into dead stream");
return;
}
if (data)
this->add_1(data, is_eof);
if (is_eof) {
if (this->partial_size)
this->flush_partial_block();
this->block_size = 0;
assert(this->is_eof());
}
}
unique_pyarray
BlockBuilder::make_block_array(unique_dtype dtype, npy_intp size)
{
assume(size <= this->block_size);
return make_uninit_hunk_array(qc, std::move(dtype), size);
}
int
BlockBuilder::py_traverse(visitproc visit, void* arg) const noexcept
{
Py_VISIT(this->dtype.get());
Py_VISIT(this->partial_data.get());
Py_VISIT(this->partial_mask.get());
return 0;
}
} // namespace dctv