blob: fc150313ce176306e3d971c873453ebf509c5776 [file] [log] [blame]
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/support/port_platform.h>
#include "src/core/ext/transport/chttp2/client/chttp2_connector.h"
#include <string.h>
#include <grpc/grpc.h>
#include <grpc/grpc_posix.h>
#include <grpc/slice_buffer.h>
#include <grpc/support/alloc.h>
#include <grpc/support/string_util.h>
#include "src/core/ext/filters/client_channel/client_channel.h"
#include "src/core/ext/filters/client_channel/connector.h"
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/handshaker.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/tcp_client.h"
#include "src/core/lib/resolver/resolver_registry.h"
#include "src/core/lib/resource_quota/api.h"
#include "src/core/lib/security/credentials/credentials.h"
#include "src/core/lib/security/security_connector/security_connector.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/transport/transport.h"
#include "src/core/lib/uri/uri_parser.h"
#ifdef GPR_SUPPORT_CHANNELS_FROM_FD
#include <fcntl.h>
#include "src/core/lib/iomgr/tcp_client_posix.h"
#include "src/core/lib/iomgr/tcp_posix.h"
#endif // GPR_SUPPORT_CHANNELS_FROM_FD
namespace grpc_core {
Chttp2Connector::Chttp2Connector() {
GRPC_CLOSURE_INIT(&connected_, Connected, this, grpc_schedule_on_exec_ctx);
}
Chttp2Connector::~Chttp2Connector() {
if (endpoint_ != nullptr) {
grpc_endpoint_destroy(endpoint_);
}
}
void Chttp2Connector::Connect(const Args& args, Result* result,
grpc_closure* notify) {
grpc_endpoint** ep;
{
MutexLock lock(&mu_);
GPR_ASSERT(notify_ == nullptr);
args_ = args;
result_ = result;
notify_ = notify;
GPR_ASSERT(!connecting_);
connecting_ = true;
GPR_ASSERT(endpoint_ == nullptr);
ep = &endpoint_;
}
// In some implementations, the closure can be flushed before
// grpc_tcp_client_connect() returns, and since the closure requires access
// to mu_, this can result in a deadlock (see
// https://github.com/grpc/grpc/issues/16427 for details).
// grpc_tcp_client_connect() will fill endpoint_ with proper contents, and we
// make sure that we still exist at that point by taking a ref.
Ref().release(); // Ref held by callback.
grpc_tcp_client_connect(&connected_, ep, args.interested_parties,
args.channel_args, args.address, args.deadline);
}
void Chttp2Connector::Shutdown(grpc_error_handle error) {
MutexLock lock(&mu_);
shutdown_ = true;
if (handshake_mgr_ != nullptr) {
handshake_mgr_->Shutdown(GRPC_ERROR_REF(error));
}
// If handshaking is not yet in progress, shutdown the endpoint.
// Otherwise, the handshaker will do this for us.
if (!connecting_ && endpoint_ != nullptr) {
grpc_endpoint_shutdown(endpoint_, GRPC_ERROR_REF(error));
}
GRPC_ERROR_UNREF(error);
}
void Chttp2Connector::Connected(void* arg, grpc_error_handle error) {
Chttp2Connector* self = static_cast<Chttp2Connector*>(arg);
bool unref = false;
{
MutexLock lock(&self->mu_);
GPR_ASSERT(self->connecting_);
self->connecting_ = false;
if (error != GRPC_ERROR_NONE || self->shutdown_) {
if (error == GRPC_ERROR_NONE) {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("connector shutdown");
} else {
error = GRPC_ERROR_REF(error);
}
if (self->endpoint_ != nullptr) {
grpc_endpoint_shutdown(self->endpoint_, GRPC_ERROR_REF(error));
}
self->result_->Reset();
grpc_closure* notify = self->notify_;
self->notify_ = nullptr;
ExecCtx::Run(DEBUG_LOCATION, notify, error);
unref = true;
} else {
GPR_ASSERT(self->endpoint_ != nullptr);
self->StartHandshakeLocked();
}
}
if (unref) self->Unref();
}
void Chttp2Connector::StartHandshakeLocked() {
handshake_mgr_ = MakeRefCounted<HandshakeManager>();
CoreConfiguration::Get().handshaker_registry().AddHandshakers(
HANDSHAKER_CLIENT, args_.channel_args, args_.interested_parties,
handshake_mgr_.get());
grpc_endpoint_add_to_pollset_set(endpoint_, args_.interested_parties);
handshake_mgr_->DoHandshake(endpoint_, args_.channel_args, args_.deadline,
nullptr /* acceptor */, OnHandshakeDone, this);
endpoint_ = nullptr; // Endpoint handed off to handshake manager.
}
namespace {
void NullThenSchedClosure(const DebugLocation& location, grpc_closure** closure,
grpc_error_handle error) {
grpc_closure* c = *closure;
*closure = nullptr;
ExecCtx::Run(location, c, error);
}
} // namespace
void Chttp2Connector::OnHandshakeDone(void* arg, grpc_error_handle error) {
auto* args = static_cast<HandshakerArgs*>(arg);
Chttp2Connector* self = static_cast<Chttp2Connector*>(args->user_data);
{
MutexLock lock(&self->mu_);
if (error != GRPC_ERROR_NONE || self->shutdown_) {
if (error == GRPC_ERROR_NONE) {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("connector shutdown");
// We were shut down after handshaking completed successfully, so
// destroy the endpoint here.
if (args->endpoint != nullptr) {
// TODO(ctiller): It is currently necessary to shutdown endpoints
// before destroying them, even if we know that there are no
// pending read/write callbacks. This should be fixed, at which
// point this can be removed.
grpc_endpoint_shutdown(args->endpoint, GRPC_ERROR_REF(error));
grpc_endpoint_destroy(args->endpoint);
grpc_channel_args_destroy(args->args);
grpc_slice_buffer_destroy_internal(args->read_buffer);
gpr_free(args->read_buffer);
}
} else {
error = GRPC_ERROR_REF(error);
}
self->result_->Reset();
NullThenSchedClosure(DEBUG_LOCATION, &self->notify_, error);
} else if (args->endpoint != nullptr) {
self->result_->transport =
grpc_create_chttp2_transport(args->args, args->endpoint, true);
self->result_->socket_node =
grpc_chttp2_transport_get_socket_node(self->result_->transport);
self->result_->channel_args = args->args;
GPR_ASSERT(self->result_->transport != nullptr);
self->endpoint_ = args->endpoint;
self->Ref().release(); // Ref held by OnReceiveSettings()
GRPC_CLOSURE_INIT(&self->on_receive_settings_, OnReceiveSettings, self,
grpc_schedule_on_exec_ctx);
self->Ref().release(); // Ref held by OnTimeout()
grpc_chttp2_transport_start_reading(self->result_->transport,
args->read_buffer,
&self->on_receive_settings_, nullptr);
GRPC_CLOSURE_INIT(&self->on_timeout_, OnTimeout, self,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&self->timer_, self->args_.deadline, &self->on_timeout_);
} else {
// If the handshaking succeeded but there is no endpoint, then the
// handshaker may have handed off the connection to some external
// code. Just verify that exit_early flag is set.
GPR_DEBUG_ASSERT(args->exit_early);
NullThenSchedClosure(DEBUG_LOCATION, &self->notify_, error);
}
self->handshake_mgr_.reset();
}
self->Unref();
}
void Chttp2Connector::OnReceiveSettings(void* arg, grpc_error_handle error) {
Chttp2Connector* self = static_cast<Chttp2Connector*>(arg);
{
MutexLock lock(&self->mu_);
if (!self->notify_error_.has_value()) {
grpc_endpoint_delete_from_pollset_set(self->endpoint_,
self->args_.interested_parties);
if (error != GRPC_ERROR_NONE) {
// Transport got an error while waiting on SETTINGS frame.
// TODO(yashykt): The following two lines should be moved to
// SubchannelConnector::Result::Reset()
grpc_transport_destroy(self->result_->transport);
grpc_channel_args_destroy(self->result_->channel_args);
self->result_->Reset();
}
self->MaybeNotify(GRPC_ERROR_REF(error));
grpc_timer_cancel(&self->timer_);
} else {
// OnTimeout() was already invoked. Call Notify() again so that notify_
// can be invoked.
self->MaybeNotify(GRPC_ERROR_NONE);
}
}
self->Unref();
}
void Chttp2Connector::OnTimeout(void* arg, grpc_error_handle /*error*/) {
Chttp2Connector* self = static_cast<Chttp2Connector*>(arg);
{
MutexLock lock(&self->mu_);
if (!self->notify_error_.has_value()) {
// The transport did not receive the settings frame in time. Destroy the
// transport.
grpc_endpoint_delete_from_pollset_set(self->endpoint_,
self->args_.interested_parties);
// TODO(yashykt): The following two lines should be moved to
// SubchannelConnector::Result::Reset()
grpc_transport_destroy(self->result_->transport);
grpc_channel_args_destroy(self->result_->channel_args);
self->result_->Reset();
self->MaybeNotify(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"connection attempt timed out before receiving SETTINGS frame"));
} else {
// OnReceiveSettings() was already invoked. Call Notify() again so that
// notify_ can be invoked.
self->MaybeNotify(GRPC_ERROR_NONE);
}
}
self->Unref();
}
void Chttp2Connector::MaybeNotify(grpc_error_handle error) {
if (notify_error_.has_value()) {
GRPC_ERROR_UNREF(error);
NullThenSchedClosure(DEBUG_LOCATION, &notify_, notify_error_.value());
// Clear state for a new Connect().
// Clear out the endpoint_, since it is the responsibility of
// the transport to shut it down.
endpoint_ = nullptr;
notify_error_.reset();
} else {
notify_error_ = error;
}
}
namespace {
class Chttp2SecureClientChannelFactory : public ClientChannelFactory {
public:
RefCountedPtr<Subchannel> CreateSubchannel(
const grpc_resolved_address& address,
const grpc_channel_args* args) override {
grpc_channel_args* new_args = GetSecureNamingChannelArgs(args);
if (new_args == nullptr) {
gpr_log(GPR_ERROR,
"Failed to create channel args during subchannel creation.");
return nullptr;
}
RefCountedPtr<Subchannel> s = Subchannel::Create(
MakeOrphanable<Chttp2Connector>(), address, new_args);
grpc_channel_args_destroy(new_args);
return s;
}
private:
static grpc_channel_args* GetSecureNamingChannelArgs(
const grpc_channel_args* args) {
grpc_channel_credentials* channel_credentials =
grpc_channel_credentials_find_in_args(args);
if (channel_credentials == nullptr) {
gpr_log(GPR_ERROR,
"Can't create subchannel: channel credentials missing for secure "
"channel.");
return nullptr;
}
// Make sure security connector does not already exist in args.
if (grpc_security_connector_find_in_args(args) != nullptr) {
gpr_log(GPR_ERROR,
"Can't create subchannel: security connector already present in "
"channel args.");
return nullptr;
}
// Find the authority to use in the security connector.
const char* authority =
grpc_channel_args_find_string(args, GRPC_ARG_DEFAULT_AUTHORITY);
GPR_ASSERT(authority != nullptr);
// Create the security connector using the credentials and target name.
grpc_channel_args* new_args_from_connector = nullptr;
RefCountedPtr<grpc_channel_security_connector>
subchannel_security_connector =
channel_credentials->create_security_connector(
/*call_creds=*/nullptr, authority, args,
&new_args_from_connector);
if (subchannel_security_connector == nullptr) {
gpr_log(GPR_ERROR,
"Failed to create secure subchannel for secure name '%s'",
authority);
return nullptr;
}
grpc_arg new_security_connector_arg =
grpc_security_connector_to_arg(subchannel_security_connector.get());
grpc_channel_args* new_args = grpc_channel_args_copy_and_add(
new_args_from_connector != nullptr ? new_args_from_connector : args,
&new_security_connector_arg, 1);
subchannel_security_connector.reset(DEBUG_LOCATION, "lb_channel_create");
grpc_channel_args_destroy(new_args_from_connector);
return new_args;
}
};
grpc_channel* CreateChannel(const char* target, const grpc_channel_args* args,
grpc_error_handle* error) {
if (target == nullptr) {
gpr_log(GPR_ERROR, "cannot create channel with NULL target name");
if (error != nullptr) {
*error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("channel target is NULL");
}
return nullptr;
}
// Add channel arg containing the server URI.
std::string canonical_target =
CoreConfiguration::Get().resolver_registry().AddDefaultPrefixIfNeeded(
target);
grpc_arg arg = grpc_channel_arg_string_create(
const_cast<char*>(GRPC_ARG_SERVER_URI),
const_cast<char*>(canonical_target.c_str()));
const char* to_remove[] = {GRPC_ARG_SERVER_URI};
grpc_channel_args* new_args =
grpc_channel_args_copy_and_add_and_remove(args, to_remove, 1, &arg, 1);
grpc_channel* channel = grpc_channel_create_internal(
target, new_args, GRPC_CLIENT_CHANNEL, nullptr, error);
grpc_channel_args_destroy(new_args);
return channel;
}
} // namespace
} // namespace grpc_core
namespace {
grpc_core::Chttp2SecureClientChannelFactory* g_factory;
gpr_once g_factory_once = GPR_ONCE_INIT;
void FactoryInit() {
g_factory = new grpc_core::Chttp2SecureClientChannelFactory();
}
} // namespace
// Create a secure client channel:
// Asynchronously: - resolve target
// - connect to it (trying alternatives as presented)
// - perform handshakes
grpc_channel* grpc_channel_create(const char* target,
grpc_channel_credentials* creds,
const grpc_channel_args* args) {
grpc_core::ExecCtx exec_ctx;
GRPC_API_TRACE("grpc_secure_channel_create(target=%s, creds=%p, args=%p)", 3,
(target, (void*)creds, (void*)args));
args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(args);
grpc_channel* channel = nullptr;
grpc_error_handle error = GRPC_ERROR_NONE;
if (creds != nullptr) {
// Add channel args containing the client channel factory and channel
// credentials.
gpr_once_init(&g_factory_once, FactoryInit);
grpc_arg channel_factory_arg =
grpc_core::ClientChannelFactory::CreateChannelArg(g_factory);
grpc_arg args_to_add[] = {channel_factory_arg,
grpc_channel_credentials_to_arg(creds)};
const char* arg_to_remove = channel_factory_arg.key;
grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
args, &arg_to_remove, 1, args_to_add, GPR_ARRAY_SIZE(args_to_add));
new_args = creds->update_arguments(new_args);
// Create channel.
channel = grpc_core::CreateChannel(target, new_args, &error);
// Clean up.
grpc_channel_args_destroy(new_args);
}
grpc_channel_args_destroy(args);
if (channel == nullptr) {
intptr_t integer;
grpc_status_code status = GRPC_STATUS_INTERNAL;
if (grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &integer)) {
status = static_cast<grpc_status_code>(integer);
}
GRPC_ERROR_UNREF(error);
channel = grpc_lame_client_channel_create(
target, status, "Failed to create secure client channel");
}
return channel;
}
#ifdef GPR_SUPPORT_CHANNELS_FROM_FD
grpc_channel* grpc_channel_create_from_fd(const char* target, int fd,
grpc_channel_credentials* creds,
const grpc_channel_args* args) {
grpc_core::ExecCtx exec_ctx;
GRPC_API_TRACE(
"grpc_channel_create_from_fd(target=%p, fd=%d, creds=%p, args=%p)", 4,
(target, fd, creds, args));
// For now, we only support insecure channel credentials.
if (creds == nullptr ||
strcmp(creds->type(), GRPC_CREDENTIALS_TYPE_INSECURE) != 0) {
return grpc_lame_client_channel_create(
target, GRPC_STATUS_INTERNAL,
"Failed to create client channel due to invalid creds");
}
grpc_arg default_authority_arg = grpc_channel_arg_string_create(
const_cast<char*>(GRPC_ARG_DEFAULT_AUTHORITY),
const_cast<char*>("test.authority"));
args = grpc_channel_args_copy_and_add(args, &default_authority_arg, 1);
const grpc_channel_args* final_args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(args);
grpc_channel_args_destroy(args);
int flags = fcntl(fd, F_GETFL, 0);
GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0);
grpc_endpoint* client = grpc_tcp_client_create_from_fd(
grpc_fd_create(fd, "client", true), final_args, "fd-client");
grpc_transport* transport =
grpc_create_chttp2_transport(final_args, client, true);
GPR_ASSERT(transport);
grpc_error_handle error = GRPC_ERROR_NONE;
grpc_channel* channel = grpc_channel_create_internal(
target, final_args, GRPC_CLIENT_DIRECT_CHANNEL, transport, &error);
grpc_channel_args_destroy(final_args);
if (channel != nullptr) {
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
grpc_core::ExecCtx::Get()->Flush();
} else {
intptr_t integer;
grpc_status_code status = GRPC_STATUS_INTERNAL;
if (grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &integer)) {
status = static_cast<grpc_status_code>(integer);
}
GRPC_ERROR_UNREF(error);
grpc_transport_destroy(transport);
channel = grpc_lame_client_channel_create(
target, status, "Failed to create client channel");
}
return channel;
}
#else // !GPR_SUPPORT_CHANNELS_FROM_FD
grpc_channel* grpc_channel_create_from_fd(const char* /* target */,
int /* fd */,
grpc_channel_credentials* /* creds*/,
const grpc_channel_args* /* args */) {
GPR_ASSERT(0);
return nullptr;
}
#endif // GPR_SUPPORT_CHANNELS_FROM_FD