Revert "Revert "Promise pipe redux (#28319)" (#28364)" (#28400)
* Revert "Revert "Promise pipe redux (#28319)" (#28364)"
This reverts commit 3df2a4eceabbbcc5522c70ad3da837f3b46c285a.
* rephrase to be hopefully more readable to msvc
* golfing
diff --git a/BUILD b/BUILD
index 4dd7d77..e03cd6a 100644
--- a/BUILD
+++ b/BUILD
@@ -1372,6 +1372,7 @@
],
deps = [
"activity",
+ "arena",
"gpr_platform",
"intra_activity_waiter",
],
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 408e906..e02b006 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -10678,53 +10678,23 @@
if(gRPC_BUILD_TESTS)
add_executable(for_each_test
- src/core/ext/upb-generated/google/protobuf/any.upb.c
- src/core/ext/upb-generated/google/rpc/status.upb.c
- src/core/lib/gpr/alloc.cc
- src/core/lib/gpr/atm.cc
- src/core/lib/gpr/cpu_iphone.cc
- src/core/lib/gpr/cpu_linux.cc
- src/core/lib/gpr/cpu_posix.cc
- src/core/lib/gpr/cpu_windows.cc
- src/core/lib/gpr/env_linux.cc
- src/core/lib/gpr/env_posix.cc
- src/core/lib/gpr/env_windows.cc
- src/core/lib/gpr/log.cc
- src/core/lib/gpr/log_android.cc
- src/core/lib/gpr/log_linux.cc
- src/core/lib/gpr/log_posix.cc
- src/core/lib/gpr/log_windows.cc
- src/core/lib/gpr/murmur_hash.cc
- src/core/lib/gpr/string.cc
- src/core/lib/gpr/string_posix.cc
- src/core/lib/gpr/string_util_windows.cc
- src/core/lib/gpr/string_windows.cc
- src/core/lib/gpr/sync.cc
- src/core/lib/gpr/sync_abseil.cc
- src/core/lib/gpr/sync_posix.cc
- src/core/lib/gpr/sync_windows.cc
- src/core/lib/gpr/time.cc
- src/core/lib/gpr/time_posix.cc
- src/core/lib/gpr/time_precise.cc
- src/core/lib/gpr/time_windows.cc
- src/core/lib/gpr/tmpfile_msys.cc
- src/core/lib/gpr/tmpfile_posix.cc
- src/core/lib/gpr/tmpfile_windows.cc
- src/core/lib/gpr/wrap_memcpy.cc
- src/core/lib/gprpp/examine_stack.cc
- src/core/lib/gprpp/fork.cc
- src/core/lib/gprpp/global_config_env.cc
- src/core/lib/gprpp/host_port.cc
- src/core/lib/gprpp/mpscq.cc
- src/core/lib/gprpp/stat_posix.cc
- src/core/lib/gprpp/stat_windows.cc
- src/core/lib/gprpp/status_helper.cc
- src/core/lib/gprpp/thd_posix.cc
- src/core/lib/gprpp/thd_windows.cc
- src/core/lib/gprpp/time_util.cc
- src/core/lib/profiling/basic_timers.cc
- src/core/lib/profiling/stap_timers.cc
+ src/core/lib/debug/trace.cc
+ src/core/lib/event_engine/memory_allocator.cc
+ src/core/lib/iomgr/combiner.cc
+ src/core/lib/iomgr/error.cc
+ src/core/lib/iomgr/exec_ctx.cc
+ src/core/lib/iomgr/executor.cc
+ src/core/lib/iomgr/iomgr_internal.cc
src/core/lib/promise/activity.cc
+ src/core/lib/resource_quota/arena.cc
+ src/core/lib/resource_quota/memory_quota.cc
+ src/core/lib/resource_quota/resource_quota.cc
+ src/core/lib/resource_quota/thread_quota.cc
+ src/core/lib/resource_quota/trace.cc
+ src/core/lib/slice/slice.cc
+ src/core/lib/slice/slice_refcount.cc
+ src/core/lib/slice/slice_string_helpers.cc
+ src/core/lib/slice/static_slice.cc
test/core/promise/for_each_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
@@ -10752,21 +10722,10 @@
target_link_libraries(for_each_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
- absl::base
- absl::core_headers
absl::flat_hash_set
- absl::memory
- absl::random_random
- absl::status
absl::statusor
- absl::cord
- absl::str_format
- absl::strings
- absl::synchronization
- absl::time
- absl::optional
absl::variant
- upb
+ gpr
)
@@ -13349,53 +13308,23 @@
if(gRPC_BUILD_TESTS)
add_executable(pipe_test
- src/core/ext/upb-generated/google/protobuf/any.upb.c
- src/core/ext/upb-generated/google/rpc/status.upb.c
- src/core/lib/gpr/alloc.cc
- src/core/lib/gpr/atm.cc
- src/core/lib/gpr/cpu_iphone.cc
- src/core/lib/gpr/cpu_linux.cc
- src/core/lib/gpr/cpu_posix.cc
- src/core/lib/gpr/cpu_windows.cc
- src/core/lib/gpr/env_linux.cc
- src/core/lib/gpr/env_posix.cc
- src/core/lib/gpr/env_windows.cc
- src/core/lib/gpr/log.cc
- src/core/lib/gpr/log_android.cc
- src/core/lib/gpr/log_linux.cc
- src/core/lib/gpr/log_posix.cc
- src/core/lib/gpr/log_windows.cc
- src/core/lib/gpr/murmur_hash.cc
- src/core/lib/gpr/string.cc
- src/core/lib/gpr/string_posix.cc
- src/core/lib/gpr/string_util_windows.cc
- src/core/lib/gpr/string_windows.cc
- src/core/lib/gpr/sync.cc
- src/core/lib/gpr/sync_abseil.cc
- src/core/lib/gpr/sync_posix.cc
- src/core/lib/gpr/sync_windows.cc
- src/core/lib/gpr/time.cc
- src/core/lib/gpr/time_posix.cc
- src/core/lib/gpr/time_precise.cc
- src/core/lib/gpr/time_windows.cc
- src/core/lib/gpr/tmpfile_msys.cc
- src/core/lib/gpr/tmpfile_posix.cc
- src/core/lib/gpr/tmpfile_windows.cc
- src/core/lib/gpr/wrap_memcpy.cc
- src/core/lib/gprpp/examine_stack.cc
- src/core/lib/gprpp/fork.cc
- src/core/lib/gprpp/global_config_env.cc
- src/core/lib/gprpp/host_port.cc
- src/core/lib/gprpp/mpscq.cc
- src/core/lib/gprpp/stat_posix.cc
- src/core/lib/gprpp/stat_windows.cc
- src/core/lib/gprpp/status_helper.cc
- src/core/lib/gprpp/thd_posix.cc
- src/core/lib/gprpp/thd_windows.cc
- src/core/lib/gprpp/time_util.cc
- src/core/lib/profiling/basic_timers.cc
- src/core/lib/profiling/stap_timers.cc
+ src/core/lib/debug/trace.cc
+ src/core/lib/event_engine/memory_allocator.cc
+ src/core/lib/iomgr/combiner.cc
+ src/core/lib/iomgr/error.cc
+ src/core/lib/iomgr/exec_ctx.cc
+ src/core/lib/iomgr/executor.cc
+ src/core/lib/iomgr/iomgr_internal.cc
src/core/lib/promise/activity.cc
+ src/core/lib/resource_quota/arena.cc
+ src/core/lib/resource_quota/memory_quota.cc
+ src/core/lib/resource_quota/resource_quota.cc
+ src/core/lib/resource_quota/thread_quota.cc
+ src/core/lib/resource_quota/trace.cc
+ src/core/lib/slice/slice.cc
+ src/core/lib/slice/slice_refcount.cc
+ src/core/lib/slice/slice_string_helpers.cc
+ src/core/lib/slice/static_slice.cc
test/core/promise/pipe_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
@@ -13423,20 +13352,9 @@
target_link_libraries(pipe_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
- absl::base
- absl::core_headers
- absl::memory
- absl::random_random
- absl::status
absl::statusor
- absl::cord
- absl::str_format
- absl::strings
- absl::synchronization
- absl::time
- absl::optional
absl::variant
- upb
+ gpr
)
diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml
index 25c32fe..bcb9330 100644
--- a/build_autogenerated.yaml
+++ b/build_autogenerated.yaml
@@ -5839,38 +5839,21 @@
build: test
language: c++
headers:
- - src/core/ext/upb-generated/google/protobuf/any.upb.h
- - src/core/ext/upb-generated/google/rpc/status.upb.h
- - src/core/lib/gpr/alloc.h
- - src/core/lib/gpr/env.h
- - src/core/lib/gpr/murmur_hash.h
- - src/core/lib/gpr/spinlock.h
- - src/core/lib/gpr/string.h
- - src/core/lib/gpr/string_windows.h
- - src/core/lib/gpr/time_precise.h
- - src/core/lib/gpr/tls.h
- - src/core/lib/gpr/tmpfile.h
- - src/core/lib/gpr/useful.h
+ - src/core/lib/debug/trace.h
- src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/bitset.h
- - src/core/lib/gprpp/construct_destruct.h
- - src/core/lib/gprpp/debug_location.h
- - src/core/lib/gprpp/examine_stack.h
- - src/core/lib/gprpp/fork.h
- - src/core/lib/gprpp/global_config.h
- - src/core/lib/gprpp/global_config_custom.h
- - src/core/lib/gprpp/global_config_env.h
- - src/core/lib/gprpp/global_config_generic.h
- - src/core/lib/gprpp/host_port.h
- - src/core/lib/gprpp/manual_constructor.h
- - src/core/lib/gprpp/memory.h
- - src/core/lib/gprpp/mpscq.h
- - src/core/lib/gprpp/stat.h
- - src/core/lib/gprpp/status_helper.h
- - src/core/lib/gprpp/sync.h
- - src/core/lib/gprpp/thd.h
- - src/core/lib/gprpp/time_util.h
- - src/core/lib/profiling/timers.h
+ - src/core/lib/gprpp/cpp_impl_of.h
+ - src/core/lib/gprpp/dual_ref_counted.h
+ - src/core/lib/gprpp/orphanable.h
+ - src/core/lib/gprpp/ref_counted.h
+ - src/core/lib/gprpp/ref_counted_ptr.h
+ - src/core/lib/iomgr/closure.h
+ - src/core/lib/iomgr/combiner.h
+ - src/core/lib/iomgr/error.h
+ - src/core/lib/iomgr/error_internal.h
+ - src/core/lib/iomgr/exec_ctx.h
+ - src/core/lib/iomgr/executor.h
+ - src/core/lib/iomgr/iomgr_internal.h
- src/core/lib/promise/activity.h
- src/core/lib/promise/context.h
- src/core/lib/promise/detail/basic_join.h
@@ -5879,81 +5862,55 @@
- src/core/lib/promise/detail/promise_like.h
- src/core/lib/promise/detail/status.h
- src/core/lib/promise/detail/switch.h
+ - src/core/lib/promise/exec_ctx_wakeup_scheduler.h
- src/core/lib/promise/for_each.h
- src/core/lib/promise/intra_activity_waiter.h
- src/core/lib/promise/join.h
+ - src/core/lib/promise/loop.h
- src/core/lib/promise/map.h
- src/core/lib/promise/observable.h
- src/core/lib/promise/pipe.h
- src/core/lib/promise/poll.h
+ - src/core/lib/promise/race.h
- src/core/lib/promise/seq.h
- src/core/lib/promise/wait_set.h
+ - src/core/lib/resource_quota/arena.h
+ - src/core/lib/resource_quota/memory_quota.h
+ - src/core/lib/resource_quota/resource_quota.h
+ - src/core/lib/resource_quota/thread_quota.h
+ - src/core/lib/resource_quota/trace.h
+ - src/core/lib/slice/slice.h
+ - src/core/lib/slice/slice_internal.h
+ - src/core/lib/slice/slice_refcount.h
+ - src/core/lib/slice/slice_refcount_base.h
+ - src/core/lib/slice/slice_string_helpers.h
+ - src/core/lib/slice/slice_utils.h
+ - src/core/lib/slice/static_slice.h
- test/core/promise/test_wakeup_schedulers.h
src:
- - src/core/ext/upb-generated/google/protobuf/any.upb.c
- - src/core/ext/upb-generated/google/rpc/status.upb.c
- - src/core/lib/gpr/alloc.cc
- - src/core/lib/gpr/atm.cc
- - src/core/lib/gpr/cpu_iphone.cc
- - src/core/lib/gpr/cpu_linux.cc
- - src/core/lib/gpr/cpu_posix.cc
- - src/core/lib/gpr/cpu_windows.cc
- - src/core/lib/gpr/env_linux.cc
- - src/core/lib/gpr/env_posix.cc
- - src/core/lib/gpr/env_windows.cc
- - src/core/lib/gpr/log.cc
- - src/core/lib/gpr/log_android.cc
- - src/core/lib/gpr/log_linux.cc
- - src/core/lib/gpr/log_posix.cc
- - src/core/lib/gpr/log_windows.cc
- - src/core/lib/gpr/murmur_hash.cc
- - src/core/lib/gpr/string.cc
- - src/core/lib/gpr/string_posix.cc
- - src/core/lib/gpr/string_util_windows.cc
- - src/core/lib/gpr/string_windows.cc
- - src/core/lib/gpr/sync.cc
- - src/core/lib/gpr/sync_abseil.cc
- - src/core/lib/gpr/sync_posix.cc
- - src/core/lib/gpr/sync_windows.cc
- - src/core/lib/gpr/time.cc
- - src/core/lib/gpr/time_posix.cc
- - src/core/lib/gpr/time_precise.cc
- - src/core/lib/gpr/time_windows.cc
- - src/core/lib/gpr/tmpfile_msys.cc
- - src/core/lib/gpr/tmpfile_posix.cc
- - src/core/lib/gpr/tmpfile_windows.cc
- - src/core/lib/gpr/wrap_memcpy.cc
- - src/core/lib/gprpp/examine_stack.cc
- - src/core/lib/gprpp/fork.cc
- - src/core/lib/gprpp/global_config_env.cc
- - src/core/lib/gprpp/host_port.cc
- - src/core/lib/gprpp/mpscq.cc
- - src/core/lib/gprpp/stat_posix.cc
- - src/core/lib/gprpp/stat_windows.cc
- - src/core/lib/gprpp/status_helper.cc
- - src/core/lib/gprpp/thd_posix.cc
- - src/core/lib/gprpp/thd_windows.cc
- - src/core/lib/gprpp/time_util.cc
- - src/core/lib/profiling/basic_timers.cc
- - src/core/lib/profiling/stap_timers.cc
+ - src/core/lib/debug/trace.cc
+ - src/core/lib/event_engine/memory_allocator.cc
+ - src/core/lib/iomgr/combiner.cc
+ - src/core/lib/iomgr/error.cc
+ - src/core/lib/iomgr/exec_ctx.cc
+ - src/core/lib/iomgr/executor.cc
+ - src/core/lib/iomgr/iomgr_internal.cc
- src/core/lib/promise/activity.cc
+ - src/core/lib/resource_quota/arena.cc
+ - src/core/lib/resource_quota/memory_quota.cc
+ - src/core/lib/resource_quota/resource_quota.cc
+ - src/core/lib/resource_quota/thread_quota.cc
+ - src/core/lib/resource_quota/trace.cc
+ - src/core/lib/slice/slice.cc
+ - src/core/lib/slice/slice_refcount.cc
+ - src/core/lib/slice/slice_string_helpers.cc
+ - src/core/lib/slice/static_slice.cc
- test/core/promise/for_each_test.cc
deps:
- - absl/base:base
- - absl/base:core_headers
- absl/container:flat_hash_set
- - absl/memory:memory
- - absl/random:random
- - absl/status:status
- absl/status:statusor
- - absl/strings:cord
- - absl/strings:str_format
- - absl/strings:strings
- - absl/synchronization:synchronization
- - absl/time:time
- - absl/types:optional
- absl/types:variant
- - upb
+ - gpr
uses_polling: false
- name: generic_end2end_test
gtest: true
@@ -6969,38 +6926,21 @@
build: test
language: c++
headers:
- - src/core/ext/upb-generated/google/protobuf/any.upb.h
- - src/core/ext/upb-generated/google/rpc/status.upb.h
- - src/core/lib/gpr/alloc.h
- - src/core/lib/gpr/env.h
- - src/core/lib/gpr/murmur_hash.h
- - src/core/lib/gpr/spinlock.h
- - src/core/lib/gpr/string.h
- - src/core/lib/gpr/string_windows.h
- - src/core/lib/gpr/time_precise.h
- - src/core/lib/gpr/tls.h
- - src/core/lib/gpr/tmpfile.h
- - src/core/lib/gpr/useful.h
+ - src/core/lib/debug/trace.h
- src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/bitset.h
- - src/core/lib/gprpp/construct_destruct.h
- - src/core/lib/gprpp/debug_location.h
- - src/core/lib/gprpp/examine_stack.h
- - src/core/lib/gprpp/fork.h
- - src/core/lib/gprpp/global_config.h
- - src/core/lib/gprpp/global_config_custom.h
- - src/core/lib/gprpp/global_config_env.h
- - src/core/lib/gprpp/global_config_generic.h
- - src/core/lib/gprpp/host_port.h
- - src/core/lib/gprpp/manual_constructor.h
- - src/core/lib/gprpp/memory.h
- - src/core/lib/gprpp/mpscq.h
- - src/core/lib/gprpp/stat.h
- - src/core/lib/gprpp/status_helper.h
- - src/core/lib/gprpp/sync.h
- - src/core/lib/gprpp/thd.h
- - src/core/lib/gprpp/time_util.h
- - src/core/lib/profiling/timers.h
+ - src/core/lib/gprpp/cpp_impl_of.h
+ - src/core/lib/gprpp/dual_ref_counted.h
+ - src/core/lib/gprpp/orphanable.h
+ - src/core/lib/gprpp/ref_counted.h
+ - src/core/lib/gprpp/ref_counted_ptr.h
+ - src/core/lib/iomgr/closure.h
+ - src/core/lib/iomgr/combiner.h
+ - src/core/lib/iomgr/error.h
+ - src/core/lib/iomgr/error_internal.h
+ - src/core/lib/iomgr/exec_ctx.h
+ - src/core/lib/iomgr/executor.h
+ - src/core/lib/iomgr/iomgr_internal.h
- src/core/lib/promise/activity.h
- src/core/lib/promise/context.h
- src/core/lib/promise/detail/basic_join.h
@@ -7009,77 +6949,52 @@
- src/core/lib/promise/detail/promise_like.h
- src/core/lib/promise/detail/status.h
- src/core/lib/promise/detail/switch.h
+ - src/core/lib/promise/exec_ctx_wakeup_scheduler.h
- src/core/lib/promise/intra_activity_waiter.h
- src/core/lib/promise/join.h
+ - src/core/lib/promise/loop.h
+ - src/core/lib/promise/map.h
- src/core/lib/promise/pipe.h
- src/core/lib/promise/poll.h
- src/core/lib/promise/promise.h
+ - src/core/lib/promise/race.h
- src/core/lib/promise/seq.h
+ - src/core/lib/resource_quota/arena.h
+ - src/core/lib/resource_quota/memory_quota.h
+ - src/core/lib/resource_quota/resource_quota.h
+ - src/core/lib/resource_quota/thread_quota.h
+ - src/core/lib/resource_quota/trace.h
+ - src/core/lib/slice/slice.h
+ - src/core/lib/slice/slice_internal.h
+ - src/core/lib/slice/slice_refcount.h
+ - src/core/lib/slice/slice_refcount_base.h
+ - src/core/lib/slice/slice_string_helpers.h
+ - src/core/lib/slice/slice_utils.h
+ - src/core/lib/slice/static_slice.h
- test/core/promise/test_wakeup_schedulers.h
src:
- - src/core/ext/upb-generated/google/protobuf/any.upb.c
- - src/core/ext/upb-generated/google/rpc/status.upb.c
- - src/core/lib/gpr/alloc.cc
- - src/core/lib/gpr/atm.cc
- - src/core/lib/gpr/cpu_iphone.cc
- - src/core/lib/gpr/cpu_linux.cc
- - src/core/lib/gpr/cpu_posix.cc
- - src/core/lib/gpr/cpu_windows.cc
- - src/core/lib/gpr/env_linux.cc
- - src/core/lib/gpr/env_posix.cc
- - src/core/lib/gpr/env_windows.cc
- - src/core/lib/gpr/log.cc
- - src/core/lib/gpr/log_android.cc
- - src/core/lib/gpr/log_linux.cc
- - src/core/lib/gpr/log_posix.cc
- - src/core/lib/gpr/log_windows.cc
- - src/core/lib/gpr/murmur_hash.cc
- - src/core/lib/gpr/string.cc
- - src/core/lib/gpr/string_posix.cc
- - src/core/lib/gpr/string_util_windows.cc
- - src/core/lib/gpr/string_windows.cc
- - src/core/lib/gpr/sync.cc
- - src/core/lib/gpr/sync_abseil.cc
- - src/core/lib/gpr/sync_posix.cc
- - src/core/lib/gpr/sync_windows.cc
- - src/core/lib/gpr/time.cc
- - src/core/lib/gpr/time_posix.cc
- - src/core/lib/gpr/time_precise.cc
- - src/core/lib/gpr/time_windows.cc
- - src/core/lib/gpr/tmpfile_msys.cc
- - src/core/lib/gpr/tmpfile_posix.cc
- - src/core/lib/gpr/tmpfile_windows.cc
- - src/core/lib/gpr/wrap_memcpy.cc
- - src/core/lib/gprpp/examine_stack.cc
- - src/core/lib/gprpp/fork.cc
- - src/core/lib/gprpp/global_config_env.cc
- - src/core/lib/gprpp/host_port.cc
- - src/core/lib/gprpp/mpscq.cc
- - src/core/lib/gprpp/stat_posix.cc
- - src/core/lib/gprpp/stat_windows.cc
- - src/core/lib/gprpp/status_helper.cc
- - src/core/lib/gprpp/thd_posix.cc
- - src/core/lib/gprpp/thd_windows.cc
- - src/core/lib/gprpp/time_util.cc
- - src/core/lib/profiling/basic_timers.cc
- - src/core/lib/profiling/stap_timers.cc
+ - src/core/lib/debug/trace.cc
+ - src/core/lib/event_engine/memory_allocator.cc
+ - src/core/lib/iomgr/combiner.cc
+ - src/core/lib/iomgr/error.cc
+ - src/core/lib/iomgr/exec_ctx.cc
+ - src/core/lib/iomgr/executor.cc
+ - src/core/lib/iomgr/iomgr_internal.cc
- src/core/lib/promise/activity.cc
+ - src/core/lib/resource_quota/arena.cc
+ - src/core/lib/resource_quota/memory_quota.cc
+ - src/core/lib/resource_quota/resource_quota.cc
+ - src/core/lib/resource_quota/thread_quota.cc
+ - src/core/lib/resource_quota/trace.cc
+ - src/core/lib/slice/slice.cc
+ - src/core/lib/slice/slice_refcount.cc
+ - src/core/lib/slice/slice_string_helpers.cc
+ - src/core/lib/slice/static_slice.cc
- test/core/promise/pipe_test.cc
deps:
- - absl/base:base
- - absl/base:core_headers
- - absl/memory:memory
- - absl/random:random
- - absl/status:status
- absl/status:statusor
- - absl/strings:cord
- - absl/strings:str_format
- - absl/strings:strings
- - absl/synchronization:synchronization
- - absl/time:time
- - absl/types:optional
- absl/types:variant
- - upb
+ - gpr
uses_polling: false
- name: poll_test
gtest: true
diff --git a/src/core/lib/promise/activity.h b/src/core/lib/promise/activity.h
index 5403aa8..792084c 100644
--- a/src/core/lib/promise/activity.h
+++ b/src/core/lib/promise/activity.h
@@ -24,6 +24,7 @@
#include <atomic>
#include <functional>
#include <memory>
+#include <type_traits>
#include <utility>
#include "absl/base/thread_annotations.h"
@@ -267,6 +268,8 @@
template <typename Context>
class ContextHolder {
public:
+ using ContextType = Context;
+
explicit ContextHolder(Context value) : value_(std::move(value)) {}
Context* GetContext() { return &value_; }
@@ -277,6 +280,8 @@
template <typename Context>
class ContextHolder<Context*> {
public:
+ using ContextType = Context;
+
explicit ContextHolder(Context* value) : value_(value) {}
Context* GetContext() { return value_; }
@@ -284,11 +289,35 @@
Context* value_;
};
-template <typename... Contexts>
-class EnterContexts : public promise_detail::Context<Contexts>... {
+template <typename Context, typename Deleter>
+class ContextHolder<std::unique_ptr<Context, Deleter>> {
public:
- explicit EnterContexts(Contexts*... contexts)
- : promise_detail::Context<Contexts>(contexts)... {}
+ using ContextType = Context;
+
+ explicit ContextHolder(std::unique_ptr<Context, Deleter> value)
+ : value_(std::move(value)) {}
+ Context* GetContext() { return value_.get(); }
+
+ private:
+ std::unique_ptr<Context, Deleter> value_;
+};
+
+template <typename HeldContext>
+using ContextTypeFromHeld = typename ContextHolder<HeldContext>::ContextType;
+
+template <typename... Contexts>
+class ActivityContexts : public ContextHolder<Contexts>... {
+ public:
+ explicit ActivityContexts(Contexts&&... contexts)
+ : ContextHolder<Contexts>(std::forward<Contexts>(contexts))... {}
+
+ class ScopedContext : public Context<ContextTypeFromHeld<Contexts>>... {
+ public:
+ explicit ScopedContext(ActivityContexts* contexts)
+ : Context<ContextTypeFromHeld<Contexts>>(
+ static_cast<ContextHolder<Contexts>*>(contexts)
+ ->GetContext())... {}
+ };
};
// Implementation details for an Activity of an arbitrary type of promise.
@@ -303,15 +332,14 @@
// invoked, and that a given activity will not be concurrently scheduled again
// until its RunScheduledWakeup() has been invoked.
template <class F, class WakeupScheduler, class OnDone, typename... Contexts>
-class PromiseActivity final
- : public Activity,
- private promise_detail::ContextHolder<Contexts>... {
+class PromiseActivity final : public Activity,
+ private ActivityContexts<Contexts...> {
public:
using Factory = PromiseFactory<void, F>;
PromiseActivity(F promise_factory, WakeupScheduler wakeup_scheduler,
- OnDone on_done, Contexts... contexts)
+ OnDone on_done, Contexts&&... contexts)
: Activity(),
- ContextHolder<Contexts>(std::move(contexts))...,
+ ActivityContexts<Contexts...>(std::forward<Contexts>(contexts)...),
wakeup_scheduler_(std::move(wakeup_scheduler)),
on_done_(std::move(on_done)) {
// Lock, construct an initial promise from the factory, and step it.
@@ -361,6 +389,8 @@
}
private:
+ using typename ActivityContexts<Contexts...>::ScopedContext;
+
// Wakeup this activity. Arrange to poll the activity again at a convenient
// time: this could be inline if it's deemed safe, or it could be by passing
// the activity to an external threadpool to run. If the activity is already
@@ -412,28 +442,27 @@
}
// The main body of a step: set the current activity, and any contexts, and
- // then run the main polling loop. Contained in a function by itself in order
- // to keep the scoping rules a little easier in Step().
+ // then run the main polling loop. Contained in a function by itself in
+ // order to keep the scoping rules a little easier in Step().
absl::optional<absl::Status> RunStep() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
ScopedActivity scoped_activity(this);
- EnterContexts<Contexts...> contexts(
- static_cast<ContextHolder<Contexts>*>(this)->GetContext()...);
+ ScopedContext contexts(this);
return StepLoop();
}
- // Similarly to RunStep, but additionally construct the promise from a promise
- // factory before entering the main loop. Called once from the constructor.
+ // Similarly to RunStep, but additionally construct the promise from a
+ // promise factory before entering the main loop. Called once from the
+ // constructor.
absl::optional<absl::Status> Start(Factory promise_factory)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
ScopedActivity scoped_activity(this);
- EnterContexts<Contexts...> contexts(
- static_cast<ContextHolder<Contexts>*>(this)->GetContext()...);
+ ScopedContext contexts(this);
Construct(&promise_holder_.promise, promise_factory.Once());
return StepLoop();
}
- // Until there are no wakeups from within and the promise is incomplete: poll
- // the promise.
+ // Until there are no wakeups from within and the promise is incomplete:
+ // poll the promise.
absl::optional<absl::Status> StepLoop() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
GPR_ASSERT(is_current());
while (true) {
@@ -486,12 +515,12 @@
typename... Contexts>
ActivityPtr MakeActivity(Factory promise_factory,
WakeupScheduler wakeup_scheduler, OnDone on_done,
- Contexts... contexts) {
+ Contexts&&... contexts) {
return ActivityPtr(
new promise_detail::PromiseActivity<Factory, WakeupScheduler, OnDone,
Contexts...>(
std::move(promise_factory), std::move(wakeup_scheduler),
- std::move(on_done), std::move(contexts)...));
+ std::move(on_done), std::forward<Contexts>(contexts)...));
}
} // namespace grpc_core
diff --git a/src/core/lib/promise/pipe.h b/src/core/lib/promise/pipe.h
index fb53ff1..c2de5da 100644
--- a/src/core/lib/promise/pipe.h
+++ b/src/core/lib/promise/pipe.h
@@ -31,10 +31,13 @@
#include "absl/types/optional.h"
#include "absl/types/variant.h"
+#include <grpc/support/log.h>
+
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/detail/promise_factory.h"
#include "src/core/lib/promise/intra_activity_waiter.h"
#include "src/core/lib/promise/poll.h"
+#include "src/core/lib/resource_quota/arena.h"
namespace grpc_core {
@@ -52,39 +55,114 @@
template <typename T>
class Next;
-template <class T>
-class Promise {
- public:
- virtual Poll<bool> Step(T* output) = 0;
- virtual void Stop() = 0;
-
- protected:
- inline virtual ~Promise() = default;
-};
-
-struct alignas(alignof(void*)) Scratch {
- uint8_t scratch[32];
-};
-
+// Center sits between a sender and a receiver to provide a one-deep buffer of
+// Ts
template <typename T>
-class FilterInterface {
+class Center {
public:
- FilterInterface() = default;
- FilterInterface(const FilterInterface&) = delete;
- FilterInterface& operator=(const FilterInterface&) = delete;
- virtual Promise<T>* Step(T* p, Scratch* scratch_space) = 0;
- virtual void UpdateReceiver(PipeReceiver<T>* receiver) = 0;
+ // Initialize with one send ref (held by PipeSender) and one recv ref (held by
+ // PipeReceiver)
+ Center() {
+ send_refs_ = 1;
+ recv_refs_ = 1;
+ has_value_ = false;
+ }
- protected:
- inline virtual ~FilterInterface() {}
- static void SetReceiverIndex(PipeReceiver<T>* receiver, int idx,
- FilterInterface* p);
- char AllocIndex(PipeReceiver<T>* receiver);
+ // Add one ref to the send side of this object, and return this.
+ Center* RefSend() {
+ send_refs_++;
+ return this;
+ }
+
+ // Add one ref to the recv side of this object, and return this.
+ Center* RefRecv() {
+ recv_refs_++;
+ return this;
+ }
+
+ // Drop a send side ref
+ // If no send refs remain, wake due to send closure
+ // If no refs remain, destroy this object
+ void UnrefSend() {
+ GPR_DEBUG_ASSERT(send_refs_ > 0);
+ send_refs_--;
+ if (0 == send_refs_) {
+ on_full_.Wake();
+ on_empty_.Wake();
+ if (0 == recv_refs_) {
+ this->~Center();
+ }
+ }
+ }
+
+ // Drop a recv side ref
+ // If no recv refs remain, wake due to recv closure
+ // If no refs remain, destroy this object
+ void UnrefRecv() {
+ GPR_DEBUG_ASSERT(recv_refs_ > 0);
+ recv_refs_--;
+ if (0 == recv_refs_) {
+ on_full_.Wake();
+ on_empty_.Wake();
+ if (0 == send_refs_) {
+ this->~Center();
+ } else if (has_value_) {
+ ResetValue();
+ }
+ }
+ }
+
+ // Try to push *value into the pipe.
+ // Return Pending if there is no space.
+ // Return true if the value was pushed.
+ // Return false if the recv end is closed.
+ Poll<bool> Push(T* value) {
+ GPR_DEBUG_ASSERT(send_refs_ != 0);
+ if (recv_refs_ == 0) return false;
+ if (has_value_) return on_empty_.pending();
+ has_value_ = true;
+ value_ = std::move(*value);
+ on_full_.Wake();
+ return true;
+ }
+
+ // Try to receive a value from the pipe.
+ // Return Pending if there is no value.
+ // Return the value if one was retrieved.
+ // Return nullopt if the send end is closed and no value had been pushed.
+ Poll<absl::optional<T>> Next() {
+ GPR_DEBUG_ASSERT(recv_refs_ != 0);
+ if (!has_value_) {
+ if (send_refs_ == 0) return absl::nullopt;
+ return on_full_.pending();
+ }
+ has_value_ = false;
+ on_empty_.Wake();
+ return std::move(value_);
+ }
+
+ private:
+ void ResetValue() {
+ // Fancy dance to move out of value in the off chance that we reclaim some
+ // memory earlier.
+ [](T) {}(std::move(value_));
+ has_value_ = false;
+ }
+ T value_;
+ // Number of sending objects.
+ // 0 => send is closed.
+ // 1 ref each for PipeSender and Push.
+ uint8_t send_refs_ : 2;
+ // Number of receiving objects.
+ // 0 => recv is closed.
+ // 1 ref each for PipeReceiver and Next.
+ uint8_t recv_refs_ : 2;
+ // True iff there is a value in the pipe.
+ bool has_value_ : 1;
+ IntraActivityWaiter on_empty_;
+ IntraActivityWaiter on_full_;
};
-template <typename T, typename F>
-class Filter;
-
} // namespace pipe_detail
// Send end of a Pipe.
@@ -94,43 +172,18 @@
PipeSender(const PipeSender&) = delete;
PipeSender& operator=(const PipeSender&) = delete;
- PipeSender(PipeSender&& other) noexcept
- : receiver_(other.receiver_), push_(other.push_) {
- if (receiver_ != nullptr) {
- receiver_->sender_ = this;
- other.receiver_ = nullptr;
- }
- if (push_ != nullptr) {
- push_->sender_ = this;
- other.push_ = nullptr;
- }
+ PipeSender(PipeSender&& other) noexcept : center_(other.center_) {
+ other.center_ = nullptr;
}
PipeSender& operator=(PipeSender&& other) noexcept {
- if (receiver_ != nullptr) {
- receiver_->sender_ = nullptr;
- }
- if (push_ != nullptr) {
- push_->sender_ = nullptr;
- }
- receiver_ = other.receiver_;
- if (receiver_ != nullptr) {
- receiver_->sender_ = this;
- other.receiver_ = nullptr;
- }
- if (push_ != nullptr) {
- push_->sender_ = this;
- other.push_ = nullptr;
- }
+ if (center_ != nullptr) center_->UnrefSend();
+ center_ = other.center_;
+ other.center_ = nullptr;
return *this;
}
~PipeSender() {
- if (receiver_ != nullptr) {
- receiver_->MarkClosed();
- }
- if (push_ != nullptr) {
- push_->sender_ = nullptr;
- }
+ if (center_ != nullptr) center_->UnrefSend();
}
// Send a single message along the pipe.
@@ -139,21 +192,10 @@
// receiver is either closed or able to receive another message.
pipe_detail::Push<T> Push(T value);
- // Attach a promise factory based filter to this pipe.
- // The overall promise returned from this will be active until the pipe is
- // closed. If this promise is cancelled before the pipe is closed, the pipe
- // will close. The filter will be run _after_ any other registered filters.
- template <typename F>
- pipe_detail::Filter<T, F> Filter(F f);
-
private:
friend struct Pipe<T>;
- friend class PipeReceiver<T>;
- friend class pipe_detail::Next<T>;
- friend class pipe_detail::Push<T>;
- explicit PipeSender(PipeReceiver<T>* receiver) : receiver_(receiver) {}
- PipeReceiver<T>* receiver_;
- pipe_detail::Push<T>* push_ = nullptr;
+ explicit PipeSender(pipe_detail::Center<T>* center) : center_(center) {}
+ pipe_detail::Center<T>* center_;
};
// Receive end of a Pipe.
@@ -163,56 +205,17 @@
PipeReceiver(const PipeReceiver&) = delete;
PipeReceiver& operator=(const PipeReceiver&) = delete;
- PipeReceiver(PipeReceiver&& other) noexcept
- : sender_(other.sender_),
- next_(other.next_),
- filters_(std::move(other.filters_)),
- pending_(std::move(other.pending_)),
- waiting_to_send_(std::move(other.waiting_to_send_)),
- waiting_to_receive_(other.waiting_to_receive_) {
- if (sender_ != nullptr) {
- sender_->receiver_ = this;
- other.sender_ = nullptr;
- }
- if (next_ != nullptr) {
- next_->receiver_ = this;
- other.next_ = nullptr;
- }
- for (auto filter : filters_) {
- filter->UpdateReceiver(this);
- }
+ PipeReceiver(PipeReceiver&& other) noexcept : center_(other.center_) {
+ other.center_ = nullptr;
}
PipeReceiver& operator=(PipeReceiver&& other) noexcept {
- if (sender_ != nullptr) {
- sender_->receiver_ = nullptr;
- }
- if (next_ != nullptr) {
- next_->receiver_ = nullptr;
- }
- sender_ = other.sender_;
- next_ = other.next_;
- filters_ = std::move(other.filters_);
- for (auto filter : filters_) {
- filter->UpdateReceiver(this);
- }
- pending_ = std::move(other.pending_);
- waiting_to_send_ = std::move(other.waiting_to_send_);
- waiting_to_receive_ = std::move(other.waiting_to_receive_);
- if (sender_ != nullptr) {
- sender_->receiver_ = this;
- other.sender_ = nullptr;
- }
- if (next_ != nullptr) {
- next_->receiver_ = this;
- other.next_ = nullptr;
- }
+ if (center_ != nullptr) center_->UnrefRecv();
+ center_ = other.center_;
+ other.center_ = nullptr;
return *this;
}
~PipeReceiver() {
- MarkClosed();
- if (next_ != nullptr) {
- next_->receiver_ = nullptr;
- }
+ if (center_ != nullptr) center_->UnrefRecv();
}
// Receive a single message from the pipe.
@@ -222,44 +225,10 @@
// available.
pipe_detail::Next<T> Next();
- // Attach a promise factory based filter to this pipe.
- // The overall promise returned from this will be active until the pipe is
- // closed. If this promise is cancelled before the pipe is closed, the pipe
- // will close. The filter will be run _after_ any other registered filters.
- template <typename F>
- pipe_detail::Filter<T, F> Filter(F f);
-
private:
friend struct Pipe<T>;
- friend class PipeSender<T>;
- friend class pipe_detail::Next<T>;
- friend class pipe_detail::Push<T>;
- friend class pipe_detail::FilterInterface<T>;
- explicit PipeReceiver(PipeSender<T>* sender) : sender_(sender) {}
- PipeSender<T>* sender_;
- pipe_detail::Next<T>* next_ = nullptr;
- absl::InlinedVector<pipe_detail::FilterInterface<T>*, 12> filters_;
- absl::optional<T> pending_;
- IntraActivityWaiter waiting_to_send_;
- IntraActivityWaiter waiting_to_receive_;
-
- void MarkClosed() {
- if (sender_ == nullptr) {
- return;
- }
-
- sender_->receiver_ = nullptr;
-
- waiting_to_receive_.Wake();
- waiting_to_send_.Wake();
- sender_ = nullptr;
-
- for (auto* filter : filters_) {
- if (filter != nullptr) {
- filter->UpdateReceiver(nullptr);
- }
- }
- }
+ explicit PipeReceiver(pipe_detail::Center<T>* center) : center_(center) {}
+ pipe_detail::Center<T>* center_;
};
namespace pipe_detail {
@@ -271,55 +240,28 @@
Push(const Push&) = delete;
Push& operator=(const Push&) = delete;
Push(Push&& other) noexcept
- : sender_(other.sender_), push_(std::move(other.push_)) {
- if (sender_ != nullptr) {
- sender_->push_ = this;
- other.sender_ = nullptr;
- }
+ : center_(other.center_), push_(std::move(other.push_)) {
+ other.center_ = nullptr;
}
Push& operator=(Push&& other) noexcept {
- if (sender_ != nullptr) {
- sender_->push_ = nullptr;
- }
- sender_ = other.sender_;
+ if (center_ != nullptr) center_->UnrefSend();
+ center_ = other.center_;
+ other.center_ = nullptr;
push_ = std::move(other.push_);
- if (sender_ != nullptr) {
- sender_->push_ = this;
- other.sender_ = nullptr;
- }
return *this;
}
~Push() {
- if (sender_ != nullptr) {
- assert(sender_->push_ == this);
- sender_->push_ = nullptr;
- }
+ if (center_ != nullptr) center_->UnrefSend();
}
- Poll<bool> operator()() {
- auto* receiver = sender_->receiver_;
- if (receiver == nullptr) {
- return false;
- }
- if (receiver->pending_.has_value()) {
- return receiver->waiting_to_send_.pending();
- }
- receiver->pending_ = std::move(push_);
- receiver->waiting_to_receive_.Wake();
- sender_->push_ = nullptr;
- sender_ = nullptr;
- return true;
- }
+ Poll<bool> operator()() { return center_->Push(&push_); }
private:
friend class PipeSender<T>;
- Push(PipeSender<T>* sender, T push)
- : sender_(sender), push_(std::move(push)) {
- assert(sender_->push_ == nullptr);
- sender_->push_ = this;
- }
- PipeSender<T>* sender_;
+ explicit Push(pipe_detail::Center<T>* center, T push)
+ : center_(center), push_(std::move(push)) {}
+ Center<T>* center_;
T push_;
};
@@ -329,262 +271,56 @@
public:
Next(const Next&) = delete;
Next& operator=(const Next&) = delete;
- Next(Next&& other) noexcept
- : receiver_(other.receiver_),
- next_filter_(other.next_filter_),
- current_promise_(nullptr) {
- assert(other.current_promise_ == nullptr);
- if (receiver_ != nullptr) {
- receiver_->next_ = this;
- other.receiver_ = nullptr;
- }
+ Next(Next&& other) noexcept : center_(other.center_) {
+ other.center_ = nullptr;
}
Next& operator=(Next&& other) noexcept {
- assert(current_promise_ == nullptr);
- assert(other.current_promise_ == nullptr);
- if (receiver_ != nullptr) {
- receiver_->next_ = nullptr;
- }
- receiver_ = other.receiver_;
- next_filter_ = other.next_filter_;
- if (receiver_ != nullptr) {
- receiver_->next_ = this;
- other.receiver_ = nullptr;
- }
+ if (center_ != nullptr) center_->UnrefRecv();
+ center_ = other.center_;
+ other.center_ = nullptr;
return *this;
}
~Next() {
- if (receiver_ != nullptr) {
- assert(receiver_->next_ == this);
- receiver_->next_ = nullptr;
- }
- if (current_promise_ != nullptr) {
- current_promise_->Stop();
- }
+ if (center_ != nullptr) center_->UnrefRecv();
}
- Poll<absl::optional<T>> operator()() {
- if (receiver_->pending_.has_value()) {
- auto* pending = &*receiver_->pending_;
- if (current_promise_ != nullptr) {
- auto r = current_promise_->Step(pending);
- if (auto* p = absl::get_if<kPollReadyIdx>(&r)) {
- current_promise_->Stop();
- current_promise_ = nullptr;
- if (!*p) {
- receiver_->MarkClosed();
- return absl::optional<T>();
- }
- } else {
- return Pending();
- }
- }
- while (true) {
- if (next_filter_ >= receiver_->filters_.size()) {
- auto result = absl::optional<T>(std::move(*pending));
- receiver_->pending_.reset();
- receiver_->waiting_to_send_.Wake();
- receiver_->next_ = nullptr;
- receiver_ = nullptr;
- return result;
- }
- auto* filter = receiver_->filters_[next_filter_];
- current_promise_ = filter ? filter->Step(pending, &scratch_) : nullptr;
- next_filter_++;
- if (current_promise_ ==
- reinterpret_cast<Promise<T>*>(uintptr_t(false))) {
- current_promise_ = nullptr;
- receiver_->MarkClosed();
- return absl::optional<T>();
- } else if (current_promise_ ==
- reinterpret_cast<Promise<T>*>(uintptr_t(true))) {
- current_promise_ = nullptr;
- } else {
- return Pending();
- }
- }
- }
- if (receiver_->sender_ == nullptr) {
- return absl::optional<T>();
- }
- return receiver_->waiting_to_receive_.pending();
- }
+ Poll<absl::optional<T>> operator()() { return center_->Next(); }
private:
friend class PipeReceiver<T>;
- explicit Next(PipeReceiver<T>* receiver) : receiver_(receiver) {
- assert(receiver_->next_ == nullptr);
- receiver_->next_ = this;
- }
- PipeReceiver<T>* receiver_;
- size_t next_filter_ = 0;
- Promise<T>* current_promise_ = nullptr;
- Scratch scratch_;
+ explicit Next(pipe_detail::Center<T>* center) : center_(center) {}
+ Center<T>* center_;
};
-template <typename T, typename F>
-class Filter final : private FilterInterface<T> {
- public:
- Filter(PipeReceiver<T>* receiver, F f)
- : active_{receiver, promise_detail::PromiseFactory<T, F>(std::move(f))},
- index_(this->AllocIndex(receiver)){};
- explicit Filter(absl::Status already_finished)
- : done_(std::move(already_finished)) {}
- ~Filter() {
- if (index_ != kTombstoneIndex) {
- this->SetReceiverIndex(active_.receiver, index_, nullptr);
- active_.~Active();
- } else {
- done_.~Status();
- }
- }
- Filter(Filter&& other) noexcept : index_(other.index_) {
- if (index_ != kTombstoneIndex) {
- new (&active_) Active(std::move(other.active_));
- other.active_.~Active();
- new (&other.done_) absl::Status(absl::OkStatus());
- other.index_ = kTombstoneIndex;
- this->SetReceiverIndex(active_.receiver, index_, this);
- } else {
- new (&done_) absl::Status(std::move(other.done_));
- }
- }
-
- Filter(const Filter&) = delete;
- Filter& operator=(const Filter&) = delete;
-
- Poll<absl::Status> operator()() {
- if (index_ == kTombstoneIndex) {
- return std::move(done_);
- }
- return Pending();
- }
-
- private:
- static constexpr char kTombstoneIndex = -1;
- struct Active {
- GPR_NO_UNIQUE_ADDRESS PipeReceiver<T>* receiver;
- GPR_NO_UNIQUE_ADDRESS promise_detail::PromiseFactory<T, F> factory;
- };
- union {
- GPR_NO_UNIQUE_ADDRESS Active active_;
- GPR_NO_UNIQUE_ADDRESS absl::Status done_;
- };
- GPR_NO_UNIQUE_ADDRESS char index_;
-
- class PromiseImpl final : public ::grpc_core::pipe_detail::Promise<T> {
- using PF = typename promise_detail::PromiseFactory<T, F>::Promise;
-
- public:
- PromiseImpl(PF f, Filter* filter) : f_(std::move(f)), filter_(filter) {}
-
- Poll<bool> Step(T* output) final {
- auto r = f_();
- if (auto* p = absl::get_if<kPollReadyIdx>(&r)) {
- if (p->ok()) {
- *output = std::move(**p);
- return true;
- } else {
- filter_->SetReceiverIndex(filter_->active_.receiver, filter_->index_,
- nullptr);
- filter_->active_.~Active();
- filter_->index_ = kTombstoneIndex;
- new (&filter_->done_) absl::Status(std::move(p->status()));
- Activity::WakeupCurrent();
- return false;
- }
- } else {
- return Pending();
- }
- }
-
- void Stop() final { this->~PromiseImpl(); }
-
- private:
- PF f_;
- Filter* filter_;
- };
-
- Promise<T>* Step(T* p, Scratch* scratch) final {
- if (index_ != kTombstoneIndex) {
- PromiseImpl promise(active_.factory.Repeated(std::move(*p)), this);
- auto r = promise.Step(p);
- if (auto* result = absl::get_if<kPollReadyIdx>(&r)) {
- return reinterpret_cast<Promise<T>*>(uintptr_t(*result));
- }
- static_assert(sizeof(promise) <= sizeof(Scratch),
- "scratch size too small");
- static_assert(alignof(decltype(promise)) <= alignof(Scratch),
- "bad alignment");
- return new (scratch) decltype(promise)(std::move(promise));
- } else {
- return nullptr;
- }
- }
-
- void UpdateReceiver(PipeReceiver<T>* receiver) final {
- if (index_ != kTombstoneIndex) {
- if (receiver == nullptr) {
- active_.~Active();
- index_ = kTombstoneIndex;
- new (&done_) absl::Status(absl::OkStatus());
- } else {
- active_.receiver = receiver;
- }
- Activity::WakeupCurrent();
- }
- }
-};
-
-template <typename T>
-void FilterInterface<T>::SetReceiverIndex(PipeReceiver<T>* receiver, int idx,
- FilterInterface* p) {
- receiver->filters_[idx] = p;
-}
-
-template <typename T>
-char FilterInterface<T>::AllocIndex(PipeReceiver<T>* receiver) {
- auto r = receiver->filters_.size();
- receiver->filters_.push_back(this);
- return r;
-}
-
} // namespace pipe_detail
template <typename T>
pipe_detail::Push<T> PipeSender<T>::Push(T value) {
- return pipe_detail::Push<T>(this, std::move(value));
+ return pipe_detail::Push<T>(center_->RefSend(), std::move(value));
}
template <typename T>
pipe_detail::Next<T> PipeReceiver<T>::Next() {
- return pipe_detail::Next<T>(this);
-}
-
-template <typename T>
-template <typename F>
-pipe_detail::Filter<T, F> PipeSender<T>::Filter(F f) {
- if (receiver_) {
- return pipe_detail::Filter<T, F>(receiver_, std::move(f));
- } else {
- return pipe_detail::Filter<T, F>(absl::OkStatus());
- }
-}
-
-template <typename T>
-template <typename F>
-pipe_detail::Filter<T, F> PipeReceiver<T>::Filter(F f) {
- return pipe_detail::Filter<T, F>(this, std::move(f));
+ return pipe_detail::Next<T>(center_->RefRecv());
}
// A Pipe is an intra-Activity communications channel that transmits T's from
// one end to the other.
// It is only safe to use a Pipe within the context of a single Activity.
// No synchronization is performed internally.
+// The primary Pipe data structure is allocated from an arena, so the activity
+// must have an arena as part of its context.
+// By performing that allocation we can ensure stable pointer to shared data
+// allowing PipeSender/PipeReceiver/Push/Next to be relatively simple in their
+// implementation.
+// This type has been optimized with the expectation that there are relatively
+// few pipes per activity. If this assumption does not hold then a design
+// allowing inline filtering of pipe contents (instead of connecting pipes with
+// polling code) would likely be more appropriate.
template <typename T>
struct Pipe {
- Pipe() : sender(&receiver), receiver(&sender) {}
+ Pipe() : Pipe(GetContext<Arena>()->New<pipe_detail::Center<T>>()) {}
Pipe(const Pipe&) = delete;
Pipe& operator=(const Pipe&) = delete;
Pipe(Pipe&&) noexcept = default;
@@ -592,6 +328,10 @@
PipeSender<T> sender;
PipeReceiver<T> receiver;
+
+ private:
+ explicit Pipe(pipe_detail::Center<T>* center)
+ : sender(center), receiver(center) {}
};
} // namespace grpc_core
diff --git a/test/core/promise/BUILD b/test/core/promise/BUILD
index 4106114..9f36385 100644
--- a/test/core/promise/BUILD
+++ b/test/core/promise/BUILD
@@ -260,6 +260,7 @@
"//:map",
"//:observable",
"//:pipe",
+ "//:resource_quota",
"//:seq",
"//test/core/util:grpc_suppressions",
],
@@ -276,6 +277,7 @@
"//:join",
"//:pipe",
"//:promise",
+ "//:resource_quota",
"//:seq",
"//test/core/util:grpc_suppressions",
],
diff --git a/test/core/promise/for_each_test.cc b/test/core/promise/for_each_test.cc
index d668c3d..ebf1477 100644
--- a/test/core/promise/for_each_test.cc
+++ b/test/core/promise/for_each_test.cc
@@ -22,6 +22,7 @@
#include "src/core/lib/promise/observable.h"
#include "src/core/lib/promise/pipe.h"
#include "src/core/lib/promise/seq.h"
+#include "src/core/lib/resource_quota/resource_quota.h"
#include "test/core/promise/test_wakeup_schedulers.h"
using testing::Mock;
@@ -30,22 +31,25 @@
namespace grpc_core {
+static auto* g_memory_allocator = new MemoryAllocator(
+ ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator("test"));
+
TEST(ForEachTest, SendThriceWithPipe) {
- Pipe<int> pipe;
int num_received = 0;
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
MakeActivity(
- [&pipe, &num_received] {
+ [&num_received] {
+ Pipe<int> pipe;
+ auto sender = std::make_shared<std::unique_ptr<PipeSender<int>>>(
+ absl::make_unique<PipeSender<int>>(std::move(pipe.sender)));
return Map(
Join(
// Push 3 things into a pipe -- 1, 2, then 3 -- then close.
- Seq(
- pipe.sender.Push(1),
- [&pipe] { return pipe.sender.Push(2); },
- [&pipe] { return pipe.sender.Push(3); },
- [&pipe] {
- auto drop = std::move(pipe.sender);
+ Seq((*sender)->Push(1), [sender] { return (*sender)->Push(2); },
+ [sender] { return (*sender)->Push(3); },
+ [sender] {
+ sender->reset();
return absl::OkStatus();
}),
// Use a ForEach loop to read them out and verify all values are
@@ -59,7 +63,8 @@
JustElem<1>());
},
NoWakeupScheduler(),
- [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
+ [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
+ MakeScopedArena(1024, g_memory_allocator));
Mock::VerifyAndClearExpectations(&on_done);
EXPECT_EQ(num_received, 3);
}
diff --git a/test/core/promise/pipe_test.cc b/test/core/promise/pipe_test.cc
index ba38fbe..a202580 100644
--- a/test/core/promise/pipe_test.cc
+++ b/test/core/promise/pipe_test.cc
@@ -22,6 +22,7 @@
#include "src/core/lib/promise/join.h"
#include "src/core/lib/promise/promise.h"
#include "src/core/lib/promise/seq.h"
+#include "src/core/lib/resource_quota/resource_quota.h"
#include "test/core/promise/test_wakeup_schedulers.h"
using testing::MockFunction;
@@ -29,12 +30,15 @@
namespace grpc_core {
+static auto* g_memory_allocator = new MemoryAllocator(
+ ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator("test"));
+
TEST(PipeTest, CanSendAndReceive) {
- Pipe<int> pipe;
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
MakeActivity(
- [&pipe] {
+ [] {
+ Pipe<int> pipe;
return Seq(
// Concurrently: send 42 into the pipe, and receive from the pipe.
Join(pipe.sender.Push(42), pipe.receiver.Next()),
@@ -46,15 +50,16 @@
});
},
NoWakeupScheduler(),
- [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
+ [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
+ MakeScopedArena(1024, g_memory_allocator));
}
TEST(PipeTest, CanReceiveAndSend) {
- Pipe<int> pipe;
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
MakeActivity(
- [&pipe] {
+ [] {
+ Pipe<int> pipe;
return Seq(
// Concurrently: receive from the pipe, and send 42 into the pipe.
Join(pipe.receiver.Next(), pipe.sender.Push(42)),
@@ -66,28 +71,29 @@
});
},
NoWakeupScheduler(),
- [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
+ [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
+ MakeScopedArena(1024, g_memory_allocator));
}
TEST(PipeTest, CanSeeClosedOnSend) {
- Pipe<int> pipe;
StrictMock<MockFunction<void(absl::Status)>> on_done;
- auto sender = std::move(pipe.sender);
- auto receiver =
- absl::make_unique<PipeReceiver<int>>(std::move(pipe.receiver));
EXPECT_CALL(on_done, Call(absl::OkStatus()));
- // Push 42 onto the pipe - this will the pipe's one-deep send buffer.
- EXPECT_TRUE(NowOrNever(sender.Push(42)).has_value());
MakeActivity(
- [&sender, &receiver] {
+ [] {
+ Pipe<int> pipe;
+ auto sender = std::move(pipe.sender);
+ // Push 42 onto the pipe - this will the pipe's one-deep send buffer.
+ EXPECT_TRUE(NowOrNever(sender.Push(42)).has_value());
+ auto receiver = std::make_shared<std::unique_ptr<PipeReceiver<int>>>(
+ absl::make_unique<PipeReceiver<int>>(std::move(pipe.receiver)));
return Seq(
// Concurrently:
// - push 43 into the sender, which will stall because the buffer is
// full
// - and close the receiver, which will fail the pending send.
Join(sender.Push(43),
- [&receiver] {
- receiver.reset();
+ [receiver] {
+ receiver->reset();
return absl::OkStatus();
}),
// Verify both that the send failed and that we executed the close.
@@ -97,17 +103,19 @@
});
},
NoWakeupScheduler(),
- [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
+ [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
+ MakeScopedArena(1024, g_memory_allocator));
}
TEST(PipeTest, CanSeeClosedOnReceive) {
- Pipe<int> pipe;
StrictMock<MockFunction<void(absl::Status)>> on_done;
- auto sender = absl::make_unique<PipeSender<int>>(std::move(pipe.sender));
- auto receiver = std::move(pipe.receiver);
EXPECT_CALL(on_done, Call(absl::OkStatus()));
MakeActivity(
- [&sender, &receiver] {
+ [] {
+ Pipe<int> pipe;
+ auto sender = std::make_shared<std::unique_ptr<PipeSender<int>>>(
+ absl::make_unique<PipeSender<int>>(std::move(pipe.sender)));
+ auto receiver = std::move(pipe.receiver);
return Seq(
// Concurrently:
// - wait for a received value (will stall forever since we push
@@ -115,8 +123,8 @@
// - close the sender, which will signal the receiver to return an
// end-of-stream.
Join(receiver.Next(),
- [&sender] {
- sender.reset();
+ [sender] {
+ sender->reset();
return absl::OkStatus();
}),
// Verify we received end-of-stream and closed the sender.
@@ -127,49 +135,8 @@
});
},
NoWakeupScheduler(),
- [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
-}
-
-TEST(PipeTest, CanFilter) {
- Pipe<int> pipe;
- StrictMock<MockFunction<void(absl::Status)>> on_done;
- EXPECT_CALL(on_done, Call(absl::OkStatus()));
- MakeActivity(
- [&pipe] {
- // Setup some filters here, carefully getting ordering correct by doing
- // so outside of the Join() since C++ does not define execution order
- // between arguments.
- // TODO(ctiller): A future change to Pipe will specify an ordering
- // between filters added to sender and receiver, at which point these
- // should move back.
- auto doubler = pipe.receiver.Filter(
- [](int p) { return absl::StatusOr<int>(p * 2); });
- auto adder = pipe.sender.Filter(
- [](int p) { return absl::StatusOr<int>(p + 1); });
- return Seq(
- // Concurrently:
- // - push 42 into the pipe
- // - wait for a value to be received, and filter it by doubling it
- // - wait for a value to be received, and filter it by adding one to
- // it
- // - wait for a value to be received and close the pipe.
- Join(pipe.sender.Push(42), std::move(doubler), std::move(adder),
- Seq(pipe.receiver.Next(),
- [&pipe](absl::optional<int> i) {
- auto x = std::move(pipe.receiver);
- return i;
- })),
- // Verify all of the above happened correctly.
- [](std::tuple<bool, absl::Status, absl::Status, absl::optional<int>>
- result) {
- EXPECT_EQ(result, std::make_tuple(true, absl::OkStatus(),
- absl::OkStatus(),
- absl::optional<int>(85)));
- return absl::OkStatus();
- });
- },
- NoWakeupScheduler(),
- [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
+ [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
+ MakeScopedArena(1024, g_memory_allocator));
}
} // namespace grpc_core