blob: 2b1daa357674d22423d33deae3ef1de212c78911 [file] [log] [blame]
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <inttypes.h>
#include <stdio.h>
#include <stdlib.h>
#include <algorithm>
#include <cstdint>
#include <deque>
#include <functional>
#include <limits>
#include <map>
#include <memory>
#include <queue>
#include <string>
#include <utility>
#include <vector>
#include "absl/base/attributes.h"
#include "absl/memory/memory.h"
#include "absl/status/status.h"
#include "absl/types/optional.h"
#include <grpc/event_engine/memory_request.h>
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "src/core/ext/transport/chttp2/transport/flow_control.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/resource_quota/memory_quota.h"
#include "src/core/lib/transport/bdp_estimator.h"
#include "src/libfuzzer/libfuzzer_macro.h"
#include "test/core/transport/chttp2/flow_control_fuzzer.pb.h"
// IWYU pragma: no_include <google/protobuf/repeated_ptr_field.h>
bool squelch = true;
extern gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type);
namespace grpc_core {
namespace chttp2 {
namespace {
constexpr uint64_t kMaxAdvanceTimeMillis = 24ull * 365 * 3600 * 1000;
gpr_timespec g_now;
gpr_timespec now_impl(gpr_clock_type clock_type) {
GPR_ASSERT(clock_type != GPR_TIMESPAN);
gpr_timespec ts = g_now;
ts.clock_type = clock_type;
return ts;
}
void InitGlobals() {
g_now = {1, 0, GPR_CLOCK_MONOTONIC};
TestOnlySetProcessEpoch(g_now);
gpr_now_impl = now_impl;
}
class FlowControlFuzzer {
public:
explicit FlowControlFuzzer(bool enable_bdp) {
ExecCtx exec_ctx;
tfc_ = absl::make_unique<TransportFlowControl>("fuzzer", enable_bdp,
&memory_owner_);
}
~FlowControlFuzzer() {
ExecCtx exec_ctx;
streams_.clear();
tfc_.reset();
memory_owner_.Release(allocated_memory_);
}
void Perform(const flow_control_fuzzer::Action& action);
void AssertNoneStuck() const;
void AssertAnnouncedOverInitialWindowSizeCorrect() const;
private:
struct StreamPayload {
uint32_t id;
uint64_t size;
};
struct SendToRemote {
bool bdp_ping = false;
absl::optional<uint32_t> initial_window_size;
uint32_t transport_window_update;
std::vector<StreamPayload> stream_window_updates;
};
struct SendFromRemote {
bool bdp_pong = false;
absl::optional<uint32_t> ack_initial_window_size;
std::vector<StreamPayload> stream_writes;
};
struct Stream {
explicit Stream(uint32_t id, TransportFlowControl* tfc) : id(id), fc(tfc) {}
uint32_t id;
StreamFlowControl fc;
int64_t queued_writes = 0;
int64_t window_delta = 0;
};
void PerformAction(FlowControlAction action, Stream* stream);
Stream* GetStream(uint32_t id) {
auto it = streams_.find(id);
if (it == streams_.end()) {
it = streams_.emplace(id, Stream{id, tfc_.get()}).first;
}
return &it->second;
}
MemoryQuotaRefPtr memory_quota_ = MakeMemoryQuota("fuzzer");
MemoryOwner memory_owner_ = memory_quota_->CreateMemoryOwner("owner");
std::unique_ptr<TransportFlowControl> tfc_;
absl::optional<uint32_t> queued_initial_window_size_;
absl::optional<uint32_t> queued_send_max_frame_size_;
bool scheduled_write_ = false;
bool sending_initial_window_size_ = false;
std::deque<SendToRemote> send_to_remote_;
std::deque<SendFromRemote> send_from_remote_;
uint32_t remote_initial_window_size_ = kDefaultWindow;
int64_t remote_transport_window_size_ = kDefaultWindow;
std::map<uint32_t, Stream> streams_;
std::queue<uint32_t> streams_to_update_;
uint64_t allocated_memory_ = 0;
Timestamp next_bdp_ping_ = Timestamp::ProcessEpoch();
};
void FlowControlFuzzer::Perform(const flow_control_fuzzer::Action& action) {
ExecCtx exec_ctx;
bool sending_payload = false;
switch (action.action_case()) {
case flow_control_fuzzer::Action::ACTION_NOT_SET:
break;
case flow_control_fuzzer::Action::kSetMemoryQuota: {
memory_quota_->SetSize(
Clamp(action.set_memory_quota(), uint64_t(1),
uint64_t(std::numeric_limits<int64_t>::max())));
} break;
case flow_control_fuzzer::Action::kStepTimeMs: {
g_now = gpr_time_add(
g_now, gpr_time_from_millis(Clamp(action.step_time_ms(), uint64_t(1),
kMaxAdvanceTimeMillis),
GPR_TIMESPAN));
exec_ctx.InvalidateNow();
if (exec_ctx.Now() >= next_bdp_ping_) {
scheduled_write_ = true;
}
} break;
case flow_control_fuzzer::Action::kPeriodicUpdate: {
PerformAction(tfc_->PeriodicUpdate(), nullptr);
} break;
case flow_control_fuzzer::Action::kPerformSendToRemote: {
scheduled_write_ = true;
} break;
case flow_control_fuzzer::Action::kPerformSendToRemoteWithPayload: {
scheduled_write_ = true;
sending_payload = true;
} break;
case flow_control_fuzzer::Action::kReadSendToRemote: {
if (send_to_remote_.empty()) break;
auto sent_to_remote = send_to_remote_.front();
if (sent_to_remote.initial_window_size.has_value()) {
if (!squelch) {
fprintf(stderr, "Setting initial window size to %d\n",
sent_to_remote.initial_window_size.value());
}
SendFromRemote send_from_remote;
send_from_remote.ack_initial_window_size =
sent_to_remote.initial_window_size;
for (const auto& id_stream : streams_) {
GPR_ASSERT(id_stream.second.window_delta +
*sent_to_remote.initial_window_size <=
(1u << 31) - 1);
}
remote_initial_window_size_ = *sent_to_remote.initial_window_size;
send_from_remote_.push_back(send_from_remote);
}
if (sent_to_remote.bdp_ping) {
SendFromRemote send_from_remote;
send_from_remote.bdp_pong = true;
send_from_remote_.push_back(send_from_remote);
}
for (auto stream_update : sent_to_remote.stream_window_updates) {
Stream* s = GetStream(stream_update.id);
if (!squelch) {
fprintf(stderr,
"[%" PRIu32 "]: increase window delta by %" PRIu64
" from %" PRId64 "\n",
stream_update.id, stream_update.size, s->window_delta);
}
s->window_delta += stream_update.size;
GPR_ASSERT(s->window_delta <= chttp2::kMaxWindowDelta);
}
remote_transport_window_size_ += sent_to_remote.transport_window_update;
send_to_remote_.pop_front();
} break;
case flow_control_fuzzer::Action::kReadSendFromRemote: {
if (send_from_remote_.empty()) break;
auto sent_from_remote = send_from_remote_.front();
if (sent_from_remote.ack_initial_window_size.has_value()) {
if (!squelch) {
fprintf(stderr, "Received ACK for initial window size %d\n",
*sent_from_remote.ack_initial_window_size);
}
tfc_->SetAckedInitialWindow(*sent_from_remote.ack_initial_window_size);
sending_initial_window_size_ = false;
}
if (sent_from_remote.bdp_pong) {
next_bdp_ping_ = tfc_->bdp_estimator()->CompletePing();
}
for (const auto& stream_write : sent_from_remote.stream_writes) {
Stream* stream = GetStream(stream_write.id);
if (!squelch) {
fprintf(stderr, "[%" PRIu32 "]: recv write of %" PRIu64 "\n",
stream_write.id, stream_write.size);
}
if (auto* bdp = tfc_->bdp_estimator()) {
bdp->AddIncomingBytes(stream_write.size);
}
StreamFlowControl::IncomingUpdateContext upd(&stream->fc);
GPR_ASSERT(upd.RecvData(stream_write.size).ok());
PerformAction(upd.MakeAction(), stream);
}
send_from_remote_.pop_front();
} break;
case flow_control_fuzzer::Action::kStreamWrite: {
Stream* s = GetStream(action.stream_write().id());
s->queued_writes += action.stream_write().size();
} break;
case flow_control_fuzzer::Action::kPerformSendFromRemote: {
SendFromRemote send;
for (auto& id_stream : streams_) {
auto send_amount = std::min(
{id_stream.second.queued_writes, remote_transport_window_size_,
remote_initial_window_size_ + id_stream.second.window_delta});
if (send_amount <= 0) continue;
send.stream_writes.push_back({id_stream.first, uint64_t(send_amount)});
id_stream.second.queued_writes -= send_amount;
id_stream.second.window_delta -= send_amount;
remote_transport_window_size_ -= send_amount;
}
send_from_remote_.push_back(send);
} break;
case flow_control_fuzzer::Action::kSetMinProgressSize: {
Stream* s = GetStream(action.set_min_progress_size().id());
StreamFlowControl::IncomingUpdateContext upd(&s->fc);
upd.SetMinProgressSize(action.set_min_progress_size().size());
PerformAction(upd.MakeAction(), s);
} break;
case flow_control_fuzzer::Action::kAllocateMemory: {
auto allocate = std::min(
size_t(action.allocate_memory()),
grpc_event_engine::experimental::MemoryRequest::max_allowed_size());
allocated_memory_ += allocate;
memory_owner_.Reserve(allocate);
} break;
case flow_control_fuzzer::Action::kDeallocateMemory: {
auto deallocate =
std::min(uint64_t(action.deallocate_memory()), allocated_memory_);
allocated_memory_ -= deallocate;
memory_owner_.Release(deallocate);
} break;
case flow_control_fuzzer::Action::kSetPendingSize: {
Stream* s = GetStream(action.set_min_progress_size().id());
StreamFlowControl::IncomingUpdateContext upd(&s->fc);
upd.SetPendingSize(action.set_pending_size().size());
PerformAction(upd.MakeAction(), s);
} break;
}
if (scheduled_write_) {
SendToRemote send;
if (exec_ctx.Now() >= next_bdp_ping_) {
if (auto* bdp = tfc_->bdp_estimator()) {
bdp->SchedulePing();
bdp->StartPing();
next_bdp_ping_ = Timestamp::InfFuture();
send.bdp_ping = true;
}
}
if (!sending_initial_window_size_ &&
queued_initial_window_size_.has_value()) {
sending_initial_window_size_ = true;
send.initial_window_size =
std::exchange(queued_initial_window_size_, absl::nullopt);
}
while (!streams_to_update_.empty()) {
auto* stream = GetStream(streams_to_update_.front());
streams_to_update_.pop();
send.stream_window_updates.push_back(
{stream->id, stream->fc.MaybeSendUpdate()});
}
send.transport_window_update = tfc_->MaybeSendUpdate(sending_payload);
queued_send_max_frame_size_.reset();
send_to_remote_.emplace_back(std::move(send));
scheduled_write_ = false;
}
}
void FlowControlFuzzer::PerformAction(FlowControlAction action,
Stream* stream) {
if (!squelch) {
fprintf(stderr, "[%" PRId64 "]: ACTION: %s\n",
stream == nullptr ? int64_t(-1) : int64_t(stream->id),
action.DebugString().c_str());
}
auto with_urgency = [this](FlowControlAction::Urgency urgency,
std::function<void()> f) {
switch (urgency) {
case FlowControlAction::Urgency::NO_ACTION_NEEDED:
break;
case FlowControlAction::Urgency::UPDATE_IMMEDIATELY:
scheduled_write_ = true;
ABSL_FALLTHROUGH_INTENDED;
case FlowControlAction::Urgency::QUEUE_UPDATE:
f();
break;
}
};
with_urgency(action.send_stream_update(),
[this, stream]() { streams_to_update_.push(stream->id); });
with_urgency(action.send_transport_update(), []() {});
with_urgency(action.send_initial_window_update(), [this, &action]() {
GPR_ASSERT(action.initial_window_size() <= chttp2::kMaxInitialWindowSize);
queued_initial_window_size_ = action.initial_window_size();
});
with_urgency(action.send_max_frame_size_update(), [this, &action]() {
queued_send_max_frame_size_ = action.max_frame_size();
});
}
void FlowControlFuzzer::AssertNoneStuck() const {
GPR_ASSERT(!scheduled_write_);
// Reconcile all the values to get the view of the remote that is knowable to
// the flow control system.
std::map<uint32_t, int64_t> reconciled_stream_deltas;
int64_t reconciled_transport_window = remote_transport_window_size_;
int64_t reconciled_initial_window = remote_initial_window_size_;
for (const auto& id_stream : streams_) {
reconciled_stream_deltas[id_stream.first] = id_stream.second.window_delta;
}
// Anything that's been sent from flow control -> remote needs to be added to
// the remote.
for (const auto& send_to_remote : send_to_remote_) {
if (send_to_remote.initial_window_size.has_value()) {
reconciled_initial_window = *send_to_remote.initial_window_size;
}
reconciled_transport_window += send_to_remote.transport_window_update;
for (const auto& stream_update : send_to_remote.stream_window_updates) {
reconciled_stream_deltas[stream_update.id] += stream_update.size;
}
}
// Anything that's been sent from remote -> flow control needs to be wound
// back into the remote.
for (const auto& send_from_remote : send_from_remote_) {
for (const auto& stream_write : send_from_remote.stream_writes) {
reconciled_stream_deltas[stream_write.id] += stream_write.size;
reconciled_transport_window += stream_write.size;
}
}
// Finally, if a stream has indicated it's willing to read, the reconciled
// remote *MUST* be in a state where it could send at least one byte.
for (const auto& id_stream : streams_) {
if (id_stream.second.fc.min_progress_size() == 0) continue;
int64_t stream_window =
reconciled_stream_deltas[id_stream.first] + reconciled_initial_window;
if (stream_window <= 0 || reconciled_transport_window <= 0) {
fprintf(stderr,
"FAILED: stream %d has stream_window=%" PRId64
", transport_window=%" PRId64 ", delta=%" PRId64
", init_window_size=%" PRId64 ", min_progress_size=%" PRId64 "\n",
id_stream.first, stream_window, reconciled_transport_window,
reconciled_stream_deltas[id_stream.first],
reconciled_initial_window,
int64_t(id_stream.second.fc.min_progress_size()));
abort();
}
}
}
void FlowControlFuzzer::AssertAnnouncedOverInitialWindowSizeCorrect() const {
int64_t value_from_streams = 0;
for (const auto& id_stream : streams_) {
const auto& stream = id_stream.second;
if (stream.fc.announced_window_delta() > 0) {
value_from_streams += stream.fc.announced_window_delta();
}
}
GPR_ASSERT(value_from_streams ==
tfc_->announced_stream_total_over_incoming_window());
}
} // namespace
} // namespace chttp2
} // namespace grpc_core
DEFINE_PROTO_FUZZER(const flow_control_fuzzer::Msg& msg) {
grpc_core::chttp2::InitGlobals();
grpc_core::chttp2::FlowControlFuzzer fuzzer(msg.enable_bdp());
for (const auto& action : msg.actions()) {
if (!squelch) {
fprintf(stderr, "%s\n", action.DebugString().c_str());
}
fuzzer.Perform(action);
fuzzer.AssertNoneStuck();
fuzzer.AssertAnnouncedOverInitialWindowSizeCorrect();
}
}