reviewer feedback
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
index 38032d5..9da9a7a 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
@@ -1507,9 +1507,7 @@
if (op->send_message) {
GRPC_STATS_INC_HTTP2_OP_SEND_MESSAGE();
- if (t->channelz_socket != nullptr) {
- t->channelz_socket->RecordMessageSent();
- }
+ t->num_messages_in_next_write++;
GRPC_STATS_INC_HTTP2_SEND_MESSAGE_SIZE(
op->payload->send_message.send_message->length());
on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
@@ -1627,9 +1625,6 @@
if (op->recv_message) {
GRPC_STATS_INC_HTTP2_OP_RECV_MESSAGE();
- if (t->channelz_socket != nullptr) {
- t->channelz_socket->RecordMessageRecieved();
- }
size_t before = 0;
GPR_ASSERT(s->recv_message_ready == nullptr);
GPR_ASSERT(!s->pending_byte_stream);
diff --git a/src/core/ext/transport/chttp2/transport/frame_data.cc b/src/core/ext/transport/chttp2/transport/frame_data.cc
index 15de879..933b32c 100644
--- a/src/core/ext/transport/chttp2/transport/frame_data.cc
+++ b/src/core/ext/transport/chttp2/transport/frame_data.cc
@@ -192,6 +192,9 @@
GPR_ASSERT(stream_out != nullptr);
GPR_ASSERT(p->parsing_frame == nullptr);
p->frame_size |= (static_cast<uint32_t>(*cur));
+ if (t->channelz_socket != nullptr) {
+ t->channelz_socket->RecordMessageReceived();
+ }
p->state = GRPC_CHTTP2_DATA_FRAME;
++cur;
message_flags = 0;
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index bf0dfa9..ff26dd9 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -474,6 +474,7 @@
grpc_chttp2_keepalive_state keepalive_state;
grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode> channelz_socket;
+ uint32_t num_messages_in_next_write;
};
typedef enum {
diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc
index 5beaf54..d533989 100644
--- a/src/core/ext/transport/chttp2/transport/writing.cc
+++ b/src/core/ext/transport/chttp2/transport/writing.cc
@@ -633,6 +633,11 @@
GPR_TIMER_SCOPE("grpc_chttp2_end_write", 0);
grpc_chttp2_stream* s;
+ if (t->channelz_socket != nullptr) {
+ t->channelz_socket->RecordMessagesSent(t->num_messages_in_next_write);
+ }
+ t->num_messages_in_next_write = 0;
+
while (grpc_chttp2_list_pop_writing_stream(t, &s)) {
if (s->sending_bytes != 0) {
update_list(t, s, static_cast<int64_t>(s->sending_bytes),
diff --git a/src/core/lib/channel/channelz.cc b/src/core/lib/channel/channelz.cc
index e1ab2ea..df5c99f 100644
--- a/src/core/lib/channel/channelz.cc
+++ b/src/core/lib/channel/channelz.cc
@@ -62,7 +62,7 @@
CallCountingHelper::~CallCountingHelper() {}
void CallCountingHelper::RecordCallStarted() {
- gpr_atm_no_barrier_fetch_add(&calls_started_, (gpr_atm)1);
+ gpr_atm_no_barrier_fetch_add(&calls_started_, static_cast<gpr_atm>(1));
gpr_atm_no_barrier_store(&last_call_started_millis_,
(gpr_atm)ExecCtx::Get()->Now());
}
@@ -182,31 +182,32 @@
}
// ask CallCountingHelper to populate trace and call count data.
call_counter_.PopulateCallCounts(json);
- json = top_level_json;
return top_level_json;
}
+SocketNode::SocketNode() : BaseNode(EntityType::kSocket) {}
+
void SocketNode::RecordStreamStartedFromLocal() {
- gpr_atm_no_barrier_fetch_add(&streams_started_, (gpr_atm)1);
+ gpr_atm_no_barrier_fetch_add(&streams_started_, static_cast<gpr_atm>(1));
gpr_atm_no_barrier_store(&last_local_stream_created_millis_,
(gpr_atm)ExecCtx::Get()->Now());
}
void SocketNode::RecordStreamStartedFromRemote() {
- gpr_atm_no_barrier_fetch_add(&streams_started_, (gpr_atm)1);
+ gpr_atm_no_barrier_fetch_add(&streams_started_, static_cast<gpr_atm>(1));
gpr_atm_no_barrier_store(&last_remote_stream_created_millis_,
(gpr_atm)ExecCtx::Get()->Now());
}
-void SocketNode::RecordMessageSent() {
- gpr_atm_no_barrier_fetch_add(&messages_sent_, (gpr_atm)1);
+void SocketNode::RecordMessagesSent(uint32_t num_sent) {
+ gpr_atm_no_barrier_fetch_add(&messages_sent_, static_cast<gpr_atm>(num_sent));
gpr_atm_no_barrier_store(&last_message_sent_millis_,
(gpr_atm)ExecCtx::Get()->Now());
}
-void SocketNode::RecordMessageRecieved() {
- gpr_atm_no_barrier_fetch_add(&messages_recieved_, (gpr_atm)1);
- gpr_atm_no_barrier_store(&last_message_recieved_millis_,
+void SocketNode::RecordMessageReceived() {
+ gpr_atm_no_barrier_fetch_add(&messages_received_, static_cast<gpr_atm>(1));
+ gpr_atm_no_barrier_store(&last_message_received_millis_,
(gpr_atm)ExecCtx::Get()->Now());
}
@@ -242,47 +243,44 @@
json_iterator = grpc_json_add_number_string_child(
json, json_iterator, "streamsFailed", streams_failed_);
}
+ gpr_timespec ts;
if (messages_sent_ != 0) {
json_iterator = grpc_json_add_number_string_child(
json, json_iterator, "messagesSent", messages_sent_);
- }
- if (messages_recieved_ != 0) {
- json_iterator = grpc_json_add_number_string_child(
- json, json_iterator, "messagesRecieved", messages_recieved_);
- }
- if (keepalives_sent_ != 0) {
- json_iterator = grpc_json_add_number_string_child(
- json, json_iterator, "keepAlivesSent", keepalives_sent_);
- }
- gpr_timespec ts;
- if (streams_started_ != 0 && last_local_stream_created_millis_ != 0) {
- ts = grpc_millis_to_timespec(last_local_stream_created_millis_,
- GPR_CLOCK_REALTIME);
- json_iterator = grpc_json_create_child(
- json_iterator, json, "lastLocalStreamCreatedTimestamp",
- gpr_format_timespec(ts), GRPC_JSON_STRING, true);
- }
- if (streams_started_ != 0 && last_remote_stream_created_millis_ != 0) {
- ts = grpc_millis_to_timespec(last_remote_stream_created_millis_,
- GPR_CLOCK_REALTIME);
- json_iterator = grpc_json_create_child(
- json_iterator, json, "lastRemoteStreamCreatedTimestamp",
- gpr_format_timespec(ts), GRPC_JSON_STRING, true);
- }
- if (messages_sent_ != 0) {
ts = grpc_millis_to_timespec(last_message_sent_millis_, GPR_CLOCK_REALTIME);
json_iterator =
grpc_json_create_child(json_iterator, json, "lastMessageSentTimestamp",
gpr_format_timespec(ts), GRPC_JSON_STRING, true);
}
- if (messages_recieved_ != 0) {
- ts = grpc_millis_to_timespec(last_message_recieved_millis_,
+ if (messages_received_ != 0) {
+ json_iterator = grpc_json_add_number_string_child(
+ json, json_iterator, "messagesReceived", messages_received_);
+ ts = grpc_millis_to_timespec(last_message_received_millis_,
GPR_CLOCK_REALTIME);
json_iterator = grpc_json_create_child(
- json_iterator, json, "lastMessageRecievedTimestamp",
+ json_iterator, json, "lastMessageReceivedTimestamp",
gpr_format_timespec(ts), GRPC_JSON_STRING, true);
}
- json = top_level_json;
+ if (keepalives_sent_ != 0) {
+ json_iterator = grpc_json_add_number_string_child(
+ json, json_iterator, "keepAlivesSent", keepalives_sent_);
+ }
+ if (streams_started_ != 0) {
+ if (last_local_stream_created_millis_ != 0) {
+ ts = grpc_millis_to_timespec(last_local_stream_created_millis_,
+ GPR_CLOCK_REALTIME);
+ json_iterator = grpc_json_create_child(
+ json_iterator, json, "lastLocalStreamCreatedTimestamp",
+ gpr_format_timespec(ts), GRPC_JSON_STRING, true);
+ }
+ if (last_remote_stream_created_millis_ != 0) {
+ ts = grpc_millis_to_timespec(last_remote_stream_created_millis_,
+ GPR_CLOCK_REALTIME);
+ json_iterator = grpc_json_create_child(
+ json_iterator, json, "lastRemoteStreamCreatedTimestamp",
+ gpr_format_timespec(ts), GRPC_JSON_STRING, true);
+ }
+ }
return top_level_json;
}
diff --git a/src/core/lib/channel/channelz.h b/src/core/lib/channel/channelz.h
index 2486820..b7ae101 100644
--- a/src/core/lib/channel/channelz.h
+++ b/src/core/lib/channel/channelz.h
@@ -92,10 +92,10 @@
void RecordCallStarted();
void RecordCallFailed() {
- gpr_atm_no_barrier_fetch_add(&calls_failed_, (gpr_atm(1)));
+ gpr_atm_no_barrier_fetch_add(&calls_failed_, static_cast<gpr_atm>(1));
}
void RecordCallSucceeded() {
- gpr_atm_no_barrier_fetch_add(&calls_succeeded_, (gpr_atm(1)));
+ gpr_atm_no_barrier_fetch_add(&calls_succeeded_, static_cast<gpr_atm>(1));
}
// Common rendering of the call count data and last_call_started_timestamp.
@@ -199,7 +199,7 @@
// Handles channelz bookkeeping for sockets
class SocketNode : public BaseNode {
public:
- SocketNode() : BaseNode(EntityType::kSocket) {}
+ SocketNode();
~SocketNode() override {}
grpc_json* RenderJson() override;
@@ -207,15 +207,15 @@
void RecordStreamStartedFromLocal();
void RecordStreamStartedFromRemote();
void RecordStreamSucceeded() {
- gpr_atm_no_barrier_fetch_add(&streams_succeeded_, (gpr_atm(1)));
+ gpr_atm_no_barrier_fetch_add(&streams_succeeded_, static_cast<gpr_atm>(1));
}
void RecordStreamFailed() {
- gpr_atm_no_barrier_fetch_add(&streams_failed_, (gpr_atm(1)));
+ gpr_atm_no_barrier_fetch_add(&streams_failed_, static_cast<gpr_atm>(1));
}
- void RecordMessageSent();
- void RecordMessageRecieved();
+ void RecordMessagesSent(uint32_t num_sent);
+ void RecordMessageReceived();
void RecordKeepaliveSent() {
- gpr_atm_no_barrier_fetch_add(&keepalives_sent_, (gpr_atm(1)));
+ gpr_atm_no_barrier_fetch_add(&keepalives_sent_, static_cast<gpr_atm>(1));
}
private:
@@ -223,12 +223,13 @@
gpr_atm streams_succeeded_ = 0;
gpr_atm streams_failed_ = 0;
gpr_atm messages_sent_ = 0;
- gpr_atm messages_recieved_ = 0;
+ gpr_atm messages_received_ = 0;
gpr_atm keepalives_sent_ = 0;
gpr_atm last_local_stream_created_millis_ = 0;
gpr_atm last_remote_stream_created_millis_ = 0;
gpr_atm last_message_sent_millis_ = 0;
- gpr_atm last_message_recieved_millis_ = 0;
+ gpr_atm last_message_received_millis_ = 0;
+ UniquePtr<char> peer_string_;
};
// Creation functions
diff --git a/test/core/end2end/tests/channelz.cc b/test/core/end2end/tests/channelz.cc
index 3ebaea2..bb99045 100644
--- a/test/core/end2end/tests/channelz.cc
+++ b/test/core/end2end/tests/channelz.cc
@@ -202,12 +202,12 @@
size_t i;
static const char chars[] = "abcdefghijklmnopqrstuvwxyz1234567890";
char* output;
- const size_t output_size = 1024 * 1024;
- output = static_cast<char*>(gpr_malloc(output_size));
- for (i = 0; i < output_size - 1; ++i) {
+ const size_t kOutputSize = 1024 * 1024;
+ output = static_cast<char*>(gpr_malloc(kOutputSize));
+ for (i = 0; i < kOutputSize - 1; ++i) {
output[i] = chars[rand() % static_cast<int>(sizeof(chars) - 1)];
}
- output[output_size - 1] = '\0';
+ output[kOutputSize - 1] = '\0';
grpc_slice out = grpc_slice_from_copied_string(output);
gpr_free(output);
return out;
@@ -430,10 +430,32 @@
GPR_ASSERT(nullptr == strstr(json, "\"severity\":\"CT_INFO\""));
gpr_free(json);
+ // TODO(ncteisen): add logic to query for socket id once child socket support
+ // is in place. For now, we hardcode uuid=5, which we know is a socket.
+ json = grpc_channelz_get_socket(5);
+ GPR_ASSERT(json != nullptr);
+ gpr_log(GPR_INFO, "%s", json);
+ GPR_ASSERT(nullptr != strstr(json, "\"socketId\":\"5\""));
+ GPR_ASSERT(nullptr != strstr(json, "\"streamsStarted\":\"2\""));
+ GPR_ASSERT(nullptr != strstr(json, "\"streamsSucceeded\":\"2\""));
+ // no messaged sent yet.
+ GPR_ASSERT(nullptr == strstr(json, "\"messagesSent\""));
+ GPR_ASSERT(nullptr == strstr(json, "\"messagesReceived\""));
+ gpr_free(json);
+
// one successful request with payload to test socket data
- // TODO(ncteisen): add some programatic spot checks on the socket json.
run_one_request_with_payload(config, f);
+ json = grpc_channelz_get_socket(5);
+ GPR_ASSERT(json != nullptr);
+ gpr_log(GPR_INFO, "%s", json);
+ GPR_ASSERT(nullptr != strstr(json, "\"socketId\":\"5\""));
+ GPR_ASSERT(nullptr != strstr(json, "\"streamsStarted\":\"3\""));
+ GPR_ASSERT(nullptr != strstr(json, "\"streamsSucceeded\":\"3\""));
+ GPR_ASSERT(nullptr != strstr(json, "\"messagesSent\":\"1\""));
+ GPR_ASSERT(nullptr != strstr(json, "\"messagesReceived\":\"1\""));
+ gpr_free(json);
+
end_test(&f);
config.tear_down_data(&f);
}