Revert "Revert "Promise pipe redux (#28319)" (#28364)" (#28400)

* Revert "Revert "Promise pipe redux (#28319)" (#28364)"

This reverts commit 3df2a4eceabbbcc5522c70ad3da837f3b46c285a.

* rephrase to be hopefully more readable to msvc

* golfing
diff --git a/BUILD b/BUILD
index 4dd7d77..e03cd6a 100644
--- a/BUILD
+++ b/BUILD
@@ -1372,6 +1372,7 @@
     ],
     deps = [
         "activity",
+        "arena",
         "gpr_platform",
         "intra_activity_waiter",
     ],
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 408e906..e02b006 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -10678,53 +10678,23 @@
 if(gRPC_BUILD_TESTS)
 
 add_executable(for_each_test
-  src/core/ext/upb-generated/google/protobuf/any.upb.c
-  src/core/ext/upb-generated/google/rpc/status.upb.c
-  src/core/lib/gpr/alloc.cc
-  src/core/lib/gpr/atm.cc
-  src/core/lib/gpr/cpu_iphone.cc
-  src/core/lib/gpr/cpu_linux.cc
-  src/core/lib/gpr/cpu_posix.cc
-  src/core/lib/gpr/cpu_windows.cc
-  src/core/lib/gpr/env_linux.cc
-  src/core/lib/gpr/env_posix.cc
-  src/core/lib/gpr/env_windows.cc
-  src/core/lib/gpr/log.cc
-  src/core/lib/gpr/log_android.cc
-  src/core/lib/gpr/log_linux.cc
-  src/core/lib/gpr/log_posix.cc
-  src/core/lib/gpr/log_windows.cc
-  src/core/lib/gpr/murmur_hash.cc
-  src/core/lib/gpr/string.cc
-  src/core/lib/gpr/string_posix.cc
-  src/core/lib/gpr/string_util_windows.cc
-  src/core/lib/gpr/string_windows.cc
-  src/core/lib/gpr/sync.cc
-  src/core/lib/gpr/sync_abseil.cc
-  src/core/lib/gpr/sync_posix.cc
-  src/core/lib/gpr/sync_windows.cc
-  src/core/lib/gpr/time.cc
-  src/core/lib/gpr/time_posix.cc
-  src/core/lib/gpr/time_precise.cc
-  src/core/lib/gpr/time_windows.cc
-  src/core/lib/gpr/tmpfile_msys.cc
-  src/core/lib/gpr/tmpfile_posix.cc
-  src/core/lib/gpr/tmpfile_windows.cc
-  src/core/lib/gpr/wrap_memcpy.cc
-  src/core/lib/gprpp/examine_stack.cc
-  src/core/lib/gprpp/fork.cc
-  src/core/lib/gprpp/global_config_env.cc
-  src/core/lib/gprpp/host_port.cc
-  src/core/lib/gprpp/mpscq.cc
-  src/core/lib/gprpp/stat_posix.cc
-  src/core/lib/gprpp/stat_windows.cc
-  src/core/lib/gprpp/status_helper.cc
-  src/core/lib/gprpp/thd_posix.cc
-  src/core/lib/gprpp/thd_windows.cc
-  src/core/lib/gprpp/time_util.cc
-  src/core/lib/profiling/basic_timers.cc
-  src/core/lib/profiling/stap_timers.cc
+  src/core/lib/debug/trace.cc
+  src/core/lib/event_engine/memory_allocator.cc
+  src/core/lib/iomgr/combiner.cc
+  src/core/lib/iomgr/error.cc
+  src/core/lib/iomgr/exec_ctx.cc
+  src/core/lib/iomgr/executor.cc
+  src/core/lib/iomgr/iomgr_internal.cc
   src/core/lib/promise/activity.cc
+  src/core/lib/resource_quota/arena.cc
+  src/core/lib/resource_quota/memory_quota.cc
+  src/core/lib/resource_quota/resource_quota.cc
+  src/core/lib/resource_quota/thread_quota.cc
+  src/core/lib/resource_quota/trace.cc
+  src/core/lib/slice/slice.cc
+  src/core/lib/slice/slice_refcount.cc
+  src/core/lib/slice/slice_string_helpers.cc
+  src/core/lib/slice/static_slice.cc
   test/core/promise/for_each_test.cc
   third_party/googletest/googletest/src/gtest-all.cc
   third_party/googletest/googlemock/src/gmock-all.cc
@@ -10752,21 +10722,10 @@
 target_link_libraries(for_each_test
   ${_gRPC_PROTOBUF_LIBRARIES}
   ${_gRPC_ALLTARGETS_LIBRARIES}
-  absl::base
-  absl::core_headers
   absl::flat_hash_set
-  absl::memory
-  absl::random_random
-  absl::status
   absl::statusor
-  absl::cord
-  absl::str_format
-  absl::strings
-  absl::synchronization
-  absl::time
-  absl::optional
   absl::variant
-  upb
+  gpr
 )
 
 
@@ -13349,53 +13308,23 @@
 if(gRPC_BUILD_TESTS)
 
 add_executable(pipe_test
-  src/core/ext/upb-generated/google/protobuf/any.upb.c
-  src/core/ext/upb-generated/google/rpc/status.upb.c
-  src/core/lib/gpr/alloc.cc
-  src/core/lib/gpr/atm.cc
-  src/core/lib/gpr/cpu_iphone.cc
-  src/core/lib/gpr/cpu_linux.cc
-  src/core/lib/gpr/cpu_posix.cc
-  src/core/lib/gpr/cpu_windows.cc
-  src/core/lib/gpr/env_linux.cc
-  src/core/lib/gpr/env_posix.cc
-  src/core/lib/gpr/env_windows.cc
-  src/core/lib/gpr/log.cc
-  src/core/lib/gpr/log_android.cc
-  src/core/lib/gpr/log_linux.cc
-  src/core/lib/gpr/log_posix.cc
-  src/core/lib/gpr/log_windows.cc
-  src/core/lib/gpr/murmur_hash.cc
-  src/core/lib/gpr/string.cc
-  src/core/lib/gpr/string_posix.cc
-  src/core/lib/gpr/string_util_windows.cc
-  src/core/lib/gpr/string_windows.cc
-  src/core/lib/gpr/sync.cc
-  src/core/lib/gpr/sync_abseil.cc
-  src/core/lib/gpr/sync_posix.cc
-  src/core/lib/gpr/sync_windows.cc
-  src/core/lib/gpr/time.cc
-  src/core/lib/gpr/time_posix.cc
-  src/core/lib/gpr/time_precise.cc
-  src/core/lib/gpr/time_windows.cc
-  src/core/lib/gpr/tmpfile_msys.cc
-  src/core/lib/gpr/tmpfile_posix.cc
-  src/core/lib/gpr/tmpfile_windows.cc
-  src/core/lib/gpr/wrap_memcpy.cc
-  src/core/lib/gprpp/examine_stack.cc
-  src/core/lib/gprpp/fork.cc
-  src/core/lib/gprpp/global_config_env.cc
-  src/core/lib/gprpp/host_port.cc
-  src/core/lib/gprpp/mpscq.cc
-  src/core/lib/gprpp/stat_posix.cc
-  src/core/lib/gprpp/stat_windows.cc
-  src/core/lib/gprpp/status_helper.cc
-  src/core/lib/gprpp/thd_posix.cc
-  src/core/lib/gprpp/thd_windows.cc
-  src/core/lib/gprpp/time_util.cc
-  src/core/lib/profiling/basic_timers.cc
-  src/core/lib/profiling/stap_timers.cc
+  src/core/lib/debug/trace.cc
+  src/core/lib/event_engine/memory_allocator.cc
+  src/core/lib/iomgr/combiner.cc
+  src/core/lib/iomgr/error.cc
+  src/core/lib/iomgr/exec_ctx.cc
+  src/core/lib/iomgr/executor.cc
+  src/core/lib/iomgr/iomgr_internal.cc
   src/core/lib/promise/activity.cc
+  src/core/lib/resource_quota/arena.cc
+  src/core/lib/resource_quota/memory_quota.cc
+  src/core/lib/resource_quota/resource_quota.cc
+  src/core/lib/resource_quota/thread_quota.cc
+  src/core/lib/resource_quota/trace.cc
+  src/core/lib/slice/slice.cc
+  src/core/lib/slice/slice_refcount.cc
+  src/core/lib/slice/slice_string_helpers.cc
+  src/core/lib/slice/static_slice.cc
   test/core/promise/pipe_test.cc
   third_party/googletest/googletest/src/gtest-all.cc
   third_party/googletest/googlemock/src/gmock-all.cc
@@ -13423,20 +13352,9 @@
 target_link_libraries(pipe_test
   ${_gRPC_PROTOBUF_LIBRARIES}
   ${_gRPC_ALLTARGETS_LIBRARIES}
-  absl::base
-  absl::core_headers
-  absl::memory
-  absl::random_random
-  absl::status
   absl::statusor
-  absl::cord
-  absl::str_format
-  absl::strings
-  absl::synchronization
-  absl::time
-  absl::optional
   absl::variant
-  upb
+  gpr
 )
 
 
diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml
index 25c32fe..bcb9330 100644
--- a/build_autogenerated.yaml
+++ b/build_autogenerated.yaml
@@ -5839,38 +5839,21 @@
   build: test
   language: c++
   headers:
-  - src/core/ext/upb-generated/google/protobuf/any.upb.h
-  - src/core/ext/upb-generated/google/rpc/status.upb.h
-  - src/core/lib/gpr/alloc.h
-  - src/core/lib/gpr/env.h
-  - src/core/lib/gpr/murmur_hash.h
-  - src/core/lib/gpr/spinlock.h
-  - src/core/lib/gpr/string.h
-  - src/core/lib/gpr/string_windows.h
-  - src/core/lib/gpr/time_precise.h
-  - src/core/lib/gpr/tls.h
-  - src/core/lib/gpr/tmpfile.h
-  - src/core/lib/gpr/useful.h
+  - src/core/lib/debug/trace.h
   - src/core/lib/gprpp/atomic_utils.h
   - src/core/lib/gprpp/bitset.h
-  - src/core/lib/gprpp/construct_destruct.h
-  - src/core/lib/gprpp/debug_location.h
-  - src/core/lib/gprpp/examine_stack.h
-  - src/core/lib/gprpp/fork.h
-  - src/core/lib/gprpp/global_config.h
-  - src/core/lib/gprpp/global_config_custom.h
-  - src/core/lib/gprpp/global_config_env.h
-  - src/core/lib/gprpp/global_config_generic.h
-  - src/core/lib/gprpp/host_port.h
-  - src/core/lib/gprpp/manual_constructor.h
-  - src/core/lib/gprpp/memory.h
-  - src/core/lib/gprpp/mpscq.h
-  - src/core/lib/gprpp/stat.h
-  - src/core/lib/gprpp/status_helper.h
-  - src/core/lib/gprpp/sync.h
-  - src/core/lib/gprpp/thd.h
-  - src/core/lib/gprpp/time_util.h
-  - src/core/lib/profiling/timers.h
+  - src/core/lib/gprpp/cpp_impl_of.h
+  - src/core/lib/gprpp/dual_ref_counted.h
+  - src/core/lib/gprpp/orphanable.h
+  - src/core/lib/gprpp/ref_counted.h
+  - src/core/lib/gprpp/ref_counted_ptr.h
+  - src/core/lib/iomgr/closure.h
+  - src/core/lib/iomgr/combiner.h
+  - src/core/lib/iomgr/error.h
+  - src/core/lib/iomgr/error_internal.h
+  - src/core/lib/iomgr/exec_ctx.h
+  - src/core/lib/iomgr/executor.h
+  - src/core/lib/iomgr/iomgr_internal.h
   - src/core/lib/promise/activity.h
   - src/core/lib/promise/context.h
   - src/core/lib/promise/detail/basic_join.h
@@ -5879,81 +5862,55 @@
   - src/core/lib/promise/detail/promise_like.h
   - src/core/lib/promise/detail/status.h
   - src/core/lib/promise/detail/switch.h
+  - src/core/lib/promise/exec_ctx_wakeup_scheduler.h
   - src/core/lib/promise/for_each.h
   - src/core/lib/promise/intra_activity_waiter.h
   - src/core/lib/promise/join.h
+  - src/core/lib/promise/loop.h
   - src/core/lib/promise/map.h
   - src/core/lib/promise/observable.h
   - src/core/lib/promise/pipe.h
   - src/core/lib/promise/poll.h
+  - src/core/lib/promise/race.h
   - src/core/lib/promise/seq.h
   - src/core/lib/promise/wait_set.h
+  - src/core/lib/resource_quota/arena.h
+  - src/core/lib/resource_quota/memory_quota.h
+  - src/core/lib/resource_quota/resource_quota.h
+  - src/core/lib/resource_quota/thread_quota.h
+  - src/core/lib/resource_quota/trace.h
+  - src/core/lib/slice/slice.h
+  - src/core/lib/slice/slice_internal.h
+  - src/core/lib/slice/slice_refcount.h
+  - src/core/lib/slice/slice_refcount_base.h
+  - src/core/lib/slice/slice_string_helpers.h
+  - src/core/lib/slice/slice_utils.h
+  - src/core/lib/slice/static_slice.h
   - test/core/promise/test_wakeup_schedulers.h
   src:
-  - src/core/ext/upb-generated/google/protobuf/any.upb.c
-  - src/core/ext/upb-generated/google/rpc/status.upb.c
-  - src/core/lib/gpr/alloc.cc
-  - src/core/lib/gpr/atm.cc
-  - src/core/lib/gpr/cpu_iphone.cc
-  - src/core/lib/gpr/cpu_linux.cc
-  - src/core/lib/gpr/cpu_posix.cc
-  - src/core/lib/gpr/cpu_windows.cc
-  - src/core/lib/gpr/env_linux.cc
-  - src/core/lib/gpr/env_posix.cc
-  - src/core/lib/gpr/env_windows.cc
-  - src/core/lib/gpr/log.cc
-  - src/core/lib/gpr/log_android.cc
-  - src/core/lib/gpr/log_linux.cc
-  - src/core/lib/gpr/log_posix.cc
-  - src/core/lib/gpr/log_windows.cc
-  - src/core/lib/gpr/murmur_hash.cc
-  - src/core/lib/gpr/string.cc
-  - src/core/lib/gpr/string_posix.cc
-  - src/core/lib/gpr/string_util_windows.cc
-  - src/core/lib/gpr/string_windows.cc
-  - src/core/lib/gpr/sync.cc
-  - src/core/lib/gpr/sync_abseil.cc
-  - src/core/lib/gpr/sync_posix.cc
-  - src/core/lib/gpr/sync_windows.cc
-  - src/core/lib/gpr/time.cc
-  - src/core/lib/gpr/time_posix.cc
-  - src/core/lib/gpr/time_precise.cc
-  - src/core/lib/gpr/time_windows.cc
-  - src/core/lib/gpr/tmpfile_msys.cc
-  - src/core/lib/gpr/tmpfile_posix.cc
-  - src/core/lib/gpr/tmpfile_windows.cc
-  - src/core/lib/gpr/wrap_memcpy.cc
-  - src/core/lib/gprpp/examine_stack.cc
-  - src/core/lib/gprpp/fork.cc
-  - src/core/lib/gprpp/global_config_env.cc
-  - src/core/lib/gprpp/host_port.cc
-  - src/core/lib/gprpp/mpscq.cc
-  - src/core/lib/gprpp/stat_posix.cc
-  - src/core/lib/gprpp/stat_windows.cc
-  - src/core/lib/gprpp/status_helper.cc
-  - src/core/lib/gprpp/thd_posix.cc
-  - src/core/lib/gprpp/thd_windows.cc
-  - src/core/lib/gprpp/time_util.cc
-  - src/core/lib/profiling/basic_timers.cc
-  - src/core/lib/profiling/stap_timers.cc
+  - src/core/lib/debug/trace.cc
+  - src/core/lib/event_engine/memory_allocator.cc
+  - src/core/lib/iomgr/combiner.cc
+  - src/core/lib/iomgr/error.cc
+  - src/core/lib/iomgr/exec_ctx.cc
+  - src/core/lib/iomgr/executor.cc
+  - src/core/lib/iomgr/iomgr_internal.cc
   - src/core/lib/promise/activity.cc
+  - src/core/lib/resource_quota/arena.cc
+  - src/core/lib/resource_quota/memory_quota.cc
+  - src/core/lib/resource_quota/resource_quota.cc
+  - src/core/lib/resource_quota/thread_quota.cc
+  - src/core/lib/resource_quota/trace.cc
+  - src/core/lib/slice/slice.cc
+  - src/core/lib/slice/slice_refcount.cc
+  - src/core/lib/slice/slice_string_helpers.cc
+  - src/core/lib/slice/static_slice.cc
   - test/core/promise/for_each_test.cc
   deps:
-  - absl/base:base
-  - absl/base:core_headers
   - absl/container:flat_hash_set
-  - absl/memory:memory
-  - absl/random:random
-  - absl/status:status
   - absl/status:statusor
-  - absl/strings:cord
-  - absl/strings:str_format
-  - absl/strings:strings
-  - absl/synchronization:synchronization
-  - absl/time:time
-  - absl/types:optional
   - absl/types:variant
-  - upb
+  - gpr
   uses_polling: false
 - name: generic_end2end_test
   gtest: true
@@ -6969,38 +6926,21 @@
   build: test
   language: c++
   headers:
-  - src/core/ext/upb-generated/google/protobuf/any.upb.h
-  - src/core/ext/upb-generated/google/rpc/status.upb.h
-  - src/core/lib/gpr/alloc.h
-  - src/core/lib/gpr/env.h
-  - src/core/lib/gpr/murmur_hash.h
-  - src/core/lib/gpr/spinlock.h
-  - src/core/lib/gpr/string.h
-  - src/core/lib/gpr/string_windows.h
-  - src/core/lib/gpr/time_precise.h
-  - src/core/lib/gpr/tls.h
-  - src/core/lib/gpr/tmpfile.h
-  - src/core/lib/gpr/useful.h
+  - src/core/lib/debug/trace.h
   - src/core/lib/gprpp/atomic_utils.h
   - src/core/lib/gprpp/bitset.h
-  - src/core/lib/gprpp/construct_destruct.h
-  - src/core/lib/gprpp/debug_location.h
-  - src/core/lib/gprpp/examine_stack.h
-  - src/core/lib/gprpp/fork.h
-  - src/core/lib/gprpp/global_config.h
-  - src/core/lib/gprpp/global_config_custom.h
-  - src/core/lib/gprpp/global_config_env.h
-  - src/core/lib/gprpp/global_config_generic.h
-  - src/core/lib/gprpp/host_port.h
-  - src/core/lib/gprpp/manual_constructor.h
-  - src/core/lib/gprpp/memory.h
-  - src/core/lib/gprpp/mpscq.h
-  - src/core/lib/gprpp/stat.h
-  - src/core/lib/gprpp/status_helper.h
-  - src/core/lib/gprpp/sync.h
-  - src/core/lib/gprpp/thd.h
-  - src/core/lib/gprpp/time_util.h
-  - src/core/lib/profiling/timers.h
+  - src/core/lib/gprpp/cpp_impl_of.h
+  - src/core/lib/gprpp/dual_ref_counted.h
+  - src/core/lib/gprpp/orphanable.h
+  - src/core/lib/gprpp/ref_counted.h
+  - src/core/lib/gprpp/ref_counted_ptr.h
+  - src/core/lib/iomgr/closure.h
+  - src/core/lib/iomgr/combiner.h
+  - src/core/lib/iomgr/error.h
+  - src/core/lib/iomgr/error_internal.h
+  - src/core/lib/iomgr/exec_ctx.h
+  - src/core/lib/iomgr/executor.h
+  - src/core/lib/iomgr/iomgr_internal.h
   - src/core/lib/promise/activity.h
   - src/core/lib/promise/context.h
   - src/core/lib/promise/detail/basic_join.h
@@ -7009,77 +6949,52 @@
   - src/core/lib/promise/detail/promise_like.h
   - src/core/lib/promise/detail/status.h
   - src/core/lib/promise/detail/switch.h
+  - src/core/lib/promise/exec_ctx_wakeup_scheduler.h
   - src/core/lib/promise/intra_activity_waiter.h
   - src/core/lib/promise/join.h
+  - src/core/lib/promise/loop.h
+  - src/core/lib/promise/map.h
   - src/core/lib/promise/pipe.h
   - src/core/lib/promise/poll.h
   - src/core/lib/promise/promise.h
+  - src/core/lib/promise/race.h
   - src/core/lib/promise/seq.h
+  - src/core/lib/resource_quota/arena.h
+  - src/core/lib/resource_quota/memory_quota.h
+  - src/core/lib/resource_quota/resource_quota.h
+  - src/core/lib/resource_quota/thread_quota.h
+  - src/core/lib/resource_quota/trace.h
+  - src/core/lib/slice/slice.h
+  - src/core/lib/slice/slice_internal.h
+  - src/core/lib/slice/slice_refcount.h
+  - src/core/lib/slice/slice_refcount_base.h
+  - src/core/lib/slice/slice_string_helpers.h
+  - src/core/lib/slice/slice_utils.h
+  - src/core/lib/slice/static_slice.h
   - test/core/promise/test_wakeup_schedulers.h
   src:
-  - src/core/ext/upb-generated/google/protobuf/any.upb.c
-  - src/core/ext/upb-generated/google/rpc/status.upb.c
-  - src/core/lib/gpr/alloc.cc
-  - src/core/lib/gpr/atm.cc
-  - src/core/lib/gpr/cpu_iphone.cc
-  - src/core/lib/gpr/cpu_linux.cc
-  - src/core/lib/gpr/cpu_posix.cc
-  - src/core/lib/gpr/cpu_windows.cc
-  - src/core/lib/gpr/env_linux.cc
-  - src/core/lib/gpr/env_posix.cc
-  - src/core/lib/gpr/env_windows.cc
-  - src/core/lib/gpr/log.cc
-  - src/core/lib/gpr/log_android.cc
-  - src/core/lib/gpr/log_linux.cc
-  - src/core/lib/gpr/log_posix.cc
-  - src/core/lib/gpr/log_windows.cc
-  - src/core/lib/gpr/murmur_hash.cc
-  - src/core/lib/gpr/string.cc
-  - src/core/lib/gpr/string_posix.cc
-  - src/core/lib/gpr/string_util_windows.cc
-  - src/core/lib/gpr/string_windows.cc
-  - src/core/lib/gpr/sync.cc
-  - src/core/lib/gpr/sync_abseil.cc
-  - src/core/lib/gpr/sync_posix.cc
-  - src/core/lib/gpr/sync_windows.cc
-  - src/core/lib/gpr/time.cc
-  - src/core/lib/gpr/time_posix.cc
-  - src/core/lib/gpr/time_precise.cc
-  - src/core/lib/gpr/time_windows.cc
-  - src/core/lib/gpr/tmpfile_msys.cc
-  - src/core/lib/gpr/tmpfile_posix.cc
-  - src/core/lib/gpr/tmpfile_windows.cc
-  - src/core/lib/gpr/wrap_memcpy.cc
-  - src/core/lib/gprpp/examine_stack.cc
-  - src/core/lib/gprpp/fork.cc
-  - src/core/lib/gprpp/global_config_env.cc
-  - src/core/lib/gprpp/host_port.cc
-  - src/core/lib/gprpp/mpscq.cc
-  - src/core/lib/gprpp/stat_posix.cc
-  - src/core/lib/gprpp/stat_windows.cc
-  - src/core/lib/gprpp/status_helper.cc
-  - src/core/lib/gprpp/thd_posix.cc
-  - src/core/lib/gprpp/thd_windows.cc
-  - src/core/lib/gprpp/time_util.cc
-  - src/core/lib/profiling/basic_timers.cc
-  - src/core/lib/profiling/stap_timers.cc
+  - src/core/lib/debug/trace.cc
+  - src/core/lib/event_engine/memory_allocator.cc
+  - src/core/lib/iomgr/combiner.cc
+  - src/core/lib/iomgr/error.cc
+  - src/core/lib/iomgr/exec_ctx.cc
+  - src/core/lib/iomgr/executor.cc
+  - src/core/lib/iomgr/iomgr_internal.cc
   - src/core/lib/promise/activity.cc
+  - src/core/lib/resource_quota/arena.cc
+  - src/core/lib/resource_quota/memory_quota.cc
+  - src/core/lib/resource_quota/resource_quota.cc
+  - src/core/lib/resource_quota/thread_quota.cc
+  - src/core/lib/resource_quota/trace.cc
+  - src/core/lib/slice/slice.cc
+  - src/core/lib/slice/slice_refcount.cc
+  - src/core/lib/slice/slice_string_helpers.cc
+  - src/core/lib/slice/static_slice.cc
   - test/core/promise/pipe_test.cc
   deps:
-  - absl/base:base
-  - absl/base:core_headers
-  - absl/memory:memory
-  - absl/random:random
-  - absl/status:status
   - absl/status:statusor
-  - absl/strings:cord
-  - absl/strings:str_format
-  - absl/strings:strings
-  - absl/synchronization:synchronization
-  - absl/time:time
-  - absl/types:optional
   - absl/types:variant
-  - upb
+  - gpr
   uses_polling: false
 - name: poll_test
   gtest: true
diff --git a/src/core/lib/promise/activity.h b/src/core/lib/promise/activity.h
index 5403aa8..792084c 100644
--- a/src/core/lib/promise/activity.h
+++ b/src/core/lib/promise/activity.h
@@ -24,6 +24,7 @@
 #include <atomic>
 #include <functional>
 #include <memory>
+#include <type_traits>
 #include <utility>
 
 #include "absl/base/thread_annotations.h"
@@ -267,6 +268,8 @@
 template <typename Context>
 class ContextHolder {
  public:
+  using ContextType = Context;
+
   explicit ContextHolder(Context value) : value_(std::move(value)) {}
   Context* GetContext() { return &value_; }
 
@@ -277,6 +280,8 @@
 template <typename Context>
 class ContextHolder<Context*> {
  public:
+  using ContextType = Context;
+
   explicit ContextHolder(Context* value) : value_(value) {}
   Context* GetContext() { return value_; }
 
@@ -284,11 +289,35 @@
   Context* value_;
 };
 
-template <typename... Contexts>
-class EnterContexts : public promise_detail::Context<Contexts>... {
+template <typename Context, typename Deleter>
+class ContextHolder<std::unique_ptr<Context, Deleter>> {
  public:
-  explicit EnterContexts(Contexts*... contexts)
-      : promise_detail::Context<Contexts>(contexts)... {}
+  using ContextType = Context;
+
+  explicit ContextHolder(std::unique_ptr<Context, Deleter> value)
+      : value_(std::move(value)) {}
+  Context* GetContext() { return value_.get(); }
+
+ private:
+  std::unique_ptr<Context, Deleter> value_;
+};
+
+template <typename HeldContext>
+using ContextTypeFromHeld = typename ContextHolder<HeldContext>::ContextType;
+
+template <typename... Contexts>
+class ActivityContexts : public ContextHolder<Contexts>... {
+ public:
+  explicit ActivityContexts(Contexts&&... contexts)
+      : ContextHolder<Contexts>(std::forward<Contexts>(contexts))... {}
+
+  class ScopedContext : public Context<ContextTypeFromHeld<Contexts>>... {
+   public:
+    explicit ScopedContext(ActivityContexts* contexts)
+        : Context<ContextTypeFromHeld<Contexts>>(
+              static_cast<ContextHolder<Contexts>*>(contexts)
+                  ->GetContext())... {}
+  };
 };
 
 // Implementation details for an Activity of an arbitrary type of promise.
@@ -303,15 +332,14 @@
 // invoked, and that a given activity will not be concurrently scheduled again
 // until its RunScheduledWakeup() has been invoked.
 template <class F, class WakeupScheduler, class OnDone, typename... Contexts>
-class PromiseActivity final
-    : public Activity,
-      private promise_detail::ContextHolder<Contexts>... {
+class PromiseActivity final : public Activity,
+                              private ActivityContexts<Contexts...> {
  public:
   using Factory = PromiseFactory<void, F>;
   PromiseActivity(F promise_factory, WakeupScheduler wakeup_scheduler,
-                  OnDone on_done, Contexts... contexts)
+                  OnDone on_done, Contexts&&... contexts)
       : Activity(),
-        ContextHolder<Contexts>(std::move(contexts))...,
+        ActivityContexts<Contexts...>(std::forward<Contexts>(contexts)...),
         wakeup_scheduler_(std::move(wakeup_scheduler)),
         on_done_(std::move(on_done)) {
     // Lock, construct an initial promise from the factory, and step it.
@@ -361,6 +389,8 @@
   }
 
  private:
+  using typename ActivityContexts<Contexts...>::ScopedContext;
+
   // Wakeup this activity. Arrange to poll the activity again at a convenient
   // time: this could be inline if it's deemed safe, or it could be by passing
   // the activity to an external threadpool to run. If the activity is already
@@ -412,28 +442,27 @@
   }
 
   // The main body of a step: set the current activity, and any contexts, and
-  // then run the main polling loop. Contained in a function by itself in order
-  // to keep the scoping rules a little easier in Step().
+  // then run the main polling loop. Contained in a function by itself in
+  // order to keep the scoping rules a little easier in Step().
   absl::optional<absl::Status> RunStep() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
     ScopedActivity scoped_activity(this);
-    EnterContexts<Contexts...> contexts(
-        static_cast<ContextHolder<Contexts>*>(this)->GetContext()...);
+    ScopedContext contexts(this);
     return StepLoop();
   }
 
-  // Similarly to RunStep, but additionally construct the promise from a promise
-  // factory before entering the main loop. Called once from the constructor.
+  // Similarly to RunStep, but additionally construct the promise from a
+  // promise factory before entering the main loop. Called once from the
+  // constructor.
   absl::optional<absl::Status> Start(Factory promise_factory)
       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
     ScopedActivity scoped_activity(this);
-    EnterContexts<Contexts...> contexts(
-        static_cast<ContextHolder<Contexts>*>(this)->GetContext()...);
+    ScopedContext contexts(this);
     Construct(&promise_holder_.promise, promise_factory.Once());
     return StepLoop();
   }
 
-  // Until there are no wakeups from within and the promise is incomplete: poll
-  // the promise.
+  // Until there are no wakeups from within and the promise is incomplete:
+  // poll the promise.
   absl::optional<absl::Status> StepLoop() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
     GPR_ASSERT(is_current());
     while (true) {
@@ -486,12 +515,12 @@
           typename... Contexts>
 ActivityPtr MakeActivity(Factory promise_factory,
                          WakeupScheduler wakeup_scheduler, OnDone on_done,
-                         Contexts... contexts) {
+                         Contexts&&... contexts) {
   return ActivityPtr(
       new promise_detail::PromiseActivity<Factory, WakeupScheduler, OnDone,
                                           Contexts...>(
           std::move(promise_factory), std::move(wakeup_scheduler),
-          std::move(on_done), std::move(contexts)...));
+          std::move(on_done), std::forward<Contexts>(contexts)...));
 }
 
 }  // namespace grpc_core
diff --git a/src/core/lib/promise/pipe.h b/src/core/lib/promise/pipe.h
index fb53ff1..c2de5da 100644
--- a/src/core/lib/promise/pipe.h
+++ b/src/core/lib/promise/pipe.h
@@ -31,10 +31,13 @@
 #include "absl/types/optional.h"
 #include "absl/types/variant.h"
 
+#include <grpc/support/log.h>
+
 #include "src/core/lib/promise/activity.h"
 #include "src/core/lib/promise/detail/promise_factory.h"
 #include "src/core/lib/promise/intra_activity_waiter.h"
 #include "src/core/lib/promise/poll.h"
+#include "src/core/lib/resource_quota/arena.h"
 
 namespace grpc_core {
 
@@ -52,39 +55,114 @@
 template <typename T>
 class Next;
 
-template <class T>
-class Promise {
- public:
-  virtual Poll<bool> Step(T* output) = 0;
-  virtual void Stop() = 0;
-
- protected:
-  inline virtual ~Promise() = default;
-};
-
-struct alignas(alignof(void*)) Scratch {
-  uint8_t scratch[32];
-};
-
+// Center sits between a sender and a receiver to provide a one-deep buffer of
+// Ts
 template <typename T>
-class FilterInterface {
+class Center {
  public:
-  FilterInterface() = default;
-  FilterInterface(const FilterInterface&) = delete;
-  FilterInterface& operator=(const FilterInterface&) = delete;
-  virtual Promise<T>* Step(T* p, Scratch* scratch_space) = 0;
-  virtual void UpdateReceiver(PipeReceiver<T>* receiver) = 0;
+  // Initialize with one send ref (held by PipeSender) and one recv ref (held by
+  // PipeReceiver)
+  Center() {
+    send_refs_ = 1;
+    recv_refs_ = 1;
+    has_value_ = false;
+  }
 
- protected:
-  inline virtual ~FilterInterface() {}
-  static void SetReceiverIndex(PipeReceiver<T>* receiver, int idx,
-                               FilterInterface* p);
-  char AllocIndex(PipeReceiver<T>* receiver);
+  // Add one ref to the send side of this object, and return this.
+  Center* RefSend() {
+    send_refs_++;
+    return this;
+  }
+
+  // Add one ref to the recv side of this object, and return this.
+  Center* RefRecv() {
+    recv_refs_++;
+    return this;
+  }
+
+  // Drop a send side ref
+  // If no send refs remain, wake due to send closure
+  // If no refs remain, destroy this object
+  void UnrefSend() {
+    GPR_DEBUG_ASSERT(send_refs_ > 0);
+    send_refs_--;
+    if (0 == send_refs_) {
+      on_full_.Wake();
+      on_empty_.Wake();
+      if (0 == recv_refs_) {
+        this->~Center();
+      }
+    }
+  }
+
+  // Drop a recv side ref
+  // If no recv refs remain, wake due to recv closure
+  // If no refs remain, destroy this object
+  void UnrefRecv() {
+    GPR_DEBUG_ASSERT(recv_refs_ > 0);
+    recv_refs_--;
+    if (0 == recv_refs_) {
+      on_full_.Wake();
+      on_empty_.Wake();
+      if (0 == send_refs_) {
+        this->~Center();
+      } else if (has_value_) {
+        ResetValue();
+      }
+    }
+  }
+
+  // Try to push *value into the pipe.
+  // Return Pending if there is no space.
+  // Return true if the value was pushed.
+  // Return false if the recv end is closed.
+  Poll<bool> Push(T* value) {
+    GPR_DEBUG_ASSERT(send_refs_ != 0);
+    if (recv_refs_ == 0) return false;
+    if (has_value_) return on_empty_.pending();
+    has_value_ = true;
+    value_ = std::move(*value);
+    on_full_.Wake();
+    return true;
+  }
+
+  // Try to receive a value from the pipe.
+  // Return Pending if there is no value.
+  // Return the value if one was retrieved.
+  // Return nullopt if the send end is closed and no value had been pushed.
+  Poll<absl::optional<T>> Next() {
+    GPR_DEBUG_ASSERT(recv_refs_ != 0);
+    if (!has_value_) {
+      if (send_refs_ == 0) return absl::nullopt;
+      return on_full_.pending();
+    }
+    has_value_ = false;
+    on_empty_.Wake();
+    return std::move(value_);
+  }
+
+ private:
+  void ResetValue() {
+    // Fancy dance to move out of value in the off chance that we reclaim some
+    // memory earlier.
+    [](T) {}(std::move(value_));
+    has_value_ = false;
+  }
+  T value_;
+  // Number of sending objects.
+  // 0 => send is closed.
+  // 1 ref each for PipeSender and Push.
+  uint8_t send_refs_ : 2;
+  // Number of receiving objects.
+  // 0 => recv is closed.
+  // 1 ref each for PipeReceiver and Next.
+  uint8_t recv_refs_ : 2;
+  // True iff there is a value in the pipe.
+  bool has_value_ : 1;
+  IntraActivityWaiter on_empty_;
+  IntraActivityWaiter on_full_;
 };
 
-template <typename T, typename F>
-class Filter;
-
 }  // namespace pipe_detail
 
 // Send end of a Pipe.
@@ -94,43 +172,18 @@
   PipeSender(const PipeSender&) = delete;
   PipeSender& operator=(const PipeSender&) = delete;
 
-  PipeSender(PipeSender&& other) noexcept
-      : receiver_(other.receiver_), push_(other.push_) {
-    if (receiver_ != nullptr) {
-      receiver_->sender_ = this;
-      other.receiver_ = nullptr;
-    }
-    if (push_ != nullptr) {
-      push_->sender_ = this;
-      other.push_ = nullptr;
-    }
+  PipeSender(PipeSender&& other) noexcept : center_(other.center_) {
+    other.center_ = nullptr;
   }
   PipeSender& operator=(PipeSender&& other) noexcept {
-    if (receiver_ != nullptr) {
-      receiver_->sender_ = nullptr;
-    }
-    if (push_ != nullptr) {
-      push_->sender_ = nullptr;
-    }
-    receiver_ = other.receiver_;
-    if (receiver_ != nullptr) {
-      receiver_->sender_ = this;
-      other.receiver_ = nullptr;
-    }
-    if (push_ != nullptr) {
-      push_->sender_ = this;
-      other.push_ = nullptr;
-    }
+    if (center_ != nullptr) center_->UnrefSend();
+    center_ = other.center_;
+    other.center_ = nullptr;
     return *this;
   }
 
   ~PipeSender() {
-    if (receiver_ != nullptr) {
-      receiver_->MarkClosed();
-    }
-    if (push_ != nullptr) {
-      push_->sender_ = nullptr;
-    }
+    if (center_ != nullptr) center_->UnrefSend();
   }
 
   // Send a single message along the pipe.
@@ -139,21 +192,10 @@
   // receiver is either closed or able to receive another message.
   pipe_detail::Push<T> Push(T value);
 
-  // Attach a promise factory based filter to this pipe.
-  // The overall promise returned from this will be active until the pipe is
-  // closed. If this promise is cancelled before the pipe is closed, the pipe
-  // will close. The filter will be run _after_ any other registered filters.
-  template <typename F>
-  pipe_detail::Filter<T, F> Filter(F f);
-
  private:
   friend struct Pipe<T>;
-  friend class PipeReceiver<T>;
-  friend class pipe_detail::Next<T>;
-  friend class pipe_detail::Push<T>;
-  explicit PipeSender(PipeReceiver<T>* receiver) : receiver_(receiver) {}
-  PipeReceiver<T>* receiver_;
-  pipe_detail::Push<T>* push_ = nullptr;
+  explicit PipeSender(pipe_detail::Center<T>* center) : center_(center) {}
+  pipe_detail::Center<T>* center_;
 };
 
 // Receive end of a Pipe.
@@ -163,56 +205,17 @@
   PipeReceiver(const PipeReceiver&) = delete;
   PipeReceiver& operator=(const PipeReceiver&) = delete;
 
-  PipeReceiver(PipeReceiver&& other) noexcept
-      : sender_(other.sender_),
-        next_(other.next_),
-        filters_(std::move(other.filters_)),
-        pending_(std::move(other.pending_)),
-        waiting_to_send_(std::move(other.waiting_to_send_)),
-        waiting_to_receive_(other.waiting_to_receive_) {
-    if (sender_ != nullptr) {
-      sender_->receiver_ = this;
-      other.sender_ = nullptr;
-    }
-    if (next_ != nullptr) {
-      next_->receiver_ = this;
-      other.next_ = nullptr;
-    }
-    for (auto filter : filters_) {
-      filter->UpdateReceiver(this);
-    }
+  PipeReceiver(PipeReceiver&& other) noexcept : center_(other.center_) {
+    other.center_ = nullptr;
   }
   PipeReceiver& operator=(PipeReceiver&& other) noexcept {
-    if (sender_ != nullptr) {
-      sender_->receiver_ = nullptr;
-    }
-    if (next_ != nullptr) {
-      next_->receiver_ = nullptr;
-    }
-    sender_ = other.sender_;
-    next_ = other.next_;
-    filters_ = std::move(other.filters_);
-    for (auto filter : filters_) {
-      filter->UpdateReceiver(this);
-    }
-    pending_ = std::move(other.pending_);
-    waiting_to_send_ = std::move(other.waiting_to_send_);
-    waiting_to_receive_ = std::move(other.waiting_to_receive_);
-    if (sender_ != nullptr) {
-      sender_->receiver_ = this;
-      other.sender_ = nullptr;
-    }
-    if (next_ != nullptr) {
-      next_->receiver_ = this;
-      other.next_ = nullptr;
-    }
+    if (center_ != nullptr) center_->UnrefRecv();
+    center_ = other.center_;
+    other.center_ = nullptr;
     return *this;
   }
   ~PipeReceiver() {
-    MarkClosed();
-    if (next_ != nullptr) {
-      next_->receiver_ = nullptr;
-    }
+    if (center_ != nullptr) center_->UnrefRecv();
   }
 
   // Receive a single message from the pipe.
@@ -222,44 +225,10 @@
   // available.
   pipe_detail::Next<T> Next();
 
-  // Attach a promise factory based filter to this pipe.
-  // The overall promise returned from this will be active until the pipe is
-  // closed. If this promise is cancelled before the pipe is closed, the pipe
-  // will close. The filter will be run _after_ any other registered filters.
-  template <typename F>
-  pipe_detail::Filter<T, F> Filter(F f);
-
  private:
   friend struct Pipe<T>;
-  friend class PipeSender<T>;
-  friend class pipe_detail::Next<T>;
-  friend class pipe_detail::Push<T>;
-  friend class pipe_detail::FilterInterface<T>;
-  explicit PipeReceiver(PipeSender<T>* sender) : sender_(sender) {}
-  PipeSender<T>* sender_;
-  pipe_detail::Next<T>* next_ = nullptr;
-  absl::InlinedVector<pipe_detail::FilterInterface<T>*, 12> filters_;
-  absl::optional<T> pending_;
-  IntraActivityWaiter waiting_to_send_;
-  IntraActivityWaiter waiting_to_receive_;
-
-  void MarkClosed() {
-    if (sender_ == nullptr) {
-      return;
-    }
-
-    sender_->receiver_ = nullptr;
-
-    waiting_to_receive_.Wake();
-    waiting_to_send_.Wake();
-    sender_ = nullptr;
-
-    for (auto* filter : filters_) {
-      if (filter != nullptr) {
-        filter->UpdateReceiver(nullptr);
-      }
-    }
-  }
+  explicit PipeReceiver(pipe_detail::Center<T>* center) : center_(center) {}
+  pipe_detail::Center<T>* center_;
 };
 
 namespace pipe_detail {
@@ -271,55 +240,28 @@
   Push(const Push&) = delete;
   Push& operator=(const Push&) = delete;
   Push(Push&& other) noexcept
-      : sender_(other.sender_), push_(std::move(other.push_)) {
-    if (sender_ != nullptr) {
-      sender_->push_ = this;
-      other.sender_ = nullptr;
-    }
+      : center_(other.center_), push_(std::move(other.push_)) {
+    other.center_ = nullptr;
   }
   Push& operator=(Push&& other) noexcept {
-    if (sender_ != nullptr) {
-      sender_->push_ = nullptr;
-    }
-    sender_ = other.sender_;
+    if (center_ != nullptr) center_->UnrefSend();
+    center_ = other.center_;
+    other.center_ = nullptr;
     push_ = std::move(other.push_);
-    if (sender_ != nullptr) {
-      sender_->push_ = this;
-      other.sender_ = nullptr;
-    }
     return *this;
   }
 
   ~Push() {
-    if (sender_ != nullptr) {
-      assert(sender_->push_ == this);
-      sender_->push_ = nullptr;
-    }
+    if (center_ != nullptr) center_->UnrefSend();
   }
 
-  Poll<bool> operator()() {
-    auto* receiver = sender_->receiver_;
-    if (receiver == nullptr) {
-      return false;
-    }
-    if (receiver->pending_.has_value()) {
-      return receiver->waiting_to_send_.pending();
-    }
-    receiver->pending_ = std::move(push_);
-    receiver->waiting_to_receive_.Wake();
-    sender_->push_ = nullptr;
-    sender_ = nullptr;
-    return true;
-  }
+  Poll<bool> operator()() { return center_->Push(&push_); }
 
  private:
   friend class PipeSender<T>;
-  Push(PipeSender<T>* sender, T push)
-      : sender_(sender), push_(std::move(push)) {
-    assert(sender_->push_ == nullptr);
-    sender_->push_ = this;
-  }
-  PipeSender<T>* sender_;
+  explicit Push(pipe_detail::Center<T>* center, T push)
+      : center_(center), push_(std::move(push)) {}
+  Center<T>* center_;
   T push_;
 };
 
@@ -329,262 +271,56 @@
  public:
   Next(const Next&) = delete;
   Next& operator=(const Next&) = delete;
-  Next(Next&& other) noexcept
-      : receiver_(other.receiver_),
-        next_filter_(other.next_filter_),
-        current_promise_(nullptr) {
-    assert(other.current_promise_ == nullptr);
-    if (receiver_ != nullptr) {
-      receiver_->next_ = this;
-      other.receiver_ = nullptr;
-    }
+  Next(Next&& other) noexcept : center_(other.center_) {
+    other.center_ = nullptr;
   }
   Next& operator=(Next&& other) noexcept {
-    assert(current_promise_ == nullptr);
-    assert(other.current_promise_ == nullptr);
-    if (receiver_ != nullptr) {
-      receiver_->next_ = nullptr;
-    }
-    receiver_ = other.receiver_;
-    next_filter_ = other.next_filter_;
-    if (receiver_ != nullptr) {
-      receiver_->next_ = this;
-      other.receiver_ = nullptr;
-    }
+    if (center_ != nullptr) center_->UnrefRecv();
+    center_ = other.center_;
+    other.center_ = nullptr;
     return *this;
   }
 
   ~Next() {
-    if (receiver_ != nullptr) {
-      assert(receiver_->next_ == this);
-      receiver_->next_ = nullptr;
-    }
-    if (current_promise_ != nullptr) {
-      current_promise_->Stop();
-    }
+    if (center_ != nullptr) center_->UnrefRecv();
   }
 
-  Poll<absl::optional<T>> operator()() {
-    if (receiver_->pending_.has_value()) {
-      auto* pending = &*receiver_->pending_;
-      if (current_promise_ != nullptr) {
-        auto r = current_promise_->Step(pending);
-        if (auto* p = absl::get_if<kPollReadyIdx>(&r)) {
-          current_promise_->Stop();
-          current_promise_ = nullptr;
-          if (!*p) {
-            receiver_->MarkClosed();
-            return absl::optional<T>();
-          }
-        } else {
-          return Pending();
-        }
-      }
-      while (true) {
-        if (next_filter_ >= receiver_->filters_.size()) {
-          auto result = absl::optional<T>(std::move(*pending));
-          receiver_->pending_.reset();
-          receiver_->waiting_to_send_.Wake();
-          receiver_->next_ = nullptr;
-          receiver_ = nullptr;
-          return result;
-        }
-        auto* filter = receiver_->filters_[next_filter_];
-        current_promise_ = filter ? filter->Step(pending, &scratch_) : nullptr;
-        next_filter_++;
-        if (current_promise_ ==
-            reinterpret_cast<Promise<T>*>(uintptr_t(false))) {
-          current_promise_ = nullptr;
-          receiver_->MarkClosed();
-          return absl::optional<T>();
-        } else if (current_promise_ ==
-                   reinterpret_cast<Promise<T>*>(uintptr_t(true))) {
-          current_promise_ = nullptr;
-        } else {
-          return Pending();
-        }
-      }
-    }
-    if (receiver_->sender_ == nullptr) {
-      return absl::optional<T>();
-    }
-    return receiver_->waiting_to_receive_.pending();
-  }
+  Poll<absl::optional<T>> operator()() { return center_->Next(); }
 
  private:
   friend class PipeReceiver<T>;
-  explicit Next(PipeReceiver<T>* receiver) : receiver_(receiver) {
-    assert(receiver_->next_ == nullptr);
-    receiver_->next_ = this;
-  }
-  PipeReceiver<T>* receiver_;
-  size_t next_filter_ = 0;
-  Promise<T>* current_promise_ = nullptr;
-  Scratch scratch_;
+  explicit Next(pipe_detail::Center<T>* center) : center_(center) {}
+  Center<T>* center_;
 };
 
-template <typename T, typename F>
-class Filter final : private FilterInterface<T> {
- public:
-  Filter(PipeReceiver<T>* receiver, F f)
-      : active_{receiver, promise_detail::PromiseFactory<T, F>(std::move(f))},
-        index_(this->AllocIndex(receiver)){};
-  explicit Filter(absl::Status already_finished)
-      : done_(std::move(already_finished)) {}
-  ~Filter() {
-    if (index_ != kTombstoneIndex) {
-      this->SetReceiverIndex(active_.receiver, index_, nullptr);
-      active_.~Active();
-    } else {
-      done_.~Status();
-    }
-  }
-  Filter(Filter&& other) noexcept : index_(other.index_) {
-    if (index_ != kTombstoneIndex) {
-      new (&active_) Active(std::move(other.active_));
-      other.active_.~Active();
-      new (&other.done_) absl::Status(absl::OkStatus());
-      other.index_ = kTombstoneIndex;
-      this->SetReceiverIndex(active_.receiver, index_, this);
-    } else {
-      new (&done_) absl::Status(std::move(other.done_));
-    }
-  }
-
-  Filter(const Filter&) = delete;
-  Filter& operator=(const Filter&) = delete;
-
-  Poll<absl::Status> operator()() {
-    if (index_ == kTombstoneIndex) {
-      return std::move(done_);
-    }
-    return Pending();
-  }
-
- private:
-  static constexpr char kTombstoneIndex = -1;
-  struct Active {
-    GPR_NO_UNIQUE_ADDRESS PipeReceiver<T>* receiver;
-    GPR_NO_UNIQUE_ADDRESS promise_detail::PromiseFactory<T, F> factory;
-  };
-  union {
-    GPR_NO_UNIQUE_ADDRESS Active active_;
-    GPR_NO_UNIQUE_ADDRESS absl::Status done_;
-  };
-  GPR_NO_UNIQUE_ADDRESS char index_;
-
-  class PromiseImpl final : public ::grpc_core::pipe_detail::Promise<T> {
-    using PF = typename promise_detail::PromiseFactory<T, F>::Promise;
-
-   public:
-    PromiseImpl(PF f, Filter* filter) : f_(std::move(f)), filter_(filter) {}
-
-    Poll<bool> Step(T* output) final {
-      auto r = f_();
-      if (auto* p = absl::get_if<kPollReadyIdx>(&r)) {
-        if (p->ok()) {
-          *output = std::move(**p);
-          return true;
-        } else {
-          filter_->SetReceiverIndex(filter_->active_.receiver, filter_->index_,
-                                    nullptr);
-          filter_->active_.~Active();
-          filter_->index_ = kTombstoneIndex;
-          new (&filter_->done_) absl::Status(std::move(p->status()));
-          Activity::WakeupCurrent();
-          return false;
-        }
-      } else {
-        return Pending();
-      }
-    }
-
-    void Stop() final { this->~PromiseImpl(); }
-
-   private:
-    PF f_;
-    Filter* filter_;
-  };
-
-  Promise<T>* Step(T* p, Scratch* scratch) final {
-    if (index_ != kTombstoneIndex) {
-      PromiseImpl promise(active_.factory.Repeated(std::move(*p)), this);
-      auto r = promise.Step(p);
-      if (auto* result = absl::get_if<kPollReadyIdx>(&r)) {
-        return reinterpret_cast<Promise<T>*>(uintptr_t(*result));
-      }
-      static_assert(sizeof(promise) <= sizeof(Scratch),
-                    "scratch size too small");
-      static_assert(alignof(decltype(promise)) <= alignof(Scratch),
-                    "bad alignment");
-      return new (scratch) decltype(promise)(std::move(promise));
-    } else {
-      return nullptr;
-    }
-  }
-
-  void UpdateReceiver(PipeReceiver<T>* receiver) final {
-    if (index_ != kTombstoneIndex) {
-      if (receiver == nullptr) {
-        active_.~Active();
-        index_ = kTombstoneIndex;
-        new (&done_) absl::Status(absl::OkStatus());
-      } else {
-        active_.receiver = receiver;
-      }
-      Activity::WakeupCurrent();
-    }
-  }
-};
-
-template <typename T>
-void FilterInterface<T>::SetReceiverIndex(PipeReceiver<T>* receiver, int idx,
-                                          FilterInterface* p) {
-  receiver->filters_[idx] = p;
-}
-
-template <typename T>
-char FilterInterface<T>::AllocIndex(PipeReceiver<T>* receiver) {
-  auto r = receiver->filters_.size();
-  receiver->filters_.push_back(this);
-  return r;
-}
-
 }  // namespace pipe_detail
 
 template <typename T>
 pipe_detail::Push<T> PipeSender<T>::Push(T value) {
-  return pipe_detail::Push<T>(this, std::move(value));
+  return pipe_detail::Push<T>(center_->RefSend(), std::move(value));
 }
 
 template <typename T>
 pipe_detail::Next<T> PipeReceiver<T>::Next() {
-  return pipe_detail::Next<T>(this);
-}
-
-template <typename T>
-template <typename F>
-pipe_detail::Filter<T, F> PipeSender<T>::Filter(F f) {
-  if (receiver_) {
-    return pipe_detail::Filter<T, F>(receiver_, std::move(f));
-  } else {
-    return pipe_detail::Filter<T, F>(absl::OkStatus());
-  }
-}
-
-template <typename T>
-template <typename F>
-pipe_detail::Filter<T, F> PipeReceiver<T>::Filter(F f) {
-  return pipe_detail::Filter<T, F>(this, std::move(f));
+  return pipe_detail::Next<T>(center_->RefRecv());
 }
 
 // A Pipe is an intra-Activity communications channel that transmits T's from
 // one end to the other.
 // It is only safe to use a Pipe within the context of a single Activity.
 // No synchronization is performed internally.
+// The primary Pipe data structure is allocated from an arena, so the activity
+// must have an arena as part of its context.
+// By performing that allocation we can ensure stable pointer to shared data
+// allowing PipeSender/PipeReceiver/Push/Next to be relatively simple in their
+// implementation.
+// This type has been optimized with the expectation that there are relatively
+// few pipes per activity. If this assumption does not hold then a design
+// allowing inline filtering of pipe contents (instead of connecting pipes with
+// polling code) would likely be more appropriate.
 template <typename T>
 struct Pipe {
-  Pipe() : sender(&receiver), receiver(&sender) {}
+  Pipe() : Pipe(GetContext<Arena>()->New<pipe_detail::Center<T>>()) {}
   Pipe(const Pipe&) = delete;
   Pipe& operator=(const Pipe&) = delete;
   Pipe(Pipe&&) noexcept = default;
@@ -592,6 +328,10 @@
 
   PipeSender<T> sender;
   PipeReceiver<T> receiver;
+
+ private:
+  explicit Pipe(pipe_detail::Center<T>* center)
+      : sender(center), receiver(center) {}
 };
 
 }  // namespace grpc_core
diff --git a/test/core/promise/BUILD b/test/core/promise/BUILD
index 4106114..9f36385 100644
--- a/test/core/promise/BUILD
+++ b/test/core/promise/BUILD
@@ -260,6 +260,7 @@
         "//:map",
         "//:observable",
         "//:pipe",
+        "//:resource_quota",
         "//:seq",
         "//test/core/util:grpc_suppressions",
     ],
@@ -276,6 +277,7 @@
         "//:join",
         "//:pipe",
         "//:promise",
+        "//:resource_quota",
         "//:seq",
         "//test/core/util:grpc_suppressions",
     ],
diff --git a/test/core/promise/for_each_test.cc b/test/core/promise/for_each_test.cc
index d668c3d..ebf1477 100644
--- a/test/core/promise/for_each_test.cc
+++ b/test/core/promise/for_each_test.cc
@@ -22,6 +22,7 @@
 #include "src/core/lib/promise/observable.h"
 #include "src/core/lib/promise/pipe.h"
 #include "src/core/lib/promise/seq.h"
+#include "src/core/lib/resource_quota/resource_quota.h"
 #include "test/core/promise/test_wakeup_schedulers.h"
 
 using testing::Mock;
@@ -30,22 +31,25 @@
 
 namespace grpc_core {
 
+static auto* g_memory_allocator = new MemoryAllocator(
+    ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator("test"));
+
 TEST(ForEachTest, SendThriceWithPipe) {
-  Pipe<int> pipe;
   int num_received = 0;
   StrictMock<MockFunction<void(absl::Status)>> on_done;
   EXPECT_CALL(on_done, Call(absl::OkStatus()));
   MakeActivity(
-      [&pipe, &num_received] {
+      [&num_received] {
+        Pipe<int> pipe;
+        auto sender = std::make_shared<std::unique_ptr<PipeSender<int>>>(
+            absl::make_unique<PipeSender<int>>(std::move(pipe.sender)));
         return Map(
             Join(
                 // Push 3 things into a pipe -- 1, 2, then 3 -- then close.
-                Seq(
-                    pipe.sender.Push(1),
-                    [&pipe] { return pipe.sender.Push(2); },
-                    [&pipe] { return pipe.sender.Push(3); },
-                    [&pipe] {
-                      auto drop = std::move(pipe.sender);
+                Seq((*sender)->Push(1), [sender] { return (*sender)->Push(2); },
+                    [sender] { return (*sender)->Push(3); },
+                    [sender] {
+                      sender->reset();
                       return absl::OkStatus();
                     }),
                 // Use a ForEach loop to read them out and verify all values are
@@ -59,7 +63,8 @@
             JustElem<1>());
       },
       NoWakeupScheduler(),
-      [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
+      [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
+      MakeScopedArena(1024, g_memory_allocator));
   Mock::VerifyAndClearExpectations(&on_done);
   EXPECT_EQ(num_received, 3);
 }
diff --git a/test/core/promise/pipe_test.cc b/test/core/promise/pipe_test.cc
index ba38fbe..a202580 100644
--- a/test/core/promise/pipe_test.cc
+++ b/test/core/promise/pipe_test.cc
@@ -22,6 +22,7 @@
 #include "src/core/lib/promise/join.h"
 #include "src/core/lib/promise/promise.h"
 #include "src/core/lib/promise/seq.h"
+#include "src/core/lib/resource_quota/resource_quota.h"
 #include "test/core/promise/test_wakeup_schedulers.h"
 
 using testing::MockFunction;
@@ -29,12 +30,15 @@
 
 namespace grpc_core {
 
+static auto* g_memory_allocator = new MemoryAllocator(
+    ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator("test"));
+
 TEST(PipeTest, CanSendAndReceive) {
-  Pipe<int> pipe;
   StrictMock<MockFunction<void(absl::Status)>> on_done;
   EXPECT_CALL(on_done, Call(absl::OkStatus()));
   MakeActivity(
-      [&pipe] {
+      [] {
+        Pipe<int> pipe;
         return Seq(
             // Concurrently: send 42 into the pipe, and receive from the pipe.
             Join(pipe.sender.Push(42), pipe.receiver.Next()),
@@ -46,15 +50,16 @@
             });
       },
       NoWakeupScheduler(),
-      [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
+      [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
+      MakeScopedArena(1024, g_memory_allocator));
 }
 
 TEST(PipeTest, CanReceiveAndSend) {
-  Pipe<int> pipe;
   StrictMock<MockFunction<void(absl::Status)>> on_done;
   EXPECT_CALL(on_done, Call(absl::OkStatus()));
   MakeActivity(
-      [&pipe] {
+      [] {
+        Pipe<int> pipe;
         return Seq(
             // Concurrently: receive from the pipe, and send 42 into the pipe.
             Join(pipe.receiver.Next(), pipe.sender.Push(42)),
@@ -66,28 +71,29 @@
             });
       },
       NoWakeupScheduler(),
-      [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
+      [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
+      MakeScopedArena(1024, g_memory_allocator));
 }
 
 TEST(PipeTest, CanSeeClosedOnSend) {
-  Pipe<int> pipe;
   StrictMock<MockFunction<void(absl::Status)>> on_done;
-  auto sender = std::move(pipe.sender);
-  auto receiver =
-      absl::make_unique<PipeReceiver<int>>(std::move(pipe.receiver));
   EXPECT_CALL(on_done, Call(absl::OkStatus()));
-  // Push 42 onto the pipe - this will the pipe's one-deep send buffer.
-  EXPECT_TRUE(NowOrNever(sender.Push(42)).has_value());
   MakeActivity(
-      [&sender, &receiver] {
+      [] {
+        Pipe<int> pipe;
+        auto sender = std::move(pipe.sender);
+        // Push 42 onto the pipe - this will the pipe's one-deep send buffer.
+        EXPECT_TRUE(NowOrNever(sender.Push(42)).has_value());
+        auto receiver = std::make_shared<std::unique_ptr<PipeReceiver<int>>>(
+            absl::make_unique<PipeReceiver<int>>(std::move(pipe.receiver)));
         return Seq(
             // Concurrently:
             // - push 43 into the sender, which will stall because the buffer is
             //   full
             // - and close the receiver, which will fail the pending send.
             Join(sender.Push(43),
-                 [&receiver] {
-                   receiver.reset();
+                 [receiver] {
+                   receiver->reset();
                    return absl::OkStatus();
                  }),
             // Verify both that the send failed and that we executed the close.
@@ -97,17 +103,19 @@
             });
       },
       NoWakeupScheduler(),
-      [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
+      [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
+      MakeScopedArena(1024, g_memory_allocator));
 }
 
 TEST(PipeTest, CanSeeClosedOnReceive) {
-  Pipe<int> pipe;
   StrictMock<MockFunction<void(absl::Status)>> on_done;
-  auto sender = absl::make_unique<PipeSender<int>>(std::move(pipe.sender));
-  auto receiver = std::move(pipe.receiver);
   EXPECT_CALL(on_done, Call(absl::OkStatus()));
   MakeActivity(
-      [&sender, &receiver] {
+      [] {
+        Pipe<int> pipe;
+        auto sender = std::make_shared<std::unique_ptr<PipeSender<int>>>(
+            absl::make_unique<PipeSender<int>>(std::move(pipe.sender)));
+        auto receiver = std::move(pipe.receiver);
         return Seq(
             // Concurrently:
             // - wait for a received value (will stall forever since we push
@@ -115,8 +123,8 @@
             // - close the sender, which will signal the receiver to return an
             //   end-of-stream.
             Join(receiver.Next(),
-                 [&sender] {
-                   sender.reset();
+                 [sender] {
+                   sender->reset();
                    return absl::OkStatus();
                  }),
             // Verify we received end-of-stream and closed the sender.
@@ -127,49 +135,8 @@
             });
       },
       NoWakeupScheduler(),
-      [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
-}
-
-TEST(PipeTest, CanFilter) {
-  Pipe<int> pipe;
-  StrictMock<MockFunction<void(absl::Status)>> on_done;
-  EXPECT_CALL(on_done, Call(absl::OkStatus()));
-  MakeActivity(
-      [&pipe] {
-        // Setup some filters here, carefully getting ordering correct by doing
-        // so outside of the Join() since C++ does not define execution order
-        // between arguments.
-        // TODO(ctiller): A future change to Pipe will specify an ordering
-        // between filters added to sender and receiver, at which point these
-        // should move back.
-        auto doubler = pipe.receiver.Filter(
-            [](int p) { return absl::StatusOr<int>(p * 2); });
-        auto adder = pipe.sender.Filter(
-            [](int p) { return absl::StatusOr<int>(p + 1); });
-        return Seq(
-            // Concurrently:
-            // - push 42 into the pipe
-            // - wait for a value to be received, and filter it by doubling it
-            // - wait for a value to be received, and filter it by adding one to
-            //   it
-            // - wait for a value to be received and close the pipe.
-            Join(pipe.sender.Push(42), std::move(doubler), std::move(adder),
-                 Seq(pipe.receiver.Next(),
-                     [&pipe](absl::optional<int> i) {
-                       auto x = std::move(pipe.receiver);
-                       return i;
-                     })),
-            // Verify all of the above happened correctly.
-            [](std::tuple<bool, absl::Status, absl::Status, absl::optional<int>>
-                   result) {
-              EXPECT_EQ(result, std::make_tuple(true, absl::OkStatus(),
-                                                absl::OkStatus(),
-                                                absl::optional<int>(85)));
-              return absl::OkStatus();
-            });
-      },
-      NoWakeupScheduler(),
-      [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
+      [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
+      MakeScopedArena(1024, g_memory_allocator));
 }
 
 }  // namespace grpc_core