L2CAP: Use DataPipelineManager to organize components

Currently there is coupling between sender, scheduler, and receiver.
Instead, use DataPipelineManager to orgranize them.

Bug: 144773997
Test: bluetooth_test_gd
Change-Id: Ifeef5255e1cdcc75b78211c648dbf7463f18095f
diff --git a/gd/l2cap/Android.bp b/gd/l2cap/Android.bp
index 60175e2..51258b4 100644
--- a/gd/l2cap/Android.bp
+++ b/gd/l2cap/Android.bp
@@ -18,6 +18,7 @@
         "classic/internal/signalling_manager.cc",
         "classic/l2cap_classic_module.cc",
         "internal/basic_mode_channel_data_controller.cc",
+        "internal/data_pipeline_manager.cc",
         "internal/enhanced_retransmission_mode_channel_data_controller.cc",
         "internal/le_credit_based_channel_data_controller.cc",
         "internal/receiver.cc",
diff --git a/gd/l2cap/classic/internal/dynamic_channel_allocator_fuzz_test.cc b/gd/l2cap/classic/internal/dynamic_channel_allocator_fuzz_test.cc
index 80d1518..48d9121 100644
--- a/gd/l2cap/classic/internal/dynamic_channel_allocator_fuzz_test.cc
+++ b/gd/l2cap/classic/internal/dynamic_channel_allocator_fuzz_test.cc
@@ -48,8 +48,7 @@
     handler_ = new os::Handler(thread_);
     mock_parameter_provider_ = new NiceMock<MockParameterProvider>();
     mock_classic_link_ =
-        new NiceMock<MockLink>(handler_, mock_parameter_provider_, std::make_unique<NiceMock<MockAclConnection>>(),
-                               std::make_unique<NiceMock<MockScheduler>>());
+        new NiceMock<MockLink>(handler_, mock_parameter_provider_, std::make_unique<NiceMock<MockAclConnection>>());
     EXPECT_CALL(*mock_classic_link_, GetDevice()).WillRepeatedly(Return(device));
     channel_allocator_ = std::make_unique<DynamicChannelAllocator>(mock_classic_link_, handler_);
   }
diff --git a/gd/l2cap/classic/internal/link.cc b/gd/l2cap/classic/internal/link.cc
index 7b5178f..63f5f71 100644
--- a/gd/l2cap/classic/internal/link.cc
+++ b/gd/l2cap/classic/internal/link.cc
@@ -22,7 +22,6 @@
 #include "l2cap/classic/internal/fixed_channel_impl.h"
 #include "l2cap/classic/internal/link.h"
 #include "l2cap/internal/parameter_provider.h"
-#include "l2cap/internal/scheduler.h"
 #include "os/alarm.h"
 
 namespace bluetooth {
@@ -31,19 +30,16 @@
 namespace internal {
 
 Link::Link(os::Handler* l2cap_handler, std::unique_ptr<hci::AclConnection> acl_connection,
-           std::unique_ptr<l2cap::internal::Scheduler> scheduler,
            l2cap::internal::ParameterProvider* parameter_provider,
            DynamicChannelServiceManagerImpl* dynamic_service_manager,
            FixedChannelServiceManagerImpl* fixed_service_manager)
-    : l2cap_handler_(l2cap_handler), acl_connection_(std::move(acl_connection)), scheduler_(std::move(scheduler)),
-      receiver_(acl_connection_->GetAclQueueEnd(), l2cap_handler_, scheduler_.get()),
-      parameter_provider_(parameter_provider), dynamic_service_manager_(dynamic_service_manager),
-      fixed_service_manager_(fixed_service_manager),
+    : l2cap_handler_(l2cap_handler), acl_connection_(std::move(acl_connection)),
+      data_pipeline_manager_(l2cap_handler, acl_connection_->GetAclQueueEnd()), parameter_provider_(parameter_provider),
+      dynamic_service_manager_(dynamic_service_manager), fixed_service_manager_(fixed_service_manager),
       signalling_manager_(l2cap_handler_, this, dynamic_service_manager_, &dynamic_channel_allocator_,
                           fixed_service_manager_) {
   ASSERT(l2cap_handler_ != nullptr);
   ASSERT(acl_connection_ != nullptr);
-  ASSERT(scheduler_ != nullptr);
   ASSERT(parameter_provider_ != nullptr);
   link_idle_disconnect_alarm_.Schedule(common::BindOnce(&Link::Disconnect, common::Unretained(this)),
                                        parameter_provider_->GetClassicLinkIdleDisconnectTimeout());
@@ -60,7 +56,7 @@
 
 std::shared_ptr<FixedChannelImpl> Link::AllocateFixedChannel(Cid cid, SecurityPolicy security_policy) {
   auto channel = fixed_channel_allocator_.AllocateChannel(cid, security_policy);
-  scheduler_->AttachChannel(cid, channel);
+  data_pipeline_manager_.AttachChannel(cid, channel);
   return channel;
 }
 
@@ -94,7 +90,7 @@
                                                                  SecurityPolicy security_policy) {
   auto channel = dynamic_channel_allocator_.AllocateChannel(psm, remote_cid, security_policy);
   if (channel != nullptr) {
-    scheduler_->AttachChannel(channel->GetCid(), channel);
+    data_pipeline_manager_.AttachChannel(channel->GetCid(), channel);
   }
   channel->local_initiated_ = false;
   return channel;
@@ -104,7 +100,7 @@
                                                                          SecurityPolicy security_policy) {
   auto channel = dynamic_channel_allocator_.AllocateReservedChannel(reserved_cid, psm, remote_cid, security_policy);
   if (channel != nullptr) {
-    scheduler_->AttachChannel(channel->GetCid(), channel);
+    data_pipeline_manager_.AttachChannel(channel->GetCid(), channel);
   }
   channel->local_initiated_ = true;
   return channel;
@@ -120,7 +116,7 @@
   if (dynamic_channel_allocator_.FindChannelByCid(cid) == nullptr) {
     return;
   }
-  scheduler_->DetachChannel(cid);
+  data_pipeline_manager_.DetachChannel(cid);
   dynamic_channel_allocator_.FreeChannel(cid);
 }
 
diff --git a/gd/l2cap/classic/internal/link.h b/gd/l2cap/classic/internal/link.h
index 09d7eb9..44a5b25 100644
--- a/gd/l2cap/classic/internal/link.h
+++ b/gd/l2cap/classic/internal/link.h
@@ -26,10 +26,9 @@
 #include "l2cap/classic/internal/dynamic_channel_service_manager_impl.h"
 #include "l2cap/classic/internal/fixed_channel_impl.h"
 #include "l2cap/classic/internal/fixed_channel_service_manager_impl.h"
+#include "l2cap/internal/data_pipeline_manager.h"
 #include "l2cap/internal/fixed_channel_allocator.h"
 #include "l2cap/internal/parameter_provider.h"
-#include "l2cap/internal/receiver.h"
-#include "l2cap/internal/scheduler.h"
 #include "os/alarm.h"
 #include "os/handler.h"
 #include "signalling_manager.h"
@@ -42,7 +41,7 @@
 class Link {
  public:
   Link(os::Handler* l2cap_handler, std::unique_ptr<hci::AclConnection> acl_connection,
-       std::unique_ptr<l2cap::internal::Scheduler> scheduler, l2cap::internal::ParameterProvider* parameter_provider,
+       l2cap::internal::ParameterProvider* parameter_provider,
        DynamicChannelServiceManagerImpl* dynamic_service_manager,
        FixedChannelServiceManagerImpl* fixed_service_manager);
 
@@ -116,8 +115,7 @@
   l2cap::internal::FixedChannelAllocator<FixedChannelImpl, Link> fixed_channel_allocator_{this, l2cap_handler_};
   DynamicChannelAllocator dynamic_channel_allocator_{this, l2cap_handler_};
   std::unique_ptr<hci::AclConnection> acl_connection_;
-  std::unique_ptr<l2cap::internal::Scheduler> scheduler_;
-  l2cap::internal::Receiver receiver_;
+  l2cap::internal::DataPipelineManager data_pipeline_manager_;
   l2cap::internal::ParameterProvider* parameter_provider_;
   DynamicChannelServiceManagerImpl* dynamic_service_manager_;
   FixedChannelServiceManagerImpl* fixed_service_manager_;
diff --git a/gd/l2cap/classic/internal/link_manager.cc b/gd/l2cap/classic/internal/link_manager.cc
index ee93fd7..812b180 100644
--- a/gd/l2cap/classic/internal/link_manager.cc
+++ b/gd/l2cap/classic/internal/link_manager.cc
@@ -114,9 +114,7 @@
   // Register ACL disconnection callback in LinkManager so that we can clean up link resource properly
   acl_connection->RegisterDisconnectCallback(
       common::BindOnce(&LinkManager::OnDisconnect, common::Unretained(this), device), l2cap_handler_);
-  auto* link_queue_up_end = acl_connection->GetAclQueueEnd();
-  links_.try_emplace(device, l2cap_handler_, std::move(acl_connection),
-                     std::make_unique<l2cap::internal::Fifo>(link_queue_up_end, l2cap_handler_), parameter_provider_,
+  links_.try_emplace(device, l2cap_handler_, std::move(acl_connection), parameter_provider_,
                      dynamic_channel_service_manager_, fixed_channel_service_manager_);
   auto* link = GetLink(device);
   ASSERT(link != nullptr);
diff --git a/gd/l2cap/classic/internal/link_mock.h b/gd/l2cap/classic/internal/link_mock.h
index 3e0e2a6..cbbad15 100644
--- a/gd/l2cap/classic/internal/link_mock.h
+++ b/gd/l2cap/classic/internal/link_mock.h
@@ -34,12 +34,10 @@
 class MockLink : public Link {
  public:
   explicit MockLink(os::Handler* handler, l2cap::internal::ParameterProvider* parameter_provider)
-      : Link(handler, std::make_unique<MockAclConnection>(),
-             std::make_unique<l2cap::internal::testing::MockScheduler>(), parameter_provider, nullptr, nullptr){};
+      : Link(handler, std::make_unique<MockAclConnection>(), parameter_provider, nullptr, nullptr){};
   explicit MockLink(os::Handler* handler, l2cap::internal::ParameterProvider* parameter_provider,
-                    std::unique_ptr<hci::AclConnection> acl_connection,
-                    std::unique_ptr<l2cap::internal::Scheduler> scheduler)
-      : Link(handler, std::move(acl_connection), std::move(scheduler), parameter_provider, nullptr, nullptr){};
+                    std::unique_ptr<hci::AclConnection> acl_connection)
+      : Link(handler, std::move(acl_connection), parameter_provider, nullptr, nullptr){};
   MOCK_METHOD(hci::Address, GetDevice, (), (override));
   MOCK_METHOD(void, OnAclDisconnected, (hci::ErrorCode status), (override));
   MOCK_METHOD(void, Disconnect, (), (override));
diff --git a/gd/l2cap/internal/data_controller_mock.h b/gd/l2cap/internal/data_controller_mock.h
new file mode 100644
index 0000000..d071c68
--- /dev/null
+++ b/gd/l2cap/internal/data_controller_mock.h
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "l2cap/internal/data_controller.h"
+
+#include <gmock/gmock.h>
+
+// Unit test interfaces
+namespace bluetooth {
+namespace l2cap {
+namespace internal {
+namespace testing {
+
+class MockDataController : public DataController {
+ public:
+  MOCK_METHOD(void, OnSdu, (std::unique_ptr<packet::BasePacketBuilder>), (override));
+  MOCK_METHOD(void, OnPdu, (packet::PacketView<true>), (override));
+  MOCK_METHOD(std::unique_ptr<packet::BasePacketBuilder>, GetNextPacket, (), (override));
+  MOCK_METHOD(void, EnableFcs, (bool), (override));
+  MOCK_METHOD(void, SetRetransmissionAndFlowControlOptions, (const RetransmissionAndFlowControlConfigurationOption&),
+              (override));
+};
+
+}  // namespace testing
+}  // namespace internal
+}  // namespace l2cap
+}  // namespace bluetooth
diff --git a/gd/l2cap/internal/data_pipeline_manager.cc b/gd/l2cap/internal/data_pipeline_manager.cc
new file mode 100644
index 0000000..d0023c0
--- /dev/null
+++ b/gd/l2cap/internal/data_pipeline_manager.cc
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <unordered_map>
+
+#include "l2cap/cid.h"
+#include "l2cap/internal/channel_impl.h"
+#include "l2cap/internal/data_controller.h"
+#include "l2cap/internal/data_pipeline_manager.h"
+#include "os/log.h"
+
+namespace bluetooth {
+namespace l2cap {
+namespace internal {
+
+void DataPipelineManager::AttachChannel(Cid cid, std::shared_ptr<ChannelImpl> channel) {
+  ASSERT(sender_map_.find(cid) == sender_map_.end());
+  sender_map_.emplace(std::piecewise_construct, std::forward_as_tuple(cid),
+                      std::forward_as_tuple(handler_, scheduler_.get(), channel));
+  if (channel->GetCid() >= kFirstDynamicChannel) {
+    channel->SetSender(&sender_map_.find(cid)->second);
+  }
+}
+
+void DataPipelineManager::DetachChannel(Cid cid) {
+  ASSERT(sender_map_.find(cid) != sender_map_.end());
+  sender_map_.erase(cid);
+}
+
+DataController* DataPipelineManager::GetDataController(Cid cid) {
+  ASSERT(sender_map_.find(cid) != sender_map_.end());
+  return sender_map_.find(cid)->second.GetDataController();
+}
+
+void DataPipelineManager::OnPacketSent(Cid cid) {
+  ASSERT(sender_map_.find(cid) != sender_map_.end());
+  sender_map_.find(cid)->second.OnPacketSent();
+}
+
+}  // namespace internal
+}  // namespace l2cap
+}  // namespace bluetooth
diff --git a/gd/l2cap/internal/data_pipeline_manager.h b/gd/l2cap/internal/data_pipeline_manager.h
new file mode 100644
index 0000000..f801297
--- /dev/null
+++ b/gd/l2cap/internal/data_pipeline_manager.h
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <unordered_map>
+
+#include "common/bidi_queue.h"
+#include "common/bind.h"
+#include "data_controller.h"
+#include "l2cap/cid.h"
+#include "l2cap/internal/channel_impl.h"
+#include "l2cap/internal/receiver.h"
+#include "l2cap/internal/scheduler.h"
+#include "l2cap/internal/scheduler_fifo.h"
+#include "l2cap/l2cap_packets.h"
+#include "l2cap/mtu.h"
+#include "os/handler.h"
+#include "os/log.h"
+#include "os/queue.h"
+#include "packet/base_packet_builder.h"
+#include "packet/packet_view.h"
+
+namespace bluetooth {
+namespace l2cap {
+namespace internal {
+
+/**
+ * Manages data pipeline from channel queue end to link queue end, per link.
+ * Contains a Scheduler and Receiver per link.
+ * Contains a Sender and its corrsponding DataController per attached channel.
+ */
+class DataPipelineManager {
+ public:
+  using UpperEnqueue = packet::PacketView<packet::kLittleEndian>;
+  using UpperDequeue = packet::BasePacketBuilder;
+  using UpperQueueDownEnd = common::BidiQueueEnd<UpperEnqueue, UpperDequeue>;
+  using LowerEnqueue = UpperDequeue;
+  using LowerDequeue = UpperEnqueue;
+  using LowerQueueUpEnd = common::BidiQueueEnd<LowerEnqueue, LowerDequeue>;
+
+  DataPipelineManager(os::Handler* handler, LowerQueueUpEnd* link_queue_up_end)
+      : handler_(handler), scheduler_(std::make_unique<Fifo>(this, link_queue_up_end, handler)),
+        receiver_(link_queue_up_end, handler, this) {}
+
+  virtual void AttachChannel(Cid cid, std::shared_ptr<ChannelImpl> channel);
+  virtual void DetachChannel(Cid cid);
+  virtual DataController* GetDataController(Cid cid);
+  virtual void OnPacketSent(Cid cid);
+  virtual ~DataPipelineManager() = default;
+
+ private:
+  os::Handler* handler_;
+  std::unique_ptr<Scheduler> scheduler_;
+  Receiver receiver_;
+  std::unordered_map<Cid, Sender> sender_map_;
+};
+}  // namespace internal
+}  // namespace l2cap
+}  // namespace bluetooth
diff --git a/gd/l2cap/internal/data_pipeline_manager_mock.h b/gd/l2cap/internal/data_pipeline_manager_mock.h
new file mode 100644
index 0000000..f5a24f4
--- /dev/null
+++ b/gd/l2cap/internal/data_pipeline_manager_mock.h
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "l2cap/internal/data_pipeline_manager.h"
+
+#include "l2cap/internal/channel_impl.h"
+#include "l2cap/internal/data_controller.h"
+
+#include <gmock/gmock.h>
+
+// Unit test interfaces
+namespace bluetooth {
+namespace l2cap {
+namespace internal {
+namespace testing {
+
+class MockDataPipelineManager : public DataPipelineManager {
+ public:
+  MockDataPipelineManager(os::Handler* handler, LowerQueueUpEnd* link_queue_up_end)
+      : DataPipelineManager(handler, link_queue_up_end) {}
+  MOCK_METHOD(void, AttachChannel, (Cid, std::shared_ptr<ChannelImpl>), (override));
+  MOCK_METHOD(void, DetachChannel, (Cid), (override));
+  MOCK_METHOD(DataController*, GetDataController, (Cid), (override));
+  MOCK_METHOD(void, OnPacketSent, (Cid), (override));
+};
+
+}  // namespace testing
+}  // namespace internal
+}  // namespace l2cap
+}  // namespace bluetooth
diff --git a/gd/l2cap/internal/receiver.cc b/gd/l2cap/internal/receiver.cc
index e92de0b..86f43ae 100644
--- a/gd/l2cap/internal/receiver.cc
+++ b/gd/l2cap/internal/receiver.cc
@@ -18,14 +18,16 @@
 
 #include "common/bidi_queue.h"
 #include "l2cap/cid.h"
+#include "l2cap/internal/data_pipeline_manager.h"
 #include "l2cap/l2cap_packets.h"
 #include "packet/packet_view.h"
 
 namespace bluetooth {
 namespace l2cap {
 namespace internal {
-Receiver::Receiver(LowerQueueUpEnd* link_queue_up_end, os::Handler* handler, Scheduler* scheduler)
-    : link_queue_up_end_(link_queue_up_end), handler_(handler), scheduler_(scheduler) {
+Receiver::Receiver(LowerQueueUpEnd* link_queue_up_end, os::Handler* handler,
+                   DataPipelineManager* data_pipeline_manager_)
+    : link_queue_up_end_(link_queue_up_end), handler_(handler), data_pipeline_manager_(data_pipeline_manager_) {
   ASSERT(link_queue_up_end_ != nullptr && handler_ != nullptr);
   link_queue_up_end_->RegisterDequeue(handler_,
                                       common::Bind(&Receiver::link_queue_dequeue_callback, common::Unretained(this)));
@@ -43,7 +45,7 @@
     return;
   }
   Cid cid = static_cast<Cid>(basic_frame_view.GetChannelId());
-  auto* data_controller = scheduler_->GetDataController(cid);
+  auto* data_controller = data_pipeline_manager_->GetDataController(cid);
   if (data_controller == nullptr) {
     LOG_WARN("Received a packet with invalid cid: %d", cid);
     return;
diff --git a/gd/l2cap/internal/receiver.h b/gd/l2cap/internal/receiver.h
index e3c035e..f757319 100644
--- a/gd/l2cap/internal/receiver.h
+++ b/gd/l2cap/internal/receiver.h
@@ -34,6 +34,8 @@
 namespace l2cap {
 namespace internal {
 
+class DataPipelineManager;
+
 /**
  * Handle receiving L2CAP PDUs from link queue and distribute them into into channel data controllers.
  * Dequeue incoming packets from LinkQueueUpEnd, and enqueue it to ChannelQueueDownEnd. Note: If a channel
@@ -51,13 +53,13 @@
   using LowerDequeue = UpperEnqueue;
   using LowerQueueUpEnd = common::BidiQueueEnd<LowerEnqueue, LowerDequeue>;
 
-  Receiver(LowerQueueUpEnd* link_queue_up_end, os::Handler* handler, Scheduler* scheduler);
+  Receiver(LowerQueueUpEnd* link_queue_up_end, os::Handler* handler, DataPipelineManager* data_pipeline_manager);
   ~Receiver();
 
  private:
   LowerQueueUpEnd* link_queue_up_end_;
   os::Handler* handler_;
-  Scheduler* scheduler_;
+  DataPipelineManager* data_pipeline_manager_;
 
   void link_queue_dequeue_callback();
 };
diff --git a/gd/l2cap/internal/scheduler.h b/gd/l2cap/internal/scheduler.h
index 59f2bfd..416c972 100644
--- a/gd/l2cap/internal/scheduler.h
+++ b/gd/l2cap/internal/scheduler.h
@@ -50,34 +50,10 @@
   using LowerQueueUpEnd = common::BidiQueueEnd<LowerEnqueue, LowerDequeue>;
 
   /**
-   * Attach the channel with the specified ChannelQueueDownEnd into the scheduler.
-   * Scheduler needs to notify the channel its Sender through SetSender().
-   *
-   * @param cid The channel to attach to the scheduler.
-   * @param channel The reference to a DynamicChannelImpl object. Use nullptr for fixed channel.
-   * TODO (b/144503952): Rethink about channel abstraction. Currently channel contains duplicated info as remote_cid
-   */
-  virtual void AttachChannel(Cid cid, std::shared_ptr<ChannelImpl> channel) {}
-
-  /**
-   * Detach the channel from the scheduler.
-   *
-   * @param cid The channel to detach to the scheduler.
-   */
-  virtual void DetachChannel(Cid cid) {}
-
-  /**
    * Callback from the sender to indicate that the scheduler could dequeue number_packets from it
    */
   virtual void OnPacketsReady(Cid cid, int number_packets) {}
 
-  /**
-   * Get the data controller for Reassembler
-   */
-  virtual DataController* GetDataController(Cid cid) {
-    return nullptr;
-  }
-
   virtual ~Scheduler() = default;
 };
 
diff --git a/gd/l2cap/internal/scheduler_fifo.cc b/gd/l2cap/internal/scheduler_fifo.cc
index 7454cbe..5e89c01 100644
--- a/gd/l2cap/internal/scheduler_fifo.cc
+++ b/gd/l2cap/internal/scheduler_fifo.cc
@@ -17,6 +17,7 @@
 #include "l2cap/internal/scheduler_fifo.h"
 
 #include "l2cap/classic/internal/dynamic_channel_impl.h"
+#include "l2cap/internal/data_pipeline_manager.h"
 #include "l2cap/l2cap_packets.h"
 #include "os/log.h"
 
@@ -24,32 +25,17 @@
 namespace l2cap {
 namespace internal {
 
-Fifo::Fifo(LowerQueueUpEnd* link_queue_up_end, os::Handler* handler)
-    : link_queue_up_end_(link_queue_up_end), handler_(handler) {
+Fifo::Fifo(DataPipelineManager* data_pipeline_manager, LowerQueueUpEnd* link_queue_up_end, os::Handler* handler)
+    : data_pipeline_manager_(data_pipeline_manager), link_queue_up_end_(link_queue_up_end), handler_(handler) {
   ASSERT(link_queue_up_end_ != nullptr && handler_ != nullptr);
 }
 
 Fifo::~Fifo() {
-  sender_map_.clear();
   if (link_queue_enqueue_registered_) {
     link_queue_up_end_->UnregisterEnqueue();
   }
 }
 
-void Fifo::AttachChannel(Cid cid, std::shared_ptr<ChannelImpl> channel) {
-  ASSERT(sender_map_.find(cid) == sender_map_.end());
-  sender_map_.emplace(std::piecewise_construct, std::forward_as_tuple(cid),
-                      std::forward_as_tuple(handler_, this, channel));
-  if (channel->GetCid() >= kFirstDynamicChannel) {
-    channel->SetSender(&sender_map_.find(cid)->second);
-  }
-}
-
-void Fifo::DetachChannel(Cid cid) {
-  ASSERT(sender_map_.find(cid) != sender_map_.end());
-  sender_map_.erase(cid);
-}
-
 void Fifo::OnPacketsReady(Cid cid, int number_packets) {
   next_to_dequeue_and_num_packets.push(std::make_pair(cid, number_packets));
   try_register_link_queue_enqueue();
@@ -63,9 +49,9 @@
   if (channel_id_and_number_packets.second == 0) {
     next_to_dequeue_and_num_packets.pop();
   }
-  auto packet = sender_map_.find(channel_id)->second.GetNextPacket();
+  auto packet = data_pipeline_manager_->GetDataController(channel_id)->GetNextPacket();
 
-  sender_map_.find(channel_id)->second.OnPacketSent();
+  data_pipeline_manager_->OnPacketSent(channel_id);
   if (next_to_dequeue_and_num_packets.empty()) {
     link_queue_up_end_->UnregisterEnqueue();
     link_queue_enqueue_registered_ = false;
@@ -82,13 +68,6 @@
   link_queue_enqueue_registered_ = true;
 }
 
-DataController* Fifo::GetDataController(Cid cid) {
-  if (sender_map_.find(cid) == sender_map_.end()) {
-    return nullptr;
-  }
-  return sender_map_.find(cid)->second.GetDataController();
-}
-
 }  // namespace internal
 }  // namespace l2cap
 }  // namespace bluetooth
diff --git a/gd/l2cap/internal/scheduler_fifo.h b/gd/l2cap/internal/scheduler_fifo.h
index b20ea4f..f6fcf7f 100644
--- a/gd/l2cap/internal/scheduler_fifo.h
+++ b/gd/l2cap/internal/scheduler_fifo.h
@@ -31,23 +31,21 @@
 namespace bluetooth {
 namespace l2cap {
 namespace internal {
+class DataPipelineManager;
 
 class Fifo : public Scheduler {
  public:
-  Fifo(LowerQueueUpEnd* link_queue_up_end, os::Handler* handler);
+  Fifo(DataPipelineManager* data_pipeline_manager, LowerQueueUpEnd* link_queue_up_end, os::Handler* handler);
   ~Fifo() override;
-  void AttachChannel(Cid cid, std::shared_ptr<ChannelImpl> channel) override;
-  void DetachChannel(Cid cid) override;
   void OnPacketsReady(Cid cid, int number_packets) override;
-  DataController* GetDataController(Cid cid) override;
 
  private:
+  DataPipelineManager* data_pipeline_manager_;
   LowerQueueUpEnd* link_queue_up_end_;
   os::Handler* handler_;
-  std::unordered_map<Cid, Sender> sender_map_;
   std::queue<std::pair<Cid, int>> next_to_dequeue_and_num_packets;
-
   bool link_queue_enqueue_registered_ = false;
+
   void try_register_link_queue_enqueue();
   std::unique_ptr<LowerEnqueue> link_queue_enqueue_callback();
 };
diff --git a/gd/l2cap/internal/scheduler_fifo_test.cc b/gd/l2cap/internal/scheduler_fifo_test.cc
index ff78838..cafd3b6 100644
--- a/gd/l2cap/internal/scheduler_fifo_test.cc
+++ b/gd/l2cap/internal/scheduler_fifo_test.cc
@@ -21,6 +21,8 @@
 #include <future>
 
 #include "l2cap/internal/channel_impl_mock.h"
+#include "l2cap/internal/data_controller_mock.h"
+#include "l2cap/internal/data_pipeline_manager_mock.h"
 #include "os/handler.h"
 #include "os/queue.h"
 #include "os/thread.h"
@@ -31,8 +33,23 @@
 namespace internal {
 namespace {
 
+using ::testing::_;
 using ::testing::Return;
 
+std::unique_ptr<packet::BasePacketBuilder> CreateSdu(std::vector<uint8_t> payload) {
+  auto raw_builder = std::make_unique<packet::RawBuilder>();
+  raw_builder->AddOctets(payload);
+  return raw_builder;
+}
+
+PacketView<kLittleEndian> GetPacketView(std::unique_ptr<packet::BasePacketBuilder> packet) {
+  auto bytes = std::make_shared<std::vector<uint8_t>>();
+  BitInserter i(*bytes);
+  bytes->reserve(packet->size());
+  packet->Serialize(i);
+  return packet::PacketView<packet::kLittleEndian>(bytes);
+}
+
 void sync_handler(os::Handler* handler) {
   std::promise<void> promise;
   auto future = promise.get_future();
@@ -41,17 +58,28 @@
   EXPECT_EQ(status, std::future_status::ready);
 }
 
+class MyDataController : public testing::MockDataController {
+ public:
+  std::unique_ptr<BasePacketBuilder> GetNextPacket() override {
+    return std::move(next_packet);
+  }
+
+  std::unique_ptr<BasePacketBuilder> next_packet;
+};
+
 class L2capSchedulerFifoTest : public ::testing::Test {
  protected:
   void SetUp() override {
     thread_ = new os::Thread("test_thread", os::Thread::Priority::NORMAL);
     user_handler_ = new os::Handler(thread_);
     queue_handler_ = new os::Handler(thread_);
-    fifo_ = new Fifo(link_queue_.GetUpEnd(), queue_handler_);
+    mock_data_pipeline_manager_ = new testing::MockDataPipelineManager(queue_handler_, link_queue_.GetUpEnd());
+    fifo_ = new Fifo(mock_data_pipeline_manager_, link_queue_.GetUpEnd(), queue_handler_);
   }
 
   void TearDown() override {
     delete fifo_;
+    delete mock_data_pipeline_manager_;
     queue_handler_->Clear();
     user_handler_->Clear();
     delete queue_handler_;
@@ -63,42 +91,26 @@
   os::Handler* user_handler_ = nullptr;
   os::Handler* queue_handler_ = nullptr;
   common::BidiQueue<Scheduler::LowerDequeue, Scheduler::LowerEnqueue> link_queue_{10};
+  testing::MockDataPipelineManager* mock_data_pipeline_manager_ = nullptr;
+  MyDataController data_controller_;
   Fifo* fifo_ = nullptr;
 };
 
 TEST_F(L2capSchedulerFifoTest, send_packet) {
-  common::BidiQueue<Scheduler::UpperEnqueue, Scheduler::UpperDequeue> channel_one_queue_{10};
-  common::BidiQueue<Scheduler::UpperEnqueue, Scheduler::UpperDequeue> channel_two_queue_{10};
-
-  auto mock_channel_1 = std::make_shared<testing::MockChannelImpl>();
-  EXPECT_CALL(*mock_channel_1, GetQueueDownEnd()).WillRepeatedly(Return(channel_one_queue_.GetDownEnd()));
-  EXPECT_CALL(*mock_channel_1, GetCid()).WillRepeatedly(Return(1));
-  EXPECT_CALL(*mock_channel_1, GetRemoteCid()).WillRepeatedly(Return(1));
-  auto mock_channel_2 = std::make_shared<testing::MockChannelImpl>();
-  EXPECT_CALL(*mock_channel_2, GetQueueDownEnd()).WillRepeatedly(Return(channel_two_queue_.GetDownEnd()));
-  EXPECT_CALL(*mock_channel_2, GetCid()).WillRepeatedly(Return(2));
-  EXPECT_CALL(*mock_channel_2, GetRemoteCid()).WillRepeatedly(Return(2));
-  fifo_->AttachChannel(1, mock_channel_1);
-  fifo_->AttachChannel(2, mock_channel_2);
-  os::EnqueueBuffer<Scheduler::UpperDequeue> channel_one_enqueue_buffer{channel_one_queue_.GetUpEnd()};
-  os::EnqueueBuffer<Scheduler::UpperDequeue> channel_two_enqueue_buffer{channel_two_queue_.GetUpEnd()};
-  auto packet_one = std::make_unique<packet::RawBuilder>();
-  packet_one->AddOctets({1, 2, 3});
-  auto packet_two = std::make_unique<packet::RawBuilder>();
-  packet_two->AddOctets({4, 5, 6, 7});
-  channel_one_enqueue_buffer.Enqueue(std::move(packet_one), user_handler_);
-  channel_two_enqueue_buffer.Enqueue(std::move(packet_two), user_handler_);
-  sync_handler(user_handler_);
+  auto frame = BasicFrameBuilder::Create(1, CreateSdu({'a', 'b', 'c'}));
+  data_controller_.next_packet = std::move(frame);
+  EXPECT_CALL(*mock_data_pipeline_manager_, GetDataController(_)).WillOnce(Return(&data_controller_));
+  EXPECT_CALL(*mock_data_pipeline_manager_, OnPacketSent(1));
+  fifo_->OnPacketsReady(1, 1);
   sync_handler(queue_handler_);
   sync_handler(user_handler_);
   auto packet = link_queue_.GetDownEnd()->TryDequeue();
-  EXPECT_NE(packet, nullptr);
-  EXPECT_EQ(packet->size(), 7);
-  packet = link_queue_.GetDownEnd()->TryDequeue();
-  EXPECT_NE(packet, nullptr);
-  EXPECT_EQ(packet->size(), 8);
-  fifo_->DetachChannel(1);
-  fifo_->DetachChannel(2);
+  auto packet_view = GetPacketView(std::move(packet));
+  auto basic_frame_view = BasicFrameView::Create(packet_view);
+  EXPECT_TRUE(basic_frame_view.IsValid());
+  EXPECT_EQ(basic_frame_view.GetChannelId(), 1);
+  auto payload = basic_frame_view.GetPayload();
+  EXPECT_EQ(std::string(payload.begin(), payload.end()), "abc");
 }
 
 }  // namespace
diff --git a/gd/l2cap/internal/scheduler_mock.h b/gd/l2cap/internal/scheduler_mock.h
index 5fdffbe..c0ac4d7 100644
--- a/gd/l2cap/internal/scheduler_mock.h
+++ b/gd/l2cap/internal/scheduler_mock.h
@@ -28,8 +28,6 @@
 
 class MockScheduler : public Scheduler {
  public:
-  MOCK_METHOD(void, AttachChannel, (Cid cid, std::shared_ptr<l2cap::internal::ChannelImpl> channel), (override));
-  MOCK_METHOD(void, DetachChannel, (Cid cid), (override));
   MOCK_METHOD(void, OnPacketsReady, (Cid cid, int number_packet), (override));
 };
 
diff --git a/gd/l2cap/internal/sender_test.cc b/gd/l2cap/internal/sender_test.cc
index cb16245..cecdfa4 100644
--- a/gd/l2cap/internal/sender_test.cc
+++ b/gd/l2cap/internal/sender_test.cc
@@ -40,6 +40,14 @@
   return raw_builder;
 }
 
+PacketView<kLittleEndian> GetPacketView(std::unique_ptr<packet::BasePacketBuilder> packet) {
+  auto bytes = std::make_shared<std::vector<uint8_t>>();
+  BitInserter i(*bytes);
+  bytes->reserve(packet->size());
+  packet->Serialize(i);
+  return packet::PacketView<packet::kLittleEndian>(bytes);
+}
+
 class FakeScheduler : public Scheduler {
  public:
   void OnPacketsReady(Cid cid, int number_packets) override {
@@ -52,10 +60,10 @@
   std::function<void(Cid cid, int number_packets)> on_packets_ready_;
 };
 
-class L2capSegmenterTest : public ::testing::Test {
+class L2capSenderTest : public ::testing::Test {
  public:
   std::unique_ptr<Sender::UpperDequeue> enqueue_callback() {
-    auto packet_one = CreateSdu({1, 2, 3});
+    auto packet_one = CreateSdu({'a', 'b', 'c'});
     channel_queue_.GetUpEnd()->UnregisterEnqueue();
     return packet_one;
   }
@@ -67,15 +75,15 @@
     queue_handler_ = new os::Handler(thread_);
     mock_channel_ = std::make_shared<testing::MockChannelImpl>();
     EXPECT_CALL(*mock_channel_, GetQueueDownEnd()).WillRepeatedly(Return(channel_queue_.GetDownEnd()));
-    EXPECT_CALL(*mock_channel_, GetCid()).WillRepeatedly(Return(0x41));
-    EXPECT_CALL(*mock_channel_, GetRemoteCid()).WillRepeatedly(Return(0x41));
+    EXPECT_CALL(*mock_channel_, GetCid()).WillRepeatedly(Return(cid_));
+    EXPECT_CALL(*mock_channel_, GetRemoteCid()).WillRepeatedly(Return(cid_));
     sender_ = new Sender(queue_handler_, &scheduler_, mock_channel_);
   }
 
   void TearDown() override {
-    delete sender_;
     queue_handler_->Clear();
     user_handler_->Clear();
+    delete sender_;
     delete queue_handler_;
     delete user_handler_;
     delete thread_;
@@ -87,20 +95,27 @@
   common::BidiQueue<Sender::UpperEnqueue, Sender::UpperDequeue> channel_queue_{10};
   std::shared_ptr<testing::MockChannelImpl> mock_channel_;
   Sender* sender_ = nullptr;
+  Cid cid_ = 0x41;
   FakeScheduler scheduler_;
 };
 
-TEST_F(L2capSegmenterTest, send_packet) {
-  auto packet_one = CreateSdu({1, 2, 3});
+TEST_F(L2capSenderTest, send_packet) {
   std::promise<void> promise;
   auto future = promise.get_future();
   scheduler_.SetOnPacketsReady([&promise](Cid cid, int number_packets) { promise.set_value(); });
   channel_queue_.GetUpEnd()->RegisterEnqueue(
-      queue_handler_, common::Bind(&L2capSegmenterTest::enqueue_callback, common::Unretained(this)));
+      queue_handler_, common::Bind(&L2capSenderTest::enqueue_callback, common::Unretained(this)));
   auto status = future.wait_for(std::chrono::milliseconds(3));
   EXPECT_EQ(status, std::future_status::ready);
   auto packet = sender_->GetNextPacket();
   EXPECT_NE(packet, nullptr);
+  auto packet_view = GetPacketView(std::move(packet));
+  auto basic_frame_view = BasicFrameView::Create(packet_view);
+  EXPECT_TRUE(basic_frame_view.IsValid());
+  EXPECT_EQ(basic_frame_view.GetChannelId(), cid_);
+  auto payload = basic_frame_view.GetPayload();
+  std::string payload_string(payload.begin(), payload.end());
+  EXPECT_EQ(payload_string, "abc");
 }
 
 }  // namespace
diff --git a/gd/l2cap/le/internal/link.h b/gd/l2cap/le/internal/link.h
index 783e03b..3e75da6 100644
--- a/gd/l2cap/le/internal/link.h
+++ b/gd/l2cap/le/internal/link.h
@@ -20,9 +20,9 @@
 #include <memory>
 
 #include "hci/acl_manager.h"
+#include "l2cap/internal/data_pipeline_manager.h"
 #include "l2cap/internal/fixed_channel_allocator.h"
 #include "l2cap/internal/parameter_provider.h"
-#include "l2cap/internal/scheduler.h"
 #include "l2cap/le/internal/fixed_channel_impl.h"
 #include "os/alarm.h"
 
@@ -34,12 +34,12 @@
 class Link {
  public:
   Link(os::Handler* l2cap_handler, std::unique_ptr<hci::AclConnection> acl_connection,
-       std::unique_ptr<l2cap::internal::Scheduler> scheduler, l2cap::internal::ParameterProvider* parameter_provider)
-      : l2cap_handler_(l2cap_handler), acl_connection_(std::move(acl_connection)), scheduler_(std::move(scheduler)),
+       l2cap::internal::ParameterProvider* parameter_provider)
+      : l2cap_handler_(l2cap_handler), acl_connection_(std::move(acl_connection)),
+        data_pipeline_manager_(l2cap_handler, acl_connection_->GetAclQueueEnd()),
         parameter_provider_(parameter_provider) {
     ASSERT(l2cap_handler_ != nullptr);
     ASSERT(acl_connection_ != nullptr);
-    ASSERT(scheduler_ != nullptr);
     ASSERT(parameter_provider_ != nullptr);
     link_idle_disconnect_alarm_.Schedule(common::BindOnce(&Link::Disconnect, common::Unretained(this)),
                                          parameter_provider_->GetLeLinkIdleDisconnectTimeout());
@@ -69,7 +69,7 @@
 
   virtual std::shared_ptr<FixedChannelImpl> AllocateFixedChannel(Cid cid, SecurityPolicy security_policy) {
     auto channel = fixed_channel_allocator_.AllocateChannel(cid, security_policy);
-    scheduler_->AttachChannel(cid, channel);
+    data_pipeline_manager_.AttachChannel(cid, channel);
     return channel;
   }
 
@@ -98,7 +98,7 @@
   os::Handler* l2cap_handler_;
   l2cap::internal::FixedChannelAllocator<FixedChannelImpl, Link> fixed_channel_allocator_{this, l2cap_handler_};
   std::unique_ptr<hci::AclConnection> acl_connection_;
-  std::unique_ptr<l2cap::internal::Scheduler> scheduler_;
+  l2cap::internal::DataPipelineManager data_pipeline_manager_;
   l2cap::internal::ParameterProvider* parameter_provider_;
   os::Alarm link_idle_disconnect_alarm_{l2cap_handler_};
   DISALLOW_COPY_AND_ASSIGN(Link);
diff --git a/gd/l2cap/le/internal/link_manager.cc b/gd/l2cap/le/internal/link_manager.cc
index 989cf3d..fac8cbc 100644
--- a/gd/l2cap/le/internal/link_manager.cc
+++ b/gd/l2cap/le/internal/link_manager.cc
@@ -95,13 +95,11 @@
   hci::AddressWithType connected_address_with_type(acl_connection->GetAddress(), acl_connection->GetAddressType());
   ASSERT_LOG(GetLink(connected_address_with_type) == nullptr, "%s is connected twice without disconnection",
              acl_connection->GetAddress().ToString().c_str());
-  auto* link_queue_up_end = acl_connection->GetAclQueueEnd();
   // Register ACL disconnection callback in LinkManager so that we can clean up link resource properly
   acl_connection->RegisterDisconnectCallback(
       common::BindOnce(&LinkManager::OnDisconnect, common::Unretained(this), connected_address_with_type),
       l2cap_handler_);
-  links_.try_emplace(connected_address_with_type, l2cap_handler_, std::move(acl_connection),
-                     std::make_unique<l2cap::internal::Fifo>(link_queue_up_end, l2cap_handler_), parameter_provider_);
+  links_.try_emplace(connected_address_with_type, l2cap_handler_, std::move(acl_connection), parameter_provider_);
   auto* link = GetLink(connected_address_with_type);
   // Allocate and distribute channels for all registered fixed channel services
   auto fixed_channel_services = service_manager_->GetRegisteredServices();
diff --git a/gd/l2cap/le/internal/link_mock.h b/gd/l2cap/le/internal/link_mock.h
index 08ea629..5d537b3 100644
--- a/gd/l2cap/le/internal/link_mock.h
+++ b/gd/l2cap/le/internal/link_mock.h
@@ -34,8 +34,7 @@
 class MockLink : public Link {
  public:
   explicit MockLink(os::Handler* handler, l2cap::internal::ParameterProvider* parameter_provider)
-      : Link(handler, std::make_unique<MockAclConnection>(),
-             std::make_unique<l2cap::internal::testing::MockScheduler>(), parameter_provider){};
+      : Link(handler, std::make_unique<MockAclConnection>(), parameter_provider){};
   MOCK_METHOD(hci::AddressWithType, GetDevice, (), (override));
   MOCK_METHOD(hci::Role, GetRole, (), (override));
   MOCK_METHOD(void, OnAclDisconnected, (hci::ErrorCode status), (override));