blob: 45dd0e7d3b9f4072f71dcdf03c2f6b601de40e71 [file] [log] [blame]
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/support/port_platform.h>
#include "src/core/lib/http/httpcli.h"
#include <string.h>
#include <string>
#include "absl/functional/bind_front.h"
#include "absl/strings/str_format.h"
#include "absl/strings/string_view.h"
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/http/format_request.h"
#include "src/core/lib/http/parser.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/tcp_client.h"
#include "src/core/lib/resource_quota/api.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/transport/error_utils.h"
namespace grpc_core {
namespace {
grpc_httpcli_get_override g_get_override;
grpc_httpcli_post_override g_post_override;
} // namespace
OrphanablePtr<HttpRequest> HttpRequest::Get(
URI uri, const grpc_channel_args* channel_args,
grpc_polling_entity* pollent, const grpc_http_request* request,
Timestamp deadline, grpc_closure* on_done, grpc_http_response* response,
RefCountedPtr<grpc_channel_credentials> channel_creds) {
absl::optional<std::function<void()>> test_only_generate_response;
if (g_get_override != nullptr) {
test_only_generate_response = [request, uri, deadline, on_done,
response]() {
// Note that capturing request here assumes it will remain alive
// until after Start is called. This avoids making a copy as this
// code path is only used for test mocks.
g_get_override(request, uri.authority().c_str(), uri.path().c_str(),
deadline, on_done, response);
};
}
std::string name =
absl::StrFormat("HTTP:GET:%s:%s", uri.authority(), uri.path());
const grpc_slice request_text = grpc_httpcli_format_get_request(
request, uri.authority().c_str(), uri.path().c_str());
return MakeOrphanable<HttpRequest>(
std::move(uri), request_text, response, deadline, channel_args, on_done,
pollent, name.c_str(), std::move(test_only_generate_response),
std::move(channel_creds));
}
OrphanablePtr<HttpRequest> HttpRequest::Post(
URI uri, const grpc_channel_args* channel_args,
grpc_polling_entity* pollent, const grpc_http_request* request,
Timestamp deadline, grpc_closure* on_done, grpc_http_response* response,
RefCountedPtr<grpc_channel_credentials> channel_creds) {
absl::optional<std::function<void()>> test_only_generate_response;
if (g_post_override != nullptr) {
test_only_generate_response = [request, uri, deadline, on_done,
response]() {
g_post_override(request, uri.authority().c_str(), uri.path().c_str(),
request->body, request->body_length, deadline, on_done,
response);
};
}
std::string name =
absl::StrFormat("HTTP:POST:%s:%s", uri.authority(), uri.path());
const grpc_slice request_text = grpc_httpcli_format_post_request(
request, uri.authority().c_str(), uri.path().c_str());
return MakeOrphanable<HttpRequest>(
std::move(uri), request_text, response, deadline, channel_args, on_done,
pollent, name.c_str(), std::move(test_only_generate_response),
std::move(channel_creds));
}
void HttpRequest::SetOverride(grpc_httpcli_get_override get,
grpc_httpcli_post_override post) {
g_get_override = get;
g_post_override = post;
}
HttpRequest::HttpRequest(
URI uri, const grpc_slice& request_text, grpc_http_response* response,
Timestamp deadline, const grpc_channel_args* channel_args,
grpc_closure* on_done, grpc_polling_entity* pollent, const char* name,
absl::optional<std::function<void()>> test_only_generate_response,
RefCountedPtr<grpc_channel_credentials> channel_creds)
: uri_(std::move(uri)),
request_text_(request_text),
deadline_(deadline),
channel_args_(CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(channel_args)),
channel_creds_(std::move(channel_creds)),
on_done_(on_done),
resource_quota_(ResourceQuotaFromChannelArgs(channel_args_)),
pollent_(pollent),
pollset_set_(grpc_pollset_set_create()),
test_only_generate_response_(std::move(test_only_generate_response)) {
grpc_http_parser_init(&parser_, GRPC_HTTP_RESPONSE, response);
grpc_slice_buffer_init(&incoming_);
grpc_slice_buffer_init(&outgoing_);
grpc_iomgr_register_object(&iomgr_obj_, name);
GRPC_CLOSURE_INIT(&on_read_, OnRead, this, grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&continue_on_read_after_schedule_on_exec_ctx_,
ContinueOnReadAfterScheduleOnExecCtx, this,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&done_write_, DoneWrite, this, grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&continue_done_write_after_schedule_on_exec_ctx_,
ContinueDoneWriteAfterScheduleOnExecCtx, this,
grpc_schedule_on_exec_ctx);
GPR_ASSERT(pollent);
grpc_polling_entity_add_to_pollset_set(pollent, pollset_set_);
// Create the DNS resolver. We'll start resolving when Start is called.
dns_request_ = GetDNSResolver()->ResolveName(
uri_.authority(), uri_.scheme(), pollset_set_,
absl::bind_front(&HttpRequest::OnResolved, this));
}
HttpRequest::~HttpRequest() {
grpc_channel_args_destroy(channel_args_);
grpc_http_parser_destroy(&parser_);
if (own_endpoint_ && ep_ != nullptr) {
grpc_endpoint_destroy(ep_);
}
grpc_slice_unref_internal(request_text_);
grpc_iomgr_unregister_object(&iomgr_obj_);
grpc_slice_buffer_destroy_internal(&incoming_);
grpc_slice_buffer_destroy_internal(&outgoing_);
GRPC_ERROR_UNREF(overall_error_);
grpc_pollset_set_destroy(pollset_set_);
}
void HttpRequest::Start() {
MutexLock lock(&mu_);
if (test_only_generate_response_.has_value()) {
test_only_generate_response_.value()();
return;
}
Ref().release(); // ref held by pending DNS resolution
dns_request_->Start();
}
void HttpRequest::Orphan() {
{
MutexLock lock(&mu_);
GPR_ASSERT(!cancelled_);
cancelled_ = true;
dns_request_.reset(); // cancel potentially pending DNS resolution
if (connecting_) {
// gRPC's TCP connection establishment API doesn't currently have
// a mechanism for cancellation. So invoke the user callback now. The TCP
// connection will eventually complete (at least within its deadline), and
// we'll simply unref ourselves at that point.
// TODO(apolcyn): fix this to cancel the TCP connection attempt when
// an API to do so exists.
Finish(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"HTTP request cancelled during TCP connection establishment",
&overall_error_, 1));
}
if (handshake_mgr_ != nullptr) {
handshake_mgr_->Shutdown(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"HTTP request cancelled during security handshake"));
}
if (own_endpoint_ && ep_ != nullptr) {
grpc_endpoint_shutdown(
ep_, GRPC_ERROR_CREATE_FROM_STATIC_STRING("HTTP request cancelled"));
}
}
Unref();
}
void HttpRequest::AppendError(grpc_error_handle error) {
if (overall_error_ == GRPC_ERROR_NONE) {
overall_error_ =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Failed HTTP/1 client request");
}
const grpc_resolved_address* addr = &addresses_[next_address_ - 1];
std::string addr_text = grpc_sockaddr_to_uri(addr);
overall_error_ = grpc_error_add_child(
overall_error_,
grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS, addr_text));
}
void HttpRequest::OnReadInternal(grpc_error_handle error) {
for (size_t i = 0; i < incoming_.count; i++) {
if (GRPC_SLICE_LENGTH(incoming_.slices[i])) {
have_read_byte_ = 1;
grpc_error_handle err =
grpc_http_parser_parse(&parser_, incoming_.slices[i], nullptr);
if (err != GRPC_ERROR_NONE) {
Finish(err);
return;
}
}
}
if (cancelled_) {
Finish(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"HTTP1 request cancelled during read", &overall_error_, 1));
} else if (error == GRPC_ERROR_NONE) {
DoRead();
} else if (!have_read_byte_) {
NextAddress(GRPC_ERROR_REF(error));
} else {
Finish(grpc_http_parser_eof(&parser_));
}
}
void HttpRequest::ContinueDoneWriteAfterScheduleOnExecCtx(
void* arg, grpc_error_handle error) {
RefCountedPtr<HttpRequest> req(static_cast<HttpRequest*>(arg));
MutexLock lock(&req->mu_);
if (error == GRPC_ERROR_NONE && !req->cancelled_) {
req->OnWritten();
} else {
req->NextAddress(GRPC_ERROR_REF(error));
}
}
void HttpRequest::StartWrite() {
grpc_slice_ref_internal(request_text_);
grpc_slice_buffer_add(&outgoing_, request_text_);
Ref().release(); // ref held by pending write
grpc_endpoint_write(ep_, &outgoing_, &done_write_, nullptr);
}
void HttpRequest::OnHandshakeDone(void* arg, grpc_error_handle error) {
auto* args = static_cast<HandshakerArgs*>(arg);
RefCountedPtr<HttpRequest> req(static_cast<HttpRequest*>(args->user_data));
MutexLock lock(&req->mu_);
req->own_endpoint_ = true;
if (error != GRPC_ERROR_NONE || req->cancelled_) {
gpr_log(GPR_ERROR, "Secure transport setup failed: %s",
grpc_error_std_string(error).c_str());
req->NextAddress(GRPC_ERROR_REF(error));
return;
}
grpc_channel_args_destroy(args->args);
grpc_slice_buffer_destroy_internal(args->read_buffer);
gpr_free(args->read_buffer);
req->ep_ = args->endpoint;
req->StartWrite();
}
void HttpRequest::OnConnected(void* arg, grpc_error_handle error) {
RefCountedPtr<HttpRequest> req(static_cast<HttpRequest*>(arg));
MutexLock lock(&req->mu_);
req->connecting_ = false;
req->own_endpoint_ = true;
if (req->cancelled_) {
// since we were cancelled while connecting, Finish has already
// been called.
return;
}
if (!req->ep_) {
req->NextAddress(GRPC_ERROR_REF(error));
return;
}
// TODO(yihuaz): treating nullptr channel_creds_ as insecure is
// a hack used to support the port server client (a test utility) in
// unsecure builds (when no definition of grpc_insecure_credentials_create
// exists). We can remove this hack and unconditionally assume a valid
// channel_creds_ object after unsecure builds are deleted, in
// https://github.com/grpc/grpc/pull/25586.
if (req->channel_creds_ == nullptr) {
gpr_log(GPR_DEBUG,
"HTTP request skipping handshake because creds are null");
req->StartWrite();
return;
}
// Create the security connector using the credentials and target name.
grpc_channel_args* new_args_from_connector = nullptr;
RefCountedPtr<grpc_channel_security_connector> sc =
req->channel_creds_->create_security_connector(
nullptr /*call_creds*/, req->uri_.authority().c_str(),
req->channel_args_, &new_args_from_connector);
if (sc == nullptr) {
req->Finish(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"failed to create security connector", &req->overall_error_, 1));
return;
}
grpc_arg security_connector_arg = grpc_security_connector_to_arg(sc.get());
grpc_channel_args* new_args = grpc_channel_args_copy_and_add(
new_args_from_connector != nullptr ? new_args_from_connector
: req->channel_args_,
&security_connector_arg, 1);
grpc_channel_args_destroy(new_args_from_connector);
// Start the handshake
req->handshake_mgr_ = MakeRefCounted<HandshakeManager>();
CoreConfiguration::Get().handshaker_registry().AddHandshakers(
HANDSHAKER_CLIENT, new_args, req->pollset_set_,
req->handshake_mgr_.get());
req->Ref().release(); // ref held by pending handshake
grpc_endpoint* ep = req->ep_;
req->ep_ = nullptr;
req->own_endpoint_ = false;
req->handshake_mgr_->DoHandshake(ep, new_args, req->deadline_,
/*acceptor=*/nullptr, OnHandshakeDone,
/*user_data=*/req.get());
sc.reset(DEBUG_LOCATION, "httpcli");
grpc_channel_args_destroy(new_args);
}
void HttpRequest::NextAddress(grpc_error_handle error) {
if (error != GRPC_ERROR_NONE) {
AppendError(error);
}
if (cancelled_) {
Finish(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"HTTP request was cancelled", &overall_error_, 1));
return;
}
if (next_address_ == addresses_.size()) {
Finish(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Failed HTTP requests to all targets", &overall_error_, 1));
return;
}
const grpc_resolved_address* addr = &addresses_[next_address_++];
GRPC_CLOSURE_INIT(&connected_, OnConnected, this, grpc_schedule_on_exec_ctx);
connecting_ = true;
own_endpoint_ = false;
Ref().release(); // ref held by pending connect
grpc_tcp_client_connect(&connected_, &ep_, pollset_set_, channel_args_, addr,
deadline_);
}
void HttpRequest::OnResolved(
absl::StatusOr<std::vector<grpc_resolved_address>> addresses_or) {
RefCountedPtr<HttpRequest> unreffer(this);
MutexLock lock(&mu_);
dns_request_.reset();
if (!addresses_or.ok()) {
Finish(absl_status_to_grpc_error(addresses_or.status()));
return;
}
if (cancelled_) {
Finish(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"cancelled during DNS resolution"));
return;
}
addresses_ = std::move(*addresses_or);
next_address_ = 0;
NextAddress(GRPC_ERROR_NONE);
}
} // namespace grpc_core