blob: f6c04131edf685138aaf7eb329eaa8ba5ea123a8 [file] [log] [blame]
// Copyright (C) 2020 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "dctv.h"
#include <variant>
#include "npy.h"
#include "pyutil.h"
#include "query.h"
#include "simple_variant.h"
namespace dctv {
struct IoSummary;
// N.B. We want to preserve aggregate initialization!
struct IoRequest : NoCopy {
VARIANT_VIRTUAL
int py_traverse(visitproc, void*) const noexcept { return 0; }
VARIANT_VIRTUAL
void setup(const OperatorContext* oc) const { }
VARIANT_VIRTUAL
void undo_setup() const noexcept { }
VARIANT_VIRTUAL
void summarize(IoSummary* summary) const noexcept { }
VARIANT_VIRTUAL
bool try_write(bool /*force*/) { return false; }
VARIANT_VIRTUAL
unique_pyref do_it() { return addref(Py_None); }
};
struct IoInput : IoRequest {
inline IoInput(unique_obj_pyref<InputChannel> channel,
npy_intp min_wanted_rows,
npy_intp max_wanted_rows);
int py_traverse(visitproc visit, void* arg) const noexcept VARIANT_OVERRIDE;
void setup(const OperatorContext* oc) const VARIANT_OVERRIDE;
void undo_setup() const noexcept VARIANT_OVERRIDE;
void summarize(IoSummary* summary) const noexcept VARIANT_OVERRIDE;
unique_pyref do_it() VARIANT_OVERRIDE;
private:
unique_obj_pyref<InputChannel> channel;
npy_intp min_wanted_rows;
npy_intp max_wanted_rows;
};
struct IoInputScalar : IoInput {
explicit inline IoInputScalar(unique_obj_pyref<InputChannel> channel);
unique_pyref do_it() VARIANT_OVERRIDE;
};
struct IoInputInt final : IoInputScalar {
using IoInputScalar::IoInputScalar;
unique_pyref do_it() VARIANT_OVERRIDE;
};
struct IoOutput : IoRequest {
inline IoOutput(unique_obj_pyref<OutputChannel> channel,
unique_pyref data);
bool try_write(bool force) VARIANT_OVERRIDE;
int py_traverse(visitproc visit, void* arg) const noexcept VARIANT_OVERRIDE;
void summarize(IoSummary* summary) const noexcept VARIANT_OVERRIDE;
protected:
bool try_write_1(bool force, bool is_eof);
private:
unique_obj_pyref<OutputChannel> channel;
unique_pyref data;
};
struct IoOutputEof final : IoOutput {
using IoOutput::IoOutput;
bool try_write(bool force) VARIANT_OVERRIDE;
};
struct IoResizeBuffer final : IoRequest {
explicit inline IoResizeBuffer(unique_obj_pyref<OutputChannel> channel);
bool try_write(bool force) VARIANT_OVERRIDE;
void summarize(IoSummary* summary) const noexcept VARIANT_OVERRIDE;
private:
unique_obj_pyref<OutputChannel> channel;
};
struct IoTerminalFlush final : IoRequest {
explicit inline IoTerminalFlush(unique_obj_pyref<OutputChannel> channel);
bool try_write(bool force) VARIANT_OVERRIDE;
void summarize(IoSummary* summary) const noexcept VARIANT_OVERRIDE;
private:
unique_obj_pyref<OutputChannel> channel;
};
struct IoDummy final : IoRequest {
using IoRequest::IoRequest;
};
// We encode some operation logic in the type and use typecases for
// polymorphism. Why not? We have a closed set of types and this
// approach saves us from having to use vtables, which don't mix with
// the BasePyObject system.
using IoSpecVariant = SimpleVariant<IoInput,
IoInputScalar,
IoInputInt,
IoOutput,
IoOutputEof,
IoResizeBuffer,
IoTerminalFlush,
IoDummy>;
struct IoSpec final : private IoSpecVariant
{
using IoSpecVariant::IoSpecVariant;
using IoSpecVariant::operator=;
using variant_base = IoRequest;
VARIANT_FORWARD(py_traverse);
VARIANT_FORWARD(setup);
VARIANT_FORWARD(undo_setup);
VARIANT_FORWARD(summarize);
VARIANT_FORWARD(try_write);
VARIANT_FORWARD(do_it);
};
// Figure out the score an operator should have if it wants to perform
// the given IO work. If NR_SPECS is zero, return a RUNNABLE score,
// since in this case, we can successfully do nothing.
Score compute_io_score(Score base_score,
const IoSpec* io_specs,
size_t nr_specs);
// Do writes incrementally, replacing entries in io_specs that we can
// perform with a dummy IoSpec. Return true if we have more work to
// do. Return false when all writes in the batch are complete.
bool do_writes_incremental(IoSpec* io_specs, size_t nr_specs);
// Do an operator setup for a batch of IO operations.
void io_setup(OperatorContext* oc, IoSpec* io_specs, size_t nr_specs);
inline void init_io_spec(pyref m) {}
} // namespace dctv
#include "io_spec-inl.h"