blob: 468a291627505e4b84ceac8eca4ab1db9d77d069 [file] [log] [blame]
/*
*
* Copyright 2016 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <stdint.h>
#include <string.h>
#include <algorithm>
#include <atomic>
#include <functional>
#include <initializer_list>
#include <memory>
#include <new>
#include <string>
#include <utility>
#include <vector>
#include "absl/memory/memory.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include <grpc/byte_buffer.h>
#include <grpc/event_engine/endpoint_config.h>
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
#include <grpc/slice.h>
#include <grpc/status.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h"
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/lib/address_utils/parse_address.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/env.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/resolved_address.h"
#include "src/core/lib/iomgr/tcp_client.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/iomgr/timer_manager.h"
#include "src/core/lib/resolver/server_address.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/server.h"
#include "src/core/lib/transport/transport_fwd.h"
#include "src/libfuzzer/libfuzzer_macro.h"
#include "test/core/end2end/data/ssl_test_data.h"
#include "test/core/end2end/fuzzers/api_fuzzer.pb.h"
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h"
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h"
#include "test/core/util/passthru_endpoint.h"
// IWYU pragma: no_include <google/protobuf/repeated_ptr_field.h>
// Applicable when simulating channel actions. Prevents overflows.
static constexpr uint64_t kMaxWaitMs = grpc_core::Duration::Hours(1).millis();
// Applicable when simulating channel actions. Prevents overflows.
static constexpr uint64_t kMaxAddNReadableBytes = (2 * 1024 * 1024); // 2GB
// Applicable when simulating channel actions. Prevents overflows.
static constexpr uint64_t kMaxAddNWritableBytes = (2 * 1024 * 1024); // 2GB
////////////////////////////////////////////////////////////////////////////////
// logging
bool squelch = true;
bool leak_check = true;
static void dont_log(gpr_log_func_args* /*args*/) {}
////////////////////////////////////////////////////////////////////////////////
// global state
static grpc_server* g_server;
static grpc_channel* g_channel;
static grpc_resource_quota* g_resource_quota;
static std::vector<grpc_passthru_endpoint_channel_action> g_channel_actions;
static std::atomic<bool> g_channel_force_delete{false};
////////////////////////////////////////////////////////////////////////////////
// dns resolution
typedef struct addr_req {
grpc_timer timer;
char* addr;
grpc_closure* on_done;
std::unique_ptr<grpc_core::ServerAddressList>* addresses;
} addr_req;
static void finish_resolve(void* arg, grpc_error_handle error) {
addr_req* r = static_cast<addr_req*>(arg);
if (GRPC_ERROR_IS_NONE(error) && 0 == strcmp(r->addr, "server")) {
*r->addresses = absl::make_unique<grpc_core::ServerAddressList>();
grpc_resolved_address fake_resolved_address;
GPR_ASSERT(
grpc_parse_ipv4_hostport("1.2.3.4:5", &fake_resolved_address, false));
(*r->addresses)
->emplace_back(fake_resolved_address, grpc_core::ChannelArgs());
grpc_core::ExecCtx::Run(DEBUG_LOCATION, r->on_done, GRPC_ERROR_NONE);
} else {
grpc_core::ExecCtx::Run(DEBUG_LOCATION, r->on_done,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Resolution failed", &error, 1));
}
gpr_free(r->addr);
delete r;
}
namespace {
using ::grpc_event_engine::experimental::GetDefaultEventEngine;
class FuzzerDNSResolver : public grpc_core::DNSResolver {
public:
class FuzzerDNSRequest {
public:
FuzzerDNSRequest(
absl::string_view name,
std::function<void(absl::StatusOr<std::vector<grpc_resolved_address>>)>
on_done)
: name_(std::string(name)), on_done_(std::move(on_done)) {
grpc_timer_init(
&timer_,
grpc_core::Duration::Seconds(1) + grpc_core::ExecCtx::Get()->Now(),
GRPC_CLOSURE_CREATE(FinishResolve, this, grpc_schedule_on_exec_ctx));
}
private:
static void FinishResolve(void* arg, grpc_error_handle error) {
FuzzerDNSRequest* self = static_cast<FuzzerDNSRequest*>(arg);
if (GRPC_ERROR_IS_NONE(error) && self->name_ == "server") {
std::vector<grpc_resolved_address> addrs;
grpc_resolved_address addr;
addr.len = 0;
addrs.push_back(addr);
self->on_done_(std::move(addrs));
} else {
self->on_done_(absl::UnknownError("Resolution failed"));
}
delete self;
}
const std::string name_;
const std::function<void(
absl::StatusOr<std::vector<grpc_resolved_address>>)>
on_done_;
grpc_timer timer_;
};
// Gets the singleton instance, possibly creating it first
static FuzzerDNSResolver* GetOrCreate() {
static FuzzerDNSResolver* instance = new FuzzerDNSResolver();
return instance;
}
TaskHandle LookupHostname(
std::function<void(absl::StatusOr<std::vector<grpc_resolved_address>>)>
on_resolved,
absl::string_view name, absl::string_view /* default_port */,
grpc_core::Duration /* timeout */,
grpc_pollset_set* /* interested_parties */,
absl::string_view /* name_server */) override {
new FuzzerDNSRequest(name, std::move(on_resolved));
return kNullHandle;
}
absl::StatusOr<std::vector<grpc_resolved_address>> LookupHostnameBlocking(
absl::string_view /* name */,
absl::string_view /* default_port */) override {
GPR_ASSERT(0);
}
TaskHandle LookupSRV(
std::function<void(absl::StatusOr<std::vector<grpc_resolved_address>>)>
on_resolved,
absl::string_view /* name */, grpc_core::Duration /* timeout */,
grpc_pollset_set* /* interested_parties */,
absl::string_view /* name_server */) override {
GetDefaultEventEngine()->Run([on_resolved] {
on_resolved(absl::UnimplementedError(
"The Fuzzing DNS resolver does not support looking up SRV records"));
});
return {-1, -1};
};
TaskHandle LookupTXT(
std::function<void(absl::StatusOr<std::string>)> on_resolved,
absl::string_view /* name */, grpc_core::Duration /* timeout */,
grpc_pollset_set* /* interested_parties */,
absl::string_view /* name_server */) override {
// Not supported
GetDefaultEventEngine()->Run([on_resolved] {
on_resolved(absl::UnimplementedError(
"The Fuzing DNS resolver does not support looking up TXT records"));
});
return {-1, -1};
};
// FuzzerDNSResolver does not support cancellation.
bool Cancel(TaskHandle /*handle*/) override { return false; }
};
} // namespace
grpc_ares_request* my_dns_lookup_ares(
const char* /*dns_server*/, const char* addr, const char* /*default_port*/,
grpc_pollset_set* /*interested_parties*/, grpc_closure* on_done,
std::unique_ptr<grpc_core::ServerAddressList>* addresses,
int /*query_timeout*/) {
addr_req* r = new addr_req();
r->addr = gpr_strdup(addr);
r->on_done = on_done;
r->addresses = addresses;
grpc_timer_init(
&r->timer,
grpc_core::Duration::Seconds(1) + grpc_core::ExecCtx::Get()->Now(),
GRPC_CLOSURE_CREATE(finish_resolve, r, grpc_schedule_on_exec_ctx));
return nullptr;
}
static void my_cancel_ares_request(grpc_ares_request* request) {
GPR_ASSERT(request == nullptr);
}
////////////////////////////////////////////////////////////////////////////////
// client connection
static void sched_connect(grpc_closure* closure, grpc_endpoint** ep,
gpr_timespec deadline);
typedef struct {
grpc_timer timer;
grpc_closure* closure;
grpc_endpoint** ep;
gpr_timespec deadline;
} future_connect;
static void do_connect(void* arg, grpc_error_handle error) {
future_connect* fc = static_cast<future_connect*>(arg);
if (!GRPC_ERROR_IS_NONE(error)) {
*fc->ep = nullptr;
grpc_core::ExecCtx::Run(DEBUG_LOCATION, fc->closure, GRPC_ERROR_REF(error));
} else if (g_server != nullptr) {
grpc_endpoint* client;
grpc_endpoint* server;
grpc_passthru_endpoint_create(&client, &server, nullptr, true);
*fc->ep = client;
start_scheduling_grpc_passthru_endpoint_channel_effects(client,
g_channel_actions);
grpc_core::Server* core_server = grpc_core::Server::FromC(g_server);
grpc_transport* transport = grpc_create_chttp2_transport(
core_server->channel_args(), server, false);
GPR_ASSERT(GRPC_LOG_IF_ERROR(
"SetupTransport",
core_server->SetupTransport(transport, nullptr,
core_server->channel_args(), nullptr)));
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, fc->closure, GRPC_ERROR_NONE);
} else {
sched_connect(fc->closure, fc->ep, fc->deadline);
}
gpr_free(fc);
}
static void sched_connect(grpc_closure* closure, grpc_endpoint** ep,
gpr_timespec deadline) {
if (gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) < 0) {
*ep = nullptr;
grpc_core::ExecCtx::Run(
DEBUG_LOCATION, closure,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Connect deadline exceeded"));
return;
}
future_connect* fc = static_cast<future_connect*>(gpr_malloc(sizeof(*fc)));
fc->closure = closure;
fc->ep = ep;
fc->deadline = deadline;
grpc_timer_init(
&fc->timer,
grpc_core::Duration::Seconds(1) + grpc_core::ExecCtx::Get()->Now(),
GRPC_CLOSURE_CREATE(do_connect, fc, grpc_schedule_on_exec_ctx));
}
static int64_t my_tcp_client_connect(
grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* /*interested_parties*/,
const grpc_event_engine::experimental::EndpointConfig& /*config*/,
const grpc_resolved_address* /*addr*/, grpc_core::Timestamp deadline) {
sched_connect(closure, ep, deadline.as_timespec(GPR_CLOCK_MONOTONIC));
return 0;
}
static bool my_tcp_cancel_connect(int64_t /*connection_handle*/) {
return false;
}
grpc_tcp_client_vtable fuzz_tcp_client_vtable = {my_tcp_client_connect,
my_tcp_cancel_connect};
////////////////////////////////////////////////////////////////////////////////
// test driver
class Validator {
public:
explicit Validator(std::function<void(bool)> impl) : impl_(std::move(impl)) {}
virtual ~Validator() {}
void Run(bool success) {
impl_(success);
delete this;
}
private:
std::function<void(bool)> impl_;
};
Validator* MakeValidator(std::function<void(bool)> impl) {
return new Validator(std::move(impl));
}
static Validator* AssertSuccessAndDecrement(int* counter) {
return MakeValidator([counter](bool success) {
GPR_ASSERT(success);
--*counter;
});
}
static Validator* Decrement(int* counter) {
return MakeValidator([counter](bool) { --*counter; });
}
static Validator* ValidateConnectivityWatch(gpr_timespec deadline,
int* counter) {
return MakeValidator([deadline, counter](bool success) {
if (!success) {
auto now = gpr_now(deadline.clock_type);
GPR_ASSERT(gpr_time_cmp(now, deadline) >= 0);
}
--*counter;
});
}
static void free_non_null(void* p) {
GPR_ASSERT(p != nullptr);
gpr_free(p);
}
enum class CallType { CLIENT, SERVER, PENDING_SERVER, TOMBSTONED };
class Call : public std::enable_shared_from_this<Call> {
public:
explicit Call(CallType type) : type_(type) {
grpc_metadata_array_init(&recv_initial_metadata_);
grpc_metadata_array_init(&recv_trailing_metadata_);
grpc_call_details_init(&call_details_);
}
~Call();
CallType type() const { return type_; }
bool done() const {
if ((type_ == CallType::TOMBSTONED || call_closed_) && pending_ops_ == 0) {
return true;
}
if (call_ == nullptr && type() != CallType::PENDING_SERVER) return true;
return false;
}
void Shutdown() {
if (call_ != nullptr) {
grpc_call_cancel(call_, nullptr);
type_ = CallType::TOMBSTONED;
}
}
void SetCall(grpc_call* call) {
GPR_ASSERT(call_ == nullptr);
call_ = call;
}
grpc_call* call() const { return call_; }
void RequestCall(grpc_server* server, grpc_completion_queue* cq) {
auto* v = FinishedRequestCall();
grpc_call_error error = grpc_server_request_call(
server, &call_, &call_details_, &recv_initial_metadata_, cq, cq, v);
if (error != GRPC_CALL_OK) {
v->Run(false);
}
}
void* Allocate(size_t size) {
void* p = gpr_malloc(size);
free_pointers_.push_back(p);
return p;
}
template <typename T>
T* AllocArray(size_t elems) {
return static_cast<T*>(Allocate(sizeof(T) * elems));
}
template <typename T>
T* NewCopy(T value) {
T* p = AllocArray<T>(1);
new (p) T(value);
return p;
}
template <typename T>
grpc_slice ReadSlice(const T& s) {
grpc_slice slice = grpc_slice_from_cpp_string(s.value());
unref_slices_.push_back(slice);
return slice;
}
template <typename M>
grpc_metadata_array ReadMetadata(const M& metadata) {
grpc_metadata* m = AllocArray<grpc_metadata>(metadata.size());
for (int i = 0; i < metadata.size(); ++i) {
m[i].key = ReadSlice(metadata[i].key());
m[i].value = ReadSlice(metadata[i].value());
}
return grpc_metadata_array{static_cast<size_t>(metadata.size()),
static_cast<size_t>(metadata.size()), m};
}
absl::optional<grpc_op> ReadOp(
const api_fuzzer::BatchOp& batch_op, bool* batch_is_ok,
uint8_t* batch_ops, std::vector<std::function<void()>>* unwinders) {
grpc_op op;
memset(&op, 0, sizeof(op));
switch (batch_op.op_case()) {
case api_fuzzer::BatchOp::OP_NOT_SET:
/* invalid value */
return {};
case api_fuzzer::BatchOp::kSendInitialMetadata:
if (sent_initial_metadata_) {
*batch_is_ok = false;
} else {
sent_initial_metadata_ = true;
op.op = GRPC_OP_SEND_INITIAL_METADATA;
*batch_ops |= 1 << GRPC_OP_SEND_INITIAL_METADATA;
auto ary = ReadMetadata(batch_op.send_initial_metadata().metadata());
op.data.send_initial_metadata.count = ary.count;
op.data.send_initial_metadata.metadata = ary.metadata;
}
break;
case api_fuzzer::BatchOp::kSendMessage:
op.op = GRPC_OP_SEND_MESSAGE;
if (send_message_ != nullptr) {
*batch_is_ok = false;
} else {
*batch_ops |= 1 << GRPC_OP_SEND_MESSAGE;
std::vector<grpc_slice> slices;
for (const auto& m : batch_op.send_message().message()) {
slices.push_back(ReadSlice(m));
}
send_message_ = op.data.send_message.send_message =
grpc_raw_byte_buffer_create(slices.data(), slices.size());
unwinders->push_back([this]() {
grpc_byte_buffer_destroy(send_message_);
send_message_ = nullptr;
});
}
break;
case api_fuzzer::BatchOp::kSendCloseFromClient:
op.op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
*batch_ops |= 1 << GRPC_OP_SEND_CLOSE_FROM_CLIENT;
break;
case api_fuzzer::BatchOp::kSendStatusFromServer: {
op.op = GRPC_OP_SEND_STATUS_FROM_SERVER;
*batch_ops |= 1 << GRPC_OP_SEND_STATUS_FROM_SERVER;
auto ary = ReadMetadata(batch_op.send_status_from_server().metadata());
op.data.send_status_from_server.trailing_metadata_count = ary.count;
op.data.send_status_from_server.trailing_metadata = ary.metadata;
op.data.send_status_from_server.status = static_cast<grpc_status_code>(
batch_op.send_status_from_server().status_code());
op.data.send_status_from_server.status_details =
batch_op.send_status_from_server().has_status_details()
? NewCopy(ReadSlice(
batch_op.send_status_from_server().status_details()))
: nullptr;
} break;
case api_fuzzer::BatchOp::kReceiveInitialMetadata:
if (enqueued_recv_initial_metadata_) {
*batch_is_ok = false;
} else {
enqueued_recv_initial_metadata_ = true;
op.op = GRPC_OP_RECV_INITIAL_METADATA;
*batch_ops |= 1 << GRPC_OP_RECV_INITIAL_METADATA;
op.data.recv_initial_metadata.recv_initial_metadata =
&recv_initial_metadata_;
}
break;
case api_fuzzer::BatchOp::kReceiveMessage:
// Allow only one active pending_recv_message_op to exist. Otherwise if
// the previous enqueued recv_message_op is not complete by the time
// we get here, then under certain conditions, enqueing this op will
// over-write the internal call->receiving_buffer maintained by grpc
// leading to a memory leak.
if (call_closed_ || pending_recv_message_op_) {
*batch_is_ok = false;
} else {
op.op = GRPC_OP_RECV_MESSAGE;
*batch_ops |= 1 << GRPC_OP_RECV_MESSAGE;
pending_recv_message_op_ = true;
op.data.recv_message.recv_message = &recv_message_;
unwinders->push_back([this]() { pending_recv_message_op_ = false; });
}
break;
case api_fuzzer::BatchOp::kReceiveStatusOnClient:
op.op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op.data.recv_status_on_client.status = &status_;
op.data.recv_status_on_client.trailing_metadata =
&recv_trailing_metadata_;
op.data.recv_status_on_client.status_details = &recv_status_details_;
*batch_ops |= 1 << GRPC_OP_RECV_STATUS_ON_CLIENT;
break;
case api_fuzzer::BatchOp::kReceiveCloseOnServer:
op.op = GRPC_OP_RECV_CLOSE_ON_SERVER;
*batch_ops |= 1 << GRPC_OP_RECV_CLOSE_ON_SERVER;
op.data.recv_close_on_server.cancelled = &cancelled_;
break;
}
op.reserved = nullptr;
op.flags = batch_op.flags();
return op;
}
Validator* FinishedBatchValidator(uint8_t has_ops) {
++pending_ops_;
auto self = shared_from_this();
return MakeValidator([self, has_ops](bool /*success*/) {
--self->pending_ops_;
if (has_ops & (1u << GRPC_OP_RECV_MESSAGE)) {
self->pending_recv_message_op_ = false;
if (self->recv_message_ != nullptr) {
grpc_byte_buffer_destroy(self->recv_message_);
self->recv_message_ = nullptr;
}
}
if ((has_ops & (1u << GRPC_OP_SEND_MESSAGE))) {
grpc_byte_buffer_destroy(self->send_message_);
self->send_message_ = nullptr;
}
if ((has_ops & (1u << GRPC_OP_RECV_STATUS_ON_CLIENT)) ||
(has_ops & (1u << GRPC_OP_RECV_CLOSE_ON_SERVER))) {
self->call_closed_ = true;
}
});
}
Validator* FinishedRequestCall() {
++pending_ops_;
auto self = shared_from_this();
return MakeValidator([self](bool success) {
GPR_ASSERT(self->pending_ops_ > 0);
--self->pending_ops_;
if (success) {
GPR_ASSERT(self->call_ != nullptr);
self->type_ = CallType::SERVER;
} else {
self->type_ = CallType::TOMBSTONED;
}
});
}
private:
CallType type_;
grpc_call* call_ = nullptr;
grpc_byte_buffer* recv_message_ = nullptr;
grpc_status_code status_;
grpc_metadata_array recv_initial_metadata_{0, 0, nullptr};
grpc_metadata_array recv_trailing_metadata_{0, 0, nullptr};
grpc_slice recv_status_details_ = grpc_empty_slice();
// set by receive close on server, unset here to trigger
// msan if misused
int cancelled_;
int pending_ops_ = 0;
bool sent_initial_metadata_ = false;
bool enqueued_recv_initial_metadata_ = false;
grpc_call_details call_details_{};
grpc_byte_buffer* send_message_ = nullptr;
bool call_closed_ = false;
bool pending_recv_message_op_ = false;
std::vector<void*> free_pointers_;
std::vector<grpc_slice> unref_slices_;
};
static std::vector<std::shared_ptr<Call>> g_calls;
static size_t g_active_call = 0;
static Call* ActiveCall() {
while (!g_calls.empty()) {
if (g_active_call >= g_calls.size()) {
g_active_call = 0;
}
if (g_calls[g_active_call] != nullptr && !g_calls[g_active_call]->done()) {
return g_calls[g_active_call].get();
}
g_calls.erase(g_calls.begin() + g_active_call);
}
return nullptr;
}
Call::~Call() {
if (call_ != nullptr) {
grpc_call_unref(call_);
}
grpc_slice_unref(recv_status_details_);
grpc_call_details_destroy(&call_details_);
for (auto* p : free_pointers_) {
gpr_free(p);
}
for (auto s : unref_slices_) {
grpc_slice_unref(s);
}
if (recv_message_ != nullptr) {
grpc_byte_buffer_destroy(recv_message_);
recv_message_ = nullptr;
}
grpc_metadata_array_destroy(&recv_initial_metadata_);
grpc_metadata_array_destroy(&recv_trailing_metadata_);
}
template <typename ChannelArgContainer>
grpc_channel_args* ReadArgs(const ChannelArgContainer& args) {
grpc_channel_args* res =
static_cast<grpc_channel_args*>(gpr_malloc(sizeof(grpc_channel_args)));
res->args =
static_cast<grpc_arg*>(gpr_malloc(sizeof(grpc_arg) * args.size()));
int j = 0;
for (int i = 0; i < args.size(); i++) {
switch (args[i].value_case()) {
case api_fuzzer::ChannelArg::kStr:
res->args[j].type = GRPC_ARG_STRING;
res->args[j].value.string = gpr_strdup(args[i].str().c_str());
break;
case api_fuzzer::ChannelArg::kI:
res->args[j].type = GRPC_ARG_INTEGER;
res->args[j].value.integer = args[i].i();
break;
case api_fuzzer::ChannelArg::kResourceQuota:
if (args[i].key() != GRPC_ARG_RESOURCE_QUOTA) continue;
grpc_resource_quota_ref(g_resource_quota);
res->args[j].type = GRPC_ARG_POINTER;
res->args[j].value.pointer.p = g_resource_quota;
res->args[j].value.pointer.vtable = grpc_resource_quota_arg_vtable();
break;
case api_fuzzer::ChannelArg::VALUE_NOT_SET:
res->args[j].type = GRPC_ARG_INTEGER;
res->args[j].value.integer = 0;
break;
}
res->args[j].key = gpr_strdup(args[i].key().c_str());
++j;
}
res->num_args = j;
return res;
}
static const char* ReadCredArtifact(
const api_fuzzer::CredArtifact& artifact,
std::initializer_list<const char*> builtins) {
switch (artifact.type_case()) {
case api_fuzzer::CredArtifact::kCustom:
return artifact.custom().c_str();
case api_fuzzer::CredArtifact::kBuiltin:
if (artifact.builtin() < 0) return nullptr;
if (artifact.builtin() < static_cast<int>(builtins.size())) {
return *(builtins.begin() + artifact.builtin());
}
return nullptr;
case api_fuzzer::CredArtifact::TYPE_NOT_SET:
return nullptr;
}
}
static grpc_channel_credentials* ReadSslChannelCreds(
const api_fuzzer::SslChannelCreds& creds) {
const char* root_certs =
creds.has_root_certs()
? ReadCredArtifact(creds.root_certs(), {test_root_cert})
: nullptr;
const char* private_key =
creds.has_private_key()
? ReadCredArtifact(creds.private_key(),
{test_server1_key, test_self_signed_client_key,
test_signed_client_key})
: nullptr;
const char* certs =
creds.has_certs()
? ReadCredArtifact(creds.certs(),
{test_server1_cert, test_self_signed_client_cert,
test_signed_client_cert})
: nullptr;
grpc_ssl_pem_key_cert_pair key_cert_pair = {private_key, certs};
return grpc_ssl_credentials_create(
root_certs,
private_key != nullptr && certs != nullptr ? &key_cert_pair : nullptr,
nullptr, nullptr);
}
static grpc_call_credentials* ReadCallCreds(
const api_fuzzer::CallCreds& creds) {
switch (creds.type_case()) {
case api_fuzzer::CallCreds::TYPE_NOT_SET:
return nullptr;
case api_fuzzer::CallCreds::kNull:
return nullptr;
case api_fuzzer::CallCreds::kCompositeCallCreds: {
grpc_call_credentials* out = nullptr;
for (const auto& child_creds :
creds.composite_call_creds().call_creds()) {
grpc_call_credentials* child = ReadCallCreds(child_creds);
if (child != nullptr) {
if (out == nullptr) {
out = child;
} else {
auto* composed =
grpc_composite_call_credentials_create(out, child, nullptr);
grpc_call_credentials_release(child);
grpc_call_credentials_release(out);
out = composed;
}
}
}
return out;
}
case api_fuzzer::CallCreds::kAccessToken:
return grpc_access_token_credentials_create(creds.access_token().c_str(),
nullptr);
case api_fuzzer::CallCreds::kIam:
return grpc_google_iam_credentials_create(
creds.iam().auth_token().c_str(), creds.iam().auth_selector().c_str(),
nullptr);
/* TODO(ctiller): more cred types here */
}
}
static grpc_channel_credentials* ReadChannelCreds(
const api_fuzzer::ChannelCreds& creds) {
switch (creds.type_case()) {
case api_fuzzer::ChannelCreds::TYPE_NOT_SET:
return nullptr;
case api_fuzzer::ChannelCreds::kSslChannelCreds:
return ReadSslChannelCreds(creds.ssl_channel_creds());
case api_fuzzer::ChannelCreds::kCompositeChannelCreds: {
const auto& comp = creds.composite_channel_creds();
grpc_channel_credentials* c1 =
comp.has_channel_creds() ? ReadChannelCreds(comp.channel_creds())
: nullptr;
grpc_call_credentials* c2 =
comp.has_call_creds() ? ReadCallCreds(comp.call_creds()) : nullptr;
if (c1 != nullptr && c2 != nullptr) {
grpc_channel_credentials* out =
grpc_composite_channel_credentials_create(c1, c2, nullptr);
grpc_channel_credentials_release(c1);
grpc_call_credentials_release(c2);
return out;
} else if (c1 != nullptr) {
return c1;
} else if (c2 != nullptr) {
grpc_call_credentials_release(c2);
return nullptr;
} else {
return nullptr;
}
GPR_UNREACHABLE_CODE(return nullptr);
}
case api_fuzzer::ChannelCreds::kNull:
return nullptr;
}
}
DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) {
grpc_event_engine::experimental::ResetDefaultEventEngine();
grpc_test_only_set_slice_hash_seed(0);
if (squelch && !grpc_core::GetEnv("GRPC_TRACE_FUZZER").has_value()) {
gpr_set_log_function(dont_log);
}
grpc_set_tcp_client_impl(&fuzz_tcp_client_vtable);
grpc_event_engine::experimental::SetDefaultEventEngineFactory(
[actions = msg.event_engine_actions()]() {
return absl::make_unique<
grpc_event_engine::experimental::FuzzingEventEngine>(
grpc_event_engine::experimental::FuzzingEventEngine::Options(),
actions);
});
grpc_event_engine::experimental::GetDefaultEventEngine();
grpc_init();
grpc_timer_manager_set_threading(false);
{
grpc_core::ExecCtx exec_ctx;
grpc_core::Executor::SetThreadingAll(false);
}
grpc_core::SetDNSResolver(FuzzerDNSResolver::GetOrCreate());
grpc_dns_lookup_hostname_ares = my_dns_lookup_ares;
grpc_cancel_ares_request = my_cancel_ares_request;
GPR_ASSERT(g_channel == nullptr);
GPR_ASSERT(g_server == nullptr);
bool server_shutdown = false;
int pending_server_shutdowns = 0;
int pending_channel_watches = 0;
int pending_pings = 0;
g_resource_quota = grpc_resource_quota_create("api_fuzzer");
grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
int action_index = 0;
auto no_more_actions = [&]() { action_index = msg.actions_size(); };
auto poll_cq = [&]() -> bool {
grpc_event ev = grpc_completion_queue_next(
cq, gpr_inf_past(GPR_CLOCK_REALTIME), nullptr);
switch (ev.type) {
case GRPC_OP_COMPLETE: {
static_cast<Validator*>(ev.tag)->Run(ev.success);
break;
}
case GRPC_QUEUE_TIMEOUT:
break;
case GRPC_QUEUE_SHUTDOWN:
return true;
}
return false;
};
while (action_index < msg.actions_size() || g_channel != nullptr ||
g_server != nullptr || pending_channel_watches > 0 ||
pending_pings > 0 || ActiveCall() != nullptr) {
static_cast<grpc_event_engine::experimental::FuzzingEventEngine*>(
grpc_event_engine::experimental::GetDefaultEventEngine())
->Tick();
if (action_index == msg.actions_size()) {
static_cast<grpc_event_engine::experimental::FuzzingEventEngine*>(
grpc_event_engine::experimental::GetDefaultEventEngine())
->FuzzingDone();
if (g_channel != nullptr) {
grpc_channel_destroy(g_channel);
g_channel = nullptr;
}
if (g_server != nullptr) {
if (!server_shutdown) {
grpc_server_shutdown_and_notify(
g_server, cq,
AssertSuccessAndDecrement(&pending_server_shutdowns));
server_shutdown = true;
pending_server_shutdowns++;
} else if (pending_server_shutdowns == 0) {
grpc_server_destroy(g_server);
g_server = nullptr;
}
}
for (auto& call : g_calls) {
if (call == nullptr) continue;
if (call->type() == CallType::PENDING_SERVER) continue;
call->Shutdown();
}
grpc_timer_manager_tick();
GPR_ASSERT(!poll_cq());
continue;
}
grpc_timer_manager_tick();
if (g_channel_force_delete.exchange(false) && g_channel) {
grpc_channel_destroy(g_channel);
g_channel = nullptr;
g_channel_actions.clear();
}
const api_fuzzer::Action& action = msg.actions(action_index);
action_index++;
switch (action.type_case()) {
case api_fuzzer::Action::TYPE_NOT_SET:
no_more_actions();
break;
// tickle completion queue
case api_fuzzer::Action::kPollCq: {
GPR_ASSERT(!poll_cq());
break;
}
// create an insecure channel
case api_fuzzer::Action::kCreateChannel: {
if (!action.create_channel().channel_actions_size() ||
g_channel != nullptr) {
no_more_actions();
} else {
grpc_channel_args* args =
ReadArgs(action.create_channel().channel_args());
grpc_channel_credentials* creds =
action.create_channel().has_channel_creds()
? ReadChannelCreds(action.create_channel().channel_creds())
: grpc_insecure_credentials_create();
g_channel = grpc_channel_create(
action.create_channel().target().c_str(), creds, args);
grpc_channel_credentials_release(creds);
g_channel_actions.clear();
for (int i = 0; i < action.create_channel().channel_actions_size();
i++) {
const api_fuzzer::ChannelAction& channel_action =
action.create_channel().channel_actions(i);
g_channel_actions.push_back({
std::min(channel_action.wait_ms(), kMaxWaitMs),
std::min(channel_action.add_n_bytes_writable(),
kMaxAddNWritableBytes),
std::min(channel_action.add_n_bytes_readable(),
kMaxAddNReadableBytes),
});
}
GPR_ASSERT(g_channel != nullptr);
g_channel_force_delete = false;
{
grpc_core::ExecCtx exec_ctx;
grpc_channel_args_destroy(args);
}
}
break;
}
// destroy a channel
case api_fuzzer::Action::kCloseChannel: {
if (g_channel != nullptr) {
grpc_channel_destroy(g_channel);
g_channel = nullptr;
} else {
no_more_actions();
}
break;
}
// bring up a server
case api_fuzzer::Action::kCreateServer: {
if (g_server == nullptr) {
grpc_channel_args* args =
ReadArgs(action.create_server().channel_args());
g_server = grpc_server_create(args, nullptr);
GPR_ASSERT(g_server != nullptr);
{
grpc_core::ExecCtx exec_ctx;
grpc_channel_args_destroy(args);
}
grpc_server_register_completion_queue(g_server, cq, nullptr);
grpc_server_start(g_server);
server_shutdown = false;
GPR_ASSERT(pending_server_shutdowns == 0);
} else {
no_more_actions();
}
break;
}
// begin server shutdown
case api_fuzzer::Action::kShutdownServer: {
if (g_server != nullptr) {
grpc_server_shutdown_and_notify(
g_server, cq,
AssertSuccessAndDecrement(&pending_server_shutdowns));
pending_server_shutdowns++;
server_shutdown = true;
} else {
no_more_actions();
}
break;
}
// cancel all calls if shutdown
case api_fuzzer::Action::kCancelAllCallsIfShutdown: {
if (g_server != nullptr && server_shutdown) {
grpc_server_cancel_all_calls(g_server);
} else {
no_more_actions();
}
break;
}
// destroy server
case api_fuzzer::Action::kDestroyServerIfReady: {
if (g_server != nullptr && server_shutdown &&
pending_server_shutdowns == 0) {
grpc_server_destroy(g_server);
g_server = nullptr;
} else {
no_more_actions();
}
break;
}
// check connectivity
case api_fuzzer::Action::kCheckConnectivity: {
if (g_channel != nullptr) {
grpc_channel_check_connectivity_state(g_channel,
action.check_connectivity());
} else {
no_more_actions();
}
break;
}
// watch connectivity
case api_fuzzer::Action::kWatchConnectivity: {
if (g_channel != nullptr) {
grpc_connectivity_state st =
grpc_channel_check_connectivity_state(g_channel, 0);
if (st != GRPC_CHANNEL_SHUTDOWN) {
gpr_timespec deadline =
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_micros(action.watch_connectivity(),
GPR_TIMESPAN));
grpc_channel_watch_connectivity_state(
g_channel, st, deadline, cq,
ValidateConnectivityWatch(deadline, &pending_channel_watches));
pending_channel_watches++;
}
} else {
no_more_actions();
}
break;
}
// create a call
case api_fuzzer::Action::kCreateCall: {
bool ok = true;
if (g_channel == nullptr) ok = false;
// If the active call is a server call, then use it as the parent call
// to exercise the propagation logic.
Call* parent_call = ActiveCall();
if (parent_call != nullptr && parent_call->type() != CallType::SERVER) {
parent_call = nullptr;
}
g_calls.emplace_back(new Call(CallType::CLIENT));
grpc_slice method =
g_calls.back()->ReadSlice(action.create_call().method());
if (GRPC_SLICE_LENGTH(method) == 0) {
ok = false;
}
grpc_slice host =
g_calls.back()->ReadSlice(action.create_call().host());
gpr_timespec deadline = gpr_time_add(
gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_micros(action.create_call().timeout(), GPR_TIMESPAN));
if (ok) {
g_calls.back()->SetCall(grpc_channel_create_call(
g_channel, parent_call == nullptr ? nullptr : parent_call->call(),
action.create_call().propagation_mask(), cq, method, &host,
deadline, nullptr));
} else {
g_calls.pop_back();
no_more_actions();
}
break;
}
// switch the 'current' call
case api_fuzzer::Action::kChangeActiveCall: {
g_active_call++;
ActiveCall();
break;
}
// queue some ops on a call
case api_fuzzer::Action::kQueueBatch: {
auto* active_call = ActiveCall();
if (active_call == nullptr ||
active_call->type() == CallType::PENDING_SERVER ||
active_call->call() == nullptr) {
no_more_actions();
break;
}
const auto& batch = action.queue_batch().operations();
if (batch.size() > 6) {
no_more_actions();
break;
}
std::vector<grpc_op> ops;
bool ok = true;
uint8_t has_ops = 0;
std::vector<std::function<void()>> unwinders;
for (const auto& batch_op : batch) {
auto op = active_call->ReadOp(batch_op, &ok, &has_ops, &unwinders);
if (!op.has_value()) continue;
ops.push_back(*op);
}
if (g_channel == nullptr) ok = false;
if (ok) {
auto* v = active_call->FinishedBatchValidator(has_ops);
grpc_call_error error = grpc_call_start_batch(
active_call->call(), ops.data(), ops.size(), v, nullptr);
if (error != GRPC_CALL_OK) {
v->Run(false);
}
} else {
no_more_actions();
for (auto& unwind : unwinders) {
unwind();
}
}
break;
}
// cancel current call
case api_fuzzer::Action::kCancelCall: {
auto* active_call = ActiveCall();
if (active_call != nullptr && active_call->call() != nullptr) {
grpc_call_cancel(active_call->call(), nullptr);
} else {
no_more_actions();
}
break;
}
// get a calls peer
case api_fuzzer::Action::kGetPeer: {
auto* active_call = ActiveCall();
if (active_call != nullptr && active_call->call() != nullptr) {
free_non_null(grpc_call_get_peer(active_call->call()));
} else {
no_more_actions();
}
break;
}
// get a channels target
case api_fuzzer::Action::kGetTarget: {
if (g_channel != nullptr) {
free_non_null(grpc_channel_get_target(g_channel));
} else {
no_more_actions();
}
break;
}
// send a ping on a channel
case api_fuzzer::Action::kPing: {
if (g_channel != nullptr) {
pending_pings++;
grpc_channel_ping(g_channel, cq, Decrement(&pending_pings), nullptr);
} else {
no_more_actions();
}
break;
}
// enable a tracer
case api_fuzzer::Action::kEnableTracer: {
grpc_tracer_set_enabled(action.enable_tracer().c_str(), 1);
break;
}
// disable a tracer
case api_fuzzer::Action::kDisableTracer: {
grpc_tracer_set_enabled(action.disable_tracer().c_str(), 0);
break;
}
// request a server call
case api_fuzzer::Action::kRequestCall: {
if (g_server == nullptr) {
no_more_actions();
break;
}
g_calls.emplace_back(new Call(CallType::PENDING_SERVER));
g_calls.back()->RequestCall(g_server, cq);
break;
}
// destroy a call
case api_fuzzer::Action::kDestroyCall: {
auto* active_call = ActiveCall();
if (active_call != nullptr &&
active_call->type() != CallType::PENDING_SERVER &&
active_call->call() != nullptr) {
g_calls[g_active_call]->Shutdown();
} else {
no_more_actions();
}
break;
}
// resize the buffer pool
case api_fuzzer::Action::kResizeResourceQuota: {
grpc_resource_quota_resize(g_resource_quota,
action.resize_resource_quota());
break;
}
}
}
GPR_ASSERT(g_channel == nullptr);
GPR_ASSERT(g_server == nullptr);
GPR_ASSERT(ActiveCall() == nullptr);
GPR_ASSERT(g_calls.empty());
grpc_completion_queue_shutdown(cq);
GPR_ASSERT(poll_cq());
grpc_completion_queue_destroy(cq);
grpc_resource_quota_unref(g_resource_quota);
grpc_shutdown_blocking();
}