blob: 910cfedb70cc4977a864088cbfa43ca8e89ce451 [file] [log] [blame]
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "platform/impl/task_runner.h"
#include <unistd.h>
#include <atomic>
#include <chrono>
#include <string>
#include <thread>
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "platform/api/time.h"
#include "platform/test/fake_clock.h"
#include "util/chrono_helpers.h"
namespace openscreen {
namespace {
using ::testing::_;
const auto kTaskRunnerSleepTime = milliseconds(1);
constexpr Clock::duration kWaitTimeout = milliseconds(1000);
void WaitUntilCondition(std::function<bool()> predicate) {
while (!predicate()) {
std::this_thread::sleep_for(kTaskRunnerSleepTime);
}
}
class FakeTaskWaiter final : public TaskRunnerImpl::TaskWaiter {
public:
explicit FakeTaskWaiter(ClockNowFunctionPtr now_function)
: now_function_(now_function) {}
~FakeTaskWaiter() override = default;
Error WaitForTaskToBePosted(Clock::duration timeout) override {
Clock::time_point start = now_function_();
waiting_.store(true);
while (!has_event_.load() && (now_function_() - start) < timeout) {
EXPECT_EQ(usleep(100 /* microseconds */), 0);
}
waiting_.store(false);
has_event_.store(false);
return Error::None();
}
void OnTaskPosted() override { has_event_.store(true); }
void WakeUpAndStop() {
OnTaskPosted();
task_runner_->RequestStopSoon();
}
bool IsWaiting() const { return waiting_.load(); }
void SetTaskRunner(TaskRunnerImpl* task_runner) {
task_runner_ = task_runner;
}
private:
const ClockNowFunctionPtr now_function_;
TaskRunnerImpl* task_runner_;
std::atomic<bool> has_event_{false};
std::atomic<bool> waiting_{false};
};
class TaskRunnerWithWaiterFactory {
public:
static std::unique_ptr<TaskRunnerImpl> Create(
ClockNowFunctionPtr now_function) {
fake_waiter = std::make_unique<FakeTaskWaiter>(now_function);
auto runner = std::make_unique<TaskRunnerImpl>(
now_function, fake_waiter.get(), std::chrono::hours(1));
fake_waiter->SetTaskRunner(runner.get());
return runner;
}
static std::unique_ptr<FakeTaskWaiter> fake_waiter;
};
// static
std::unique_ptr<FakeTaskWaiter> TaskRunnerWithWaiterFactory::fake_waiter;
} // anonymous namespace
TEST(TaskRunnerImplTest, TaskRunnerExecutesTaskAndStops) {
FakeClock fake_clock{Clock::time_point(milliseconds(1337))};
TaskRunnerImpl runner(&fake_clock.now);
std::string ran_tasks = "";
runner.PostTask([&ran_tasks] { ran_tasks += "1"; });
runner.RequestStopSoon();
runner.RunUntilStopped();
EXPECT_EQ(ran_tasks, "1");
}
TEST(TaskRunnerImplTest, TaskRunnerRunsDelayedTasksInOrder) {
FakeClock fake_clock{Clock::time_point(milliseconds(1337))};
TaskRunnerImpl runner(&fake_clock.now);
std::thread t([&runner] { runner.RunUntilStopped(); });
std::string ran_tasks = "";
const auto kDelayTime = milliseconds(5);
const auto task_one = [&ran_tasks] { ran_tasks += "1"; };
runner.PostTaskWithDelay(task_one, kDelayTime);
const auto task_two = [&ran_tasks] { ran_tasks += "2"; };
runner.PostTaskWithDelay(task_two, kDelayTime * 2);
EXPECT_EQ(ran_tasks, "");
fake_clock.Advance(kDelayTime);
WaitUntilCondition([&ran_tasks] { return ran_tasks == "1"; });
EXPECT_EQ(ran_tasks, "1");
fake_clock.Advance(kDelayTime);
WaitUntilCondition([&ran_tasks] { return ran_tasks == "12"; });
EXPECT_EQ(ran_tasks, "12");
runner.RequestStopSoon();
t.join();
}
TEST(TaskRunnerImplTest, SingleThreadedTaskRunnerRunsSequentially) {
FakeClock fake_clock{Clock::time_point(milliseconds(1337))};
TaskRunnerImpl runner(&fake_clock.now);
std::string ran_tasks;
const auto task_one = [&ran_tasks] { ran_tasks += "1"; };
const auto task_two = [&ran_tasks] { ran_tasks += "2"; };
const auto task_three = [&ran_tasks] { ran_tasks += "3"; };
const auto task_four = [&ran_tasks] { ran_tasks += "4"; };
const auto task_five = [&ran_tasks] { ran_tasks += "5"; };
runner.PostTask(task_one);
runner.PostTask(task_two);
runner.PostTask(task_three);
runner.PostTask(task_four);
runner.PostTask(task_five);
runner.RequestStopSoon();
EXPECT_EQ(ran_tasks, "");
runner.RunUntilStopped();
EXPECT_EQ(ran_tasks, "12345");
}
TEST(TaskRunnerImplTest, RunsAllImmediateTasksBeforeStopping) {
FakeClock fake_clock{Clock::time_point(milliseconds(1337))};
TaskRunnerImpl runner(&fake_clock.now);
std::string result;
runner.PostTask([&] {
result += "Alice";
// Post a task that runs just before the quit task.
runner.PostTask([&] {
result += " says goodbye";
// These tasks will enter the queue after the quit task *and* after the
// main loop breaks. They will be executed by the flushing phase.
runner.PostTask([&] {
result += " and is not";
runner.PostTask([&] { result += " forgotten."; });
});
});
// Post the quit task.
runner.RequestStopSoon();
});
EXPECT_EQ(result, "");
runner.RunUntilStopped();
// All posted tasks will execute because RequestStopSoon() guarantees all
// immediately-runnable tasks will run before exiting, even if new
// immediately-runnable tasks are posted in the meantime.
EXPECT_EQ(result, "Alice says goodbye and is not forgotten.");
}
TEST(TaskRunnerImplTest, TaskRunnerIsStableWithLotsOfTasks) {
FakeClock fake_clock{Clock::time_point(milliseconds(1337))};
TaskRunnerImpl runner(&fake_clock.now);
const int kNumberOfTasks = 500;
std::string expected_ran_tasks;
expected_ran_tasks.append(kNumberOfTasks, '1');
std::string ran_tasks;
for (int i = 0; i < kNumberOfTasks; ++i) {
const auto task = [&ran_tasks] { ran_tasks += "1"; };
runner.PostTask(task);
}
runner.RequestStopSoon();
runner.RunUntilStopped();
EXPECT_EQ(ran_tasks, expected_ran_tasks);
}
TEST(TaskRunnerImplTest, TaskRunnerDelayedTasksDontBlockImmediateTasks) {
TaskRunnerImpl runner(Clock::now);
std::string ran_tasks;
const auto task = [&ran_tasks] { ran_tasks += "1"; };
const auto delayed_task = [&ran_tasks] { ran_tasks += "A"; };
runner.PostTaskWithDelay(delayed_task, milliseconds(10000));
runner.PostTask(task);
runner.RequestStopSoon();
runner.RunUntilStopped();
// The immediate task should have run, even though the delayed task
// was added first.
EXPECT_EQ(ran_tasks, "1");
}
TEST(TaskRunnerImplTest, TaskRunnerUsesEventWaiter) {
std::unique_ptr<TaskRunnerImpl> runner =
TaskRunnerWithWaiterFactory::Create(Clock::now);
std::atomic<int> x{0};
std::thread t([&runner, &x] {
runner.get()->RunUntilStopped();
x = 1;
});
const Clock::time_point start1 = Clock::now();
FakeTaskWaiter* fake_waiter = TaskRunnerWithWaiterFactory::fake_waiter.get();
while ((Clock::now() - start1) < kWaitTimeout && !fake_waiter->IsWaiting()) {
std::this_thread::sleep_for(kTaskRunnerSleepTime);
}
ASSERT_TRUE(fake_waiter->IsWaiting());
fake_waiter->WakeUpAndStop();
const Clock::time_point start2 = Clock::now();
while ((Clock::now() - start2) < kWaitTimeout && x == 0) {
std::this_thread::sleep_for(kTaskRunnerSleepTime);
}
ASSERT_EQ(x, 1);
ASSERT_FALSE(fake_waiter->IsWaiting());
t.join();
}
TEST(TaskRunnerImplTest, WakesEventWaiterOnPostTask) {
std::unique_ptr<TaskRunnerImpl> runner =
TaskRunnerWithWaiterFactory::Create(Clock::now);
std::atomic<int> x{0};
std::thread t([&runner] { runner.get()->RunUntilStopped(); });
const Clock::time_point start1 = Clock::now();
FakeTaskWaiter* fake_waiter = TaskRunnerWithWaiterFactory::fake_waiter.get();
while ((Clock::now() - start1) < kWaitTimeout && !fake_waiter->IsWaiting()) {
std::this_thread::sleep_for(kTaskRunnerSleepTime);
}
ASSERT_TRUE(fake_waiter->IsWaiting());
runner->PostTask([&x]() { x = 1; });
const Clock::time_point start2 = Clock::now();
while ((Clock::now() - start2) < kWaitTimeout && x == 0) {
std::this_thread::sleep_for(kTaskRunnerSleepTime);
}
ASSERT_EQ(x, 1);
fake_waiter->WakeUpAndStop();
t.join();
}
class RepeatedClass {
public:
MOCK_METHOD0(Repeat, absl::optional<Clock::duration>());
absl::optional<Clock::duration> DoCall() {
auto result = Repeat();
execution_count++;
return result;
}
std::atomic<int> execution_count{0};
};
} // namespace openscreen