| /* |
| * |
| * Copyright 2015 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| #include <grpc/support/port_platform.h> |
| |
| #include "src/core/ext/filters/client_channel/client_channel.h" |
| |
| #include <inttypes.h> |
| #include <limits.h> |
| #include <stdbool.h> |
| #include <stdio.h> |
| #include <string.h> |
| |
| #include <grpc/support/alloc.h> |
| #include <grpc/support/log.h> |
| #include <grpc/support/string_util.h> |
| #include <grpc/support/sync.h> |
| |
| #include "src/core/ext/filters/client_channel/backup_poller.h" |
| #include "src/core/ext/filters/client_channel/http_connect_handshaker.h" |
| #include "src/core/ext/filters/client_channel/lb_policy_registry.h" |
| #include "src/core/ext/filters/client_channel/method_params.h" |
| #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h" |
| #include "src/core/ext/filters/client_channel/resolver_registry.h" |
| #include "src/core/ext/filters/client_channel/retry_throttle.h" |
| #include "src/core/ext/filters/client_channel/subchannel.h" |
| #include "src/core/ext/filters/deadline/deadline_filter.h" |
| #include "src/core/lib/backoff/backoff.h" |
| #include "src/core/lib/channel/channel_args.h" |
| #include "src/core/lib/channel/connected_channel.h" |
| #include "src/core/lib/channel/status_util.h" |
| #include "src/core/lib/gpr/string.h" |
| #include "src/core/lib/gprpp/inlined_vector.h" |
| #include "src/core/lib/gprpp/manual_constructor.h" |
| #include "src/core/lib/iomgr/combiner.h" |
| #include "src/core/lib/iomgr/iomgr.h" |
| #include "src/core/lib/iomgr/polling_entity.h" |
| #include "src/core/lib/profiling/timers.h" |
| #include "src/core/lib/slice/slice_internal.h" |
| #include "src/core/lib/slice/slice_string_helpers.h" |
| #include "src/core/lib/surface/channel.h" |
| #include "src/core/lib/transport/connectivity_state.h" |
| #include "src/core/lib/transport/error_utils.h" |
| #include "src/core/lib/transport/metadata.h" |
| #include "src/core/lib/transport/metadata_batch.h" |
| #include "src/core/lib/transport/service_config.h" |
| #include "src/core/lib/transport/static_metadata.h" |
| #include "src/core/lib/transport/status_metadata.h" |
| |
| using grpc_core::internal::ClientChannelMethodParams; |
| using grpc_core::internal::ServerRetryThrottleData; |
| |
| /* Client channel implementation */ |
| |
| // By default, we buffer 256 KiB per RPC for retries. |
| // TODO(roth): Do we have any data to suggest a better value? |
| #define DEFAULT_PER_RPC_RETRY_BUFFER_SIZE (256 << 10) |
| |
| // This value was picked arbitrarily. It can be changed if there is |
| // any even moderately compelling reason to do so. |
| #define RETRY_BACKOFF_JITTER 0.2 |
| |
| grpc_core::TraceFlag grpc_client_channel_trace(false, "client_channel"); |
| |
| /************************************************************************* |
| * CHANNEL-WIDE FUNCTIONS |
| */ |
| |
| struct external_connectivity_watcher; |
| |
| typedef grpc_core::SliceHashTable< |
| grpc_core::RefCountedPtr<ClientChannelMethodParams>> |
| MethodParamsTable; |
| |
| typedef struct client_channel_channel_data { |
| grpc_core::OrphanablePtr<grpc_core::Resolver> resolver; |
| bool started_resolving; |
| bool deadline_checking_enabled; |
| grpc_client_channel_factory* client_channel_factory; |
| bool enable_retries; |
| size_t per_rpc_retry_buffer_size; |
| |
| /** combiner protecting all variables below in this data structure */ |
| grpc_combiner* combiner; |
| /** currently active load balancer */ |
| grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> lb_policy; |
| /** retry throttle data */ |
| grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data; |
| /** maps method names to method_parameters structs */ |
| grpc_core::RefCountedPtr<MethodParamsTable> method_params_table; |
| /** incoming resolver result - set by resolver.next() */ |
| grpc_channel_args* resolver_result; |
| /** a list of closures that are all waiting for resolver result to come in */ |
| grpc_closure_list waiting_for_resolver_result_closures; |
| /** resolver callback */ |
| grpc_closure on_resolver_result_changed; |
| /** connectivity state being tracked */ |
| grpc_connectivity_state_tracker state_tracker; |
| /** when an lb_policy arrives, should we try to exit idle */ |
| bool exit_idle_when_lb_policy_arrives; |
| /** owning stack */ |
| grpc_channel_stack* owning_stack; |
| /** interested parties (owned) */ |
| grpc_pollset_set* interested_parties; |
| |
| /* external_connectivity_watcher_list head is guarded by its own mutex, since |
| * counts need to be grabbed immediately without polling on a cq */ |
| gpr_mu external_connectivity_watcher_list_mu; |
| struct external_connectivity_watcher* external_connectivity_watcher_list_head; |
| |
| /* the following properties are guarded by a mutex since APIs require them |
| to be instantaneously available */ |
| gpr_mu info_mu; |
| grpc_core::UniquePtr<char> info_lb_policy_name; |
| /** service config in JSON form */ |
| grpc_core::UniquePtr<char> info_service_config_json; |
| } channel_data; |
| |
| typedef struct { |
| channel_data* chand; |
| /** used as an identifier, don't dereference it because the LB policy may be |
| * non-existing when the callback is run */ |
| grpc_core::LoadBalancingPolicy* lb_policy; |
| grpc_closure closure; |
| } reresolution_request_args; |
| |
| /** We create one watcher for each new lb_policy that is returned from a |
| resolver, to watch for state changes from the lb_policy. When a state |
| change is seen, we update the channel, and create a new watcher. */ |
| typedef struct { |
| channel_data* chand; |
| grpc_closure on_changed; |
| grpc_connectivity_state state; |
| grpc_core::LoadBalancingPolicy* lb_policy; |
| } lb_policy_connectivity_watcher; |
| |
| static void watch_lb_policy_locked(channel_data* chand, |
| grpc_core::LoadBalancingPolicy* lb_policy, |
| grpc_connectivity_state current_state); |
| |
| static void set_channel_connectivity_state_locked(channel_data* chand, |
| grpc_connectivity_state state, |
| grpc_error* error, |
| const char* reason) { |
| /* TODO: Improve failure handling: |
| * - Make it possible for policies to return GRPC_CHANNEL_TRANSIENT_FAILURE. |
| * - Hand over pending picks from old policies during the switch that happens |
| * when resolver provides an update. */ |
| if (chand->lb_policy != nullptr) { |
| if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) { |
| /* cancel picks with wait_for_ready=false */ |
| chand->lb_policy->CancelMatchingPicksLocked( |
| /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY, |
| /* check= */ 0, GRPC_ERROR_REF(error)); |
| } else if (state == GRPC_CHANNEL_SHUTDOWN) { |
| /* cancel all picks */ |
| chand->lb_policy->CancelMatchingPicksLocked(/* mask= */ 0, /* check= */ 0, |
| GRPC_ERROR_REF(error)); |
| } |
| } |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p: setting connectivity state to %s", chand, |
| grpc_connectivity_state_name(state)); |
| } |
| grpc_connectivity_state_set(&chand->state_tracker, state, error, reason); |
| } |
| |
| static void on_lb_policy_state_changed_locked(void* arg, grpc_error* error) { |
| lb_policy_connectivity_watcher* w = |
| static_cast<lb_policy_connectivity_watcher*>(arg); |
| /* check if the notification is for the latest policy */ |
| if (w->lb_policy == w->chand->lb_policy.get()) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p: lb_policy=%p state changed to %s", w->chand, |
| w->lb_policy, grpc_connectivity_state_name(w->state)); |
| } |
| set_channel_connectivity_state_locked(w->chand, w->state, |
| GRPC_ERROR_REF(error), "lb_changed"); |
| if (w->state != GRPC_CHANNEL_SHUTDOWN) { |
| watch_lb_policy_locked(w->chand, w->lb_policy, w->state); |
| } |
| } |
| GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack, "watch_lb_policy"); |
| gpr_free(w); |
| } |
| |
| static void watch_lb_policy_locked(channel_data* chand, |
| grpc_core::LoadBalancingPolicy* lb_policy, |
| grpc_connectivity_state current_state) { |
| lb_policy_connectivity_watcher* w = |
| static_cast<lb_policy_connectivity_watcher*>(gpr_malloc(sizeof(*w))); |
| GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy"); |
| w->chand = chand; |
| GRPC_CLOSURE_INIT(&w->on_changed, on_lb_policy_state_changed_locked, w, |
| grpc_combiner_scheduler(chand->combiner)); |
| w->state = current_state; |
| w->lb_policy = lb_policy; |
| lb_policy->NotifyOnStateChangeLocked(&w->state, &w->on_changed); |
| } |
| |
| static void start_resolving_locked(channel_data* chand) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p: starting name resolution", chand); |
| } |
| GPR_ASSERT(!chand->started_resolving); |
| chand->started_resolving = true; |
| GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); |
| chand->resolver->NextLocked(&chand->resolver_result, |
| &chand->on_resolver_result_changed); |
| } |
| |
| typedef struct { |
| char* server_name; |
| grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data; |
| } service_config_parsing_state; |
| |
| static void parse_retry_throttle_params( |
| const grpc_json* field, service_config_parsing_state* parsing_state) { |
| if (strcmp(field->key, "retryThrottling") == 0) { |
| if (parsing_state->retry_throttle_data != nullptr) return; // Duplicate. |
| if (field->type != GRPC_JSON_OBJECT) return; |
| int max_milli_tokens = 0; |
| int milli_token_ratio = 0; |
| for (grpc_json* sub_field = field->child; sub_field != nullptr; |
| sub_field = sub_field->next) { |
| if (sub_field->key == nullptr) return; |
| if (strcmp(sub_field->key, "maxTokens") == 0) { |
| if (max_milli_tokens != 0) return; // Duplicate. |
| if (sub_field->type != GRPC_JSON_NUMBER) return; |
| max_milli_tokens = gpr_parse_nonnegative_int(sub_field->value); |
| if (max_milli_tokens == -1) return; |
| max_milli_tokens *= 1000; |
| } else if (strcmp(sub_field->key, "tokenRatio") == 0) { |
| if (milli_token_ratio != 0) return; // Duplicate. |
| if (sub_field->type != GRPC_JSON_NUMBER) return; |
| // We support up to 3 decimal digits. |
| size_t whole_len = strlen(sub_field->value); |
| uint32_t multiplier = 1; |
| uint32_t decimal_value = 0; |
| const char* decimal_point = strchr(sub_field->value, '.'); |
| if (decimal_point != nullptr) { |
| whole_len = static_cast<size_t>(decimal_point - sub_field->value); |
| multiplier = 1000; |
| size_t decimal_len = strlen(decimal_point + 1); |
| if (decimal_len > 3) decimal_len = 3; |
| if (!gpr_parse_bytes_to_uint32(decimal_point + 1, decimal_len, |
| &decimal_value)) { |
| return; |
| } |
| uint32_t decimal_multiplier = 1; |
| for (size_t i = 0; i < (3 - decimal_len); ++i) { |
| decimal_multiplier *= 10; |
| } |
| decimal_value *= decimal_multiplier; |
| } |
| uint32_t whole_value; |
| if (!gpr_parse_bytes_to_uint32(sub_field->value, whole_len, |
| &whole_value)) { |
| return; |
| } |
| milli_token_ratio = |
| static_cast<int>((whole_value * multiplier) + decimal_value); |
| if (milli_token_ratio <= 0) return; |
| } |
| } |
| parsing_state->retry_throttle_data = |
| grpc_core::internal::ServerRetryThrottleMap::GetDataForServer( |
| parsing_state->server_name, max_milli_tokens, milli_token_ratio); |
| } |
| } |
| |
| // Invoked from the resolver NextLocked() callback when the resolver |
| // is shutting down. |
| static void on_resolver_shutdown_locked(channel_data* chand, |
| grpc_error* error) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p: shutting down", chand); |
| } |
| if (chand->lb_policy != nullptr) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", chand, |
| chand->lb_policy.get()); |
| } |
| grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(), |
| chand->interested_parties); |
| chand->lb_policy.reset(); |
| } |
| if (chand->resolver != nullptr) { |
| // This should never happen; it can only be triggered by a resolver |
| // implementation spotaneously deciding to report shutdown without |
| // being orphaned. This code is included just to be defensive. |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p: spontaneous shutdown from resolver %p", |
| chand, chand->resolver.get()); |
| } |
| chand->resolver.reset(); |
| set_channel_connectivity_state_locked( |
| chand, GRPC_CHANNEL_SHUTDOWN, |
| GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( |
| "Resolver spontaneous shutdown", &error, 1), |
| "resolver_spontaneous_shutdown"); |
| } |
| grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures, |
| GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( |
| "Channel disconnected", &error, 1)); |
| GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures); |
| GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "resolver"); |
| grpc_channel_args_destroy(chand->resolver_result); |
| chand->resolver_result = nullptr; |
| GRPC_ERROR_UNREF(error); |
| } |
| |
| // Returns the LB policy name from the resolver result. |
| static grpc_core::UniquePtr<char> |
| get_lb_policy_name_from_resolver_result_locked(channel_data* chand) { |
| // Find LB policy name in channel args. |
| const grpc_arg* channel_arg = |
| grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_POLICY_NAME); |
| const char* lb_policy_name = grpc_channel_arg_get_string(channel_arg); |
| // Special case: If at least one balancer address is present, we use |
| // the grpclb policy, regardless of what the resolver actually specified. |
| channel_arg = |
| grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES); |
| if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) { |
| grpc_lb_addresses* addresses = |
| static_cast<grpc_lb_addresses*>(channel_arg->value.pointer.p); |
| if (grpc_lb_addresses_contains_balancer_address(*addresses)) { |
| if (lb_policy_name != nullptr && |
| gpr_stricmp(lb_policy_name, "grpclb") != 0) { |
| gpr_log(GPR_INFO, |
| "resolver requested LB policy %s but provided at least one " |
| "balancer address -- forcing use of grpclb LB policy", |
| lb_policy_name); |
| } |
| lb_policy_name = "grpclb"; |
| } |
| } |
| // Use pick_first if nothing was specified and we didn't select grpclb |
| // above. |
| if (lb_policy_name == nullptr) lb_policy_name = "pick_first"; |
| return grpc_core::UniquePtr<char>(gpr_strdup(lb_policy_name)); |
| } |
| |
| static void request_reresolution_locked(void* arg, grpc_error* error) { |
| reresolution_request_args* args = |
| static_cast<reresolution_request_args*>(arg); |
| channel_data* chand = args->chand; |
| // If this invocation is for a stale LB policy, treat it as an LB shutdown |
| // signal. |
| if (args->lb_policy != chand->lb_policy.get() || error != GRPC_ERROR_NONE || |
| chand->resolver == nullptr) { |
| GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "re-resolution"); |
| gpr_free(args); |
| return; |
| } |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p: started name re-resolving", chand); |
| } |
| chand->resolver->RequestReresolutionLocked(); |
| // Give back the closure to the LB policy. |
| chand->lb_policy->SetReresolutionClosureLocked(&args->closure); |
| } |
| |
| // Creates a new LB policy, replacing any previous one. |
| // If the new policy is created successfully, sets *connectivity_state and |
| // *connectivity_error to its initial connectivity state; otherwise, |
| // leaves them unchanged. |
| static void create_new_lb_policy_locked( |
| channel_data* chand, char* lb_policy_name, |
| grpc_connectivity_state* connectivity_state, |
| grpc_error** connectivity_error) { |
| grpc_core::LoadBalancingPolicy::Args lb_policy_args; |
| lb_policy_args.combiner = chand->combiner; |
| lb_policy_args.client_channel_factory = chand->client_channel_factory; |
| lb_policy_args.args = chand->resolver_result; |
| grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> new_lb_policy = |
| grpc_core::LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( |
| lb_policy_name, lb_policy_args); |
| if (GPR_UNLIKELY(new_lb_policy == nullptr)) { |
| gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name); |
| } else { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p: created new LB policy \"%s\" (%p)", chand, |
| lb_policy_name, new_lb_policy.get()); |
| } |
| // Swap out the LB policy and update the fds in |
| // chand->interested_parties. |
| if (chand->lb_policy != nullptr) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", chand, |
| chand->lb_policy.get()); |
| } |
| grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(), |
| chand->interested_parties); |
| chand->lb_policy->HandOffPendingPicksLocked(new_lb_policy.get()); |
| } |
| chand->lb_policy = std::move(new_lb_policy); |
| grpc_pollset_set_add_pollset_set(chand->lb_policy->interested_parties(), |
| chand->interested_parties); |
| // Set up re-resolution callback. |
| reresolution_request_args* args = |
| static_cast<reresolution_request_args*>(gpr_zalloc(sizeof(*args))); |
| args->chand = chand; |
| args->lb_policy = chand->lb_policy.get(); |
| GRPC_CLOSURE_INIT(&args->closure, request_reresolution_locked, args, |
| grpc_combiner_scheduler(chand->combiner)); |
| GRPC_CHANNEL_STACK_REF(chand->owning_stack, "re-resolution"); |
| chand->lb_policy->SetReresolutionClosureLocked(&args->closure); |
| // Get the new LB policy's initial connectivity state and start a |
| // connectivity watch. |
| GRPC_ERROR_UNREF(*connectivity_error); |
| *connectivity_state = |
| chand->lb_policy->CheckConnectivityLocked(connectivity_error); |
| if (chand->exit_idle_when_lb_policy_arrives) { |
| chand->lb_policy->ExitIdleLocked(); |
| chand->exit_idle_when_lb_policy_arrives = false; |
| } |
| watch_lb_policy_locked(chand, chand->lb_policy.get(), *connectivity_state); |
| } |
| } |
| |
| // Returns the service config (as a JSON string) from the resolver result. |
| // Also updates state in chand. |
| static grpc_core::UniquePtr<char> |
| get_service_config_from_resolver_result_locked(channel_data* chand) { |
| const grpc_arg* channel_arg = |
| grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVICE_CONFIG); |
| const char* service_config_json = grpc_channel_arg_get_string(channel_arg); |
| if (service_config_json != nullptr) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p: resolver returned service config: \"%s\"", |
| chand, service_config_json); |
| } |
| grpc_core::UniquePtr<grpc_core::ServiceConfig> service_config = |
| grpc_core::ServiceConfig::Create(service_config_json); |
| if (service_config != nullptr) { |
| if (chand->enable_retries) { |
| channel_arg = |
| grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI); |
| const char* server_uri = grpc_channel_arg_get_string(channel_arg); |
| GPR_ASSERT(server_uri != nullptr); |
| grpc_uri* uri = grpc_uri_parse(server_uri, true); |
| GPR_ASSERT(uri->path[0] != '\0'); |
| service_config_parsing_state parsing_state; |
| parsing_state.server_name = |
| uri->path[0] == '/' ? uri->path + 1 : uri->path; |
| service_config->ParseGlobalParams(parse_retry_throttle_params, |
| &parsing_state); |
| grpc_uri_destroy(uri); |
| chand->retry_throttle_data = |
| std::move(parsing_state.retry_throttle_data); |
| } |
| chand->method_params_table = service_config->CreateMethodConfigTable( |
| ClientChannelMethodParams::CreateFromJson); |
| } |
| } |
| return grpc_core::UniquePtr<char>(gpr_strdup(service_config_json)); |
| } |
| |
| // Callback invoked when a resolver result is available. |
| static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { |
| channel_data* chand = static_cast<channel_data*>(arg); |
| if (grpc_client_channel_trace.enabled()) { |
| const char* disposition = |
| chand->resolver_result != nullptr |
| ? "" |
| : (error == GRPC_ERROR_NONE ? " (transient error)" |
| : " (resolver shutdown)"); |
| gpr_log(GPR_INFO, |
| "chand=%p: got resolver result: resolver_result=%p error=%s%s", |
| chand, chand->resolver_result, grpc_error_string(error), |
| disposition); |
| } |
| // Handle shutdown. |
| if (error != GRPC_ERROR_NONE || chand->resolver == nullptr) { |
| on_resolver_shutdown_locked(chand, GRPC_ERROR_REF(error)); |
| return; |
| } |
| // Data used to set the channel's connectivity state. |
| bool set_connectivity_state = true; |
| grpc_connectivity_state connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE; |
| grpc_error* connectivity_error = |
| GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy"); |
| // chand->resolver_result will be null in the case of a transient |
| // resolution error. In that case, we don't have any new result to |
| // process, which means that we keep using the previous result (if any). |
| if (chand->resolver_result == nullptr) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p: resolver transient failure", chand); |
| } |
| } else { |
| grpc_core::UniquePtr<char> lb_policy_name = |
| get_lb_policy_name_from_resolver_result_locked(chand); |
| // Check to see if we're already using the right LB policy. |
| // Note: It's safe to use chand->info_lb_policy_name here without |
| // taking a lock on chand->info_mu, because this function is the |
| // only thing that modifies its value, and it can only be invoked |
| // once at any given time. |
| bool lb_policy_name_changed = chand->info_lb_policy_name == nullptr || |
| gpr_stricmp(chand->info_lb_policy_name.get(), |
| lb_policy_name.get()) != 0; |
| if (chand->lb_policy != nullptr && !lb_policy_name_changed) { |
| // Continue using the same LB policy. Update with new addresses. |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p: updating existing LB policy \"%s\" (%p)", |
| chand, lb_policy_name.get(), chand->lb_policy.get()); |
| } |
| chand->lb_policy->UpdateLocked(*chand->resolver_result); |
| // No need to set the channel's connectivity state; the existing |
| // watch on the LB policy will take care of that. |
| set_connectivity_state = false; |
| } else { |
| // Instantiate new LB policy. |
| create_new_lb_policy_locked(chand, lb_policy_name.get(), |
| &connectivity_state, &connectivity_error); |
| } |
| // Find service config. |
| grpc_core::UniquePtr<char> service_config_json = |
| get_service_config_from_resolver_result_locked(chand); |
| // Swap out the data used by cc_get_channel_info(). |
| gpr_mu_lock(&chand->info_mu); |
| chand->info_lb_policy_name = std::move(lb_policy_name); |
| chand->info_service_config_json = std::move(service_config_json); |
| gpr_mu_unlock(&chand->info_mu); |
| // Clean up. |
| grpc_channel_args_destroy(chand->resolver_result); |
| chand->resolver_result = nullptr; |
| } |
| // Set the channel's connectivity state if needed. |
| if (set_connectivity_state) { |
| set_channel_connectivity_state_locked( |
| chand, connectivity_state, connectivity_error, "resolver_result"); |
| } else { |
| GRPC_ERROR_UNREF(connectivity_error); |
| } |
| // Invoke closures that were waiting for results and renew the watch. |
| GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures); |
| chand->resolver->NextLocked(&chand->resolver_result, |
| &chand->on_resolver_result_changed); |
| } |
| |
| static void start_transport_op_locked(void* arg, grpc_error* error_ignored) { |
| grpc_transport_op* op = static_cast<grpc_transport_op*>(arg); |
| grpc_channel_element* elem = |
| static_cast<grpc_channel_element*>(op->handler_private.extra_arg); |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| |
| if (op->on_connectivity_state_change != nullptr) { |
| grpc_connectivity_state_notify_on_state_change( |
| &chand->state_tracker, op->connectivity_state, |
| op->on_connectivity_state_change); |
| op->on_connectivity_state_change = nullptr; |
| op->connectivity_state = nullptr; |
| } |
| |
| if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) { |
| if (chand->lb_policy == nullptr) { |
| grpc_error* error = |
| GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing"); |
| GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error)); |
| GRPC_CLOSURE_SCHED(op->send_ping.on_ack, error); |
| } else { |
| grpc_error* error = GRPC_ERROR_NONE; |
| grpc_core::LoadBalancingPolicy::PickState pick_state; |
| pick_state.initial_metadata = nullptr; |
| pick_state.initial_metadata_flags = 0; |
| pick_state.on_complete = nullptr; |
| memset(&pick_state.subchannel_call_context, 0, |
| sizeof(pick_state.subchannel_call_context)); |
| pick_state.user_data = nullptr; |
| // Pick must return synchronously, because pick_state.on_complete is null. |
| GPR_ASSERT(chand->lb_policy->PickLocked(&pick_state, &error)); |
| if (pick_state.connected_subchannel != nullptr) { |
| pick_state.connected_subchannel->Ping(op->send_ping.on_initiate, |
| op->send_ping.on_ack); |
| } else { |
| if (error == GRPC_ERROR_NONE) { |
| error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
| "LB policy dropped call on ping"); |
| } |
| GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error)); |
| GRPC_CLOSURE_SCHED(op->send_ping.on_ack, error); |
| } |
| op->bind_pollset = nullptr; |
| } |
| op->send_ping.on_initiate = nullptr; |
| op->send_ping.on_ack = nullptr; |
| } |
| |
| if (op->disconnect_with_error != GRPC_ERROR_NONE) { |
| if (chand->resolver != nullptr) { |
| set_channel_connectivity_state_locked( |
| chand, GRPC_CHANNEL_SHUTDOWN, |
| GRPC_ERROR_REF(op->disconnect_with_error), "disconnect"); |
| chand->resolver.reset(); |
| if (!chand->started_resolving) { |
| grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures, |
| GRPC_ERROR_REF(op->disconnect_with_error)); |
| GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures); |
| } |
| if (chand->lb_policy != nullptr) { |
| grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(), |
| chand->interested_parties); |
| chand->lb_policy.reset(); |
| } |
| } |
| GRPC_ERROR_UNREF(op->disconnect_with_error); |
| } |
| |
| if (op->reset_connect_backoff) { |
| if (chand->resolver != nullptr) { |
| chand->resolver->ResetBackoffLocked(); |
| chand->resolver->RequestReresolutionLocked(); |
| } |
| if (chand->lb_policy != nullptr) { |
| chand->lb_policy->ResetBackoffLocked(); |
| } |
| } |
| |
| GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "start_transport_op"); |
| |
| GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE); |
| } |
| |
| static void cc_start_transport_op(grpc_channel_element* elem, |
| grpc_transport_op* op) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| |
| GPR_ASSERT(op->set_accept_stream == false); |
| if (op->bind_pollset != nullptr) { |
| grpc_pollset_set_add_pollset(chand->interested_parties, op->bind_pollset); |
| } |
| |
| op->handler_private.extra_arg = elem; |
| GRPC_CHANNEL_STACK_REF(chand->owning_stack, "start_transport_op"); |
| GRPC_CLOSURE_SCHED( |
| GRPC_CLOSURE_INIT(&op->handler_private.closure, start_transport_op_locked, |
| op, grpc_combiner_scheduler(chand->combiner)), |
| GRPC_ERROR_NONE); |
| } |
| |
| static void cc_get_channel_info(grpc_channel_element* elem, |
| const grpc_channel_info* info) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| gpr_mu_lock(&chand->info_mu); |
| if (info->lb_policy_name != nullptr) { |
| *info->lb_policy_name = gpr_strdup(chand->info_lb_policy_name.get()); |
| } |
| if (info->service_config_json != nullptr) { |
| *info->service_config_json = |
| gpr_strdup(chand->info_service_config_json.get()); |
| } |
| gpr_mu_unlock(&chand->info_mu); |
| } |
| |
| /* Constructor for channel_data */ |
| static grpc_error* cc_init_channel_elem(grpc_channel_element* elem, |
| grpc_channel_element_args* args) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| GPR_ASSERT(args->is_last); |
| GPR_ASSERT(elem->filter == &grpc_client_channel_filter); |
| // Initialize data members. |
| chand->combiner = grpc_combiner_create(); |
| gpr_mu_init(&chand->info_mu); |
| gpr_mu_init(&chand->external_connectivity_watcher_list_mu); |
| |
| gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); |
| chand->external_connectivity_watcher_list_head = nullptr; |
| gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); |
| |
| chand->owning_stack = args->channel_stack; |
| GRPC_CLOSURE_INIT(&chand->on_resolver_result_changed, |
| on_resolver_result_changed_locked, chand, |
| grpc_combiner_scheduler(chand->combiner)); |
| chand->interested_parties = grpc_pollset_set_create(); |
| grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, |
| "client_channel"); |
| grpc_client_channel_start_backup_polling(chand->interested_parties); |
| // Record max per-RPC retry buffer size. |
| const grpc_arg* arg = grpc_channel_args_find( |
| args->channel_args, GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE); |
| chand->per_rpc_retry_buffer_size = (size_t)grpc_channel_arg_get_integer( |
| arg, {DEFAULT_PER_RPC_RETRY_BUFFER_SIZE, 0, INT_MAX}); |
| // Record enable_retries. |
| arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_ENABLE_RETRIES); |
| chand->enable_retries = grpc_channel_arg_get_bool(arg, true); |
| // Record client channel factory. |
| arg = grpc_channel_args_find(args->channel_args, |
| GRPC_ARG_CLIENT_CHANNEL_FACTORY); |
| if (arg == nullptr) { |
| return GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
| "Missing client channel factory in args for client channel filter"); |
| } |
| if (arg->type != GRPC_ARG_POINTER) { |
| return GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
| "client channel factory arg must be a pointer"); |
| } |
| grpc_client_channel_factory_ref( |
| static_cast<grpc_client_channel_factory*>(arg->value.pointer.p)); |
| chand->client_channel_factory = |
| static_cast<grpc_client_channel_factory*>(arg->value.pointer.p); |
| // Get server name to resolve, using proxy mapper if needed. |
| arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI); |
| if (arg == nullptr) { |
| return GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
| "Missing server uri in args for client channel filter"); |
| } |
| if (arg->type != GRPC_ARG_STRING) { |
| return GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
| "server uri arg must be a string"); |
| } |
| char* proxy_name = nullptr; |
| grpc_channel_args* new_args = nullptr; |
| grpc_proxy_mappers_map_name(arg->value.string, args->channel_args, |
| &proxy_name, &new_args); |
| // Instantiate resolver. |
| chand->resolver = grpc_core::ResolverRegistry::CreateResolver( |
| proxy_name != nullptr ? proxy_name : arg->value.string, |
| new_args != nullptr ? new_args : args->channel_args, |
| chand->interested_parties, chand->combiner); |
| if (proxy_name != nullptr) gpr_free(proxy_name); |
| if (new_args != nullptr) grpc_channel_args_destroy(new_args); |
| if (chand->resolver == nullptr) { |
| return GRPC_ERROR_CREATE_FROM_STATIC_STRING("resolver creation failed"); |
| } |
| chand->deadline_checking_enabled = |
| grpc_deadline_checking_enabled(args->channel_args); |
| return GRPC_ERROR_NONE; |
| } |
| |
| /* Destructor for channel_data */ |
| static void cc_destroy_channel_elem(grpc_channel_element* elem) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| if (chand->resolver != nullptr) { |
| // The only way we can get here is if we never started resolving, |
| // because we take a ref to the channel stack when we start |
| // resolving and do not release it until the resolver callback is |
| // invoked after the resolver shuts down. |
| chand->resolver.reset(); |
| } |
| if (chand->client_channel_factory != nullptr) { |
| grpc_client_channel_factory_unref(chand->client_channel_factory); |
| } |
| if (chand->lb_policy != nullptr) { |
| grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(), |
| chand->interested_parties); |
| chand->lb_policy.reset(); |
| } |
| // TODO(roth): Once we convert the filter API to C++, there will no |
| // longer be any need to explicitly reset these smart pointer data members. |
| chand->info_lb_policy_name.reset(); |
| chand->info_service_config_json.reset(); |
| chand->retry_throttle_data.reset(); |
| chand->method_params_table.reset(); |
| grpc_client_channel_stop_backup_polling(chand->interested_parties); |
| grpc_connectivity_state_destroy(&chand->state_tracker); |
| grpc_pollset_set_destroy(chand->interested_parties); |
| GRPC_COMBINER_UNREF(chand->combiner, "client_channel"); |
| gpr_mu_destroy(&chand->info_mu); |
| gpr_mu_destroy(&chand->external_connectivity_watcher_list_mu); |
| } |
| |
| /************************************************************************* |
| * PER-CALL FUNCTIONS |
| */ |
| |
| // Max number of batches that can be pending on a call at any given |
| // time. This includes one batch for each of the following ops: |
| // recv_initial_metadata |
| // send_initial_metadata |
| // recv_message |
| // send_message |
| // recv_trailing_metadata |
| // send_trailing_metadata |
| #define MAX_PENDING_BATCHES 6 |
| |
| // Retry support: |
| // |
| // In order to support retries, we act as a proxy for stream op batches. |
| // When we get a batch from the surface, we add it to our list of pending |
| // batches, and we then use those batches to construct separate "child" |
| // batches to be started on the subchannel call. When the child batches |
| // return, we then decide which pending batches have been completed and |
| // schedule their callbacks accordingly. If a subchannel call fails and |
| // we want to retry it, we do a new pick and start again, constructing |
| // new "child" batches for the new subchannel call. |
| // |
| // Note that retries are committed when receiving data from the server |
| // (except for Trailers-Only responses). However, there may be many |
| // send ops started before receiving any data, so we may have already |
| // completed some number of send ops (and returned the completions up to |
| // the surface) by the time we realize that we need to retry. To deal |
| // with this, we cache data for send ops, so that we can replay them on a |
| // different subchannel call even after we have completed the original |
| // batches. |
| // |
| // There are two sets of data to maintain: |
| // - In call_data (in the parent channel), we maintain a list of pending |
| // ops and cached data for send ops. |
| // - In the subchannel call, we maintain state to indicate what ops have |
| // already been sent down to that call. |
| // |
| // When constructing the "child" batches, we compare those two sets of |
| // data to see which batches need to be sent to the subchannel call. |
| |
| // TODO(roth): In subsequent PRs: |
| // - add support for transparent retries (including initial metadata) |
| // - figure out how to record stats in census for retries |
| // (census filter is on top of this one) |
| // - add census stats for retries |
| |
| // State used for starting a retryable batch on a subchannel call. |
| // This provides its own grpc_transport_stream_op_batch and other data |
| // structures needed to populate the ops in the batch. |
| // We allocate one struct on the arena for each attempt at starting a |
| // batch on a given subchannel call. |
| typedef struct { |
| gpr_refcount refs; |
| grpc_call_element* elem; |
| grpc_subchannel_call* subchannel_call; // Holds a ref. |
| // The batch to use in the subchannel call. |
| // Its payload field points to subchannel_call_retry_state.batch_payload. |
| grpc_transport_stream_op_batch batch; |
| // For intercepting on_complete. |
| grpc_closure on_complete; |
| } subchannel_batch_data; |
| |
| // Retry state associated with a subchannel call. |
| // Stored in the parent_data of the subchannel call object. |
| typedef struct { |
| // subchannel_batch_data.batch.payload points to this. |
| grpc_transport_stream_op_batch_payload batch_payload; |
| // For send_initial_metadata. |
| // Note that we need to make a copy of the initial metadata for each |
| // subchannel call instead of just referring to the copy in call_data, |
| // because filters in the subchannel stack will probably add entries, |
| // so we need to start in a pristine state for each attempt of the call. |
| grpc_linked_mdelem* send_initial_metadata_storage; |
| grpc_metadata_batch send_initial_metadata; |
| // For send_message. |
| grpc_core::ManualConstructor<grpc_core::ByteStreamCache::CachingByteStream> |
| send_message; |
| // For send_trailing_metadata. |
| grpc_linked_mdelem* send_trailing_metadata_storage; |
| grpc_metadata_batch send_trailing_metadata; |
| // For intercepting recv_initial_metadata. |
| grpc_metadata_batch recv_initial_metadata; |
| grpc_closure recv_initial_metadata_ready; |
| bool trailing_metadata_available; |
| // For intercepting recv_message. |
| grpc_closure recv_message_ready; |
| grpc_core::OrphanablePtr<grpc_core::ByteStream> recv_message; |
| // For intercepting recv_trailing_metadata. |
| grpc_metadata_batch recv_trailing_metadata; |
| grpc_transport_stream_stats collect_stats; |
| grpc_closure recv_trailing_metadata_ready; |
| // These fields indicate which ops have been started and completed on |
| // this subchannel call. |
| size_t started_send_message_count; |
| size_t completed_send_message_count; |
| size_t started_recv_message_count; |
| size_t completed_recv_message_count; |
| bool started_send_initial_metadata : 1; |
| bool completed_send_initial_metadata : 1; |
| bool started_send_trailing_metadata : 1; |
| bool completed_send_trailing_metadata : 1; |
| bool started_recv_initial_metadata : 1; |
| bool completed_recv_initial_metadata : 1; |
| bool started_recv_trailing_metadata : 1; |
| bool completed_recv_trailing_metadata : 1; |
| // State for callback processing. |
| bool retry_dispatched : 1; |
| subchannel_batch_data* recv_initial_metadata_ready_deferred_batch; |
| grpc_error* recv_initial_metadata_error; |
| subchannel_batch_data* recv_message_ready_deferred_batch; |
| grpc_error* recv_message_error; |
| subchannel_batch_data* recv_trailing_metadata_internal_batch; |
| } subchannel_call_retry_state; |
| |
| // Pending batches stored in call data. |
| typedef struct { |
| // The pending batch. If nullptr, this slot is empty. |
| grpc_transport_stream_op_batch* batch; |
| // Indicates whether payload for send ops has been cached in call data. |
| bool send_ops_cached; |
| } pending_batch; |
| |
| /** Call data. Holds a pointer to grpc_subchannel_call and the |
| associated machinery to create such a pointer. |
| Handles queueing of stream ops until a call object is ready, waiting |
| for initial metadata before trying to create a call object, |
| and handling cancellation gracefully. */ |
| typedef struct client_channel_call_data { |
| // State for handling deadlines. |
| // The code in deadline_filter.c requires this to be the first field. |
| // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state |
| // and this struct both independently store pointers to the call stack |
| // and call combiner. If/when we have time, find a way to avoid this |
| // without breaking the grpc_deadline_state abstraction. |
| grpc_deadline_state deadline_state; |
| |
| grpc_slice path; // Request path. |
| gpr_timespec call_start_time; |
| grpc_millis deadline; |
| gpr_arena* arena; |
| grpc_call_stack* owning_call; |
| grpc_call_combiner* call_combiner; |
| |
| grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data; |
| grpc_core::RefCountedPtr<ClientChannelMethodParams> method_params; |
| |
| grpc_subchannel_call* subchannel_call; |
| |
| // Set when we get a cancel_stream op. |
| grpc_error* cancel_error; |
| |
| grpc_core::LoadBalancingPolicy::PickState pick; |
| grpc_closure pick_closure; |
| grpc_closure pick_cancel_closure; |
| |
| // state needed to support channelz interception of recv trailing metadata. |
| grpc_closure recv_trailing_metadata_ready_channelz; |
| grpc_closure* original_recv_trailing_metadata; |
| grpc_metadata_batch* recv_trailing_metadata; |
| |
| grpc_polling_entity* pollent; |
| bool pollent_added_to_interested_parties; |
| |
| // Batches are added to this list when received from above. |
| // They are removed when we are done handling the batch (i.e., when |
| // either we have invoked all of the batch's callbacks or we have |
| // passed the batch down to the subchannel call and are not |
| // intercepting any of its callbacks). |
| pending_batch pending_batches[MAX_PENDING_BATCHES]; |
| bool pending_send_initial_metadata : 1; |
| bool pending_send_message : 1; |
| bool pending_send_trailing_metadata : 1; |
| |
| // Retry state. |
| bool enable_retries : 1; |
| bool retry_committed : 1; |
| bool last_attempt_got_server_pushback : 1; |
| int num_attempts_completed; |
| size_t bytes_buffered_for_retry; |
| grpc_core::ManualConstructor<grpc_core::BackOff> retry_backoff; |
| grpc_timer retry_timer; |
| |
| // The number of pending retriable subchannel batches containing send ops. |
| // We hold a ref to the call stack while this is non-zero, since replay |
| // batches may not complete until after all callbacks have been returned |
| // to the surface, and we need to make sure that the call is not destroyed |
| // until all of these batches have completed. |
| // Note that we actually only need to track replay batches, but it's |
| // easier to track all batches with send ops. |
| int num_pending_retriable_subchannel_send_batches; |
| |
| // Cached data for retrying send ops. |
| // send_initial_metadata |
| bool seen_send_initial_metadata; |
| grpc_linked_mdelem* send_initial_metadata_storage; |
| grpc_metadata_batch send_initial_metadata; |
| uint32_t send_initial_metadata_flags; |
| gpr_atm* peer_string; |
| // send_message |
| // When we get a send_message op, we replace the original byte stream |
| // with a CachingByteStream that caches the slices to a local buffer for |
| // use in retries. |
| // Note: We inline the cache for the first 3 send_message ops and use |
| // dynamic allocation after that. This number was essentially picked |
| // at random; it could be changed in the future to tune performance. |
| grpc_core::ManualConstructor< |
| grpc_core::InlinedVector<grpc_core::ByteStreamCache*, 3>> |
| send_messages; |
| // send_trailing_metadata |
| bool seen_send_trailing_metadata; |
| grpc_linked_mdelem* send_trailing_metadata_storage; |
| grpc_metadata_batch send_trailing_metadata; |
| } call_data; |
| |
| // Forward declarations. |
| static void retry_commit(grpc_call_element* elem, |
| subchannel_call_retry_state* retry_state); |
| static void start_internal_recv_trailing_metadata(grpc_call_element* elem); |
| static void on_complete(void* arg, grpc_error* error); |
| static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored); |
| static void start_pick_locked(void* arg, grpc_error* ignored); |
| static void maybe_intercept_recv_trailing_metadata_for_channelz( |
| grpc_call_element* elem, grpc_transport_stream_op_batch* batch); |
| |
| // |
| // send op data caching |
| // |
| |
| // Caches data for send ops so that it can be retried later, if not |
| // already cached. |
| static void maybe_cache_send_ops_for_batch(call_data* calld, |
| pending_batch* pending) { |
| if (pending->send_ops_cached) return; |
| pending->send_ops_cached = true; |
| grpc_transport_stream_op_batch* batch = pending->batch; |
| // Save a copy of metadata for send_initial_metadata ops. |
| if (batch->send_initial_metadata) { |
| calld->seen_send_initial_metadata = true; |
| GPR_ASSERT(calld->send_initial_metadata_storage == nullptr); |
| grpc_metadata_batch* send_initial_metadata = |
| batch->payload->send_initial_metadata.send_initial_metadata; |
| calld->send_initial_metadata_storage = (grpc_linked_mdelem*)gpr_arena_alloc( |
| calld->arena, |
| sizeof(grpc_linked_mdelem) * send_initial_metadata->list.count); |
| grpc_metadata_batch_copy(send_initial_metadata, |
| &calld->send_initial_metadata, |
| calld->send_initial_metadata_storage); |
| calld->send_initial_metadata_flags = |
| batch->payload->send_initial_metadata.send_initial_metadata_flags; |
| calld->peer_string = batch->payload->send_initial_metadata.peer_string; |
| } |
| // Set up cache for send_message ops. |
| if (batch->send_message) { |
| grpc_core::ByteStreamCache* cache = |
| static_cast<grpc_core::ByteStreamCache*>( |
| gpr_arena_alloc(calld->arena, sizeof(grpc_core::ByteStreamCache))); |
| new (cache) grpc_core::ByteStreamCache( |
| std::move(batch->payload->send_message.send_message)); |
| calld->send_messages->push_back(cache); |
| } |
| // Save metadata batch for send_trailing_metadata ops. |
| if (batch->send_trailing_metadata) { |
| calld->seen_send_trailing_metadata = true; |
| GPR_ASSERT(calld->send_trailing_metadata_storage == nullptr); |
| grpc_metadata_batch* send_trailing_metadata = |
| batch->payload->send_trailing_metadata.send_trailing_metadata; |
| calld->send_trailing_metadata_storage = |
| (grpc_linked_mdelem*)gpr_arena_alloc( |
| calld->arena, |
| sizeof(grpc_linked_mdelem) * send_trailing_metadata->list.count); |
| grpc_metadata_batch_copy(send_trailing_metadata, |
| &calld->send_trailing_metadata, |
| calld->send_trailing_metadata_storage); |
| } |
| } |
| |
| // Frees cached send_initial_metadata. |
| static void free_cached_send_initial_metadata(channel_data* chand, |
| call_data* calld) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: destroying calld->send_initial_metadata", chand, |
| calld); |
| } |
| grpc_metadata_batch_destroy(&calld->send_initial_metadata); |
| } |
| |
| // Frees cached send_message at index idx. |
| static void free_cached_send_message(channel_data* chand, call_data* calld, |
| size_t idx) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: destroying calld->send_messages[%" PRIuPTR "]", |
| chand, calld, idx); |
| } |
| (*calld->send_messages)[idx]->Destroy(); |
| } |
| |
| // Frees cached send_trailing_metadata. |
| static void free_cached_send_trailing_metadata(channel_data* chand, |
| call_data* calld) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: destroying calld->send_trailing_metadata", |
| chand, calld); |
| } |
| grpc_metadata_batch_destroy(&calld->send_trailing_metadata); |
| } |
| |
| // Frees cached send ops that have already been completed after |
| // committing the call. |
| static void free_cached_send_op_data_after_commit( |
| grpc_call_element* elem, subchannel_call_retry_state* retry_state) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| if (retry_state->completed_send_initial_metadata) { |
| free_cached_send_initial_metadata(chand, calld); |
| } |
| for (size_t i = 0; i < retry_state->completed_send_message_count; ++i) { |
| free_cached_send_message(chand, calld, i); |
| } |
| if (retry_state->completed_send_trailing_metadata) { |
| free_cached_send_trailing_metadata(chand, calld); |
| } |
| } |
| |
| // Frees cached send ops that were completed by the completed batch in |
| // batch_data. Used when batches are completed after the call is committed. |
| static void free_cached_send_op_data_for_completed_batch( |
| grpc_call_element* elem, subchannel_batch_data* batch_data, |
| subchannel_call_retry_state* retry_state) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| if (batch_data->batch.send_initial_metadata) { |
| free_cached_send_initial_metadata(chand, calld); |
| } |
| if (batch_data->batch.send_message) { |
| free_cached_send_message(chand, calld, |
| retry_state->completed_send_message_count - 1); |
| } |
| if (batch_data->batch.send_trailing_metadata) { |
| free_cached_send_trailing_metadata(chand, calld); |
| } |
| } |
| |
| // |
| // pending_batches management |
| // |
| |
| // Returns the index into calld->pending_batches to be used for batch. |
| static size_t get_batch_index(grpc_transport_stream_op_batch* batch) { |
| // Note: It is important the send_initial_metadata be the first entry |
| // here, since the code in pick_subchannel_locked() assumes it will be. |
| if (batch->send_initial_metadata) return 0; |
| if (batch->send_message) return 1; |
| if (batch->send_trailing_metadata) return 2; |
| if (batch->recv_initial_metadata) return 3; |
| if (batch->recv_message) return 4; |
| if (batch->recv_trailing_metadata) return 5; |
| GPR_UNREACHABLE_CODE(return (size_t)-1); |
| } |
| |
| // This is called via the call combiner, so access to calld is synchronized. |
| static void pending_batches_add(grpc_call_element* elem, |
| grpc_transport_stream_op_batch* batch) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| const size_t idx = get_batch_index(batch); |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: adding pending batch at index %" PRIuPTR, chand, |
| calld, idx); |
| } |
| pending_batch* pending = &calld->pending_batches[idx]; |
| GPR_ASSERT(pending->batch == nullptr); |
| pending->batch = batch; |
| pending->send_ops_cached = false; |
| if (calld->enable_retries) { |
| // Update state in calld about pending batches. |
| // Also check if the batch takes us over the retry buffer limit. |
| // Note: We don't check the size of trailing metadata here, because |
| // gRPC clients do not send trailing metadata. |
| if (batch->send_initial_metadata) { |
| calld->pending_send_initial_metadata = true; |
| calld->bytes_buffered_for_retry += grpc_metadata_batch_size( |
| batch->payload->send_initial_metadata.send_initial_metadata); |
| } |
| if (batch->send_message) { |
| calld->pending_send_message = true; |
| calld->bytes_buffered_for_retry += |
| batch->payload->send_message.send_message->length(); |
| } |
| if (batch->send_trailing_metadata) { |
| calld->pending_send_trailing_metadata = true; |
| } |
| if (GPR_UNLIKELY(calld->bytes_buffered_for_retry > |
| chand->per_rpc_retry_buffer_size)) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: exceeded retry buffer size, committing", |
| chand, calld); |
| } |
| subchannel_call_retry_state* retry_state = |
| calld->subchannel_call == nullptr |
| ? nullptr |
| : static_cast<subchannel_call_retry_state*>( |
| grpc_connected_subchannel_call_get_parent_data( |
| calld->subchannel_call)); |
| retry_commit(elem, retry_state); |
| // If we are not going to retry and have not yet started, pretend |
| // retries are disabled so that we don't bother with retry overhead. |
| if (calld->num_attempts_completed == 0) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: disabling retries before first attempt", |
| chand, calld); |
| } |
| calld->enable_retries = false; |
| } |
| } |
| } |
| } |
| |
| static void pending_batch_clear(call_data* calld, pending_batch* pending) { |
| if (calld->enable_retries) { |
| if (pending->batch->send_initial_metadata) { |
| calld->pending_send_initial_metadata = false; |
| } |
| if (pending->batch->send_message) { |
| calld->pending_send_message = false; |
| } |
| if (pending->batch->send_trailing_metadata) { |
| calld->pending_send_trailing_metadata = false; |
| } |
| } |
| pending->batch = nullptr; |
| } |
| |
| // This is called via the call combiner, so access to calld is synchronized. |
| static void fail_pending_batch_in_call_combiner(void* arg, grpc_error* error) { |
| grpc_transport_stream_op_batch* batch = |
| static_cast<grpc_transport_stream_op_batch*>(arg); |
| call_data* calld = static_cast<call_data*>(batch->handler_private.extra_arg); |
| // Note: This will release the call combiner. |
| grpc_transport_stream_op_batch_finish_with_failure( |
| batch, GRPC_ERROR_REF(error), calld->call_combiner); |
| } |
| |
| // This is called via the call combiner, so access to calld is synchronized. |
| // If yield_call_combiner is true, assumes responsibility for yielding |
| // the call combiner. |
| static void pending_batches_fail(grpc_call_element* elem, grpc_error* error, |
| bool yield_call_combiner) { |
| GPR_ASSERT(error != GRPC_ERROR_NONE); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| if (grpc_client_channel_trace.enabled()) { |
| size_t num_batches = 0; |
| for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { |
| if (calld->pending_batches[i].batch != nullptr) ++num_batches; |
| } |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s", |
| elem->channel_data, calld, num_batches, grpc_error_string(error)); |
| } |
| grpc_core::CallCombinerClosureList closures; |
| for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { |
| pending_batch* pending = &calld->pending_batches[i]; |
| grpc_transport_stream_op_batch* batch = pending->batch; |
| if (batch != nullptr) { |
| batch->handler_private.extra_arg = calld; |
| GRPC_CLOSURE_INIT(&batch->handler_private.closure, |
| fail_pending_batch_in_call_combiner, batch, |
| grpc_schedule_on_exec_ctx); |
| closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error), |
| "pending_batches_fail"); |
| pending_batch_clear(calld, pending); |
| } |
| } |
| if (yield_call_combiner) { |
| closures.RunClosures(calld->call_combiner); |
| } else { |
| closures.RunClosuresWithoutYielding(calld->call_combiner); |
| } |
| GRPC_ERROR_UNREF(error); |
| } |
| |
| // This is called via the call combiner, so access to calld is synchronized. |
| static void resume_pending_batch_in_call_combiner(void* arg, |
| grpc_error* ignored) { |
| grpc_transport_stream_op_batch* batch = |
| static_cast<grpc_transport_stream_op_batch*>(arg); |
| grpc_subchannel_call* subchannel_call = |
| static_cast<grpc_subchannel_call*>(batch->handler_private.extra_arg); |
| // Note: This will release the call combiner. |
| grpc_subchannel_call_process_op(subchannel_call, batch); |
| } |
| |
| // This is called via the call combiner, so access to calld is synchronized. |
| static void pending_batches_resume(grpc_call_element* elem) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| if (calld->enable_retries) { |
| start_retriable_subchannel_batches(elem, GRPC_ERROR_NONE); |
| return; |
| } |
| // Retries not enabled; send down batches as-is. |
| if (grpc_client_channel_trace.enabled()) { |
| size_t num_batches = 0; |
| for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { |
| if (calld->pending_batches[i].batch != nullptr) ++num_batches; |
| } |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: starting %" PRIuPTR |
| " pending batches on subchannel_call=%p", |
| chand, calld, num_batches, calld->subchannel_call); |
| } |
| grpc_core::CallCombinerClosureList closures; |
| for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { |
| pending_batch* pending = &calld->pending_batches[i]; |
| grpc_transport_stream_op_batch* batch = pending->batch; |
| if (batch != nullptr) { |
| maybe_intercept_recv_trailing_metadata_for_channelz(elem, batch); |
| batch->handler_private.extra_arg = calld->subchannel_call; |
| GRPC_CLOSURE_INIT(&batch->handler_private.closure, |
| resume_pending_batch_in_call_combiner, batch, |
| grpc_schedule_on_exec_ctx); |
| closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE, |
| "pending_batches_resume"); |
| pending_batch_clear(calld, pending); |
| } |
| } |
| // Note: This will release the call combiner. |
| closures.RunClosures(calld->call_combiner); |
| } |
| |
| static void maybe_clear_pending_batch(grpc_call_element* elem, |
| pending_batch* pending) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| grpc_transport_stream_op_batch* batch = pending->batch; |
| // We clear the pending batch if all of its callbacks have been |
| // scheduled and reset to nullptr. |
| if (batch->on_complete == nullptr && |
| (!batch->recv_initial_metadata || |
| batch->payload->recv_initial_metadata.recv_initial_metadata_ready == |
| nullptr) && |
| (!batch->recv_message || |
| batch->payload->recv_message.recv_message_ready == nullptr) && |
| (!batch->recv_trailing_metadata || |
| batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready == |
| nullptr)) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand, |
| calld); |
| } |
| pending_batch_clear(calld, pending); |
| } |
| } |
| |
| // Returns a pointer to the first pending batch for which predicate(batch) |
| // returns true, or null if not found. |
| template <typename Predicate> |
| static pending_batch* pending_batch_find(grpc_call_element* elem, |
| const char* log_message, |
| Predicate predicate) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { |
| pending_batch* pending = &calld->pending_batches[i]; |
| grpc_transport_stream_op_batch* batch = pending->batch; |
| if (batch != nullptr && predicate(batch)) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: %s pending batch at index %" PRIuPTR, chand, |
| calld, log_message, i); |
| } |
| return pending; |
| } |
| } |
| return nullptr; |
| } |
| |
| // |
| // retry code |
| // |
| |
| // Commits the call so that no further retry attempts will be performed. |
| static void retry_commit(grpc_call_element* elem, |
| subchannel_call_retry_state* retry_state) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| if (calld->retry_committed) return; |
| calld->retry_committed = true; |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p calld=%p: committing retries", chand, calld); |
| } |
| if (retry_state != nullptr) { |
| free_cached_send_op_data_after_commit(elem, retry_state); |
| } |
| } |
| |
| // Starts a retry after appropriate back-off. |
| static void do_retry(grpc_call_element* elem, |
| subchannel_call_retry_state* retry_state, |
| grpc_millis server_pushback_ms) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| GPR_ASSERT(calld->method_params != nullptr); |
| const ClientChannelMethodParams::RetryPolicy* retry_policy = |
| calld->method_params->retry_policy(); |
| GPR_ASSERT(retry_policy != nullptr); |
| // Reset subchannel call and connected subchannel. |
| if (calld->subchannel_call != nullptr) { |
| GRPC_SUBCHANNEL_CALL_UNREF(calld->subchannel_call, |
| "client_channel_call_retry"); |
| calld->subchannel_call = nullptr; |
| } |
| if (calld->pick.connected_subchannel != nullptr) { |
| calld->pick.connected_subchannel.reset(); |
| } |
| // Compute backoff delay. |
| grpc_millis next_attempt_time; |
| if (server_pushback_ms >= 0) { |
| next_attempt_time = grpc_core::ExecCtx::Get()->Now() + server_pushback_ms; |
| calld->last_attempt_got_server_pushback = true; |
| } else { |
| if (calld->num_attempts_completed == 1 || |
| calld->last_attempt_got_server_pushback) { |
| calld->retry_backoff.Init( |
| grpc_core::BackOff::Options() |
| .set_initial_backoff(retry_policy->initial_backoff) |
| .set_multiplier(retry_policy->backoff_multiplier) |
| .set_jitter(RETRY_BACKOFF_JITTER) |
| .set_max_backoff(retry_policy->max_backoff)); |
| calld->last_attempt_got_server_pushback = false; |
| } |
| next_attempt_time = calld->retry_backoff->NextAttemptTime(); |
| } |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: retrying failed call in %" PRId64 " ms", chand, |
| calld, next_attempt_time - grpc_core::ExecCtx::Get()->Now()); |
| } |
| // Schedule retry after computed delay. |
| GRPC_CLOSURE_INIT(&calld->pick_closure, start_pick_locked, elem, |
| grpc_combiner_scheduler(chand->combiner)); |
| grpc_timer_init(&calld->retry_timer, next_attempt_time, &calld->pick_closure); |
| // Update bookkeeping. |
| if (retry_state != nullptr) retry_state->retry_dispatched = true; |
| } |
| |
| // Returns true if the call is being retried. |
| static bool maybe_retry(grpc_call_element* elem, |
| subchannel_batch_data* batch_data, |
| grpc_status_code status, |
| grpc_mdelem* server_pushback_md) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| // Get retry policy. |
| if (calld->method_params == nullptr) return false; |
| const ClientChannelMethodParams::RetryPolicy* retry_policy = |
| calld->method_params->retry_policy(); |
| if (retry_policy == nullptr) return false; |
| // If we've already dispatched a retry from this call, return true. |
| // This catches the case where the batch has multiple callbacks |
| // (i.e., it includes either recv_message or recv_initial_metadata). |
| subchannel_call_retry_state* retry_state = nullptr; |
| if (batch_data != nullptr) { |
| retry_state = static_cast<subchannel_call_retry_state*>( |
| grpc_connected_subchannel_call_get_parent_data( |
| batch_data->subchannel_call)); |
| if (retry_state->retry_dispatched) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p calld=%p: retry already dispatched", chand, |
| calld); |
| } |
| return true; |
| } |
| } |
| // Check status. |
| if (GPR_LIKELY(status == GRPC_STATUS_OK)) { |
| if (calld->retry_throttle_data != nullptr) { |
| calld->retry_throttle_data->RecordSuccess(); |
| } |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p calld=%p: call succeeded", chand, calld); |
| } |
| return false; |
| } |
| // Status is not OK. Check whether the status is retryable. |
| if (!retry_policy->retryable_status_codes.Contains(status)) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: status %s not configured as retryable", chand, |
| calld, grpc_status_code_to_string(status)); |
| } |
| return false; |
| } |
| // Record the failure and check whether retries are throttled. |
| // Note that it's important for this check to come after the status |
| // code check above, since we should only record failures whose statuses |
| // match the configured retryable status codes, so that we don't count |
| // things like failures due to malformed requests (INVALID_ARGUMENT). |
| // Conversely, it's important for this to come before the remaining |
| // checks, so that we don't fail to record failures due to other factors. |
| if (calld->retry_throttle_data != nullptr && |
| !calld->retry_throttle_data->RecordFailure()) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p calld=%p: retries throttled", chand, calld); |
| } |
| return false; |
| } |
| // Check whether the call is committed. |
| if (calld->retry_committed) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p calld=%p: retries already committed", chand, |
| calld); |
| } |
| return false; |
| } |
| // Check whether we have retries remaining. |
| ++calld->num_attempts_completed; |
| if (calld->num_attempts_completed >= retry_policy->max_attempts) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p calld=%p: exceeded %d retry attempts", chand, |
| calld, retry_policy->max_attempts); |
| } |
| return false; |
| } |
| // If the call was cancelled from the surface, don't retry. |
| if (calld->cancel_error != GRPC_ERROR_NONE) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: call cancelled from surface, not retrying", |
| chand, calld); |
| } |
| return false; |
| } |
| // Check server push-back. |
| grpc_millis server_pushback_ms = -1; |
| if (server_pushback_md != nullptr) { |
| // If the value is "-1" or any other unparseable string, we do not retry. |
| uint32_t ms; |
| if (!grpc_parse_slice_to_uint32(GRPC_MDVALUE(*server_pushback_md), &ms)) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: not retrying due to server push-back", |
| chand, calld); |
| } |
| return false; |
| } else { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p calld=%p: server push-back: retry in %u ms", |
| chand, calld, ms); |
| } |
| server_pushback_ms = (grpc_millis)ms; |
| } |
| } |
| do_retry(elem, retry_state, server_pushback_ms); |
| return true; |
| } |
| |
| // |
| // subchannel_batch_data |
| // |
| |
| // Creates a subchannel_batch_data object on the call's arena with the |
| // specified refcount. If set_on_complete is true, the batch's |
| // on_complete callback will be set to point to on_complete(); |
| // otherwise, the batch's on_complete callback will be null. |
| static subchannel_batch_data* batch_data_create(grpc_call_element* elem, |
| int refcount, |
| bool set_on_complete) { |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| subchannel_call_retry_state* retry_state = |
| static_cast<subchannel_call_retry_state*>( |
| grpc_connected_subchannel_call_get_parent_data( |
| calld->subchannel_call)); |
| subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>( |
| gpr_arena_alloc(calld->arena, sizeof(*batch_data))); |
| batch_data->elem = elem; |
| batch_data->subchannel_call = |
| GRPC_SUBCHANNEL_CALL_REF(calld->subchannel_call, "batch_data_create"); |
| batch_data->batch.payload = &retry_state->batch_payload; |
| gpr_ref_init(&batch_data->refs, refcount); |
| if (set_on_complete) { |
| GRPC_CLOSURE_INIT(&batch_data->on_complete, on_complete, batch_data, |
| grpc_schedule_on_exec_ctx); |
| batch_data->batch.on_complete = &batch_data->on_complete; |
| } |
| GRPC_CALL_STACK_REF(calld->owning_call, "batch_data"); |
| return batch_data; |
| } |
| |
| static void batch_data_unref(subchannel_batch_data* batch_data) { |
| if (gpr_unref(&batch_data->refs)) { |
| subchannel_call_retry_state* retry_state = |
| static_cast<subchannel_call_retry_state*>( |
| grpc_connected_subchannel_call_get_parent_data( |
| batch_data->subchannel_call)); |
| if (batch_data->batch.send_initial_metadata) { |
| grpc_metadata_batch_destroy(&retry_state->send_initial_metadata); |
| } |
| if (batch_data->batch.send_trailing_metadata) { |
| grpc_metadata_batch_destroy(&retry_state->send_trailing_metadata); |
| } |
| if (batch_data->batch.recv_initial_metadata) { |
| grpc_metadata_batch_destroy(&retry_state->recv_initial_metadata); |
| } |
| if (batch_data->batch.recv_trailing_metadata) { |
| grpc_metadata_batch_destroy(&retry_state->recv_trailing_metadata); |
| } |
| GRPC_SUBCHANNEL_CALL_UNREF(batch_data->subchannel_call, "batch_data_unref"); |
| call_data* calld = static_cast<call_data*>(batch_data->elem->call_data); |
| GRPC_CALL_STACK_UNREF(calld->owning_call, "batch_data"); |
| } |
| } |
| |
| // |
| // recv_initial_metadata callback handling |
| // |
| |
| // Invokes recv_initial_metadata_ready for a subchannel batch. |
| static void invoke_recv_initial_metadata_callback(void* arg, |
| grpc_error* error) { |
| subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg); |
| // Find pending batch. |
| pending_batch* pending = pending_batch_find( |
| batch_data->elem, "invoking recv_initial_metadata_ready for", |
| [](grpc_transport_stream_op_batch* batch) { |
| return batch->recv_initial_metadata && |
| batch->payload->recv_initial_metadata |
| .recv_initial_metadata_ready != nullptr; |
| }); |
| GPR_ASSERT(pending != nullptr); |
| // Return metadata. |
| subchannel_call_retry_state* retry_state = |
| static_cast<subchannel_call_retry_state*>( |
| grpc_connected_subchannel_call_get_parent_data( |
| batch_data->subchannel_call)); |
| grpc_metadata_batch_move( |
| &retry_state->recv_initial_metadata, |
| pending->batch->payload->recv_initial_metadata.recv_initial_metadata); |
| // Update bookkeeping. |
| // Note: Need to do this before invoking the callback, since invoking |
| // the callback will result in yielding the call combiner. |
| grpc_closure* recv_initial_metadata_ready = |
| pending->batch->payload->recv_initial_metadata |
| .recv_initial_metadata_ready; |
| pending->batch->payload->recv_initial_metadata.recv_initial_metadata_ready = |
| nullptr; |
| maybe_clear_pending_batch(batch_data->elem, pending); |
| batch_data_unref(batch_data); |
| // Invoke callback. |
| GRPC_CLOSURE_RUN(recv_initial_metadata_ready, GRPC_ERROR_REF(error)); |
| } |
| |
| // Intercepts recv_initial_metadata_ready callback for retries. |
| // Commits the call and returns the initial metadata up the stack. |
| static void recv_initial_metadata_ready(void* arg, grpc_error* error) { |
| subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg); |
| grpc_call_element* elem = batch_data->elem; |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: got recv_initial_metadata_ready, error=%s", |
| chand, calld, grpc_error_string(error)); |
| } |
| subchannel_call_retry_state* retry_state = |
| static_cast<subchannel_call_retry_state*>( |
| grpc_connected_subchannel_call_get_parent_data( |
| batch_data->subchannel_call)); |
| retry_state->completed_recv_initial_metadata = true; |
| // If a retry was already dispatched, then we're not going to use the |
| // result of this recv_initial_metadata op, so do nothing. |
| if (retry_state->retry_dispatched) { |
| GRPC_CALL_COMBINER_STOP( |
| calld->call_combiner, |
| "recv_initial_metadata_ready after retry dispatched"); |
| return; |
| } |
| // If we got an error or a Trailers-Only response and have not yet gotten |
| // the recv_trailing_metadata_ready callback, then defer propagating this |
| // callback back to the surface. We can evaluate whether to retry when |
| // recv_trailing_metadata comes back. |
| if (GPR_UNLIKELY((retry_state->trailing_metadata_available || |
| error != GRPC_ERROR_NONE) && |
| !retry_state->completed_recv_trailing_metadata)) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: deferring recv_initial_metadata_ready " |
| "(Trailers-Only)", |
| chand, calld); |
| } |
| retry_state->recv_initial_metadata_ready_deferred_batch = batch_data; |
| retry_state->recv_initial_metadata_error = GRPC_ERROR_REF(error); |
| if (!retry_state->started_recv_trailing_metadata) { |
| // recv_trailing_metadata not yet started by application; start it |
| // ourselves to get status. |
| start_internal_recv_trailing_metadata(elem); |
| } else { |
| GRPC_CALL_COMBINER_STOP( |
| calld->call_combiner, |
| "recv_initial_metadata_ready trailers-only or error"); |
| } |
| return; |
| } |
| // Received valid initial metadata, so commit the call. |
| retry_commit(elem, retry_state); |
| // Invoke the callback to return the result to the surface. |
| // Manually invoking a callback function; it does not take ownership of error. |
| invoke_recv_initial_metadata_callback(batch_data, error); |
| } |
| |
| // |
| // recv_message callback handling |
| // |
| |
| // Invokes recv_message_ready for a subchannel batch. |
| static void invoke_recv_message_callback(void* arg, grpc_error* error) { |
| subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg); |
| // Find pending op. |
| pending_batch* pending = pending_batch_find( |
| batch_data->elem, "invoking recv_message_ready for", |
| [](grpc_transport_stream_op_batch* batch) { |
| return batch->recv_message && |
| batch->payload->recv_message.recv_message_ready != nullptr; |
| }); |
| GPR_ASSERT(pending != nullptr); |
| // Return payload. |
| subchannel_call_retry_state* retry_state = |
| static_cast<subchannel_call_retry_state*>( |
| grpc_connected_subchannel_call_get_parent_data( |
| batch_data->subchannel_call)); |
| *pending->batch->payload->recv_message.recv_message = |
| std::move(retry_state->recv_message); |
| // Update bookkeeping. |
| // Note: Need to do this before invoking the callback, since invoking |
| // the callback will result in yielding the call combiner. |
| grpc_closure* recv_message_ready = |
| pending->batch->payload->recv_message.recv_message_ready; |
| pending->batch->payload->recv_message.recv_message_ready = nullptr; |
| maybe_clear_pending_batch(batch_data->elem, pending); |
| batch_data_unref(batch_data); |
| // Invoke callback. |
| GRPC_CLOSURE_RUN(recv_message_ready, GRPC_ERROR_REF(error)); |
| } |
| |
| // Intercepts recv_message_ready callback for retries. |
| // Commits the call and returns the message up the stack. |
| static void recv_message_ready(void* arg, grpc_error* error) { |
| subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg); |
| grpc_call_element* elem = batch_data->elem; |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p calld=%p: got recv_message_ready, error=%s", |
| chand, calld, grpc_error_string(error)); |
| } |
| subchannel_call_retry_state* retry_state = |
| static_cast<subchannel_call_retry_state*>( |
| grpc_connected_subchannel_call_get_parent_data( |
| batch_data->subchannel_call)); |
| ++retry_state->completed_recv_message_count; |
| // If a retry was already dispatched, then we're not going to use the |
| // result of this recv_message op, so do nothing. |
| if (retry_state->retry_dispatched) { |
| GRPC_CALL_COMBINER_STOP(calld->call_combiner, |
| "recv_message_ready after retry dispatched"); |
| return; |
| } |
| // If we got an error or the payload was nullptr and we have not yet gotten |
| // the recv_trailing_metadata_ready callback, then defer propagating this |
| // callback back to the surface. We can evaluate whether to retry when |
| // recv_trailing_metadata comes back. |
| if (GPR_UNLIKELY( |
| (retry_state->recv_message == nullptr || error != GRPC_ERROR_NONE) && |
| !retry_state->completed_recv_trailing_metadata)) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: deferring recv_message_ready (nullptr " |
| "message and recv_trailing_metadata pending)", |
| chand, calld); |
| } |
| retry_state->recv_message_ready_deferred_batch = batch_data; |
| retry_state->recv_message_error = GRPC_ERROR_REF(error); |
| if (!retry_state->started_recv_trailing_metadata) { |
| // recv_trailing_metadata not yet started by application; start it |
| // ourselves to get status. |
| start_internal_recv_trailing_metadata(elem); |
| } else { |
| GRPC_CALL_COMBINER_STOP(calld->call_combiner, "recv_message_ready null"); |
| } |
| return; |
| } |
| // Received a valid message, so commit the call. |
| retry_commit(elem, retry_state); |
| // Invoke the callback to return the result to the surface. |
| // Manually invoking a callback function; it does not take ownership of error. |
| invoke_recv_message_callback(batch_data, error); |
| } |
| |
| // |
| // recv_trailing_metadata handling |
| // |
| |
| // Sets *status and *server_pushback_md based on md_batch and error. |
| // Only sets *server_pushback_md if server_pushback_md != nullptr. |
| static void get_call_status(grpc_call_element* elem, |
| grpc_metadata_batch* md_batch, grpc_error* error, |
| grpc_status_code* status, |
| grpc_mdelem** server_pushback_md) { |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| if (error != GRPC_ERROR_NONE) { |
| grpc_error_get_status(error, calld->deadline, status, nullptr, nullptr, |
| nullptr); |
| } else { |
| GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr); |
| *status = |
| grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md); |
| if (server_pushback_md != nullptr && |
| md_batch->idx.named.grpc_retry_pushback_ms != nullptr) { |
| *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md; |
| } |
| } |
| GRPC_ERROR_UNREF(error); |
| } |
| |
| // Adds recv_trailing_metadata_ready closure to closures. |
| static void add_closure_for_recv_trailing_metadata_ready( |
| grpc_call_element* elem, subchannel_batch_data* batch_data, |
| grpc_error* error, grpc_core::CallCombinerClosureList* closures) { |
| // Find pending batch. |
| pending_batch* pending = pending_batch_find( |
| elem, "invoking recv_trailing_metadata for", |
| [](grpc_transport_stream_op_batch* batch) { |
| return batch->recv_trailing_metadata && |
| batch->payload->recv_trailing_metadata |
| .recv_trailing_metadata_ready != nullptr; |
| }); |
| // If we generated the recv_trailing_metadata op internally via |
| // start_internal_recv_trailing_metadata(), then there will be no |
| // pending batch. |
| if (pending == nullptr) { |
| GRPC_ERROR_UNREF(error); |
| return; |
| } |
| // Return metadata. |
| subchannel_call_retry_state* retry_state = |
| static_cast<subchannel_call_retry_state*>( |
| grpc_connected_subchannel_call_get_parent_data( |
| batch_data->subchannel_call)); |
| grpc_metadata_batch_move( |
| &retry_state->recv_trailing_metadata, |
| pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata); |
| // Add closure. |
| closures->Add(pending->batch->payload->recv_trailing_metadata |
| .recv_trailing_metadata_ready, |
| error, "recv_trailing_metadata_ready for pending batch"); |
| // Update bookkeeping. |
| pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = |
| nullptr; |
| maybe_clear_pending_batch(elem, pending); |
| } |
| |
| // Adds any necessary closures for deferred recv_initial_metadata and |
| // recv_message callbacks to closures. |
| static void add_closures_for_deferred_recv_callbacks( |
| subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state, |
| grpc_core::CallCombinerClosureList* closures) { |
| if (batch_data->batch.recv_trailing_metadata) { |
| // Add closure for deferred recv_initial_metadata_ready. |
| if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch != |
| nullptr)) { |
| GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready, |
| invoke_recv_initial_metadata_callback, |
| retry_state->recv_initial_metadata_ready_deferred_batch, |
| grpc_schedule_on_exec_ctx); |
| closures->Add(&retry_state->recv_initial_metadata_ready, |
| retry_state->recv_initial_metadata_error, |
| "resuming recv_initial_metadata_ready"); |
| retry_state->recv_initial_metadata_ready_deferred_batch = nullptr; |
| } |
| // Add closure for deferred recv_message_ready. |
| if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch != |
| nullptr)) { |
| GRPC_CLOSURE_INIT(&retry_state->recv_message_ready, |
| invoke_recv_message_callback, |
| retry_state->recv_message_ready_deferred_batch, |
| grpc_schedule_on_exec_ctx); |
| closures->Add(&retry_state->recv_message_ready, |
| retry_state->recv_message_error, |
| "resuming recv_message_ready"); |
| retry_state->recv_message_ready_deferred_batch = nullptr; |
| } |
| } |
| } |
| |
| // Returns true if any op in the batch was not yet started. |
| // Only looks at send ops, since recv ops are always started immediately. |
| static bool pending_batch_is_unstarted( |
| pending_batch* pending, call_data* calld, |
| subchannel_call_retry_state* retry_state) { |
| if (pending->batch == nullptr || pending->batch->on_complete == nullptr) { |
| return false; |
| } |
| if (pending->batch->send_initial_metadata && |
| !retry_state->started_send_initial_metadata) { |
| return true; |
| } |
| if (pending->batch->send_message && |
| retry_state->started_send_message_count < calld->send_messages->size()) { |
| return true; |
| } |
| if (pending->batch->send_trailing_metadata && |
| !retry_state->started_send_trailing_metadata) { |
| return true; |
| } |
| return false; |
| } |
| |
| // For any pending batch containing an op that has not yet been started, |
| // adds the pending batch's completion closures to closures. |
| static void add_closures_to_fail_unstarted_pending_batches( |
| grpc_call_element* elem, subchannel_call_retry_state* retry_state, |
| grpc_error* error, grpc_core::CallCombinerClosureList* closures) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { |
| pending_batch* pending = &calld->pending_batches[i]; |
| if (pending_batch_is_unstarted(pending, calld, retry_state)) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: failing unstarted pending batch at index " |
| "%" PRIuPTR, |
| chand, calld, i); |
| } |
| closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error), |
| "failing on_complete for pending batch"); |
| pending->batch->on_complete = nullptr; |
| maybe_clear_pending_batch(elem, pending); |
| } |
| } |
| GRPC_ERROR_UNREF(error); |
| } |
| |
| // Runs necessary closures upon completion of a call attempt. |
| static void run_closures_for_completed_call(subchannel_batch_data* batch_data, |
| grpc_error* error) { |
| grpc_call_element* elem = batch_data->elem; |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| subchannel_call_retry_state* retry_state = |
| static_cast<subchannel_call_retry_state*>( |
| grpc_connected_subchannel_call_get_parent_data( |
| batch_data->subchannel_call)); |
| // Construct list of closures to execute. |
| grpc_core::CallCombinerClosureList closures; |
| // First, add closure for recv_trailing_metadata_ready. |
| add_closure_for_recv_trailing_metadata_ready( |
| elem, batch_data, GRPC_ERROR_REF(error), &closures); |
| // If there are deferred recv_initial_metadata_ready or recv_message_ready |
| // callbacks, add them to closures. |
| add_closures_for_deferred_recv_callbacks(batch_data, retry_state, &closures); |
| // Add closures to fail any pending batches that have not yet been started. |
| add_closures_to_fail_unstarted_pending_batches( |
| elem, retry_state, GRPC_ERROR_REF(error), &closures); |
| // Don't need batch_data anymore. |
| batch_data_unref(batch_data); |
| // Schedule all of the closures identified above. |
| // Note: This will release the call combiner. |
| closures.RunClosures(calld->call_combiner); |
| GRPC_ERROR_UNREF(error); |
| } |
| |
| // Intercepts recv_trailing_metadata_ready callback for retries. |
| // Commits the call and returns the trailing metadata up the stack. |
| static void recv_trailing_metadata_ready(void* arg, grpc_error* error) { |
| subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg); |
| grpc_call_element* elem = batch_data->elem; |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: got recv_trailing_metadata_ready, error=%s", |
| chand, calld, grpc_error_string(error)); |
| } |
| subchannel_call_retry_state* retry_state = |
| static_cast<subchannel_call_retry_state*>( |
| grpc_connected_subchannel_call_get_parent_data( |
| batch_data->subchannel_call)); |
| retry_state->completed_recv_trailing_metadata = true; |
| // Get the call's status and check for server pushback metadata. |
| grpc_status_code status = GRPC_STATUS_OK; |
| grpc_mdelem* server_pushback_md = nullptr; |
| grpc_metadata_batch* md_batch = |
| batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata; |
| get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status, |
| &server_pushback_md); |
| grpc_core::channelz::SubchannelNode* channelz_subchannel = |
| calld->pick.connected_subchannel->channelz_subchannel(); |
| if (channelz_subchannel != nullptr) { |
| if (status == GRPC_STATUS_OK) { |
| channelz_subchannel->RecordCallSucceeded(); |
| } else { |
| channelz_subchannel->RecordCallFailed(); |
| } |
| } |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand, |
| calld, grpc_status_code_to_string(status)); |
| } |
| // Check if we should retry. |
| if (maybe_retry(elem, batch_data, status, server_pushback_md)) { |
| // Unref batch_data for deferred recv_initial_metadata_ready or |
| // recv_message_ready callbacks, if any. |
| if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) { |
| batch_data_unref(batch_data); |
| GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error); |
| } |
| if (retry_state->recv_message_ready_deferred_batch != nullptr) { |
| batch_data_unref(batch_data); |
| GRPC_ERROR_UNREF(retry_state->recv_message_error); |
| } |
| batch_data_unref(batch_data); |
| return; |
| } |
| // Not retrying, so commit the call. |
| retry_commit(elem, retry_state); |
| // Run any necessary closures. |
| run_closures_for_completed_call(batch_data, GRPC_ERROR_REF(error)); |
| } |
| |
| // |
| // on_complete callback handling |
| // |
| |
| // Adds the on_complete closure for the pending batch completed in |
| // batch_data to closures. |
| static void add_closure_for_completed_pending_batch( |
| grpc_call_element* elem, subchannel_batch_data* batch_data, |
| subchannel_call_retry_state* retry_state, grpc_error* error, |
| grpc_core::CallCombinerClosureList* closures) { |
| pending_batch* pending = pending_batch_find( |
| elem, "completed", [batch_data](grpc_transport_stream_op_batch* batch) { |
| // Match the pending batch with the same set of send ops as the |
| // subchannel batch we've just completed. |
| return batch->on_complete != nullptr && |
| batch_data->batch.send_initial_metadata == |
| batch->send_initial_metadata && |
| batch_data->batch.send_message == batch->send_message && |
| batch_data->batch.send_trailing_metadata == |
| batch->send_trailing_metadata; |
| }); |
| // If batch_data is a replay batch, then there will be no pending |
| // batch to complete. |
| if (pending == nullptr) { |
| GRPC_ERROR_UNREF(error); |
| return; |
| } |
| // Add closure. |
| closures->Add(pending->batch->on_complete, error, |
| "on_complete for pending batch"); |
| pending->batch->on_complete = nullptr; |
| maybe_clear_pending_batch(elem, pending); |
| } |
| |
| // If there are any cached ops to replay or pending ops to start on the |
| // subchannel call, adds a closure to closures to invoke |
| // start_retriable_subchannel_batches(). |
| static void add_closures_for_replay_or_pending_send_ops( |
| grpc_call_element* elem, subchannel_batch_data* batch_data, |
| subchannel_call_retry_state* retry_state, |
| grpc_core::CallCombinerClosureList* closures) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| bool have_pending_send_message_ops = |
| retry_state->started_send_message_count < calld->send_messages->size(); |
| bool have_pending_send_trailing_metadata_op = |
| calld->seen_send_trailing_metadata && |
| !retry_state->started_send_trailing_metadata; |
| if (!have_pending_send_message_ops && |
| !have_pending_send_trailing_metadata_op) { |
| for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { |
| pending_batch* pending = &calld->pending_batches[i]; |
| grpc_transport_stream_op_batch* batch = pending->batch; |
| if (batch == nullptr || pending->send_ops_cached) continue; |
| if (batch->send_message) have_pending_send_message_ops = true; |
| if (batch->send_trailing_metadata) { |
| have_pending_send_trailing_metadata_op = true; |
| } |
| } |
| } |
| if (have_pending_send_message_ops || have_pending_send_trailing_metadata_op) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: starting next batch for pending send op(s)", |
| chand, calld); |
| } |
| GRPC_CLOSURE_INIT(&batch_data->batch.handler_private.closure, |
| start_retriable_subchannel_batches, elem, |
| grpc_schedule_on_exec_ctx); |
| closures->Add(&batch_data->batch.handler_private.closure, GRPC_ERROR_NONE, |
| "starting next batch for send_* op(s)"); |
| } |
| } |
| |
| // Callback used to intercept on_complete from subchannel calls. |
| // Called only when retries are enabled. |
| static void on_complete(void* arg, grpc_error* error) { |
| subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg); |
| grpc_call_element* elem = batch_data->elem; |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| if (grpc_client_channel_trace.enabled()) { |
| char* batch_str = grpc_transport_stream_op_batch_string(&batch_data->batch); |
| gpr_log(GPR_INFO, "chand=%p calld=%p: got on_complete, error=%s, batch=%s", |
| chand, calld, grpc_error_string(error), batch_str); |
| gpr_free(batch_str); |
| } |
| subchannel_call_retry_state* retry_state = |
| static_cast<subchannel_call_retry_state*>( |
| grpc_connected_subchannel_call_get_parent_data( |
| batch_data->subchannel_call)); |
| // Update bookkeeping in retry_state. |
| if (batch_data->batch.send_initial_metadata) { |
| retry_state->completed_send_initial_metadata = true; |
| } |
| if (batch_data->batch.send_message) { |
| ++retry_state->completed_send_message_count; |
| } |
| if (batch_data->batch.send_trailing_metadata) { |
| retry_state->completed_send_trailing_metadata = true; |
| } |
| // If the call is committed, free cached data for send ops that we've just |
| // completed. |
| if (calld->retry_committed) { |
| free_cached_send_op_data_for_completed_batch(elem, batch_data, retry_state); |
| } |
| // Construct list of closures to execute. |
| grpc_core::CallCombinerClosureList closures; |
| // If a retry was already dispatched, that means we saw |
| // recv_trailing_metadata before this, so we do nothing here. |
| // Otherwise, invoke the callback to return the result to the surface. |
| if (!retry_state->retry_dispatched) { |
| // Add closure for the completed pending batch, if any. |
| add_closure_for_completed_pending_batch(elem, batch_data, retry_state, |
| GRPC_ERROR_REF(error), &closures); |
| // If needed, add a callback to start any replay or pending send ops on |
| // the subchannel call. |
| if (!retry_state->completed_recv_trailing_metadata) { |
| add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state, |
| &closures); |
| } |
| } |
| // Track number of pending subchannel send batches and determine if this |
| // was the last one. |
| --calld->num_pending_retriable_subchannel_send_batches; |
| const bool last_send_batch_complete = |
| calld->num_pending_retriable_subchannel_send_batches == 0; |
| // Don't need batch_data anymore. |
| batch_data_unref(batch_data); |
| // Schedule all of the closures identified above. |
| // Note: This yeilds the call combiner. |
| closures.RunClosures(calld->call_combiner); |
| // If this was the last subchannel send batch, unref the call stack. |
| if (last_send_batch_complete) { |
| GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches"); |
| } |
| } |
| |
| // |
| // subchannel batch construction |
| // |
| |
| // Helper function used to start a subchannel batch in the call combiner. |
| static void start_batch_in_call_combiner(void* arg, grpc_error* ignored) { |
| grpc_transport_stream_op_batch* batch = |
| static_cast<grpc_transport_stream_op_batch*>(arg); |
| grpc_subchannel_call* subchannel_call = |
| static_cast<grpc_subchannel_call*>(batch->handler_private.extra_arg); |
| // Note: This will release the call combiner. |
| grpc_subchannel_call_process_op(subchannel_call, batch); |
| } |
| |
| // Adds a closure to closures that will execute batch in the call combiner. |
| static void add_closure_for_subchannel_batch( |
| grpc_call_element* elem, grpc_transport_stream_op_batch* batch, |
| grpc_core::CallCombinerClosureList* closures) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| batch->handler_private.extra_arg = calld->subchannel_call; |
| GRPC_CLOSURE_INIT(&batch->handler_private.closure, |
| start_batch_in_call_combiner, batch, |
| grpc_schedule_on_exec_ctx); |
| if (grpc_client_channel_trace.enabled()) { |
| char* batch_str = grpc_transport_stream_op_batch_string(batch); |
| gpr_log(GPR_INFO, "chand=%p calld=%p: starting subchannel batch: %s", chand, |
| calld, batch_str); |
| gpr_free(batch_str); |
| } |
| closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE, |
| "start_subchannel_batch"); |
| } |
| |
| // Adds retriable send_initial_metadata op to batch_data. |
| static void add_retriable_send_initial_metadata_op( |
| call_data* calld, subchannel_call_retry_state* retry_state, |
| subchannel_batch_data* batch_data) { |
| // Maps the number of retries to the corresponding metadata value slice. |
| static const grpc_slice* retry_count_strings[] = { |
| &GRPC_MDSTR_1, &GRPC_MDSTR_2, &GRPC_MDSTR_3, &GRPC_MDSTR_4}; |
| // We need to make a copy of the metadata batch for each attempt, since |
| // the filters in the subchannel stack may modify this batch, and we don't |
| // want those modifications to be passed forward to subsequent attempts. |
| // |
| // If we've already completed one or more attempts, add the |
| // grpc-retry-attempts header. |
| retry_state->send_initial_metadata_storage = |
| static_cast<grpc_linked_mdelem*>(gpr_arena_alloc( |
| calld->arena, sizeof(grpc_linked_mdelem) * |
| (calld->send_initial_metadata.list.count + |
| (calld->num_attempts_completed > 0)))); |
| grpc_metadata_batch_copy(&calld->send_initial_metadata, |
| &retry_state->send_initial_metadata, |
| retry_state->send_initial_metadata_storage); |
| if (GPR_UNLIKELY(retry_state->send_initial_metadata.idx.named |
| .grpc_previous_rpc_attempts != nullptr)) { |
| grpc_metadata_batch_remove(&retry_state->send_initial_metadata, |
| retry_state->send_initial_metadata.idx.named |
| .grpc_previous_rpc_attempts); |
| } |
| if (GPR_UNLIKELY(calld->num_attempts_completed > 0)) { |
| grpc_mdelem retry_md = grpc_mdelem_from_slices( |
| GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS, |
| *retry_count_strings[calld->num_attempts_completed - 1]); |
| grpc_error* error = grpc_metadata_batch_add_tail( |
| &retry_state->send_initial_metadata, |
| &retry_state->send_initial_metadata_storage[calld->send_initial_metadata |
| .list.count], |
| retry_md); |
| if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { |
| gpr_log(GPR_ERROR, "error adding retry metadata: %s", |
| grpc_error_string(error)); |
| GPR_ASSERT(false); |
| } |
| } |
| retry_state->started_send_initial_metadata = true; |
| batch_data->batch.send_initial_metadata = true; |
| batch_data->batch.payload->send_initial_metadata.send_initial_metadata = |
| &retry_state->send_initial_metadata; |
| batch_data->batch.payload->send_initial_metadata.send_initial_metadata_flags = |
| calld->send_initial_metadata_flags; |
| batch_data->batch.payload->send_initial_metadata.peer_string = |
| calld->peer_string; |
| } |
| |
| // Adds retriable send_message op to batch_data. |
| static void add_retriable_send_message_op( |
| grpc_call_element* elem, subchannel_call_retry_state* retry_state, |
| subchannel_batch_data* batch_data) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: starting calld->send_messages[%" PRIuPTR "]", |
| chand, calld, retry_state->started_send_message_count); |
| } |
| grpc_core::ByteStreamCache* cache = |
| (*calld->send_messages)[retry_state->started_send_message_count]; |
| ++retry_state->started_send_message_count; |
| retry_state->send_message.Init(cache); |
| batch_data->batch.send_message = true; |
| batch_data->batch.payload->send_message.send_message.reset( |
| retry_state->send_message.get()); |
| } |
| |
| // Adds retriable send_trailing_metadata op to batch_data. |
| static void add_retriable_send_trailing_metadata_op( |
| call_data* calld, subchannel_call_retry_state* retry_state, |
| subchannel_batch_data* batch_data) { |
| // We need to make a copy of the metadata batch for each attempt, since |
| // the filters in the subchannel stack may modify this batch, and we don't |
| // want those modifications to be passed forward to subsequent attempts. |
| retry_state->send_trailing_metadata_storage = |
| static_cast<grpc_linked_mdelem*>(gpr_arena_alloc( |
| calld->arena, sizeof(grpc_linked_mdelem) * |
| calld->send_trailing_metadata.list.count)); |
| grpc_metadata_batch_copy(&calld->send_trailing_metadata, |
| &retry_state->send_trailing_metadata, |
| retry_state->send_trailing_metadata_storage); |
| retry_state->started_send_trailing_metadata = true; |
| batch_data->batch.send_trailing_metadata = true; |
| batch_data->batch.payload->send_trailing_metadata.send_trailing_metadata = |
| &retry_state->send_trailing_metadata; |
| } |
| |
| // Adds retriable recv_initial_metadata op to batch_data. |
| static void add_retriable_recv_initial_metadata_op( |
| call_data* calld, subchannel_call_retry_state* retry_state, |
| subchannel_batch_data* batch_data) { |
| retry_state->started_recv_initial_metadata = true; |
| batch_data->batch.recv_initial_metadata = true; |
| grpc_metadata_batch_init(&retry_state->recv_initial_metadata); |
| batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata = |
| &retry_state->recv_initial_metadata; |
| batch_data->batch.payload->recv_initial_metadata.trailing_metadata_available = |
| &retry_state->trailing_metadata_available; |
| GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready, |
| recv_initial_metadata_ready, batch_data, |
| grpc_schedule_on_exec_ctx); |
| batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata_ready = |
| &retry_state->recv_initial_metadata_ready; |
| } |
| |
| // Adds retriable recv_message op to batch_data. |
| static void add_retriable_recv_message_op( |
| call_data* calld, subchannel_call_retry_state* retry_state, |
| subchannel_batch_data* batch_data) { |
| ++retry_state->started_recv_message_count; |
| batch_data->batch.recv_message = true; |
| batch_data->batch.payload->recv_message.recv_message = |
| &retry_state->recv_message; |
| GRPC_CLOSURE_INIT(&retry_state->recv_message_ready, recv_message_ready, |
| batch_data, grpc_schedule_on_exec_ctx); |
| batch_data->batch.payload->recv_message.recv_message_ready = |
| &retry_state->recv_message_ready; |
| } |
| |
| // Adds retriable recv_trailing_metadata op to batch_data. |
| static void add_retriable_recv_trailing_metadata_op( |
| call_data* calld, subchannel_call_retry_state* retry_state, |
| subchannel_batch_data* batch_data) { |
| retry_state->started_recv_trailing_metadata = true; |
| batch_data->batch.recv_trailing_metadata = true; |
| grpc_metadata_batch_init(&retry_state->recv_trailing_metadata); |
| batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata = |
| &retry_state->recv_trailing_metadata; |
| batch_data->batch.payload->recv_trailing_metadata.collect_stats = |
| &retry_state->collect_stats; |
| GRPC_CLOSURE_INIT(&retry_state->recv_trailing_metadata_ready, |
| recv_trailing_metadata_ready, batch_data, |
| grpc_schedule_on_exec_ctx); |
| batch_data->batch.payload->recv_trailing_metadata |
| .recv_trailing_metadata_ready = |
| &retry_state->recv_trailing_metadata_ready; |
| } |
| |
| // Helper function used to start a recv_trailing_metadata batch. This |
| // is used in the case where a recv_initial_metadata or recv_message |
| // op fails in a way that we know the call is over but when the application |
| // has not yet started its own recv_trailing_metadata op. |
| static void start_internal_recv_trailing_metadata(grpc_call_element* elem) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: call failed but recv_trailing_metadata not " |
| "started; starting it internally", |
| chand, calld); |
| } |
| subchannel_call_retry_state* retry_state = |
| static_cast<subchannel_call_retry_state*>( |
| grpc_connected_subchannel_call_get_parent_data( |
| calld->subchannel_call)); |
| // Create batch_data with 2 refs, since this batch will be unreffed twice: |
| // once for the recv_trailing_metadata_ready callback when the subchannel |
| // batch returns, and again when we actually get a recv_trailing_metadata |
| // op from the surface. |
| subchannel_batch_data* batch_data = |
| batch_data_create(elem, 2, false /* set_on_complete */); |
| add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data); |
| retry_state->recv_trailing_metadata_internal_batch = batch_data; |
| // Note: This will release the call combiner. |
| grpc_subchannel_call_process_op(calld->subchannel_call, &batch_data->batch); |
| } |
| |
| // If there are any cached send ops that need to be replayed on the |
| // current subchannel call, creates and returns a new subchannel batch |
| // to replay those ops. Otherwise, returns nullptr. |
| static subchannel_batch_data* maybe_create_subchannel_batch_for_replay( |
| grpc_call_element* elem, subchannel_call_retry_state* retry_state) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| subchannel_batch_data* replay_batch_data = nullptr; |
| // send_initial_metadata. |
| if (calld->seen_send_initial_metadata && |
| !retry_state->started_send_initial_metadata && |
| !calld->pending_send_initial_metadata) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: replaying previously completed " |
| "send_initial_metadata op", |
| chand, calld); |
| } |
| replay_batch_data = batch_data_create(elem, 1, true /* set_on_complete */); |
| add_retriable_send_initial_metadata_op(calld, retry_state, |
| replay_batch_data); |
| } |
| // send_message. |
| // Note that we can only have one send_message op in flight at a time. |
| if (retry_state->started_send_message_count < calld->send_messages->size() && |
| retry_state->started_send_message_count == |
| retry_state->completed_send_message_count && |
| !calld->pending_send_message) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: replaying previously completed " |
| "send_message op", |
| chand, calld); |
| } |
| if (replay_batch_data == nullptr) { |
| replay_batch_data = |
| batch_data_create(elem, 1, true /* set_on_complete */); |
| } |
| add_retriable_send_message_op(elem, retry_state, replay_batch_data); |
| } |
| // send_trailing_metadata. |
| // Note that we only add this op if we have no more send_message ops |
| // to start, since we can't send down any more send_message ops after |
| // send_trailing_metadata. |
| if (calld->seen_send_trailing_metadata && |
| retry_state->started_send_message_count == calld->send_messages->size() && |
| !retry_state->started_send_trailing_metadata && |
| !calld->pending_send_trailing_metadata) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: replaying previously completed " |
| "send_trailing_metadata op", |
| chand, calld); |
| } |
| if (replay_batch_data == nullptr) { |
| replay_batch_data = |
| batch_data_create(elem, 1, true /* set_on_complete */); |
| } |
| add_retriable_send_trailing_metadata_op(calld, retry_state, |
| replay_batch_data); |
| } |
| return replay_batch_data; |
| } |
| |
| // Adds subchannel batches for pending batches to batches, updating |
| // *num_batches as needed. |
| static void add_subchannel_batches_for_pending_batches( |
| grpc_call_element* elem, subchannel_call_retry_state* retry_state, |
| grpc_core::CallCombinerClosureList* closures) { |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { |
| pending_batch* pending = &calld->pending_batches[i]; |
| grpc_transport_stream_op_batch* batch = pending->batch; |
| if (batch == nullptr) continue; |
| // Skip any batch that either (a) has already been started on this |
| // subchannel call or (b) we can't start yet because we're still |
| // replaying send ops that need to be completed first. |
| // TODO(roth): Note that if any one op in the batch can't be sent |
| // yet due to ops that we're replaying, we don't start any of the ops |
| // in the batch. This is probably okay, but it could conceivably |
| // lead to increased latency in some cases -- e.g., we could delay |
| // starting a recv op due to it being in the same batch with a send |
| // op. If/when we revamp the callback protocol in |
| // transport_stream_op_batch, we may be able to fix this. |
| if (batch->send_initial_metadata && |
| retry_state->started_send_initial_metadata) { |
| continue; |
| } |
| if (batch->send_message && retry_state->completed_send_message_count < |
| retry_state->started_send_message_count) { |
| continue; |
| } |
| // Note that we only start send_trailing_metadata if we have no more |
| // send_message ops to start, since we can't send down any more |
| // send_message ops after send_trailing_metadata. |
| if (batch->send_trailing_metadata && |
| (retry_state->started_send_message_count + batch->send_message < |
| calld->send_messages->size() || |
| retry_state->started_send_trailing_metadata)) { |
| continue; |
| } |
| if (batch->recv_initial_metadata && |
| retry_state->started_recv_initial_metadata) { |
| continue; |
| } |
| if (batch->recv_message && retry_state->completed_recv_message_count < |
| retry_state->started_recv_message_count) { |
| continue; |
| } |
| if (batch->recv_trailing_metadata && |
| retry_state->started_recv_trailing_metadata) { |
| // If we previously completed a recv_trailing_metadata op |
| // initiated by start_internal_recv_trailing_metadata(), use the |
| // result of that instead of trying to re-start this op. |
| if (GPR_UNLIKELY((retry_state->recv_trailing_metadata_internal_batch != |
| nullptr))) { |
| // If the batch completed, then trigger the completion callback |
| // directly, so that we return the previously returned results to |
| // the application. Otherwise, just unref the internally |
| // started subchannel batch, since we'll propagate the |
| // completion when it completes. |
| if (retry_state->completed_recv_trailing_metadata) { |
| // Batches containing recv_trailing_metadata always succeed. |
| closures->Add( |
| &retry_state->recv_trailing_metadata_ready, GRPC_ERROR_NONE, |
| "re-executing recv_trailing_metadata_ready to propagate " |
| "internally triggered result"); |
| } else { |
| batch_data_unref(retry_state->recv_trailing_metadata_internal_batch); |
| } |
| retry_state->recv_trailing_metadata_internal_batch = nullptr; |
| } |
| continue; |
| } |
| // If we're not retrying, just send the batch as-is. |
| if (calld->method_params == nullptr || |
| calld->method_params->retry_policy() == nullptr || |
| calld->retry_committed) { |
| add_closure_for_subchannel_batch(elem, batch, closures); |
| pending_batch_clear(calld, pending); |
| continue; |
| } |
| // Create batch with the right number of callbacks. |
| const bool has_send_ops = batch->send_initial_metadata || |
| batch->send_message || |
| batch->send_trailing_metadata; |
| const int num_callbacks = has_send_ops + batch->recv_initial_metadata + |
| batch->recv_message + |
| batch->recv_trailing_metadata; |
| subchannel_batch_data* batch_data = batch_data_create( |
| elem, num_callbacks, has_send_ops /* set_on_complete */); |
| // Cache send ops if needed. |
| maybe_cache_send_ops_for_batch(calld, pending); |
| // send_initial_metadata. |
| if (batch->send_initial_metadata) { |
| add_retriable_send_initial_metadata_op(calld, retry_state, batch_data); |
| } |
| // send_message. |
| if (batch->send_message) { |
| add_retriable_send_message_op(elem, retry_state, batch_data); |
| } |
| // send_trailing_metadata. |
| if (batch->send_trailing_metadata) { |
| add_retriable_send_trailing_metadata_op(calld, retry_state, batch_data); |
| } |
| // recv_initial_metadata. |
| if (batch->recv_initial_metadata) { |
| // recv_flags is only used on the server side. |
| GPR_ASSERT(batch->payload->recv_initial_metadata.recv_flags == nullptr); |
| add_retriable_recv_initial_metadata_op(calld, retry_state, batch_data); |
| } |
| // recv_message. |
| if (batch->recv_message) { |
| add_retriable_recv_message_op(calld, retry_state, batch_data); |
| } |
| // recv_trailing_metadata. |
| if (batch->recv_trailing_metadata) { |
| add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data); |
| } |
| add_closure_for_subchannel_batch(elem, &batch_data->batch, closures); |
| // Track number of pending subchannel send batches. |
| // If this is the first one, take a ref to the call stack. |
| if (batch->send_initial_metadata || batch->send_message || |
| batch->send_trailing_metadata) { |
| if (calld->num_pending_retriable_subchannel_send_batches == 0) { |
| GRPC_CALL_STACK_REF(calld->owning_call, "subchannel_send_batches"); |
| } |
| ++calld->num_pending_retriable_subchannel_send_batches; |
| } |
| } |
| } |
| |
| // Constructs and starts whatever subchannel batches are needed on the |
| // subchannel call. |
| static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) { |
| grpc_call_element* elem = static_cast<grpc_call_element*>(arg); |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p calld=%p: constructing retriable batches", |
| chand, calld); |
| } |
| subchannel_call_retry_state* retry_state = |
| static_cast<subchannel_call_retry_state*>( |
| grpc_connected_subchannel_call_get_parent_data( |
| calld->subchannel_call)); |
| // Construct list of closures to execute, one for each pending batch. |
| grpc_core::CallCombinerClosureList closures; |
| // Replay previously-returned send_* ops if needed. |
| subchannel_batch_data* replay_batch_data = |
| maybe_create_subchannel_batch_for_replay(elem, retry_state); |
| if (replay_batch_data != nullptr) { |
| add_closure_for_subchannel_batch(elem, &replay_batch_data->batch, |
| &closures); |
| // Track number of pending subchannel send batches. |
| // If this is the first one, take a ref to the call stack. |
| if (calld->num_pending_retriable_subchannel_send_batches == 0) { |
| GRPC_CALL_STACK_REF(calld->owning_call, "subchannel_send_batches"); |
| } |
| ++calld->num_pending_retriable_subchannel_send_batches; |
| } |
| // Now add pending batches. |
| add_subchannel_batches_for_pending_batches(elem, retry_state, &closures); |
| // Start batches on subchannel call. |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: starting %" PRIuPTR |
| " retriable batches on subchannel_call=%p", |
| chand, calld, closures.size(), calld->subchannel_call); |
| } |
| // Note: This will yield the call combiner. |
| closures.RunClosures(calld->call_combiner); |
| } |
| |
| // |
| // Channelz |
| // |
| |
| static void recv_trailing_metadata_ready_channelz(void* arg, |
| grpc_error* error) { |
| grpc_call_element* elem = static_cast<grpc_call_element*>(arg); |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: got recv_trailing_metadata_ready_channelz, " |
| "error=%s", |
| chand, calld, grpc_error_string(error)); |
| } |
| GPR_ASSERT(calld->recv_trailing_metadata != nullptr); |
| grpc_status_code status = GRPC_STATUS_OK; |
| grpc_metadata_batch* md_batch = calld->recv_trailing_metadata; |
| get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status, nullptr); |
| grpc_core::channelz::SubchannelNode* channelz_subchannel = |
| calld->pick.connected_subchannel->channelz_subchannel(); |
| GPR_ASSERT(channelz_subchannel != nullptr); |
| if (status == GRPC_STATUS_OK) { |
| channelz_subchannel->RecordCallSucceeded(); |
| } else { |
| channelz_subchannel->RecordCallFailed(); |
| } |
| calld->recv_trailing_metadata = nullptr; |
| GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata, error); |
| } |
| |
| // If channelz is enabled, intercept recv_trailing so that we may check the |
| // status and associate it to a subchannel. |
| // Returns true if callback was intercepted, false otherwise. |
| static void maybe_intercept_recv_trailing_metadata_for_channelz( |
| grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| // only intercept payloads with recv trailing. |
| if (!batch->recv_trailing_metadata) { |
| return; |
| } |
| // only add interceptor is channelz is enabled. |
| if (calld->pick.connected_subchannel->channelz_subchannel() == nullptr) { |
| return; |
| } |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "calld=%p batch=%p: intercepting recv trailing for channelz", calld, |
| batch); |
| } |
| GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready_channelz, |
| recv_trailing_metadata_ready_channelz, elem, |
| grpc_schedule_on_exec_ctx); |
| // save some state needed for the interception callback. |
| GPR_ASSERT(calld->recv_trailing_metadata == nullptr); |
| calld->recv_trailing_metadata = |
| batch->payload->recv_trailing_metadata.recv_trailing_metadata; |
| calld->original_recv_trailing_metadata = |
| batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; |
| batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = |
| &calld->recv_trailing_metadata_ready_channelz; |
| } |
| |
| // |
| // LB pick |
| // |
| |
| static void create_subchannel_call(grpc_call_element* elem, grpc_error* error) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| const size_t parent_data_size = |
| calld->enable_retries ? sizeof(subchannel_call_retry_state) : 0; |
| const grpc_core::ConnectedSubchannel::CallArgs call_args = { |
| calld->pollent, // pollent |
| calld->path, // path |
| calld->call_start_time, // start_time |
| calld->deadline, // deadline |
| calld->arena, // arena |
| calld->pick.subchannel_call_context, // context |
| calld->call_combiner, // call_combiner |
| parent_data_size // parent_data_size |
| }; |
| grpc_error* new_error = calld->pick.connected_subchannel->CreateCall( |
| call_args, &calld->subchannel_call); |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p calld=%p: create subchannel_call=%p: error=%s", |
| chand, calld, calld->subchannel_call, grpc_error_string(new_error)); |
| } |
| if (GPR_UNLIKELY(new_error != GRPC_ERROR_NONE)) { |
| new_error = grpc_error_add_child(new_error, error); |
| pending_batches_fail(elem, new_error, true /* yield_call_combiner */); |
| } else { |
| grpc_core::channelz::SubchannelNode* channelz_subchannel = |
| calld->pick.connected_subchannel->channelz_subchannel(); |
| if (channelz_subchannel != nullptr) { |
| channelz_subchannel->RecordCallStarted(); |
| } |
| if (parent_data_size > 0) { |
| subchannel_call_retry_state* retry_state = |
| static_cast<subchannel_call_retry_state*>( |
| grpc_connected_subchannel_call_get_parent_data( |
| calld->subchannel_call)); |
| retry_state->batch_payload.context = calld->pick.subchannel_call_context; |
| } |
| pending_batches_resume(elem); |
| } |
| GRPC_ERROR_UNREF(error); |
| } |
| |
| // Invoked when a pick is completed, on both success or failure. |
| static void pick_done(void* arg, grpc_error* error) { |
| grpc_call_element* elem = static_cast<grpc_call_element*>(arg); |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| if (GPR_UNLIKELY(calld->pick.connected_subchannel == nullptr)) { |
| // Failed to create subchannel. |
| // If there was no error, this is an LB policy drop, in which case |
| // we return an error; otherwise, we may retry. |
| grpc_status_code status = GRPC_STATUS_OK; |
| grpc_error_get_status(error, calld->deadline, &status, nullptr, nullptr, |
| nullptr); |
| if (error == GRPC_ERROR_NONE || !calld->enable_retries || |
| !maybe_retry(elem, nullptr /* batch_data */, status, |
| nullptr /* server_pushback_md */)) { |
| grpc_error* new_error = |
| error == GRPC_ERROR_NONE |
| ? GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
| "Call dropped by load balancing policy") |
| : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( |
| "Failed to create subchannel", &error, 1); |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: failed to create subchannel: error=%s", |
| chand, calld, grpc_error_string(new_error)); |
| } |
| pending_batches_fail(elem, new_error, true /* yield_call_combiner */); |
| } |
| } else { |
| /* Create call on subchannel. */ |
| create_subchannel_call(elem, GRPC_ERROR_REF(error)); |
| } |
| } |
| |
| static void maybe_add_call_to_channel_interested_parties_locked( |
| grpc_call_element* elem) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| if (!calld->pollent_added_to_interested_parties) { |
| calld->pollent_added_to_interested_parties = true; |
| grpc_polling_entity_add_to_pollset_set(calld->pollent, |
| chand->interested_parties); |
| } |
| } |
| |
| static void maybe_del_call_from_channel_interested_parties_locked( |
| grpc_call_element* elem) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| if (calld->pollent_added_to_interested_parties) { |
| calld->pollent_added_to_interested_parties = false; |
| grpc_polling_entity_del_from_pollset_set(calld->pollent, |
| chand->interested_parties); |
| } |
| } |
| |
| // Invoked when a pick is completed to leave the client_channel combiner |
| // and continue processing in the call combiner. |
| // If needed, removes the call's polling entity from chand->interested_parties. |
| static void pick_done_locked(grpc_call_element* elem, grpc_error* error) { |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| maybe_del_call_from_channel_interested_parties_locked(elem); |
| GRPC_CLOSURE_INIT(&calld->pick_closure, pick_done, elem, |
| grpc_schedule_on_exec_ctx); |
| GRPC_CLOSURE_SCHED(&calld->pick_closure, error); |
| } |
| |
| namespace grpc_core { |
| |
| // Performs subchannel pick via LB policy. |
| class LbPicker { |
| public: |
| // Starts a pick on chand->lb_policy. |
| static void StartLocked(grpc_call_element* elem) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p calld=%p: starting pick on lb_policy=%p", |
| chand, calld, chand->lb_policy.get()); |
| } |
| // If this is a retry, use the send_initial_metadata payload that |
| // we've cached; otherwise, use the pending batch. The |
| // send_initial_metadata batch will be the first pending batch in the |
| // list, as set by get_batch_index() above. |
| calld->pick.initial_metadata = |
| calld->seen_send_initial_metadata |
| ? &calld->send_initial_metadata |
| : calld->pending_batches[0] |
| .batch->payload->send_initial_metadata.send_initial_metadata; |
| calld->pick.initial_metadata_flags = |
| calld->seen_send_initial_metadata |
| ? calld->send_initial_metadata_flags |
| : calld->pending_batches[0] |
| .batch->payload->send_initial_metadata |
| .send_initial_metadata_flags; |
| GRPC_CLOSURE_INIT(&calld->pick_closure, &LbPicker::DoneLocked, elem, |
| grpc_combiner_scheduler(chand->combiner)); |
| calld->pick.on_complete = &calld->pick_closure; |
| GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback"); |
| grpc_error* error = GRPC_ERROR_NONE; |
| const bool pick_done = chand->lb_policy->PickLocked(&calld->pick, &error); |
| if (GPR_LIKELY(pick_done)) { |
| // Pick completed synchronously. |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed synchronously", |
| chand, calld); |
| } |
| pick_done_locked(elem, error); |
| GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback"); |
| } else { |
| // Pick will be returned asynchronously. |
| // Add the polling entity from call_data to the channel_data's |
| // interested_parties, so that the I/O of the LB policy can be done |
| // under it. It will be removed in pick_done_locked(). |
| maybe_add_call_to_channel_interested_parties_locked(elem); |
| // Request notification on call cancellation. |
| GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback_cancel"); |
| grpc_call_combiner_set_notify_on_cancel( |
| calld->call_combiner, |
| GRPC_CLOSURE_INIT(&calld->pick_cancel_closure, |
| &LbPicker::CancelLocked, elem, |
| grpc_combiner_scheduler(chand->combiner))); |
| } |
| } |
| |
| private: |
| // Callback invoked by LoadBalancingPolicy::PickLocked() for async picks. |
| // Unrefs the LB policy and invokes pick_done_locked(). |
| static void DoneLocked(void* arg, grpc_error* error) { |
| grpc_call_element* elem = static_cast<grpc_call_element*>(arg); |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed asynchronously", |
| chand, calld); |
| } |
| pick_done_locked(elem, GRPC_ERROR_REF(error)); |
| GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback"); |
| } |
| |
| // Note: This runs under the client_channel combiner, but will NOT be |
| // holding the call combiner. |
| static void CancelLocked(void* arg, grpc_error* error) { |
| grpc_call_element* elem = static_cast<grpc_call_element*>(arg); |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| // Note: chand->lb_policy may have changed since we started our pick, |
| // in which case we will be cancelling the pick on a policy other than |
| // the one we started it on. However, this will just be a no-op. |
| if (GPR_UNLIKELY(error != GRPC_ERROR_NONE && chand->lb_policy != nullptr)) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: cancelling pick from LB policy %p", chand, |
| calld, chand->lb_policy.get()); |
| } |
| chand->lb_policy->CancelPickLocked(&calld->pick, GRPC_ERROR_REF(error)); |
| } |
| GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback_cancel"); |
| } |
| }; |
| |
| } // namespace grpc_core |
| |
| // Applies service config to the call. Must be invoked once we know |
| // that the resolver has returned results to the channel. |
| static void apply_service_config_to_call_locked(grpc_call_element* elem) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call", |
| chand, calld); |
| } |
| if (chand->retry_throttle_data != nullptr) { |
| calld->retry_throttle_data = chand->retry_throttle_data->Ref(); |
| } |
| if (chand->method_params_table != nullptr) { |
| calld->method_params = grpc_core::ServiceConfig::MethodConfigTableLookup( |
| *chand->method_params_table, calld->path); |
| if (calld->method_params != nullptr) { |
| // If the deadline from the service config is shorter than the one |
| // from the client API, reset the deadline timer. |
| if (chand->deadline_checking_enabled && |
| calld->method_params->timeout() != 0) { |
| const grpc_millis per_method_deadline = |
| grpc_timespec_to_millis_round_up(calld->call_start_time) + |
| calld->method_params->timeout(); |
| if (per_method_deadline < calld->deadline) { |
| calld->deadline = per_method_deadline; |
| grpc_deadline_state_reset(elem, calld->deadline); |
| } |
| } |
| // If the service config set wait_for_ready and the application |
| // did not explicitly set it, use the value from the service config. |
| uint32_t* send_initial_metadata_flags = |
| &calld->pending_batches[0] |
| .batch->payload->send_initial_metadata |
| .send_initial_metadata_flags; |
| if (GPR_UNLIKELY( |
| calld->method_params->wait_for_ready() != |
| ClientChannelMethodParams::WAIT_FOR_READY_UNSET && |
| !(*send_initial_metadata_flags & |
| GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET))) { |
| if (calld->method_params->wait_for_ready() == |
| ClientChannelMethodParams::WAIT_FOR_READY_TRUE) { |
| *send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY; |
| } else { |
| *send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY; |
| } |
| } |
| } |
| } |
| // If no retry policy, disable retries. |
| // TODO(roth): Remove this when adding support for transparent retries. |
| if (calld->method_params == nullptr || |
| calld->method_params->retry_policy() == nullptr) { |
| calld->enable_retries = false; |
| } |
| } |
| |
| // Invoked once resolver results are available. |
| static void process_service_config_and_start_lb_pick_locked( |
| grpc_call_element* elem) { |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| // Only get service config data on the first attempt. |
| if (GPR_LIKELY(calld->num_attempts_completed == 0)) { |
| apply_service_config_to_call_locked(elem); |
| } |
| // Start LB pick. |
| grpc_core::LbPicker::StartLocked(elem); |
| } |
| |
| namespace grpc_core { |
| |
| // Handles waiting for a resolver result. |
| // Used only for the first call on an idle channel. |
| class ResolverResultWaiter { |
| public: |
| explicit ResolverResultWaiter(grpc_call_element* elem) : elem_(elem) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: deferring pick pending resolver result", |
| chand, calld); |
| } |
| // Add closure to be run when a resolver result is available. |
| GRPC_CLOSURE_INIT(&done_closure_, &ResolverResultWaiter::DoneLocked, this, |
| grpc_combiner_scheduler(chand->combiner)); |
| AddToWaitingList(); |
| // Set cancellation closure, so that we abort if the call is cancelled. |
| GRPC_CLOSURE_INIT(&cancel_closure_, &ResolverResultWaiter::CancelLocked, |
| this, grpc_combiner_scheduler(chand->combiner)); |
| grpc_call_combiner_set_notify_on_cancel(calld->call_combiner, |
| &cancel_closure_); |
| } |
| |
| private: |
| // Adds closure_ to chand->waiting_for_resolver_result_closures. |
| void AddToWaitingList() { |
| channel_data* chand = static_cast<channel_data*>(elem_->channel_data); |
| grpc_closure_list_append(&chand->waiting_for_resolver_result_closures, |
| &done_closure_, GRPC_ERROR_NONE); |
| } |
| |
| // Invoked when a resolver result is available. |
| static void DoneLocked(void* arg, grpc_error* error) { |
| ResolverResultWaiter* self = static_cast<ResolverResultWaiter*>(arg); |
| // If CancelLocked() has already run, delete ourselves without doing |
| // anything. Note that the call stack may have already been destroyed, |
| // so it's not safe to access anything in elem_. |
| if (GPR_UNLIKELY(self->finished_)) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "call cancelled before resolver result"); |
| } |
| Delete(self); |
| return; |
| } |
| // Otherwise, process the resolver result. |
| grpc_call_element* elem = self->elem_; |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p calld=%p: resolver failed to return data", |
| chand, calld); |
| } |
| pick_done_locked(elem, GRPC_ERROR_REF(error)); |
| } else if (GPR_UNLIKELY(chand->resolver == nullptr)) { |
| // Shutting down. |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p calld=%p: resolver disconnected", chand, |
| calld); |
| } |
| pick_done_locked(elem, |
| GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); |
| } else if (GPR_UNLIKELY(chand->lb_policy == nullptr)) { |
| // Transient resolver failure. |
| // If call has wait_for_ready=true, try again; otherwise, fail. |
| uint32_t send_initial_metadata_flags = |
| calld->seen_send_initial_metadata |
| ? calld->send_initial_metadata_flags |
| : calld->pending_batches[0] |
| .batch->payload->send_initial_metadata |
| .send_initial_metadata_flags; |
| if (send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: resolver returned but no LB policy; " |
| "wait_for_ready=true; trying again", |
| chand, calld); |
| } |
| // Re-add ourselves to the waiting list. |
| self->AddToWaitingList(); |
| // Return early so that we don't set finished_ to true below. |
| return; |
| } else { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: resolver returned but no LB policy; " |
| "wait_for_ready=false; failing", |
| chand, calld); |
| } |
| pick_done_locked( |
| elem, |
| grpc_error_set_int( |
| GRPC_ERROR_CREATE_FROM_STATIC_STRING("Name resolution failure"), |
| GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); |
| } |
| } else { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p calld=%p: resolver returned, doing LB pick", |
| chand, calld); |
| } |
| process_service_config_and_start_lb_pick_locked(elem); |
| } |
| self->finished_ = true; |
| } |
| |
| // Invoked when the call is cancelled. |
| // Note: This runs under the client_channel combiner, but will NOT be |
| // holding the call combiner. |
| static void CancelLocked(void* arg, grpc_error* error) { |
| ResolverResultWaiter* self = static_cast<ResolverResultWaiter*>(arg); |
| // If DoneLocked() has already run, delete ourselves without doing anything. |
| if (GPR_LIKELY(self->finished_)) { |
| Delete(self); |
| return; |
| } |
| // If we are being cancelled, immediately invoke pick_done_locked() |
| // to propagate the error back to the caller. |
| if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { |
| grpc_call_element* elem = self->elem_; |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: cancelling call waiting for name " |
| "resolution", |
| chand, calld); |
| } |
| // Note: Although we are not in the call combiner here, we are |
| // basically stealing the call combiner from the pending pick, so |
| // it's safe to call pick_done_locked() here -- we are essentially |
| // calling it here instead of calling it in DoneLocked(). |
| pick_done_locked(elem, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( |
| "Pick cancelled", &error, 1)); |
| } |
| self->finished_ = true; |
| } |
| |
| grpc_call_element* elem_; |
| grpc_closure done_closure_; |
| grpc_closure cancel_closure_; |
| bool finished_ = false; |
| }; |
| |
| } // namespace grpc_core |
| |
| static void start_pick_locked(void* arg, grpc_error* ignored) { |
| grpc_call_element* elem = static_cast<grpc_call_element*>(arg); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| GPR_ASSERT(calld->pick.connected_subchannel == nullptr); |
| GPR_ASSERT(calld->subchannel_call == nullptr); |
| if (GPR_LIKELY(chand->lb_policy != nullptr)) { |
| // We already have resolver results, so process the service config |
| // and start an LB pick. |
| process_service_config_and_start_lb_pick_locked(elem); |
| } else if (GPR_UNLIKELY(chand->resolver == nullptr)) { |
| pick_done_locked(elem, |
| GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); |
| } else { |
| // We do not yet have an LB policy, so wait for a resolver result. |
| if (GPR_UNLIKELY(!chand->started_resolving)) { |
| start_resolving_locked(chand); |
| } |
| // Create a new waiter, which will delete itself when done. |
| grpc_core::New<grpc_core::ResolverResultWaiter>(elem); |
| // Add the polling entity from call_data to the channel_data's |
| // interested_parties, so that the I/O of the resolver can be done |
| // under it. It will be removed in pick_done_locked(). |
| maybe_add_call_to_channel_interested_parties_locked(elem); |
| } |
| } |
| |
| // |
| // filter call vtable functions |
| // |
| |
| static void cc_start_transport_stream_op_batch( |
| grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { |
| GPR_TIMER_SCOPE("cc_start_transport_stream_op_batch", 0); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| if (GPR_LIKELY(chand->deadline_checking_enabled)) { |
| grpc_deadline_state_client_start_transport_stream_op_batch(elem, batch); |
| } |
| // If we've previously been cancelled, immediately fail any new batches. |
| if (GPR_UNLIKELY(calld->cancel_error != GRPC_ERROR_NONE)) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p calld=%p: failing batch with error: %s", |
| chand, calld, grpc_error_string(calld->cancel_error)); |
| } |
| // Note: This will release the call combiner. |
| grpc_transport_stream_op_batch_finish_with_failure( |
| batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner); |
| return; |
| } |
| // Handle cancellation. |
| if (GPR_UNLIKELY(batch->cancel_stream)) { |
| // Stash a copy of cancel_error in our call data, so that we can use |
| // it for subsequent operations. This ensures that if the call is |
| // cancelled before any batches are passed down (e.g., if the deadline |
| // is in the past when the call starts), we can return the right |
| // error to the caller when the first batch does get passed down. |
| GRPC_ERROR_UNREF(calld->cancel_error); |
| calld->cancel_error = |
| GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error); |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p calld=%p: recording cancel_error=%s", chand, |
| calld, grpc_error_string(calld->cancel_error)); |
| } |
| // If we do not have a subchannel call (i.e., a pick has not yet |
| // been started), fail all pending batches. Otherwise, send the |
| // cancellation down to the subchannel call. |
| if (calld->subchannel_call == nullptr) { |
| pending_batches_fail(elem, GRPC_ERROR_REF(calld->cancel_error), |
| false /* yield_call_combiner */); |
| // Note: This will release the call combiner. |
| grpc_transport_stream_op_batch_finish_with_failure( |
| batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner); |
| } else { |
| // Note: This will release the call combiner. |
| grpc_subchannel_call_process_op(calld->subchannel_call, batch); |
| } |
| return; |
| } |
| // Add the batch to the pending list. |
| pending_batches_add(elem, batch); |
| // Check if we've already gotten a subchannel call. |
| // Note that once we have completed the pick, we do not need to enter |
| // the channel combiner, which is more efficient (especially for |
| // streaming calls). |
| if (calld->subchannel_call != nullptr) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: starting batch on subchannel_call=%p", chand, |
| calld, calld->subchannel_call); |
| } |
| pending_batches_resume(elem); |
| return; |
| } |
| // We do not yet have a subchannel call. |
| // For batches containing a send_initial_metadata op, enter the channel |
| // combiner to start a pick. |
| if (GPR_LIKELY(batch->send_initial_metadata)) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p calld=%p: entering client_channel combiner", |
| chand, calld); |
| } |
| GRPC_CLOSURE_SCHED( |
| GRPC_CLOSURE_INIT(&batch->handler_private.closure, start_pick_locked, |
| elem, grpc_combiner_scheduler(chand->combiner)), |
| GRPC_ERROR_NONE); |
| } else { |
| // For all other batches, release the call combiner. |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: saved batch, yielding call combiner", chand, |
| calld); |
| } |
| GRPC_CALL_COMBINER_STOP(calld->call_combiner, |
| "batch does not include send_initial_metadata"); |
| } |
| } |
| |
| /* Constructor for call_data */ |
| static grpc_error* cc_init_call_elem(grpc_call_element* elem, |
| const grpc_call_element_args* args) { |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| // Initialize data members. |
| calld->path = grpc_slice_ref_internal(args->path); |
| calld->call_start_time = args->start_time; |
| calld->deadline = args->deadline; |
| calld->arena = args->arena; |
| calld->owning_call = args->call_stack; |
| calld->call_combiner = args->call_combiner; |
| if (GPR_LIKELY(chand->deadline_checking_enabled)) { |
| grpc_deadline_state_init(elem, args->call_stack, args->call_combiner, |
| calld->deadline); |
| } |
| calld->enable_retries = chand->enable_retries; |
| calld->send_messages.Init(); |
| return GRPC_ERROR_NONE; |
| } |
| |
| /* Destructor for call_data */ |
| static void cc_destroy_call_elem(grpc_call_element* elem, |
| const grpc_call_final_info* final_info, |
| grpc_closure* then_schedule_closure) { |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| if (GPR_LIKELY(chand->deadline_checking_enabled)) { |
| grpc_deadline_state_destroy(elem); |
| } |
| grpc_slice_unref_internal(calld->path); |
| calld->retry_throttle_data.reset(); |
| calld->method_params.reset(); |
| GRPC_ERROR_UNREF(calld->cancel_error); |
| if (GPR_LIKELY(calld->subchannel_call != nullptr)) { |
| grpc_subchannel_call_set_cleanup_closure(calld->subchannel_call, |
| then_schedule_closure); |
| then_schedule_closure = nullptr; |
| GRPC_SUBCHANNEL_CALL_UNREF(calld->subchannel_call, |
| "client_channel_destroy_call"); |
| } |
| for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { |
| GPR_ASSERT(calld->pending_batches[i].batch == nullptr); |
| } |
| if (GPR_LIKELY(calld->pick.connected_subchannel != nullptr)) { |
| calld->pick.connected_subchannel.reset(); |
| } |
| for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) { |
| if (calld->pick.subchannel_call_context[i].value != nullptr) { |
| calld->pick.subchannel_call_context[i].destroy( |
| calld->pick.subchannel_call_context[i].value); |
| } |
| } |
| calld->send_messages.Destroy(); |
| GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE); |
| } |
| |
| static void cc_set_pollset_or_pollset_set(grpc_call_element* elem, |
| grpc_polling_entity* pollent) { |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| calld->pollent = pollent; |
| } |
| |
| /************************************************************************* |
| * EXPORTED SYMBOLS |
| */ |
| |
| const grpc_channel_filter grpc_client_channel_filter = { |
| cc_start_transport_stream_op_batch, |
| cc_start_transport_op, |
| sizeof(call_data), |
| cc_init_call_elem, |
| cc_set_pollset_or_pollset_set, |
| cc_destroy_call_elem, |
| sizeof(channel_data), |
| cc_init_channel_elem, |
| cc_destroy_channel_elem, |
| cc_get_channel_info, |
| "client-channel", |
| }; |
| |
| static void try_to_connect_locked(void* arg, grpc_error* error_ignored) { |
| channel_data* chand = static_cast<channel_data*>(arg); |
| if (chand->lb_policy != nullptr) { |
| chand->lb_policy->ExitIdleLocked(); |
| } else { |
| chand->exit_idle_when_lb_policy_arrives = true; |
| if (!chand->started_resolving && chand->resolver != nullptr) { |
| start_resolving_locked(chand); |
| } |
| } |
| GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "try_to_connect"); |
| } |
| |
| void grpc_client_channel_populate_child_refs( |
| grpc_channel_element* elem, grpc_core::ChildRefsList* child_subchannels, |
| grpc_core::ChildRefsList* child_channels) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| if (chand->lb_policy != nullptr) { |
| chand->lb_policy->FillChildRefsForChannelz(child_subchannels, |
| child_channels); |
| } |
| } |
| |
| grpc_connectivity_state grpc_client_channel_check_connectivity_state( |
| grpc_channel_element* elem, int try_to_connect) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| grpc_connectivity_state out = |
| grpc_connectivity_state_check(&chand->state_tracker); |
| if (out == GRPC_CHANNEL_IDLE && try_to_connect) { |
| GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect"); |
| GRPC_CLOSURE_SCHED( |
| GRPC_CLOSURE_CREATE(try_to_connect_locked, chand, |
| grpc_combiner_scheduler(chand->combiner)), |
| GRPC_ERROR_NONE); |
| } |
| return out; |
| } |
| |
| typedef struct external_connectivity_watcher { |
| channel_data* chand; |
| grpc_polling_entity pollent; |
| grpc_closure* on_complete; |
| grpc_closure* watcher_timer_init; |
| grpc_connectivity_state* state; |
| grpc_closure my_closure; |
| struct external_connectivity_watcher* next; |
| } external_connectivity_watcher; |
| |
| static external_connectivity_watcher* lookup_external_connectivity_watcher( |
| channel_data* chand, grpc_closure* on_complete) { |
| gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); |
| external_connectivity_watcher* w = |
| chand->external_connectivity_watcher_list_head; |
| while (w != nullptr && w->on_complete != on_complete) { |
| w = w->next; |
| } |
| gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); |
| return w; |
| } |
| |
| static void external_connectivity_watcher_list_append( |
| channel_data* chand, external_connectivity_watcher* w) { |
| GPR_ASSERT(!lookup_external_connectivity_watcher(chand, w->on_complete)); |
| |
| gpr_mu_lock(&w->chand->external_connectivity_watcher_list_mu); |
| GPR_ASSERT(!w->next); |
| w->next = chand->external_connectivity_watcher_list_head; |
| chand->external_connectivity_watcher_list_head = w; |
| gpr_mu_unlock(&w->chand->external_connectivity_watcher_list_mu); |
| } |
| |
| static void external_connectivity_watcher_list_remove( |
| channel_data* chand, external_connectivity_watcher* too_remove) { |
| GPR_ASSERT( |
| lookup_external_connectivity_watcher(chand, too_remove->on_complete)); |
| gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); |
| if (too_remove == chand->external_connectivity_watcher_list_head) { |
| chand->external_connectivity_watcher_list_head = too_remove->next; |
| gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); |
| return; |
| } |
| external_connectivity_watcher* w = |
| chand->external_connectivity_watcher_list_head; |
| while (w != nullptr) { |
| if (w->next == too_remove) { |
| w->next = w->next->next; |
| gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); |
| return; |
| } |
| w = w->next; |
| } |
| GPR_UNREACHABLE_CODE(return ); |
| } |
| |
| int grpc_client_channel_num_external_connectivity_watchers( |
| grpc_channel_element* elem) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| int count = 0; |
| |
| gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); |
| external_connectivity_watcher* w = |
| chand->external_connectivity_watcher_list_head; |
| while (w != nullptr) { |
| count++; |
| w = w->next; |
| } |
| gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); |
| |
| return count; |
| } |
| |
| static void on_external_watch_complete_locked(void* arg, grpc_error* error) { |
| external_connectivity_watcher* w = |
| static_cast<external_connectivity_watcher*>(arg); |
| grpc_closure* follow_up = w->on_complete; |
| grpc_polling_entity_del_from_pollset_set(&w->pollent, |
| w->chand->interested_parties); |
| GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack, |
| "external_connectivity_watcher"); |
| external_connectivity_watcher_list_remove(w->chand, w); |
| gpr_free(w); |
| GRPC_CLOSURE_SCHED(follow_up, GRPC_ERROR_REF(error)); |
| } |
| |
| static void watch_connectivity_state_locked(void* arg, |
| grpc_error* error_ignored) { |
| external_connectivity_watcher* w = |
| static_cast<external_connectivity_watcher*>(arg); |
| external_connectivity_watcher* found = nullptr; |
| if (w->state != nullptr) { |
| external_connectivity_watcher_list_append(w->chand, w); |
| // An assumption is being made that the closure is scheduled on the exec ctx |
| // scheduler and that GRPC_CLOSURE_RUN would run the closure immediately. |
| GRPC_CLOSURE_RUN(w->watcher_timer_init, GRPC_ERROR_NONE); |
| GRPC_CLOSURE_INIT(&w->my_closure, on_external_watch_complete_locked, w, |
| grpc_combiner_scheduler(w->chand->combiner)); |
| grpc_connectivity_state_notify_on_state_change(&w->chand->state_tracker, |
| w->state, &w->my_closure); |
| } else { |
| GPR_ASSERT(w->watcher_timer_init == nullptr); |
| found = lookup_external_connectivity_watcher(w->chand, w->on_complete); |
| if (found) { |
| GPR_ASSERT(found->on_complete == w->on_complete); |
| grpc_connectivity_state_notify_on_state_change( |
| &found->chand->state_tracker, nullptr, &found->my_closure); |
| } |
| grpc_polling_entity_del_from_pollset_set(&w->pollent, |
| w->chand->interested_parties); |
| GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack, |
| "external_connectivity_watcher"); |
| gpr_free(w); |
| } |
| } |
| |
| void grpc_client_channel_watch_connectivity_state( |
| grpc_channel_element* elem, grpc_polling_entity pollent, |
| grpc_connectivity_state* state, grpc_closure* closure, |
| grpc_closure* watcher_timer_init) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| external_connectivity_watcher* w = |
| static_cast<external_connectivity_watcher*>(gpr_zalloc(sizeof(*w))); |
| w->chand = chand; |
| w->pollent = pollent; |
| w->on_complete = closure; |
| w->state = state; |
| w->watcher_timer_init = watcher_timer_init; |
| grpc_polling_entity_add_to_pollset_set(&w->pollent, |
| chand->interested_parties); |
| GRPC_CHANNEL_STACK_REF(w->chand->owning_stack, |
| "external_connectivity_watcher"); |
| GRPC_CLOSURE_SCHED( |
| GRPC_CLOSURE_INIT(&w->my_closure, watch_connectivity_state_locked, w, |
| grpc_combiner_scheduler(chand->combiner)), |
| GRPC_ERROR_NONE); |
| } |
| |
| grpc_subchannel_call* grpc_client_channel_get_subchannel_call( |
| grpc_call_element* elem) { |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| return calld->subchannel_call; |
| } |