blob: c023dc71c72e0f1577899f93f5cc6634c8b1161a [file] [log] [blame]
//
// Copyright 2020 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <grpc/support/port_platform.h>
#include "src/core/ext/filters/client_channel/dynamic_filters.h"
#include <stddef.h>
#include <new>
#include <string>
#include <utility>
#include "absl/status/statusor.h"
#include <grpc/support/log.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_stack_builder_impl.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gpr/alloc.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/surface/lame_client.h"
// Conversion between call and call stack.
#define CALL_TO_CALL_STACK(call) \
(grpc_call_stack*)((char*)(call) + GPR_ROUND_UP_TO_ALIGNMENT_SIZE( \
sizeof(DynamicFilters::Call)))
#define CALL_STACK_TO_CALL(callstack) \
(DynamicFilters::Call*)(((char*)(call_stack)) - \
GPR_ROUND_UP_TO_ALIGNMENT_SIZE( \
sizeof(DynamicFilters::Call)))
namespace grpc_core {
//
// DynamicFilters::Call
//
DynamicFilters::Call::Call(Args args, grpc_error_handle* error)
: channel_stack_(std::move(args.channel_stack)) {
grpc_call_stack* call_stack = CALL_TO_CALL_STACK(this);
const grpc_call_element_args call_args = {
call_stack, /* call_stack */
nullptr, /* server_transport_data */
args.context, /* context */
args.path, /* path */
args.start_time, /* start_time */
args.deadline, /* deadline */
args.arena, /* arena */
args.call_combiner /* call_combiner */
};
*error = grpc_call_stack_init(channel_stack_->channel_stack_.get(), 1,
Destroy, this, &call_args);
if (GPR_UNLIKELY(!error->ok())) {
gpr_log(GPR_ERROR, "error: %s", StatusToString(*error).c_str());
return;
}
grpc_call_stack_set_pollset_or_pollset_set(call_stack, args.pollent);
}
void DynamicFilters::Call::StartTransportStreamOpBatch(
grpc_transport_stream_op_batch* batch) {
grpc_call_stack* call_stack = CALL_TO_CALL_STACK(this);
grpc_call_element* top_elem = grpc_call_stack_element(call_stack, 0);
GRPC_CALL_LOG_OP(GPR_INFO, top_elem, batch);
top_elem->filter->start_transport_stream_op_batch(top_elem, batch);
}
void DynamicFilters::Call::SetAfterCallStackDestroy(grpc_closure* closure) {
GPR_ASSERT(after_call_stack_destroy_ == nullptr);
GPR_ASSERT(closure != nullptr);
after_call_stack_destroy_ = closure;
}
RefCountedPtr<DynamicFilters::Call> DynamicFilters::Call::Ref() {
IncrementRefCount();
return RefCountedPtr<DynamicFilters::Call>(this);
}
RefCountedPtr<DynamicFilters::Call> DynamicFilters::Call::Ref(
const DebugLocation& location, const char* reason) {
IncrementRefCount(location, reason);
return RefCountedPtr<DynamicFilters::Call>(this);
}
void DynamicFilters::Call::Unref() {
GRPC_CALL_STACK_UNREF(CALL_TO_CALL_STACK(this), "dynamic-filters-unref");
}
void DynamicFilters::Call::Unref(const DebugLocation& /*location*/,
const char* reason) {
GRPC_CALL_STACK_UNREF(CALL_TO_CALL_STACK(this), reason);
}
void DynamicFilters::Call::Destroy(void* arg, grpc_error_handle /*error*/) {
DynamicFilters::Call* self = static_cast<DynamicFilters::Call*>(arg);
// Keep some members before destroying the subchannel call.
grpc_closure* after_call_stack_destroy = self->after_call_stack_destroy_;
RefCountedPtr<DynamicFilters> channel_stack = std::move(self->channel_stack_);
// Destroy the subchannel call.
self->~Call();
// Destroy the call stack. This should be after destroying the call, because
// call->after_call_stack_destroy(), if not null, will free the call arena.
grpc_call_stack_destroy(CALL_TO_CALL_STACK(self), nullptr,
after_call_stack_destroy);
// Automatically reset channel_stack. This should be after destroying the call
// stack, because destroying call stack needs access to the channel stack.
}
void DynamicFilters::Call::IncrementRefCount() {
GRPC_CALL_STACK_REF(CALL_TO_CALL_STACK(this), "");
}
void DynamicFilters::Call::IncrementRefCount(const DebugLocation& /*location*/,
const char* reason) {
GRPC_CALL_STACK_REF(CALL_TO_CALL_STACK(this), reason);
}
//
// DynamicFilters
//
namespace {
absl::StatusOr<RefCountedPtr<grpc_channel_stack>> CreateChannelStack(
const ChannelArgs& args, std::vector<const grpc_channel_filter*> filters) {
ChannelStackBuilderImpl builder("DynamicFilters", GRPC_CLIENT_DYNAMIC);
builder.SetChannelArgs(args);
for (auto filter : filters) {
builder.AppendFilter(filter);
}
return builder.Build();
}
} // namespace
RefCountedPtr<DynamicFilters> DynamicFilters::Create(
const ChannelArgs& args, std::vector<const grpc_channel_filter*> filters) {
// Attempt to create channel stack from requested filters.
auto p = CreateChannelStack(args, std::move(filters));
if (!p.ok()) {
// Channel stack creation failed with requested filters.
// Create with lame filter instead.
auto error = p.status();
p = CreateChannelStack(args.Set(MakeLameClientErrorArg(&error)),
{&LameClientFilter::kFilter});
}
return MakeRefCounted<DynamicFilters>(std::move(p.value()));
}
RefCountedPtr<DynamicFilters::Call> DynamicFilters::CreateCall(
DynamicFilters::Call::Args args, grpc_error_handle* error) {
size_t allocation_size = GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(Call)) +
channel_stack_->call_stack_size;
Call* call = static_cast<Call*>(args.arena->Alloc(allocation_size));
new (call) Call(std::move(args), error);
return RefCountedPtr<DynamicFilters::Call>(call);
}
} // namespace grpc_core