| /* |
| * Copyright 2019 The Android Open Source Project |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "os/queue.h" |
| |
| #include <sys/eventfd.h> |
| |
| #include <atomic> |
| #include <chrono> |
| #include <future> |
| #include <unordered_map> |
| |
| #include "common/bind.h" |
| #include "gtest/gtest.h" |
| #include "os/reactor.h" |
| |
| using namespace std::chrono_literals; |
| |
| namespace bluetooth { |
| namespace os { |
| namespace { |
| |
| constexpr int kQueueSize = 10; |
| constexpr int kHalfOfQueueSize = kQueueSize / 2; |
| constexpr int kDoubleOfQueueSize = kQueueSize * 2; |
| constexpr int kQueueSizeOne = 1; |
| |
| class QueueTest : public ::testing::Test { |
| protected: |
| void SetUp() override { |
| enqueue_thread_ = new Thread("enqueue_thread", Thread::Priority::NORMAL); |
| enqueue_handler_ = new Handler(enqueue_thread_); |
| dequeue_thread_ = new Thread("dequeue_thread", Thread::Priority::NORMAL); |
| dequeue_handler_ = new Handler(dequeue_thread_); |
| } |
| void TearDown() override { |
| enqueue_handler_->Clear(); |
| delete enqueue_handler_; |
| delete enqueue_thread_; |
| dequeue_handler_->Clear(); |
| delete dequeue_handler_; |
| delete dequeue_thread_; |
| enqueue_handler_ = nullptr; |
| enqueue_thread_ = nullptr; |
| dequeue_handler_ = nullptr; |
| dequeue_thread_ = nullptr; |
| } |
| |
| Thread* enqueue_thread_; |
| Handler* enqueue_handler_; |
| Thread* dequeue_thread_; |
| Handler* dequeue_handler_; |
| |
| void sync_enqueue_handler() { |
| ASSERT(enqueue_thread_ != nullptr); |
| ASSERT(enqueue_thread_->GetReactor()->WaitForIdle(2s)); |
| } |
| }; |
| |
| class TestEnqueueEnd { |
| public: |
| explicit TestEnqueueEnd(Queue<std::string>* queue, Handler* handler) |
| : count(0), handler_(handler), queue_(queue), delay_(0) {} |
| |
| ~TestEnqueueEnd() {} |
| |
| void RegisterEnqueue(std::unordered_map<int, std::promise<int>>* promise_map) { |
| promise_map_ = promise_map; |
| handler_->Post(common::BindOnce(&TestEnqueueEnd::handle_register_enqueue, common::Unretained(this))); |
| } |
| |
| void UnregisterEnqueue() { |
| std::promise<void> promise; |
| auto future = promise.get_future(); |
| |
| handler_->Post( |
| common::BindOnce(&TestEnqueueEnd::handle_unregister_enqueue, common::Unretained(this), std::move(promise))); |
| future.wait(); |
| } |
| |
| std::unique_ptr<std::string> EnqueueCallbackForTest() { |
| if (delay_ != 0) { |
| std::this_thread::sleep_for(std::chrono::milliseconds(delay_)); |
| } |
| |
| count++; |
| std::unique_ptr<std::string> data = std::move(buffer_.front()); |
| buffer_.pop(); |
| std::string copy = *data; |
| if (buffer_.empty()) { |
| queue_->UnregisterEnqueue(); |
| } |
| |
| auto key = buffer_.size(); |
| auto node = promise_map_->extract(key); |
| if (node) { |
| node.mapped().set_value(key); |
| } |
| |
| return data; |
| } |
| |
| void setDelay(int value) { |
| delay_ = value; |
| } |
| |
| std::queue<std::unique_ptr<std::string>> buffer_; |
| int count; |
| |
| private: |
| Handler* handler_; |
| Queue<std::string>* queue_; |
| std::unordered_map<int, std::promise<int>>* promise_map_; |
| int delay_; |
| |
| void handle_register_enqueue() { |
| queue_->RegisterEnqueue(handler_, common::Bind(&TestEnqueueEnd::EnqueueCallbackForTest, common::Unretained(this))); |
| } |
| |
| void handle_unregister_enqueue(std::promise<void> promise) { |
| queue_->UnregisterEnqueue(); |
| promise.set_value(); |
| } |
| }; |
| |
| class TestDequeueEnd { |
| public: |
| explicit TestDequeueEnd(Queue<std::string>* queue, Handler* handler, int capacity) |
| : count(0), handler_(handler), queue_(queue), capacity_(capacity), delay_(0) {} |
| |
| ~TestDequeueEnd() {} |
| |
| void RegisterDequeue(std::unordered_map<int, std::promise<int>>* promise_map) { |
| promise_map_ = promise_map; |
| handler_->Post(common::BindOnce(&TestDequeueEnd::handle_register_dequeue, common::Unretained(this))); |
| } |
| |
| void UnregisterDequeue() { |
| std::promise<void> promise; |
| auto future = promise.get_future(); |
| |
| handler_->Post( |
| common::BindOnce(&TestDequeueEnd::handle_unregister_dequeue, common::Unretained(this), std::move(promise))); |
| future.wait(); |
| } |
| |
| void DequeueCallbackForTest() { |
| if (delay_ != 0) { |
| std::this_thread::sleep_for(std::chrono::milliseconds(delay_)); |
| } |
| |
| count++; |
| std::unique_ptr<std::string> data = queue_->TryDequeue(); |
| buffer_.push(std::move(data)); |
| |
| if (buffer_.size() == (size_t)capacity_) { |
| queue_->UnregisterDequeue(); |
| } |
| |
| auto key = buffer_.size(); |
| auto node = promise_map_->extract(key); |
| if (node) { |
| node.mapped().set_value(key); |
| } |
| } |
| |
| void setDelay(int value) { |
| delay_ = value; |
| } |
| |
| std::queue<std::unique_ptr<std::string>> buffer_; |
| int count; |
| |
| private: |
| Handler* handler_; |
| Queue<std::string>* queue_; |
| std::unordered_map<int, std::promise<int>>* promise_map_; |
| int capacity_; |
| int delay_; |
| |
| void handle_register_dequeue() { |
| queue_->RegisterDequeue(handler_, common::Bind(&TestDequeueEnd::DequeueCallbackForTest, common::Unretained(this))); |
| } |
| |
| void handle_unregister_dequeue(std::promise<void> promise) { |
| queue_->UnregisterDequeue(); |
| promise.set_value(); |
| } |
| }; |
| |
| // Enqueue end level : 0 -> queue is full, 1 - > queue isn't full |
| // Dequeue end level : 0 -> queue is empty, 1 - > queue isn't empty |
| |
| // Test 1 : Queue is empty |
| |
| // Enqueue end level : 1 |
| // Dequeue end level : 0 |
| // Test 1-1 EnqueueCallback should continually be invoked when queue isn't full |
| TEST_F(QueueTest, register_enqueue_with_empty_queue) { |
| Queue<std::string> queue(kQueueSize); |
| TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_); |
| |
| // Push kQueueSize data to enqueue_end buffer |
| for (int i = 0; i < kQueueSize; i++) { |
| std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i)); |
| test_enqueue_end.buffer_.push(std::move(data)); |
| } |
| EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kQueueSize); |
| |
| // Register enqueue and expect data move to Queue |
| std::unordered_map<int, std::promise<int>> enqueue_promise_map; |
| enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple()); |
| auto enqueue_future = enqueue_promise_map[0].get_future(); |
| test_enqueue_end.RegisterEnqueue(&enqueue_promise_map); |
| enqueue_future.wait(); |
| EXPECT_EQ(enqueue_future.get(), 0); |
| std::this_thread::sleep_for(std::chrono::milliseconds(20)); |
| } |
| |
| // Enqueue end level : 1 |
| // Dequeue end level : 0 |
| // Test 1-2 DequeueCallback shouldn't be invoked when queue is empty |
| TEST_F(QueueTest, register_dequeue_with_empty_queue) { |
| Queue<std::string> queue(kQueueSize); |
| TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize); |
| |
| // Register dequeue, DequeueCallback shouldn't be invoked |
| std::unordered_map<int, std::promise<int>> dequeue_promise_map; |
| test_dequeue_end.RegisterDequeue(&dequeue_promise_map); |
| std::this_thread::sleep_for(std::chrono::milliseconds(20)); |
| EXPECT_EQ(test_dequeue_end.count, 0); |
| |
| test_dequeue_end.UnregisterDequeue(); |
| } |
| |
| // Test 2 : Queue is full |
| |
| // Enqueue end level : 0 |
| // Dequeue end level : 1 |
| // Test 2-1 EnqueueCallback shouldn't be invoked when queue is full |
| TEST_F(QueueTest, register_enqueue_with_full_queue) { |
| Queue<std::string> queue(kQueueSize); |
| TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_); |
| |
| // make Queue full |
| for (int i = 0; i < kQueueSize; i++) { |
| std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i)); |
| test_enqueue_end.buffer_.push(std::move(data)); |
| } |
| std::unordered_map<int, std::promise<int>> enqueue_promise_map; |
| enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple()); |
| auto enqueue_future = enqueue_promise_map[0].get_future(); |
| test_enqueue_end.RegisterEnqueue(&enqueue_promise_map); |
| enqueue_future.wait(); |
| EXPECT_EQ(enqueue_future.get(), 0); |
| |
| // push some data to enqueue_end buffer and register enqueue; |
| for (int i = 0; i < kHalfOfQueueSize; i++) { |
| std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i)); |
| test_enqueue_end.buffer_.push(std::move(data)); |
| } |
| test_enqueue_end.RegisterEnqueue(&enqueue_promise_map); |
| |
| // EnqueueCallback shouldn't be invoked |
| std::this_thread::sleep_for(std::chrono::milliseconds(20)); |
| EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kHalfOfQueueSize); |
| EXPECT_EQ(test_enqueue_end.count, kQueueSize); |
| |
| test_enqueue_end.UnregisterEnqueue(); |
| } |
| |
| // Enqueue end level : 0 |
| // Dequeue end level : 1 |
| // Test 2-2 DequeueCallback should continually be invoked when queue isn't empty |
| TEST_F(QueueTest, register_dequeue_with_full_queue) { |
| Queue<std::string> queue(kQueueSize); |
| TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_); |
| TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize); |
| |
| // make Queue full |
| for (int i = 0; i < kQueueSize; i++) { |
| std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i)); |
| test_enqueue_end.buffer_.push(std::move(data)); |
| } |
| std::unordered_map<int, std::promise<int>> enqueue_promise_map; |
| enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple()); |
| auto enqueue_future = enqueue_promise_map[0].get_future(); |
| test_enqueue_end.RegisterEnqueue(&enqueue_promise_map); |
| enqueue_future.wait(); |
| EXPECT_EQ(enqueue_future.get(), 0); |
| |
| // Register dequeue and expect data move to dequeue end buffer |
| std::unordered_map<int, std::promise<int>> dequeue_promise_map; |
| dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple()); |
| auto dequeue_future = dequeue_promise_map[kQueueSize].get_future(); |
| test_dequeue_end.RegisterDequeue(&dequeue_promise_map); |
| dequeue_future.wait(); |
| EXPECT_EQ(dequeue_future.get(), kQueueSize); |
| |
| test_dequeue_end.UnregisterDequeue(); |
| } |
| |
| // Test 3 : Queue is non-empty and non-full |
| |
| // Enqueue end level : 1 |
| // Dequeue end level : 1 |
| // Test 3-1 Register enqueue with half empty queue, EnqueueCallback should continually be invoked |
| TEST_F(QueueTest, register_enqueue_with_half_empty_queue) { |
| Queue<std::string> queue(kQueueSize); |
| TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_); |
| |
| // make Queue half empty |
| for (int i = 0; i < kHalfOfQueueSize; i++) { |
| std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i)); |
| test_enqueue_end.buffer_.push(std::move(data)); |
| } |
| std::unordered_map<int, std::promise<int>> enqueue_promise_map; |
| enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple()); |
| auto enqueue_future = enqueue_promise_map[0].get_future(); |
| test_enqueue_end.RegisterEnqueue(&enqueue_promise_map); |
| enqueue_future.wait(); |
| EXPECT_EQ(enqueue_future.get(), 0); |
| |
| // push some data to enqueue_end buffer and register enqueue; |
| for (int i = 0; i < kHalfOfQueueSize; i++) { |
| std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i)); |
| test_enqueue_end.buffer_.push(std::move(data)); |
| } |
| |
| // Register enqueue and expect data move to Queue |
| enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple()); |
| enqueue_future = enqueue_promise_map[0].get_future(); |
| test_enqueue_end.RegisterEnqueue(&enqueue_promise_map); |
| enqueue_future.wait(); |
| EXPECT_EQ(enqueue_future.get(), 0); |
| sync_enqueue_handler(); |
| } |
| |
| // Enqueue end level : 1 |
| // Dequeue end level : 1 |
| // Test 3-2 Register dequeue with half empty queue, DequeueCallback should continually be invoked |
| TEST_F(QueueTest, register_dequeue_with_half_empty_queue) { |
| Queue<std::string> queue(kQueueSize); |
| TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_); |
| TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize); |
| |
| // make Queue half empty |
| for (int i = 0; i < kHalfOfQueueSize; i++) { |
| std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i)); |
| test_enqueue_end.buffer_.push(std::move(data)); |
| } |
| std::unordered_map<int, std::promise<int>> enqueue_promise_map; |
| enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple()); |
| auto enqueue_future = enqueue_promise_map[0].get_future(); |
| test_enqueue_end.RegisterEnqueue(&enqueue_promise_map); |
| enqueue_future.wait(); |
| EXPECT_EQ(enqueue_future.get(), 0); |
| |
| // Register dequeue and expect data move to dequeue end buffer |
| std::unordered_map<int, std::promise<int>> dequeue_promise_map; |
| dequeue_promise_map.emplace( |
| std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize), std::forward_as_tuple()); |
| auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future(); |
| test_dequeue_end.RegisterDequeue(&dequeue_promise_map); |
| dequeue_future.wait(); |
| EXPECT_EQ(dequeue_future.get(), kHalfOfQueueSize); |
| |
| test_dequeue_end.UnregisterDequeue(); |
| } |
| |
| // Dynamic level test |
| |
| // Test 4 : Queue becomes full during test, EnqueueCallback should stop to be invoked |
| |
| // Enqueue end level : 1 -> 0 |
| // Dequeue end level : 1 |
| // Test 4-1 Queue becomes full due to only register EnqueueCallback |
| TEST_F(QueueTest, queue_becomes_full_enqueue_callback_only) { |
| Queue<std::string> queue(kQueueSize); |
| TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_); |
| |
| // push double of kQueueSize to enqueue end buffer |
| for (int i = 0; i < kDoubleOfQueueSize; i++) { |
| std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i)); |
| test_enqueue_end.buffer_.push(std::move(data)); |
| } |
| |
| // Register enqueue and expect kQueueSize data move to Queue |
| std::unordered_map<int, std::promise<int>> enqueue_promise_map; |
| enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple()); |
| auto enqueue_future = enqueue_promise_map[kQueueSize].get_future(); |
| test_enqueue_end.RegisterEnqueue(&enqueue_promise_map); |
| enqueue_future.wait(); |
| EXPECT_EQ(enqueue_future.get(), kQueueSize); |
| |
| // EnqueueCallback shouldn't be invoked and buffer size stay in kQueueSize |
| std::this_thread::sleep_for(std::chrono::milliseconds(20)); |
| EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kQueueSize); |
| EXPECT_EQ(test_enqueue_end.count, kQueueSize); |
| |
| test_enqueue_end.UnregisterEnqueue(); |
| } |
| |
| // Enqueue end level : 1 -> 0 |
| // Dequeue end level : 1 |
| // Test 4-2 Queue becomes full due to DequeueCallback unregister during test |
| TEST_F(QueueTest, queue_becomes_full_dequeue_callback_unregister) { |
| Queue<std::string> queue(kQueueSize); |
| TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_); |
| TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kHalfOfQueueSize); |
| |
| // push double of kQueueSize to enqueue end buffer |
| for (int i = 0; i < kDoubleOfQueueSize; i++) { |
| std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i)); |
| test_enqueue_end.buffer_.push(std::move(data)); |
| } |
| |
| // Register dequeue |
| std::unordered_map<int, std::promise<int>> dequeue_promise_map; |
| dequeue_promise_map.emplace( |
| std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize), std::forward_as_tuple()); |
| auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future(); |
| test_dequeue_end.RegisterDequeue(&dequeue_promise_map); |
| |
| // Register enqueue |
| std::unordered_map<int, std::promise<int>> enqueue_promise_map; |
| enqueue_promise_map.emplace( |
| std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize), std::forward_as_tuple()); |
| auto enqueue_future = enqueue_promise_map[kHalfOfQueueSize].get_future(); |
| test_enqueue_end.RegisterEnqueue(&enqueue_promise_map); |
| |
| // Dequeue end will unregister when buffer size is kHalfOfQueueSize |
| dequeue_future.wait(); |
| EXPECT_EQ(dequeue_future.get(), kHalfOfQueueSize); |
| |
| // EnqueueCallback shouldn't be invoked and buffer size stay in kHalfOfQueueSize |
| enqueue_future.wait(); |
| EXPECT_EQ(enqueue_future.get(), kHalfOfQueueSize); |
| std::this_thread::sleep_for(std::chrono::milliseconds(20)); |
| EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kHalfOfQueueSize); |
| EXPECT_EQ(test_enqueue_end.count, kQueueSize + kHalfOfQueueSize); |
| |
| test_enqueue_end.UnregisterEnqueue(); |
| } |
| |
| // Enqueue end level : 1 -> 0 |
| // Dequeue end level : 1 |
| // Test 4-3 Queue becomes full due to DequeueCallback is slower |
| TEST_F(QueueTest, queue_becomes_full_dequeue_callback_slower) { |
| Queue<std::string> queue(kQueueSize); |
| TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_); |
| TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize); |
| |
| // push double of kDoubleOfQueueSize to enqueue end buffer |
| for (int i = 0; i < kDoubleOfQueueSize; i++) { |
| std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i)); |
| test_enqueue_end.buffer_.push(std::move(data)); |
| } |
| |
| // Set 20 ms delay for callback and register dequeue |
| std::unordered_map<int, std::promise<int>> dequeue_promise_map; |
| test_dequeue_end.setDelay(20); |
| auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future(); |
| test_dequeue_end.RegisterDequeue(&dequeue_promise_map); |
| |
| // Register enqueue |
| std::unordered_map<int, std::promise<int>> enqueue_promise_map; |
| enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple()); |
| auto enqueue_future = enqueue_promise_map[0].get_future(); |
| test_enqueue_end.RegisterEnqueue(&enqueue_promise_map); |
| |
| // Wait for enqueue buffer empty and expect queue is full |
| enqueue_future.wait(); |
| EXPECT_EQ(enqueue_future.get(), 0); |
| EXPECT_GE(test_dequeue_end.buffer_.size(), (size_t)(kQueueSize - 1)); |
| |
| test_dequeue_end.UnregisterDequeue(); |
| } |
| |
| // Enqueue end level : 0 -> 1 |
| // Dequeue end level : 1 -> 0 |
| // Test 5 Queue becomes full and non empty at same time. |
| TEST_F(QueueTest, queue_becomes_full_and_non_empty_at_same_time) { |
| Queue<std::string> queue(kQueueSizeOne); |
| TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_); |
| TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize); |
| |
| // push double of kQueueSize to enqueue end buffer |
| for (int i = 0; i < kQueueSize; i++) { |
| std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i)); |
| test_enqueue_end.buffer_.push(std::move(data)); |
| } |
| |
| // Register dequeue |
| std::unordered_map<int, std::promise<int>> dequeue_promise_map; |
| dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple()); |
| auto dequeue_future = dequeue_promise_map[kQueueSize].get_future(); |
| test_dequeue_end.RegisterDequeue(&dequeue_promise_map); |
| |
| // Register enqueue |
| std::unordered_map<int, std::promise<int>> enqueue_promise_map; |
| auto enqueue_future = enqueue_promise_map[0].get_future(); |
| test_enqueue_end.RegisterEnqueue(&enqueue_promise_map); |
| |
| // Wait for all data move from enqueue end buffer to dequeue end buffer |
| dequeue_future.wait(); |
| EXPECT_EQ(dequeue_future.get(), kQueueSize); |
| |
| test_dequeue_end.UnregisterDequeue(); |
| } |
| |
| // Enqueue end level : 1 -> 0 |
| // Dequeue end level : 1 |
| // Test 6 Queue becomes not full during test, EnqueueCallback should start to be invoked |
| TEST_F(QueueTest, queue_becomes_non_full_during_test) { |
| Queue<std::string> queue(kQueueSize); |
| TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_); |
| TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize * 3); |
| |
| // make Queue full |
| for (int i = 0; i < kDoubleOfQueueSize; i++) { |
| std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i)); |
| test_enqueue_end.buffer_.push(std::move(data)); |
| } |
| std::unordered_map<int, std::promise<int>> enqueue_promise_map; |
| enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple()); |
| enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple()); |
| auto enqueue_future = enqueue_promise_map[kQueueSize].get_future(); |
| test_enqueue_end.RegisterEnqueue(&enqueue_promise_map); |
| enqueue_future.wait(); |
| EXPECT_EQ(enqueue_future.get(), kQueueSize); |
| |
| // Expect kQueueSize data block in enqueue end buffer |
| std::this_thread::sleep_for(std::chrono::milliseconds(20)); |
| EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kQueueSize); |
| |
| // Register dequeue |
| std::unordered_map<int, std::promise<int>> dequeue_promise_map; |
| test_dequeue_end.RegisterDequeue(&dequeue_promise_map); |
| |
| // Expect enqueue end will empty |
| enqueue_future = enqueue_promise_map[0].get_future(); |
| enqueue_future.wait(); |
| EXPECT_EQ(enqueue_future.get(), 0); |
| |
| test_dequeue_end.UnregisterDequeue(); |
| } |
| |
| // Enqueue end level : 0 -> 1 |
| // Dequeue end level : 1 -> 0 |
| // Test 7 Queue becomes non full and empty at same time. (Exactly same as Test 5) |
| TEST_F(QueueTest, queue_becomes_non_full_and_empty_at_same_time) { |
| Queue<std::string> queue(kQueueSizeOne); |
| TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_); |
| TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize); |
| |
| // push double of kQueueSize to enqueue end buffer |
| for (int i = 0; i < kQueueSize; i++) { |
| std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i)); |
| test_enqueue_end.buffer_.push(std::move(data)); |
| } |
| |
| // Register dequeue |
| std::unordered_map<int, std::promise<int>> dequeue_promise_map; |
| dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple()); |
| auto dequeue_future = dequeue_promise_map[kQueueSize].get_future(); |
| test_dequeue_end.RegisterDequeue(&dequeue_promise_map); |
| |
| // Register enqueue |
| std::unordered_map<int, std::promise<int>> enqueue_promise_map; |
| auto enqueue_future = enqueue_promise_map[0].get_future(); |
| test_enqueue_end.RegisterEnqueue(&enqueue_promise_map); |
| |
| // Wait for all data move from enqueue end buffer to dequeue end buffer |
| dequeue_future.wait(); |
| EXPECT_EQ(dequeue_future.get(), kQueueSize); |
| |
| test_dequeue_end.UnregisterDequeue(); |
| } |
| |
| // Test 8 : Queue becomes empty during test, DequeueCallback should stop to be invoked |
| |
| // Enqueue end level : 1 |
| // Dequeue end level : 1 -> 0 |
| // Test 8-1 Queue becomes empty due to only register DequeueCallback |
| TEST_F(QueueTest, queue_becomes_empty_dequeue_callback_only) { |
| Queue<std::string> queue(kQueueSize); |
| TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_); |
| TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kHalfOfQueueSize); |
| |
| // make Queue half empty |
| for (int i = 0; i < kHalfOfQueueSize; i++) { |
| std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i)); |
| test_enqueue_end.buffer_.push(std::move(data)); |
| } |
| std::unordered_map<int, std::promise<int>> enqueue_promise_map; |
| enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple()); |
| auto enqueue_future = enqueue_promise_map[0].get_future(); |
| test_enqueue_end.RegisterEnqueue(&enqueue_promise_map); |
| enqueue_future.wait(); |
| EXPECT_EQ(enqueue_future.get(), 0); |
| |
| // Register dequeue, expect kHalfOfQueueSize data move to dequeue end buffer |
| std::unordered_map<int, std::promise<int>> dequeue_promise_map; |
| dequeue_promise_map.emplace( |
| std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize), std::forward_as_tuple()); |
| auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future(); |
| test_dequeue_end.RegisterDequeue(&dequeue_promise_map); |
| dequeue_future.wait(); |
| EXPECT_EQ(dequeue_future.get(), kHalfOfQueueSize); |
| |
| // Expect DequeueCallback should stop to be invoked |
| std::this_thread::sleep_for(std::chrono::milliseconds(20)); |
| EXPECT_EQ(test_dequeue_end.count, kHalfOfQueueSize); |
| } |
| |
| // Enqueue end level : 1 |
| // Dequeue end level : 1 -> 0 |
| // Test 8-2 Queue becomes empty due to EnqueueCallback unregister during test |
| TEST_F(QueueTest, queue_becomes_empty_enqueue_callback_unregister) { |
| Queue<std::string> queue(kQueueSize); |
| TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_); |
| TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize); |
| |
| // make Queue half empty |
| for (int i = 0; i < kHalfOfQueueSize; i++) { |
| std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i)); |
| test_enqueue_end.buffer_.push(std::move(data)); |
| } |
| std::unordered_map<int, std::promise<int>> enqueue_promise_map; |
| enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple()); |
| auto enqueue_future = enqueue_promise_map[0].get_future(); |
| test_enqueue_end.RegisterEnqueue(&enqueue_promise_map); |
| enqueue_future.wait(); |
| EXPECT_EQ(enqueue_future.get(), 0); |
| |
| // push kHalfOfQueueSize to enqueue end buffer and register enqueue. |
| for (int i = 0; i < kHalfOfQueueSize; i++) { |
| std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i)); |
| test_enqueue_end.buffer_.push(std::move(data)); |
| } |
| test_enqueue_end.RegisterEnqueue(&enqueue_promise_map); |
| |
| // Register dequeue, expect kQueueSize move to dequeue end buffer |
| std::unordered_map<int, std::promise<int>> dequeue_promise_map; |
| dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple()); |
| auto dequeue_future = dequeue_promise_map[kQueueSize].get_future(); |
| test_dequeue_end.RegisterDequeue(&dequeue_promise_map); |
| dequeue_future.wait(); |
| EXPECT_EQ(dequeue_future.get(), kQueueSize); |
| |
| // Expect DequeueCallback should stop to be invoked |
| std::this_thread::sleep_for(std::chrono::milliseconds(20)); |
| EXPECT_EQ(test_dequeue_end.count, kQueueSize); |
| } |
| |
| // Enqueue end level : 1 |
| // Dequeue end level : 0 -> 1 |
| // Test 9 Queue becomes not empty during test, DequeueCallback should start to be invoked |
| TEST_F(QueueTest, queue_becomes_non_empty_during_test) { |
| Queue<std::string> queue(kQueueSize); |
| TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_); |
| TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize); |
| |
| // Register dequeue |
| std::unordered_map<int, std::promise<int>> dequeue_promise_map; |
| dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple()); |
| test_dequeue_end.RegisterDequeue(&dequeue_promise_map); |
| |
| // push kQueueSize data to enqueue end buffer and register enqueue |
| for (int i = 0; i < kQueueSize; i++) { |
| std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i)); |
| test_enqueue_end.buffer_.push(std::move(data)); |
| } |
| std::unordered_map<int, std::promise<int>> enqueue_promise_map; |
| test_enqueue_end.RegisterEnqueue(&enqueue_promise_map); |
| |
| // Expect kQueueSize data move to dequeue end buffer |
| auto dequeue_future = dequeue_promise_map[kQueueSize].get_future(); |
| dequeue_future.wait(); |
| EXPECT_EQ(dequeue_future.get(), kQueueSize); |
| } |
| |
| TEST_F(QueueTest, pass_smart_pointer_and_unregister) { |
| Queue<std::string>* queue = new Queue<std::string>(kQueueSize); |
| |
| // Enqueue a string |
| std::string valid = "Valid String"; |
| std::shared_ptr<std::string> shared = std::make_shared<std::string>(valid); |
| queue->RegisterEnqueue( |
| enqueue_handler_, |
| common::Bind( |
| [](Queue<std::string>* queue, std::shared_ptr<std::string> shared) { |
| queue->UnregisterEnqueue(); |
| return std::make_unique<std::string>(*shared); |
| }, |
| common::Unretained(queue), |
| shared)); |
| |
| // Dequeue the string |
| queue->RegisterDequeue( |
| dequeue_handler_, |
| common::Bind( |
| [](Queue<std::string>* queue, std::string valid) { |
| queue->UnregisterDequeue(); |
| auto answer = *queue->TryDequeue(); |
| ASSERT_EQ(answer, valid); |
| }, |
| common::Unretained(queue), |
| valid)); |
| |
| // Wait for both handlers to finish and delete the Queue |
| std::promise<void> promise; |
| auto future = promise.get_future(); |
| |
| enqueue_handler_->Post(common::BindOnce( |
| [](os::Handler* dequeue_handler, Queue<std::string>* queue, std::promise<void>* promise) { |
| dequeue_handler->Post(common::BindOnce( |
| [](Queue<std::string>* queue, std::promise<void>* promise) { |
| delete queue; |
| promise->set_value(); |
| }, |
| common::Unretained(queue), |
| common::Unretained(promise))); |
| }, |
| common::Unretained(dequeue_handler_), |
| common::Unretained(queue), |
| common::Unretained(&promise))); |
| future.wait(); |
| } |
| |
| std::unique_ptr<std::string> sleep_and_enqueue_callback(int* to_increase) { |
| std::this_thread::sleep_for(std::chrono::milliseconds(100)); |
| (*to_increase)++; |
| return std::make_unique<std::string>("Hello"); |
| } |
| |
| TEST_F(QueueTest, unregister_enqueue_and_wait) { |
| Queue<std::string> queue(10); |
| int* indicator = new int(100); |
| queue.RegisterEnqueue(enqueue_handler_, common::Bind(&sleep_and_enqueue_callback, common::Unretained(indicator))); |
| std::this_thread::sleep_for(std::chrono::milliseconds(50)); |
| queue.UnregisterEnqueue(); |
| EXPECT_EQ(*indicator, 101); |
| delete indicator; |
| } |
| |
| std::unique_ptr<std::string> sleep_and_enqueue_callback_and_unregister( |
| int* to_increase, Queue<std::string>* queue, std::atomic_bool* is_registered) { |
| std::this_thread::sleep_for(std::chrono::milliseconds(100)); |
| (*to_increase)++; |
| if (is_registered->exchange(false)) { |
| queue->UnregisterEnqueue(); |
| } |
| return std::make_unique<std::string>("Hello"); |
| } |
| |
| TEST_F(QueueTest, unregister_enqueue_and_wait_maybe_unregistered) { |
| Queue<std::string> queue(10); |
| int* indicator = new int(100); |
| std::atomic_bool is_registered = true; |
| queue.RegisterEnqueue( |
| enqueue_handler_, |
| common::Bind( |
| &sleep_and_enqueue_callback_and_unregister, |
| common::Unretained(indicator), |
| common::Unretained(&queue), |
| common::Unretained(&is_registered))); |
| std::this_thread::sleep_for(std::chrono::milliseconds(50)); |
| if (is_registered.exchange(false)) { |
| queue.UnregisterEnqueue(); |
| } |
| EXPECT_EQ(*indicator, 101); |
| delete indicator; |
| } |
| |
| void sleep_and_dequeue_callback(int* to_increase) { |
| std::this_thread::sleep_for(std::chrono::milliseconds(100)); |
| (*to_increase)++; |
| } |
| |
| TEST_F(QueueTest, unregister_dequeue_and_wait) { |
| int* indicator = new int(100); |
| Queue<std::string> queue(10); |
| queue.RegisterEnqueue( |
| enqueue_handler_, |
| common::Bind( |
| [](Queue<std::string>* queue) { |
| queue->UnregisterEnqueue(); |
| return std::make_unique<std::string>("Hello"); |
| }, |
| common::Unretained(&queue))); |
| queue.RegisterDequeue(enqueue_handler_, common::Bind(&sleep_and_dequeue_callback, common::Unretained(indicator))); |
| std::this_thread::sleep_for(std::chrono::milliseconds(50)); |
| queue.UnregisterDequeue(); |
| EXPECT_EQ(*indicator, 101); |
| delete indicator; |
| } |
| |
| // Create all threads for death tests in the function that dies |
| class QueueDeathTest : public ::testing::Test { |
| public: |
| void RegisterEnqueueAndDelete() { |
| Thread* enqueue_thread = new Thread("enqueue_thread", Thread::Priority::NORMAL); |
| Handler* enqueue_handler = new Handler(enqueue_thread); |
| Queue<std::string>* queue = new Queue<std::string>(kQueueSizeOne); |
| queue->RegisterEnqueue( |
| enqueue_handler, common::Bind([]() { return std::make_unique<std::string>("A string to fill the queue"); })); |
| delete queue; |
| } |
| |
| void RegisterDequeueAndDelete() { |
| Thread* dequeue_thread = new Thread("dequeue_thread", Thread::Priority::NORMAL); |
| Handler* dequeue_handler = new Handler(dequeue_thread); |
| Queue<std::string>* queue = new Queue<std::string>(kQueueSizeOne); |
| queue->RegisterDequeue( |
| dequeue_handler, |
| common::Bind([](Queue<std::string>* queue) { queue->TryDequeue(); }, common::Unretained(queue))); |
| delete queue; |
| } |
| }; |
| |
| TEST_F(QueueDeathTest, die_if_enqueue_not_unregistered) { |
| EXPECT_DEATH(RegisterEnqueueAndDelete(), "nqueue"); |
| } |
| |
| TEST_F(QueueDeathTest, die_if_dequeue_not_unregistered) { |
| EXPECT_DEATH(RegisterDequeueAndDelete(), "equeue"); |
| } |
| |
| class MockIQueueEnqueue : public IQueueEnqueue<int> { |
| public: |
| void RegisterEnqueue(Handler* handler, EnqueueCallback callback) override { |
| EXPECT_FALSE(registered_); |
| registered_ = true; |
| handler->Post(common::BindOnce(&MockIQueueEnqueue::handle_register_enqueue, common::Unretained(this), callback)); |
| } |
| |
| void handle_register_enqueue(EnqueueCallback callback) { |
| if (dont_handle_register_enqueue_) { |
| return; |
| } |
| while (registered_) { |
| std::unique_ptr<int> front = callback.Run(); |
| queue_.push(*front); |
| } |
| } |
| |
| void UnregisterEnqueue() override { |
| EXPECT_TRUE(registered_); |
| registered_ = false; |
| } |
| |
| bool dont_handle_register_enqueue_ = false; |
| bool registered_ = false; |
| std::queue<int> queue_; |
| }; |
| |
| class EnqueueBufferTest : public ::testing::Test { |
| protected: |
| void SetUp() override { |
| thread_ = new Thread("test_thread", Thread::Priority::NORMAL); |
| handler_ = new Handler(thread_); |
| } |
| |
| void TearDown() override { |
| handler_->Clear(); |
| delete handler_; |
| delete thread_; |
| } |
| |
| void SynchronizeHandler() { |
| std::promise<void> promise; |
| auto future = promise.get_future(); |
| handler_->Post(common::BindOnce([](std::promise<void> promise) { promise.set_value(); }, std::move(promise))); |
| future.wait(); |
| } |
| |
| MockIQueueEnqueue enqueue_; |
| EnqueueBuffer<int> enqueue_buffer_{&enqueue_}; |
| Thread* thread_; |
| Handler* handler_; |
| }; |
| |
| TEST_F(EnqueueBufferTest, enqueue) { |
| int num_items = 10; |
| for (int i = 0; i < num_items; i++) { |
| enqueue_buffer_.Enqueue(std::make_unique<int>(i), handler_); |
| } |
| SynchronizeHandler(); |
| for (int i = 0; i < num_items; i++) { |
| ASSERT_EQ(enqueue_.queue_.front(), i); |
| enqueue_.queue_.pop(); |
| } |
| ASSERT_FALSE(enqueue_.registered_); |
| } |
| |
| TEST_F(EnqueueBufferTest, clear) { |
| enqueue_.dont_handle_register_enqueue_ = true; |
| int num_items = 10; |
| for (int i = 0; i < num_items; i++) { |
| enqueue_buffer_.Enqueue(std::make_unique<int>(i), handler_); |
| } |
| ASSERT_TRUE(enqueue_.registered_); |
| enqueue_buffer_.Clear(); |
| ASSERT_FALSE(enqueue_.registered_); |
| } |
| |
| TEST_F(EnqueueBufferTest, delete_when_in_callback) { |
| Queue<int>* queue = new Queue<int>(kQueueSize); |
| EnqueueBuffer<int>* enqueue_buffer = new EnqueueBuffer<int>(queue); |
| int num_items = 10; |
| for (int i = 0; i < num_items; i++) { |
| enqueue_buffer->Enqueue(std::make_unique<int>(i), handler_); |
| } |
| |
| delete enqueue_buffer; |
| delete queue; |
| } |
| |
| } // namespace |
| } // namespace os |
| } // namespace bluetooth |