Gcp Observability: Lazily initialize channels post-init (#32091)
* Gcp Observability: Lazily initialize channels post-init
* IWYU and fix build deps
* Run RegistryPostInit for client census filters too
* Remove unused function
diff --git a/src/cpp/ext/filters/census/channel_filter.cc b/src/cpp/ext/filters/census/channel_filter.cc
index 228cd48..0a7f6b0 100644
--- a/src/cpp/ext/filters/census/channel_filter.cc
+++ b/src/cpp/ext/filters/census/channel_filter.cc
@@ -22,11 +22,16 @@
#include "absl/status/status.h"
+#include "src/cpp/ext/filters/census/grpc_plugin.h"
+
namespace grpc {
+namespace internal {
grpc_error_handle CensusChannelData::Init(grpc_channel_element* /*elem*/,
grpc_channel_element_args* /*args*/) {
+ OpenCensusExporterRegistry::Get().RunRegistryPostInit();
return absl::OkStatus();
}
+} // namespace internal
} // namespace grpc
diff --git a/src/cpp/ext/filters/census/channel_filter.h b/src/cpp/ext/filters/census/channel_filter.h
index 32a1ed3..a68619e 100644
--- a/src/cpp/ext/filters/census/channel_filter.h
+++ b/src/cpp/ext/filters/census/channel_filter.h
@@ -27,6 +27,7 @@
#include "src/cpp/common/channel_filter.h"
namespace grpc {
+namespace internal {
class CensusChannelData : public ChannelData {
public:
@@ -34,6 +35,7 @@
grpc_channel_element_args* args) override;
};
+} // namespace internal
} // namespace grpc
#endif // GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CHANNEL_FILTER_H
diff --git a/src/cpp/ext/filters/census/client_filter.cc b/src/cpp/ext/filters/census/client_filter.cc
index 0dfb63d..607f727 100644
--- a/src/cpp/ext/filters/census/client_filter.cc
+++ b/src/cpp/ext/filters/census/client_filter.cc
@@ -59,6 +59,7 @@
#include "src/cpp/ext/filters/census/measures.h"
namespace grpc {
+namespace internal {
constexpr uint32_t
OpenCensusCallTracer::OpenCensusCallAttemptTracer::kMaxTraceContextLen;
@@ -71,6 +72,7 @@
grpc_error_handle CensusClientChannelData::Init(
grpc_channel_element* /*elem*/, grpc_channel_element_args* args) {
+ OpenCensusExporterRegistry::Get().RunRegistryPostInit();
tracing_enabled_ = grpc_core::ChannelArgs::FromC(args->channel_args)
.GetInt(GRPC_ARG_ENABLE_OBSERVABILITY)
.value_or(true);
@@ -330,4 +332,5 @@
context_.tags());
}
+} // namespace internal
} // namespace grpc
diff --git a/src/cpp/ext/filters/census/client_filter.h b/src/cpp/ext/filters/census/client_filter.h
index a2ce18c..75e5a03 100644
--- a/src/cpp/ext/filters/census/client_filter.h
+++ b/src/cpp/ext/filters/census/client_filter.h
@@ -28,6 +28,7 @@
#include "src/cpp/ext/filters/census/open_census_call_tracer.h"
namespace grpc {
+namespace internal {
class CensusClientChannelData : public ChannelData {
public:
@@ -53,6 +54,7 @@
bool tracing_enabled_ = true;
};
+} // namespace internal
} // namespace grpc
#endif // GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CLIENT_FILTER_H
diff --git a/src/cpp/ext/filters/census/context.cc b/src/cpp/ext/filters/census/context.cc
index 907bb4d..51b15f6 100644
--- a/src/cpp/ext/filters/census/context.cc
+++ b/src/cpp/ext/filters/census/context.cc
@@ -92,13 +92,14 @@
size_t ServerStatsSerialize(uint64_t server_elapsed_time, char* buf,
size_t buf_size) {
- return RpcServerStatsEncoding::Encode(server_elapsed_time, buf, buf_size);
+ return internal::RpcServerStatsEncoding::Encode(server_elapsed_time, buf,
+ buf_size);
}
size_t ServerStatsDeserialize(const char* buf, size_t buf_size,
uint64_t* server_elapsed_time) {
- return RpcServerStatsEncoding::Decode(absl::string_view(buf, buf_size),
- server_elapsed_time);
+ return internal::RpcServerStatsEncoding::Decode(
+ absl::string_view(buf, buf_size), server_elapsed_time);
}
uint64_t GetIncomingDataSize(const grpc_call_final_info* final_info) {
diff --git a/src/cpp/ext/filters/census/grpc_plugin.cc b/src/cpp/ext/filters/census/grpc_plugin.cc
index 946474a..e96dd9f 100644
--- a/src/cpp/ext/filters/census/grpc_plugin.cc
+++ b/src/cpp/ext/filters/census/grpc_plugin.cc
@@ -42,11 +42,13 @@
namespace grpc {
void RegisterOpenCensusPlugin() {
- RegisterChannelFilter<CensusClientChannelData,
- CensusClientChannelData::CensusClientCallData>(
+ RegisterChannelFilter<
+ internal::CensusClientChannelData,
+ internal::CensusClientChannelData::CensusClientCallData>(
"opencensus_client", GRPC_CLIENT_CHANNEL, INT_MAX /* priority */,
nullptr /* condition function */);
- RegisterChannelFilter<CensusChannelData, CensusServerCallData>(
+ RegisterChannelFilter<internal::CensusChannelData,
+ internal::CensusServerCallData>(
"opencensus_server", GRPC_SERVER_CHANNEL, INT_MAX /* priority */,
nullptr /* condition function */);
@@ -168,8 +170,21 @@
} // namespace experimental
+namespace internal {
+
+namespace {
std::atomic<bool> g_open_census_stats_enabled(true);
std::atomic<bool> g_open_census_tracing_enabled(true);
+} // namespace
+
+//
+// OpenCensusExporterRegistry
+//
+
+OpenCensusExporterRegistry& OpenCensusExporterRegistry::Get() {
+ static OpenCensusExporterRegistry* registry = new OpenCensusExporterRegistry;
+ return *registry;
+}
void EnableOpenCensusStats(bool enable) {
g_open_census_stats_enabled = enable;
@@ -187,4 +202,6 @@
return g_open_census_tracing_enabled.load(std::memory_order_relaxed);
}
+} // namespace internal
+
} // namespace grpc
diff --git a/src/cpp/ext/filters/census/grpc_plugin.h b/src/cpp/ext/filters/census/grpc_plugin.h
index 9675622..61e741e 100644
--- a/src/cpp/ext/filters/census/grpc_plugin.h
+++ b/src/cpp/ext/filters/census/grpc_plugin.h
@@ -21,6 +21,13 @@
#include <grpc/support/port_platform.h>
+#include <algorithm>
+#include <functional>
+#include <utility>
+#include <vector>
+
+#include "absl/base/call_once.h"
+
#include <grpcpp/opencensus.h>
namespace grpc {
@@ -116,6 +123,8 @@
using experimental::ServerServerLatencyHour; // NOLINT
using experimental::ServerStartedRpcsHour; // NOLINT
+namespace internal {
+
// Enables/Disables OpenCensus stats/tracing. It's only safe to do at the start
// of a program, before any channels/servers are built.
void EnableOpenCensusStats(bool enable);
@@ -124,6 +133,38 @@
bool OpenCensusStatsEnabled();
bool OpenCensusTracingEnabled();
+// Registers a function that would initialize an OpenCensus exporter. This
+// function would be run before the first time the OpenCensus plugin is run.
+class OpenCensusExporterRegistry {
+ public:
+ static OpenCensusExporterRegistry& Get();
+
+ // Registers the functions to be run post-init.
+ void Register(std::function<void()> f) {
+ exporter_registry_.push_back(std::move(f));
+ }
+
+ // Runs the registry post-init exactly once. Protected with an absl::CallOnce.
+ void RunRegistryPostInit() {
+ absl::call_once(
+ once_, &OpenCensusExporterRegistry::RunRegistryPostInitHelper, this);
+ }
+
+ private:
+ void RunRegistryPostInitHelper() {
+ for (const auto& f : exporter_registry_) {
+ f();
+ }
+ }
+
+ OpenCensusExporterRegistry() = default;
+
+ std::vector<std::function<void()>> exporter_registry_;
+ absl::once_flag once_;
+};
+
+} // namespace internal
+
} // namespace grpc
#endif // GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_GRPC_PLUGIN_H
diff --git a/src/cpp/ext/filters/census/open_census_call_tracer.h b/src/cpp/ext/filters/census/open_census_call_tracer.h
index 99c1603..63aa1a5 100644
--- a/src/cpp/ext/filters/census/open_census_call_tracer.h
+++ b/src/cpp/ext/filters/census/open_census_call_tracer.h
@@ -56,6 +56,7 @@
#define GRPC_ARG_ENABLE_OBSERVABILITY "grpc.experimental.enable_observability"
namespace grpc {
+namespace internal {
class OpenCensusCallTracer : public grpc_core::CallTracer {
public:
@@ -132,6 +133,7 @@
uint64_t num_active_rpcs_ ABSL_GUARDED_BY(&mu_) = 0;
};
-}; // namespace grpc
+} // namespace internal
+} // namespace grpc
#endif // GRPC_INTERNAL_CPP_EXT_FILTERS_OPEN_CENSUS_CALL_TRACER_H
diff --git a/src/cpp/ext/filters/census/rpc_encoding.cc b/src/cpp/ext/filters/census/rpc_encoding.cc
index b8a1313..dff1a39 100644
--- a/src/cpp/ext/filters/census/rpc_encoding.cc
+++ b/src/cpp/ext/filters/census/rpc_encoding.cc
@@ -21,6 +21,7 @@
#include "src/cpp/ext/filters/census/rpc_encoding.h"
namespace grpc {
+namespace internal {
constexpr size_t RpcServerStatsEncoding::kRpcServerStatsSize;
constexpr size_t RpcServerStatsEncoding::kEncodeDecodeFailure;
@@ -29,4 +30,5 @@
constexpr size_t RpcServerStatsEncoding::kVersionIdOffset;
constexpr size_t RpcServerStatsEncoding::kVersionId;
+} // namespace internal
} // namespace grpc
diff --git a/src/cpp/ext/filters/census/rpc_encoding.h b/src/cpp/ext/filters/census/rpc_encoding.h
index ae11fa2..9857e4a 100644
--- a/src/cpp/ext/filters/census/rpc_encoding.h
+++ b/src/cpp/ext/filters/census/rpc_encoding.h
@@ -28,6 +28,7 @@
#include "absl/strings/string_view.h"
namespace grpc {
+namespace internal {
// TODO(unknown): This may not be needed. Check to see if opencensus requires
// a trailing server response.
@@ -108,6 +109,7 @@
RpcServerStatsEncoding operator=(RpcServerStatsEncoding&&) = delete;
};
+} // namespace internal
} // namespace grpc
#endif // GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_RPC_ENCODING_H
diff --git a/src/cpp/ext/filters/census/server_filter.cc b/src/cpp/ext/filters/census/server_filter.cc
index 50d7963..631b15d 100644
--- a/src/cpp/ext/filters/census/server_filter.cc
+++ b/src/cpp/ext/filters/census/server_filter.cc
@@ -44,6 +44,7 @@
#include "src/cpp/ext/filters/census/measures.h"
namespace grpc {
+namespace internal {
constexpr uint32_t CensusServerCallData::kMaxServerStatsLen;
@@ -193,4 +194,5 @@
}
}
+} // namespace internal
} // namespace grpc
diff --git a/src/cpp/ext/filters/census/server_filter.h b/src/cpp/ext/filters/census/server_filter.h
index 059229a..f482fb3 100644
--- a/src/cpp/ext/filters/census/server_filter.h
+++ b/src/cpp/ext/filters/census/server_filter.h
@@ -44,6 +44,7 @@
#include "src/cpp/common/channel_filter.h"
namespace grpc {
+namespace internal {
// A CallData class will be created for every grpc call within a channel. It is
// used to store data and methods specific to that call. CensusServerCallData is
@@ -108,6 +109,7 @@
char stats_buf_[kMaxServerStatsLen];
};
+} // namespace internal
} // namespace grpc
#endif // GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_SERVER_FILTER_H
diff --git a/src/cpp/ext/gcp/BUILD b/src/cpp/ext/gcp/BUILD
index 5be21b8..c7acd26 100644
--- a/src/cpp/ext/gcp/BUILD
+++ b/src/cpp/ext/gcp/BUILD
@@ -100,6 +100,7 @@
"observability_logging_sink.h",
],
external_deps = [
+ "absl/base",
"absl/strings",
"absl/strings:str_format",
"absl/types:optional",
diff --git a/src/cpp/ext/gcp/observability.cc b/src/cpp/ext/gcp/observability.cc
index 5566ab2..c5ce26c 100644
--- a/src/cpp/ext/gcp/observability.cc
+++ b/src/cpp/ext/gcp/observability.cc
@@ -88,36 +88,46 @@
}
grpc::RegisterOpenCensusPlugin();
RegisterOpenCensusViewsForGcpObservability();
- ChannelArguments args;
- args.SetInt(GRPC_ARG_ENABLE_OBSERVABILITY, 0);
if (config->cloud_trace.has_value()) {
- opencensus::trace::TraceConfig::SetCurrentTraceParams(
- {kMaxAttributes, kMaxAnnotations, kMaxMessageEvents, kMaxLinks,
- opencensus::trace::ProbabilitySampler(
- config->cloud_trace->sampling_rate)});
- opencensus::exporters::trace::StackdriverOptions trace_opts;
- trace_opts.project_id = config->project_id;
- trace_opts.trace_service_stub =
- ::google::devtools::cloudtrace::v2::TraceService::NewStub(
- CreateCustomChannel(kGoogleStackdriverTraceAddress,
- GoogleDefaultCredentials(), args));
- opencensus::exporters::trace::StackdriverExporter::Register(
- std::move(trace_opts));
+ grpc::internal::OpenCensusExporterRegistry::Get().Register(
+ [cloud_trace = config->cloud_trace.value(),
+ project_id = config->project_id]() mutable {
+ opencensus::trace::TraceConfig::SetCurrentTraceParams(
+ {kMaxAttributes, kMaxAnnotations, kMaxMessageEvents, kMaxLinks,
+ opencensus::trace::ProbabilitySampler(
+ cloud_trace.sampling_rate)});
+ opencensus::exporters::trace::StackdriverOptions trace_opts;
+ trace_opts.project_id = std::move(project_id);
+ ChannelArguments args;
+ args.SetInt(GRPC_ARG_ENABLE_OBSERVABILITY, 0);
+ trace_opts.trace_service_stub =
+ ::google::devtools::cloudtrace::v2::TraceService::NewStub(
+ CreateCustomChannel(kGoogleStackdriverTraceAddress,
+ GoogleDefaultCredentials(), args));
+ opencensus::exporters::trace::StackdriverExporter::Register(
+ std::move(trace_opts));
+ });
} else {
// Disable OpenCensus tracing
- EnableOpenCensusTracing(false);
+ grpc::internal::EnableOpenCensusTracing(false);
}
if (config->cloud_monitoring.has_value()) {
- opencensus::exporters::stats::StackdriverOptions stats_opts;
- stats_opts.project_id = config->project_id;
- stats_opts.metric_service_stub =
- google::monitoring::v3::MetricService::NewStub(CreateCustomChannel(
- kGoogleStackdriverStatsAddress, GoogleDefaultCredentials(), args));
- opencensus::exporters::stats::StackdriverExporter::Register(
- std::move(stats_opts));
+ grpc::internal::OpenCensusExporterRegistry::Get().Register(
+ [project_id = config->project_id]() mutable {
+ opencensus::exporters::stats::StackdriverOptions stats_opts;
+ stats_opts.project_id = std::move(project_id);
+ ChannelArguments args;
+ args.SetInt(GRPC_ARG_ENABLE_OBSERVABILITY, 0);
+ stats_opts.metric_service_stub =
+ google::monitoring::v3::MetricService::NewStub(
+ CreateCustomChannel(kGoogleStackdriverStatsAddress,
+ GoogleDefaultCredentials(), args));
+ opencensus::exporters::stats::StackdriverExporter::Register(
+ std::move(stats_opts));
+ });
} else {
// Disable OpenCensus stats
- EnableOpenCensusStats(false);
+ grpc::internal::EnableOpenCensusStats(false);
}
if (config->cloud_logging.has_value()) {
grpc::internal::RegisterLoggingFilter(
diff --git a/src/cpp/ext/gcp/observability_logging_sink.cc b/src/cpp/ext/gcp/observability_logging_sink.cc
index bef9e68..912b442 100644
--- a/src/cpp/ext/gcp/observability_logging_sink.cc
+++ b/src/cpp/ext/gcp/observability_logging_sink.cc
@@ -56,25 +56,10 @@
for (auto& server_rpc_event_config : logging_config.server_rpc_events) {
server_configs_.emplace_back(server_rpc_event_config);
}
- std::string endpoint;
- absl::optional<std::string> endpoint_env =
- grpc_core::GetEnv("GOOGLE_CLOUD_CPP_LOGGING_SERVICE_V2_ENDPOINT");
- if (endpoint_env.has_value() && !endpoint_env->empty()) {
- endpoint = std::move(*endpoint_env);
- } else {
- endpoint = "logging.googleapis.com";
- }
- ChannelArguments args;
- // Disable observability for RPCs on this channel
- args.SetInt(GRPC_ARG_ENABLE_OBSERVABILITY, 0);
- // Set keepalive time to 24 hrs to effectively disable keepalive ping, but
- // still enable KEEPALIVE_TIMEOUT to get the TCP_USER_TIMEOUT effect.
- args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, 24 * 60 * 60 * 1000 /* 24 hours */);
- args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 20 * 1000 /* 20 seconds */);
- stub_ = google::logging::v2::LoggingServiceV2::NewStub(
- CreateCustomChannel(endpoint, GoogleDefaultCredentials(), args));
absl::optional<std::string> authority_env =
grpc_core::GetEnv("GOOGLE_CLOUD_CPP_LOGGING_SERVICE_V2_ENDPOINT");
+ absl::optional<std::string> endpoint_env =
+ grpc_core::GetEnv("GOOGLE_CLOUD_CPP_LOGGING_SERVICE_V2_ENDPOINT");
if (authority_env.has_value() && !authority_env->empty()) {
authority_ = std::move(*endpoint_env);
}
@@ -232,6 +217,25 @@
}
void ObservabilityLoggingSink::LogEntry(Entry entry) {
+ absl::call_once(once_, [this]() {
+ std::string endpoint;
+ absl::optional<std::string> endpoint_env =
+ grpc_core::GetEnv("GOOGLE_CLOUD_CPP_LOGGING_SERVICE_V2_ENDPOINT");
+ if (endpoint_env.has_value() && !endpoint_env->empty()) {
+ endpoint = std::move(*endpoint_env);
+ } else {
+ endpoint = "logging.googleapis.com";
+ }
+ ChannelArguments args;
+ // Disable observability for RPCs on this channel
+ args.SetInt(GRPC_ARG_ENABLE_OBSERVABILITY, 0);
+ // Set keepalive time to 24 hrs to effectively disable keepalive ping, but
+ // still enable KEEPALIVE_TIMEOUT to get the TCP_USER_TIMEOUT effect.
+ args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, 24 * 60 * 60 * 1000 /* 24 hours */);
+ args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 20 * 1000 /* 20 seconds */);
+ stub_ = google::logging::v2::LoggingServiceV2::NewStub(
+ CreateCustomChannel(endpoint, GoogleDefaultCredentials(), args));
+ });
struct CallContext {
ClientContext context;
google::logging::v2::WriteLogEntriesRequest request;
diff --git a/src/cpp/ext/gcp/observability_logging_sink.h b/src/cpp/ext/gcp/observability_logging_sink.h
index 229bef2..6b2e14e 100644
--- a/src/cpp/ext/gcp/observability_logging_sink.h
+++ b/src/cpp/ext/gcp/observability_logging_sink.h
@@ -29,11 +29,10 @@
#include <google/protobuf/struct.pb.h>
+#include "absl/base/call_once.h"
#include "absl/strings/string_view.h"
#include "google/logging/v2/logging.grpc.pb.h"
-#include <grpcpp/channel.h>
-
#include "src/cpp/ext/filters/logging/logging_sink.h"
#include "src/cpp/ext/gcp/observability_config.h"
@@ -71,9 +70,9 @@
std::vector<Configuration> client_configs_;
std::vector<Configuration> server_configs_;
std::string project_id_;
- std::shared_ptr<grpc::Channel> channel_;
- std::unique_ptr<google::logging::v2::LoggingServiceV2::StubInterface> stub_;
std::string authority_;
+ absl::once_flag once_;
+ std::unique_ptr<google::logging::v2::LoggingServiceV2::StubInterface> stub_;
};
// Exposed for just for testing purposes
diff --git a/test/cpp/ext/filters/census/stats_plugin_end2end_test.cc b/test/cpp/ext/filters/census/stats_plugin_end2end_test.cc
index 22877a8..8f6c4f2 100644
--- a/test/cpp/ext/filters/census/stats_plugin_end2end_test.cc
+++ b/test/cpp/ext/filters/census/stats_plugin_end2end_test.cc
@@ -865,7 +865,7 @@
// Test the working of EnableOpenCensusStats.
TEST_F(StatsPluginEnd2EndTest, TestGlobalEnableOpenCensusStats) {
- EnableOpenCensusStats(false);
+ grpc::internal::EnableOpenCensusStats(false);
View client_started_rpcs_view(ClientStartedRpcsCumulative());
View server_started_rpcs_view(ServerStartedRpcsCumulative());
@@ -889,12 +889,12 @@
EXPECT_TRUE(client_completed_rpcs_view.GetData().int_data().empty());
EXPECT_TRUE(server_completed_rpcs_view.GetData().int_data().empty());
- EnableOpenCensusStats(true);
+ grpc::internal::EnableOpenCensusStats(true);
}
// Test the working of EnableOpenCensusTracing.
TEST_F(StatsPluginEnd2EndTest, TestGlobalEnableOpenCensusTracing) {
- EnableOpenCensusTracing(false);
+ grpc::internal::EnableOpenCensusTracing(false);
{
// Client spans are ended when the ClientContext's destructor is invoked.
@@ -924,7 +924,7 @@
// No span should be exported
ASSERT_EQ(0, recorded_spans.size());
- EnableOpenCensusTracing(true);
+ grpc::internal::EnableOpenCensusTracing(true);
}
// This test verifies that users depending on src/cpp/ext/filters/census header