blob: c7c3b89a3f1a16ae17886e0c78a693c9c1e38085 [file] [log] [blame]
// Copyright (C) 2020 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "dctv.h"
#include <utility>
#include "block_builder.h"
#include "npy.h"
#include "pyutil.h"
#include "query.h"
#include "query_key.h"
#include "vector.h"
namespace dctv {
struct OutputChannelSpec final : BasePyObject,
SupportsGc,
HasPyCtor
{
explicit OutputChannelSpec(QueryKey query);
QueryKey query;
bool strict_backpressure = false;
static unique_obj_pyref<OutputChannelSpec> from_py(pyref py_spec);
int py_traverse(visitproc visit, void* arg) const noexcept;
static PyTypeObject pytype;
};
struct OutputChannel final : BasePyObject,
SupportsGc,
SupportsWeakRefs,
HasRepr
{
explicit OutputChannel(OperatorContext* oc,
QueryCache* qc,
const OutputChannelSpec& spec);
~OutputChannel() noexcept;
void add_data(pyref thing, bool is_eof);
void add_sink(InputChannel* sink);
void remove_sink(const InputChannel* input_channel) noexcept;
void flush();
void close() noexcept;
bool has_backpressure() const noexcept;
bool has_flush_backpressure() const noexcept;
bool has_buffer_resize_backpressure() const noexcept;
inline bool needs_flush() const noexcept;
void on_backpressure_change();
void resize_buffer_now();
inline bool is_dynamic_buffer_size_enabled() const noexcept;
inline void enable_dynamic_block_size();
inline bool is_disconnected() const;
inline OperatorContext* get_owner() const noexcept;
int py_traverse(visitproc visit, void* arg) const noexcept;
explicit operator String() const;
const QueryKey query;
static PyTypeObject pytype;
private:
unique_pyref py_write(pyref args) const;
void on_block_produced(unique_obj_pyref<Block> block);
// Returns (block_size, maximum_block_size)
std::pair<npy_intp, npy_intp> compute_correct_block_size() const;
OperatorContext* oc = nullptr;
Vector<InputChannel*> sinks;
BlockBuilder block_builder;
const bool strict_backpressure;
bool dynamic_block_size;
bool block_size_dirty;
static PyMethodDef pymethods[];
static PyMemberDef pymembers[];
};
void init_output_channel(pyref m);
} // namespace dctv
#include "output_channel-inl.h"