blob: 546b46d074e01b726645e4844c019129ab376f93 [file] [log] [blame]
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_LIB_PROMISE_INTERCEPTOR_LIST_H
#define GRPC_SRC_CORE_LIB_PROMISE_INTERCEPTOR_LIST_H
#include <grpc/support/port_platform.h>
#include <stddef.h>
#include <algorithm>
#include <new>
#include <string>
#include <utility>
#include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h"
#include "absl/types/optional.h"
#include <grpc/support/log.h>
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/construct_destruct.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/detail/promise_factory.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/promise/trace.h"
#include "src/core/lib/resource_quota/arena.h"
namespace grpc_core {
// Tracks a list of maps of T -> optional<T> via promises.
// When Run, runs each transformation in order, and resolves to the ulimate
// result.
// If a map resolves to nullopt, the chain is terminated and the result is
// nullopt.
// Maps can also have synchronous cleanup functions, which are guaranteed to be
// called at the termination of each run through the chain.
template <typename T>
class InterceptorList {
private:
// A map of T -> T via promises.
class Map {
public:
explicit Map(DebugLocation from) : from_(from) {}
// Construct a promise to transform x into some other value at memory.
virtual void MakePromise(T x, void* memory) = 0;
// Destroy a promise constructed at memory.
virtual void Destroy(void* memory) = 0;
// Poll a promise constructed at memory.
// Resolves to an optional<T> -- if nullopt it means terminate the chain and
// resolve.
virtual Poll<absl::optional<T>> PollOnce(void* memory) = 0;
virtual ~Map() = default;
// Update the next pointer stored with this map.
// This is only valid to call once, and only before the map is used.
void SetNext(Map* next) {
GPR_DEBUG_ASSERT(next_ == nullptr);
next_ = next;
}
// Access the creation location for this map (for debug tracing).
DebugLocation from() const { return from_; }
// Access the next map in the chain (or nullptr if this is the last map).
Map* next() const { return next_; }
private:
GPR_NO_UNIQUE_ADDRESS const DebugLocation from_;
Map* next_ = nullptr;
};
public:
// The result of Run: a promise that will execute the entire chain.
class RunPromise {
public:
RunPromise(size_t memory_required, Map* factory, absl::optional<T> value) {
if (!value.has_value() || factory == nullptr) {
is_immediately_resolved_ = true;
Construct(&result_, std::move(value));
} else {
is_immediately_resolved_ = false;
Construct(&async_resolution_, memory_required);
factory->MakePromise(std::move(*value), async_resolution_.space.get());
async_resolution_.current_factory = factory;
}
}
~RunPromise() {
if (is_immediately_resolved_) {
Destruct(&result_);
} else {
if (async_resolution_.current_factory != nullptr) {
async_resolution_.current_factory->Destroy(
async_resolution_.space.get());
}
Destruct(&async_resolution_);
}
}
RunPromise(const RunPromise&) = delete;
RunPromise& operator=(const RunPromise&) = delete;
RunPromise(RunPromise&& other) noexcept
: is_immediately_resolved_(other.is_immediately_resolved_) {
if (is_immediately_resolved_) {
Construct(&result_, std::move(other.result_));
} else {
Construct(&async_resolution_, std::move(other.async_resolution_));
}
}
RunPromise& operator=(RunPromise&& other) noexcept = delete;
Poll<absl::optional<T>> operator()() {
if (grpc_trace_promise_primitives.enabled()) {
gpr_log(GPR_DEBUG, "InterceptorList::RunPromise: %s",
DebugString().c_str());
}
if (is_immediately_resolved_) return std::move(result_);
while (true) {
auto r = async_resolution_.current_factory->PollOnce(
async_resolution_.space.get());
if (auto* p = r.value_if_ready()) {
async_resolution_.current_factory->Destroy(
async_resolution_.space.get());
async_resolution_.current_factory =
async_resolution_.current_factory->next();
if (async_resolution_.current_factory == nullptr || !p->has_value()) {
return std::move(*p);
}
async_resolution_.current_factory->MakePromise(
std::move(**p), async_resolution_.space.get());
continue;
}
return Pending{};
}
}
private:
std::string DebugString() const {
if (is_immediately_resolved_) {
return absl::StrFormat("Result:has_value:%d", result_.has_value());
} else {
return absl::StrCat(
"Running:",
async_resolution_.current_factory == nullptr
? "END"
: ([p = async_resolution_.current_factory->from()]() {
return absl::StrCat(p.file(), ":", p.line());
})()
.c_str());
}
}
struct AsyncResolution {
explicit AsyncResolution(size_t max_size)
: space(GetContext<Arena>()->MakePooledArray<char>(max_size)) {}
AsyncResolution(const AsyncResolution&) = delete;
AsyncResolution& operator=(const AsyncResolution&) = delete;
AsyncResolution(AsyncResolution&& other) noexcept
: current_factory(std::exchange(other.current_factory, nullptr)),
space(std::move(other.space)) {}
Map* current_factory;
Arena::PoolPtr<char[]> space;
};
union {
AsyncResolution async_resolution_;
absl::optional<T> result_;
};
// If true, the result_ union is valid, otherwise async_resolution_ is.
// Indicates whether the promise resolved immediately at construction or if
// additional steps were needed.
bool is_immediately_resolved_;
};
InterceptorList() = default;
InterceptorList(const InterceptorList&) = delete;
InterceptorList& operator=(const InterceptorList&) = delete;
~InterceptorList() { DeleteFactories(); }
RunPromise Run(absl::optional<T> initial_value) {
return RunPromise(promise_memory_required_, first_map_,
std::move(initial_value));
}
// Append a new map to the end of the chain.
template <typename Fn>
void AppendMap(Fn fn, DebugLocation from) {
Append(MakeMapToAdd(
std::move(fn), [] {}, from));
}
// Prepend a new map to the beginning of the chain.
template <typename Fn>
void PrependMap(Fn fn, DebugLocation from) {
Prepend(MakeMapToAdd(
std::move(fn), [] {}, from));
}
// Append a new map to the end of the chain, with a cleanup function to be
// called at the end of run promise execution.
template <typename Fn, typename CleanupFn>
void AppendMapWithCleanup(Fn fn, CleanupFn cleanup_fn, DebugLocation from) {
Append(MakeMapToAdd(std::move(fn), std::move(cleanup_fn), from));
}
// Prepend a new map to the beginning of the chain, with a cleanup function to
// be called at the end of run promise execution.
template <typename Fn, typename CleanupFn>
void PrependMapWithCleanup(Fn fn, CleanupFn cleanup_fn, DebugLocation from) {
Prepend(MakeMapToAdd(std::move(fn), std::move(cleanup_fn), from));
}
protected:
// Clear the interceptor list
void ResetInterceptorList() {
DeleteFactories();
first_map_ = nullptr;
last_map_ = nullptr;
promise_memory_required_ = 0;
}
private:
template <typename Fn, typename CleanupFn>
class MapImpl final : public Map {
public:
using PromiseFactory = promise_detail::RepeatedPromiseFactory<T, Fn>;
using Promise = typename PromiseFactory::Promise;
explicit MapImpl(Fn fn, CleanupFn cleanup_fn, DebugLocation from)
: Map(from), fn_(std::move(fn)), cleanup_fn_(std::move(cleanup_fn)) {}
~MapImpl() override { cleanup_fn_(); }
void MakePromise(T x, void* memory) override {
new (memory) Promise(fn_.Make(std::move(x)));
}
void Destroy(void* memory) override {
static_cast<Promise*>(memory)->~Promise();
}
Poll<absl::optional<T>> PollOnce(void* memory) override {
return poll_cast<absl::optional<T>>((*static_cast<Promise*>(memory))());
}
private:
GPR_NO_UNIQUE_ADDRESS PromiseFactory fn_;
GPR_NO_UNIQUE_ADDRESS CleanupFn cleanup_fn_;
};
template <typename Fn, typename CleanupFn>
Map* MakeMapToAdd(Fn fn, CleanupFn cleanup_fn, DebugLocation from) {
using FactoryType = MapImpl<Fn, CleanupFn>;
promise_memory_required_ = std::max(promise_memory_required_,
sizeof(typename FactoryType::Promise));
return GetContext<Arena>()->New<FactoryType>(std::move(fn),
std::move(cleanup_fn), from);
}
void Append(Map* f) {
if (first_map_ == nullptr) {
first_map_ = f;
last_map_ = f;
} else {
last_map_->SetNext(f);
last_map_ = f;
}
}
void Prepend(Map* f) {
if (first_map_ == nullptr) {
first_map_ = f;
last_map_ = f;
} else {
f->SetNext(first_map_);
first_map_ = f;
}
}
void DeleteFactories() {
for (auto* f = first_map_; f != nullptr;) {
auto* next = f->next();
f->~Map();
f = next;
}
}
// The first map in the chain.
Map* first_map_ = nullptr;
// The last map in the chain.
Map* last_map_ = nullptr;
// The amount of memory required to store the largest promise in the chain.
size_t promise_memory_required_ = 0;
};
} // namespace grpc_core
#endif // GRPC_SRC_CORE_LIB_PROMISE_INTERCEPTOR_LIST_H