blob: 607f727559b30b87e4dd59b04ce34f6c9651fd1f [file] [log] [blame]
//
//
// Copyright 2018 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#include <grpc/support/port_platform.h>
#include "src/cpp/ext/filters/census/client_filter.h"
#include <stddef.h>
#include <stdint.h>
#include <algorithm>
#include <string>
#include <utility>
#include <vector>
#include "absl/status/status.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "absl/types/optional.h"
#include "opencensus/stats/stats.h"
#include "opencensus/tags/tag_key.h"
#include "opencensus/tags/tag_map.h"
#include "opencensus/trace/span.h"
#include "opencensus/trace/span_context.h"
#include "opencensus/trace/status_code.h"
#include <grpc/slice.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include <grpcpp/opencensus.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
#include "src/cpp/ext/filters/census/context.h"
#include "src/cpp/ext/filters/census/grpc_plugin.h"
#include "src/cpp/ext/filters/census/measures.h"
namespace grpc {
namespace internal {
constexpr uint32_t
OpenCensusCallTracer::OpenCensusCallAttemptTracer::kMaxTraceContextLen;
constexpr uint32_t
OpenCensusCallTracer::OpenCensusCallAttemptTracer::kMaxTagsLen;
//
// CensusClientChannelData
//
grpc_error_handle CensusClientChannelData::Init(
grpc_channel_element* /*elem*/, grpc_channel_element_args* args) {
OpenCensusExporterRegistry::Get().RunRegistryPostInit();
tracing_enabled_ = grpc_core::ChannelArgs::FromC(args->channel_args)
.GetInt(GRPC_ARG_ENABLE_OBSERVABILITY)
.value_or(true);
return absl::OkStatus();
}
//
// CensusClientChannelData::CensusClientCallData
//
grpc_error_handle CensusClientChannelData::CensusClientCallData::Init(
grpc_call_element* elem, const grpc_call_element_args* args) {
tracer_ = args->arena->New<OpenCensusCallTracer>(
args, (static_cast<CensusClientChannelData*>(elem->channel_data))
->tracing_enabled_);
GPR_DEBUG_ASSERT(args->context[GRPC_CONTEXT_CALL_TRACER].value == nullptr);
args->context[GRPC_CONTEXT_CALL_TRACER].value = tracer_;
args->context[GRPC_CONTEXT_CALL_TRACER].destroy = [](void* tracer) {
(static_cast<OpenCensusCallTracer*>(tracer))->~OpenCensusCallTracer();
};
return absl::OkStatus();
}
void CensusClientChannelData::CensusClientCallData::StartTransportStreamOpBatch(
grpc_call_element* elem, TransportStreamOpBatch* op) {
// Note that we are generating the overall call context here instead of in
// the constructor of `OpenCensusCallTracer` due to the semantics of
// `grpc_census_call_set_context` which allows the application to set the
// census context for a call anytime before the first call to
// `grpc_call_start_batch`.
if (op->op()->send_initial_metadata && OpenCensusTracingEnabled() &&
(static_cast<CensusClientChannelData*>(elem->channel_data))
->tracing_enabled_) {
tracer_->GenerateContext();
}
grpc_call_next_op(elem, op->op());
}
//
// OpenCensusCallTracer::OpenCensusCallAttemptTracer
//
OpenCensusCallTracer::OpenCensusCallAttemptTracer::OpenCensusCallAttemptTracer(
OpenCensusCallTracer* parent, uint64_t attempt_num,
bool is_transparent_retry, bool arena_allocated)
: parent_(parent),
arena_allocated_(arena_allocated),
context_(parent_->CreateCensusContextForCallAttempt()),
start_time_(absl::Now()) {
if (OpenCensusTracingEnabled() && parent_->tracing_enabled_) {
context_.AddSpanAttribute("previous-rpc-attempts", attempt_num);
context_.AddSpanAttribute("transparent-retry", is_transparent_retry);
}
if (OpenCensusStatsEnabled()) {
std::vector<std::pair<opencensus::tags::TagKey, std::string>> tags =
context_.tags().tags();
tags.emplace_back(ClientMethodTagKey(), std::string(parent_->method_));
::opencensus::stats::Record({{RpcClientStartedRpcs(), 1}}, tags);
}
}
void OpenCensusCallTracer::OpenCensusCallAttemptTracer::
RecordSendInitialMetadata(grpc_metadata_batch* send_initial_metadata) {
if (OpenCensusTracingEnabled() && parent_->tracing_enabled_) {
char tracing_buf[kMaxTraceContextLen];
size_t tracing_len = TraceContextSerialize(context_.Context(), tracing_buf,
kMaxTraceContextLen);
if (tracing_len > 0) {
send_initial_metadata->Set(
grpc_core::GrpcTraceBinMetadata(),
grpc_core::Slice::FromCopiedBuffer(tracing_buf, tracing_len));
}
}
if (OpenCensusStatsEnabled()) {
grpc_slice tags = grpc_empty_slice();
// TODO(unknown): Add in tagging serialization.
size_t encoded_tags_len = StatsContextSerialize(kMaxTagsLen, &tags);
if (encoded_tags_len > 0) {
send_initial_metadata->Set(grpc_core::GrpcTagsBinMetadata(),
grpc_core::Slice(tags));
}
}
}
void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordSendMessage(
const grpc_core::SliceBuffer& /*send_message*/) {
++sent_message_count_;
}
void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordReceivedMessage(
const grpc_core::SliceBuffer& /*recv_message*/) {
++recv_message_count_;
}
namespace {
void FilterTrailingMetadata(grpc_metadata_batch* b, uint64_t* elapsed_time) {
if (OpenCensusStatsEnabled()) {
absl::optional<grpc_core::Slice> grpc_server_stats_bin =
b->Take(grpc_core::GrpcServerStatsBinMetadata());
if (grpc_server_stats_bin.has_value()) {
ServerStatsDeserialize(
reinterpret_cast<const char*>(grpc_server_stats_bin->data()),
grpc_server_stats_bin->size(), elapsed_time);
}
}
}
} // namespace
void OpenCensusCallTracer::OpenCensusCallAttemptTracer::
RecordReceivedTrailingMetadata(
absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
const grpc_transport_stream_stats* transport_stream_stats) {
status_code_ = status.code();
if (recv_trailing_metadata == nullptr || transport_stream_stats == nullptr) {
return;
}
if (OpenCensusStatsEnabled()) {
uint64_t elapsed_time = 0;
FilterTrailingMetadata(recv_trailing_metadata, &elapsed_time);
std::vector<std::pair<opencensus::tags::TagKey, std::string>> tags =
context_.tags().tags();
tags.emplace_back(ClientMethodTagKey(), std::string(parent_->method_));
std::string final_status = absl::StatusCodeToString(status_code_);
tags.emplace_back(ClientStatusTagKey(), final_status);
::opencensus::stats::Record(
{{RpcClientSentBytesPerRpc(),
static_cast<double>(transport_stream_stats->outgoing.data_bytes)},
{RpcClientReceivedBytesPerRpc(),
static_cast<double>(transport_stream_stats->incoming.data_bytes)},
{RpcClientServerLatency(),
ToDoubleMilliseconds(absl::Nanoseconds(elapsed_time))}},
tags);
}
}
void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordCancel(
grpc_error_handle /*cancel_error*/) {
status_code_ = absl::StatusCode::kCancelled;
}
void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordEnd(
const gpr_timespec& /*latency*/) {
if (OpenCensusStatsEnabled()) {
double latency_ms = absl::ToDoubleMilliseconds(absl::Now() - start_time_);
std::vector<std::pair<opencensus::tags::TagKey, std::string>> tags =
context_.tags().tags();
tags.emplace_back(ClientMethodTagKey(), std::string(parent_->method_));
tags.emplace_back(ClientStatusTagKey(), StatusCodeToString(status_code_));
::opencensus::stats::Record(
{{RpcClientRoundtripLatency(), latency_ms},
{RpcClientSentMessagesPerRpc(), sent_message_count_},
{RpcClientReceivedMessagesPerRpc(), recv_message_count_}},
tags);
grpc_core::MutexLock lock(&parent_->mu_);
if (--parent_->num_active_rpcs_ == 0) {
parent_->time_at_last_attempt_end_ = absl::Now();
}
}
if (OpenCensusTracingEnabled() && parent_->tracing_enabled_) {
if (status_code_ != absl::StatusCode::kOk) {
context_.Span().SetStatus(
static_cast<opencensus::trace::StatusCode>(status_code_),
StatusCodeToString(status_code_));
}
context_.EndSpan();
}
if (arena_allocated_) {
this->~OpenCensusCallAttemptTracer();
} else {
delete this;
}
}
void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordAnnotation(
absl::string_view annotation) {
// If tracing is disabled, the following will be a no-op.
context_.AddSpanAnnotation(annotation, {});
}
//
// OpenCensusCallTracer
//
OpenCensusCallTracer::OpenCensusCallTracer(const grpc_call_element_args* args,
bool tracing_enabled)
: call_context_(args->context),
path_(grpc_slice_ref(args->path)),
method_(GetMethod(path_)),
arena_(args->arena),
tracing_enabled_(tracing_enabled) {}
OpenCensusCallTracer::~OpenCensusCallTracer() {
std::vector<std::pair<opencensus::tags::TagKey, std::string>> tags =
context_.tags().tags();
if (OpenCensusStatsEnabled()) {
tags.emplace_back(ClientMethodTagKey(), std::string(method_));
::opencensus::stats::Record(
{{RpcClientRetriesPerCall(), retries_ - 1}, // exclude first attempt
{RpcClientTransparentRetriesPerCall(), transparent_retries_},
{RpcClientRetryDelayPerCall(), ToDoubleMilliseconds(retry_delay_)}},
tags);
}
if (OpenCensusTracingEnabled() && tracing_enabled_) {
context_.EndSpan();
}
}
void OpenCensusCallTracer::GenerateContext() {
auto* parent_context = reinterpret_cast<CensusContext*>(
call_context_[GRPC_CONTEXT_TRACING].value);
GenerateClientContext(absl::StrCat("Sent.", method_), &context_,
(parent_context == nullptr) ? nullptr : parent_context);
}
OpenCensusCallTracer::OpenCensusCallAttemptTracer*
OpenCensusCallTracer::StartNewAttempt(bool is_transparent_retry) {
// We allocate the first attempt on the arena and all subsequent attempts on
// the heap, so that in the common case we don't require a heap allocation,
// nor do we unnecessarily grow the arena.
bool is_first_attempt = true;
uint64_t attempt_num;
{
grpc_core::MutexLock lock(&mu_);
if (transparent_retries_ != 0 || retries_ != 0) {
is_first_attempt = false;
if (OpenCensusStatsEnabled() && num_active_rpcs_ == 0) {
retry_delay_ += absl::Now() - time_at_last_attempt_end_;
}
}
attempt_num = retries_;
if (is_transparent_retry) {
++transparent_retries_;
} else {
++retries_;
}
++num_active_rpcs_;
}
if (is_first_attempt) {
return arena_->New<OpenCensusCallAttemptTracer>(
this, attempt_num, is_transparent_retry, true /* arena_allocated */);
}
return new OpenCensusCallAttemptTracer(
this, attempt_num, is_transparent_retry, false /* arena_allocated */);
}
void OpenCensusCallTracer::RecordAnnotation(absl::string_view annotation) {
// If tracing is disabled, the following will be a no-op.
context_.AddSpanAnnotation(annotation, {});
}
CensusContext OpenCensusCallTracer::CreateCensusContextForCallAttempt() {
if (!OpenCensusTracingEnabled() || !tracing_enabled_) return CensusContext();
GPR_DEBUG_ASSERT(context_.Context().IsValid());
return CensusContext(absl::StrCat("Attempt.", method_), &(context_.Span()),
context_.tags());
}
} // namespace internal
} // namespace grpc