blob: d60cb45924df33c88dff54a70bd8708e01024dbd [file] [log] [blame]
// Copyright 2017 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/public/cpp/system/wait_set.h"
#include <set>
#include <vector>
#include "base/bind.h"
#include "base/callback.h"
#include "base/memory/ptr_util.h"
#include "base/synchronization/waitable_event.h"
#include "base/threading/platform_thread.h"
#include "base/threading/simple_thread.h"
#include "mojo/public/cpp/system/message_pipe.h"
#include "mojo/public/cpp/system/wait.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace mojo {
namespace {
using WaitSetTest = testing::Test;
void WriteMessage(const ScopedMessagePipeHandle& handle,
const base::StringPiece& message) {
MojoResult rv = WriteMessageRaw(handle.get(), message.data(),
static_cast<uint32_t>(message.size()),
nullptr, 0, MOJO_WRITE_MESSAGE_FLAG_NONE);
CHECK_EQ(MOJO_RESULT_OK, rv);
}
std::string ReadMessage(const ScopedMessagePipeHandle& handle) {
uint32_t num_bytes = 0;
uint32_t num_handles = 0;
MojoResult rv = ReadMessageRaw(handle.get(), nullptr, &num_bytes, nullptr,
&num_handles, MOJO_READ_MESSAGE_FLAG_NONE);
CHECK_EQ(MOJO_RESULT_RESOURCE_EXHAUSTED, rv);
CHECK_EQ(0u, num_handles);
std::vector<char> buffer(num_bytes);
rv = ReadMessageRaw(handle.get(), buffer.data(), &num_bytes, nullptr,
&num_handles, MOJO_READ_MESSAGE_FLAG_NONE);
CHECK_EQ(MOJO_RESULT_OK, rv);
return std::string(buffer.data(), buffer.size());
}
class ThreadedRunner : public base::SimpleThread {
public:
explicit ThreadedRunner(const base::Closure& callback)
: SimpleThread("ThreadedRunner"), callback_(callback) {}
~ThreadedRunner() override { Join(); }
void Run() override { callback_.Run(); }
private:
const base::Closure callback_;
DISALLOW_COPY_AND_ASSIGN(ThreadedRunner);
};
TEST_F(WaitSetTest, Satisfied) {
WaitSet wait_set;
MessagePipe p;
const char kTestMessage1[] = "hello wake up";
// Watch only one handle and write to the other.
wait_set.AddHandle(p.handle1.get(), MOJO_HANDLE_SIGNAL_READABLE);
WriteMessage(p.handle0, kTestMessage1);
size_t num_ready_handles = 2;
Handle ready_handles[2];
MojoResult ready_results[2] = {MOJO_RESULT_UNKNOWN, MOJO_RESULT_UNKNOWN};
HandleSignalsState hss[2];
wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results, hss);
EXPECT_EQ(1u, num_ready_handles);
EXPECT_EQ(p.handle1.get(), ready_handles[0]);
EXPECT_EQ(MOJO_RESULT_OK, ready_results[0]);
EXPECT_TRUE(hss[0].readable() && hss[0].writable() && !hss[0].peer_closed());
wait_set.RemoveHandle(p.handle1.get());
// Now watch only the other handle and write to the first one.
wait_set.AddHandle(p.handle0.get(), MOJO_HANDLE_SIGNAL_READABLE);
WriteMessage(p.handle1, kTestMessage1);
num_ready_handles = 2;
ready_results[0] = MOJO_RESULT_UNKNOWN;
ready_results[1] = MOJO_RESULT_UNKNOWN;
wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results, hss);
EXPECT_EQ(1u, num_ready_handles);
EXPECT_EQ(p.handle0.get(), ready_handles[0]);
EXPECT_EQ(MOJO_RESULT_OK, ready_results[0]);
EXPECT_TRUE(hss[0].readable() && hss[0].writable() && !hss[0].peer_closed());
// Now wait on both of them.
wait_set.AddHandle(p.handle1.get(), MOJO_HANDLE_SIGNAL_READABLE);
num_ready_handles = 2;
ready_results[0] = MOJO_RESULT_UNKNOWN;
ready_results[1] = MOJO_RESULT_UNKNOWN;
wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results, hss);
EXPECT_EQ(2u, num_ready_handles);
EXPECT_TRUE((ready_handles[0] == p.handle0.get() &&
ready_handles[1] == p.handle1.get()) ||
(ready_handles[0] == p.handle1.get() &&
ready_handles[1] == p.handle0.get()));
EXPECT_EQ(MOJO_RESULT_OK, ready_results[0]);
EXPECT_EQ(MOJO_RESULT_OK, ready_results[1]);
EXPECT_TRUE(hss[0].readable() && hss[0].writable() && !hss[0].peer_closed());
EXPECT_TRUE(hss[1].readable() && hss[1].writable() && !hss[1].peer_closed());
// Wait on both again, but with only enough output space for one result.
num_ready_handles = 1;
ready_results[0] = MOJO_RESULT_UNKNOWN;
wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results, hss);
EXPECT_EQ(1u, num_ready_handles);
EXPECT_TRUE(ready_handles[0] == p.handle0.get() ||
ready_handles[0] == p.handle1.get());
EXPECT_EQ(MOJO_RESULT_OK, ready_results[0]);
// Remove the ready handle from the set and wait one more time.
EXPECT_EQ(MOJO_RESULT_OK, wait_set.RemoveHandle(ready_handles[0]));
num_ready_handles = 1;
ready_results[0] = MOJO_RESULT_UNKNOWN;
wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results, hss);
EXPECT_EQ(1u, num_ready_handles);
EXPECT_TRUE(ready_handles[0] == p.handle0.get() ||
ready_handles[0] == p.handle1.get());
EXPECT_EQ(MOJO_RESULT_OK, ready_results[0]);
EXPECT_EQ(MOJO_RESULT_OK, wait_set.RemoveHandle(ready_handles[0]));
// The wait set should be empty now. Nothing to wait on.
num_ready_handles = 2;
wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results);
EXPECT_EQ(0u, num_ready_handles);
}
TEST_F(WaitSetTest, Unsatisfiable) {
MessagePipe p, q;
WaitSet wait_set;
wait_set.AddHandle(q.handle0.get(), MOJO_HANDLE_SIGNAL_READABLE);
wait_set.AddHandle(q.handle1.get(), MOJO_HANDLE_SIGNAL_READABLE);
wait_set.AddHandle(p.handle0.get(), MOJO_HANDLE_SIGNAL_READABLE);
size_t num_ready_handles = 2;
Handle ready_handles[2];
MojoResult ready_results[2] = {MOJO_RESULT_UNKNOWN, MOJO_RESULT_UNKNOWN};
p.handle1.reset();
wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results);
EXPECT_EQ(1u, num_ready_handles);
EXPECT_EQ(p.handle0.get(), ready_handles[0]);
EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, ready_results[0]);
}
TEST_F(WaitSetTest, CloseWhileWaiting) {
MessagePipe p;
WaitSet wait_set;
wait_set.AddHandle(p.handle0.get(), MOJO_HANDLE_SIGNAL_READABLE);
const Handle handle0_value = p.handle0.get();
ThreadedRunner close_after_delay(base::Bind(
[](ScopedMessagePipeHandle* handle) {
// Wait a little while, then close the handle.
base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(200));
handle->reset();
},
&p.handle0));
close_after_delay.Start();
size_t num_ready_handles = 2;
Handle ready_handles[2];
MojoResult ready_results[2] = {MOJO_RESULT_UNKNOWN, MOJO_RESULT_UNKNOWN};
wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results);
EXPECT_EQ(1u, num_ready_handles);
EXPECT_EQ(handle0_value, ready_handles[0]);
EXPECT_EQ(MOJO_RESULT_CANCELLED, ready_results[0]);
EXPECT_EQ(MOJO_RESULT_NOT_FOUND, wait_set.RemoveHandle(handle0_value));
}
TEST_F(WaitSetTest, CloseBeforeWaiting) {
MessagePipe p;
WaitSet wait_set;
wait_set.AddHandle(p.handle0.get(), MOJO_HANDLE_SIGNAL_READABLE);
wait_set.AddHandle(p.handle1.get(), MOJO_HANDLE_SIGNAL_READABLE);
Handle handle0_value = p.handle0.get();
Handle handle1_value = p.handle1.get();
p.handle0.reset();
p.handle1.reset();
// Ensure that the WaitSet user is always made aware of all cancellations even
// if they happen while not waiting, or they have to be returned over the span
// of multiple Wait() calls due to insufficient output storage.
size_t num_ready_handles = 1;
Handle ready_handle;
MojoResult ready_result = MOJO_RESULT_UNKNOWN;
wait_set.Wait(nullptr, &num_ready_handles, &ready_handle, &ready_result);
EXPECT_EQ(1u, num_ready_handles);
EXPECT_TRUE(ready_handle == handle0_value || ready_handle == handle1_value);
EXPECT_EQ(MOJO_RESULT_CANCELLED, ready_result);
EXPECT_EQ(MOJO_RESULT_NOT_FOUND, wait_set.RemoveHandle(handle0_value));
wait_set.Wait(nullptr, &num_ready_handles, &ready_handle, &ready_result);
EXPECT_EQ(1u, num_ready_handles);
EXPECT_TRUE(ready_handle == handle0_value || ready_handle == handle1_value);
EXPECT_EQ(MOJO_RESULT_CANCELLED, ready_result);
EXPECT_EQ(MOJO_RESULT_NOT_FOUND, wait_set.RemoveHandle(handle0_value));
// Nothing more to wait on.
wait_set.Wait(nullptr, &num_ready_handles, &ready_handle, &ready_result);
EXPECT_EQ(0u, num_ready_handles);
}
TEST_F(WaitSetTest, SatisfiedThenUnsatisfied) {
MessagePipe p;
WaitSet wait_set;
wait_set.AddHandle(p.handle0.get(), MOJO_HANDLE_SIGNAL_READABLE);
wait_set.AddHandle(p.handle1.get(), MOJO_HANDLE_SIGNAL_READABLE);
const char kTestMessage1[] = "testing testing testing";
WriteMessage(p.handle0, kTestMessage1);
size_t num_ready_handles = 2;
Handle ready_handles[2];
MojoResult ready_results[2] = {MOJO_RESULT_UNKNOWN, MOJO_RESULT_UNKNOWN};
wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results);
EXPECT_EQ(1u, num_ready_handles);
EXPECT_EQ(p.handle1.get(), ready_handles[0]);
EXPECT_EQ(MOJO_RESULT_OK, ready_results[0]);
EXPECT_EQ(kTestMessage1, ReadMessage(p.handle1));
ThreadedRunner write_after_delay(base::Bind(
[](ScopedMessagePipeHandle* handle) {
// Wait a little while, then write a message.
base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(200));
WriteMessage(*handle, "wakey wakey");
},
&p.handle1));
write_after_delay.Start();
num_ready_handles = 2;
wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results);
EXPECT_EQ(1u, num_ready_handles);
EXPECT_EQ(p.handle0.get(), ready_handles[0]);
EXPECT_EQ(MOJO_RESULT_OK, ready_results[0]);
}
TEST_F(WaitSetTest, EventOnly) {
base::WaitableEvent event(base::WaitableEvent::ResetPolicy::MANUAL,
base::WaitableEvent::InitialState::SIGNALED);
WaitSet wait_set;
wait_set.AddEvent(&event);
base::WaitableEvent* ready_event = nullptr;
size_t num_ready_handles = 1;
Handle ready_handle;
MojoResult ready_result = MOJO_RESULT_UNKNOWN;
wait_set.Wait(&ready_event, &num_ready_handles, &ready_handle, &ready_result);
EXPECT_EQ(0u, num_ready_handles);
EXPECT_EQ(&event, ready_event);
}
TEST_F(WaitSetTest, EventAndHandle) {
const char kTestMessage[] = "hello hello";
MessagePipe p;
WriteMessage(p.handle0, kTestMessage);
base::WaitableEvent event(base::WaitableEvent::ResetPolicy::MANUAL,
base::WaitableEvent::InitialState::NOT_SIGNALED);
WaitSet wait_set;
wait_set.AddHandle(p.handle1.get(), MOJO_HANDLE_SIGNAL_READABLE);
wait_set.AddEvent(&event);
base::WaitableEvent* ready_event = nullptr;
size_t num_ready_handles = 1;
Handle ready_handle;
MojoResult ready_result = MOJO_RESULT_UNKNOWN;
wait_set.Wait(&ready_event, &num_ready_handles, &ready_handle, &ready_result);
EXPECT_EQ(1u, num_ready_handles);
EXPECT_EQ(nullptr, ready_event);
EXPECT_EQ(p.handle1.get(), ready_handle);
EXPECT_EQ(MOJO_RESULT_OK, ready_result);
EXPECT_EQ(kTestMessage, ReadMessage(p.handle1));
ThreadedRunner signal_after_delay(base::Bind(
[](base::WaitableEvent* event) {
// Wait a little while, then close the handle.
base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(200));
event->Signal();
},
&event));
signal_after_delay.Start();
wait_set.Wait(&ready_event, &num_ready_handles, &ready_handle, &ready_result);
EXPECT_EQ(0u, num_ready_handles);
EXPECT_EQ(&event, ready_event);
}
TEST_F(WaitSetTest, NoStarvation) {
const char kTestMessage[] = "wait for it";
const size_t kNumTestPipes = 50;
const size_t kNumTestEvents = 10;
// Create a bunch of handles and events which are always ready and add them
// to a shared WaitSet.
WaitSet wait_set;
MessagePipe pipes[kNumTestPipes];
for (size_t i = 0; i < kNumTestPipes; ++i) {
WriteMessage(pipes[i].handle0, kTestMessage);
Wait(pipes[i].handle1.get(), MOJO_HANDLE_SIGNAL_READABLE);
WriteMessage(pipes[i].handle1, kTestMessage);
Wait(pipes[i].handle0.get(), MOJO_HANDLE_SIGNAL_READABLE);
wait_set.AddHandle(pipes[i].handle0.get(), MOJO_HANDLE_SIGNAL_READABLE);
wait_set.AddHandle(pipes[i].handle1.get(), MOJO_HANDLE_SIGNAL_READABLE);
}
std::vector<std::unique_ptr<base::WaitableEvent>> events(kNumTestEvents);
for (auto& event_ptr : events) {
event_ptr = base::MakeUnique<base::WaitableEvent>(
base::WaitableEvent::ResetPolicy::MANUAL,
base::WaitableEvent::InitialState::NOT_SIGNALED);
event_ptr->Signal();
wait_set.AddEvent(event_ptr.get());
}
// Now verify that all handle and event signals are deteceted within a finite
// number of consecutive Wait() calls. Do it a few times for good measure.
for (size_t i = 0; i < 3; ++i) {
std::set<base::WaitableEvent*> ready_events;
std::set<Handle> ready_handles;
while (ready_events.size() < kNumTestEvents ||
ready_handles.size() < kNumTestPipes * 2) {
base::WaitableEvent* ready_event = nullptr;
size_t num_ready_handles = 1;
Handle ready_handle;
MojoResult ready_result = MOJO_RESULT_UNKNOWN;
wait_set.Wait(&ready_event, &num_ready_handles, &ready_handle,
&ready_result);
if (ready_event)
ready_events.insert(ready_event);
if (num_ready_handles) {
EXPECT_EQ(1u, num_ready_handles);
EXPECT_EQ(MOJO_RESULT_OK, ready_result);
ready_handles.insert(ready_handle);
}
}
}
}
} // namespace
} // namespace mojo