| // Copyright (C) 2018 The Android Open Source Project |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| #include "android/emulation/AndroidAsyncMessagePipe.h" |
| #include "android/base/ArraySize.h" |
| #include "android/base/async/ThreadLooper.h" |
| #include "android/base/files/MemStream.h" |
| #include "android/base/system/System.h" |
| #include "android/base/testing/ResultMatchers.h" |
| #include "android/base/testing/TestEvent.h" |
| #include "android/base/testing/TestLooper.h" |
| #include "android/emulation/VmLock.h" |
| #include "android/emulation/android_pipe_device.h" |
| #include "android/emulation/hostpipe/HostGoldfishPipe.h" |
| #include "android/emulation/testing/TestVmLock.h" |
| |
| #include <gmock/gmock.h> |
| #include <gtest/gtest.h> |
| |
| #include <atomic> |
| #include <chrono> |
| #include <condition_variable> |
| #include <deque> |
| #include <mutex> |
| #include <thread> |
| |
| using android::base::arraySize; |
| using android::base::IsErr; |
| using android::base::IsOk; |
| using testing::ElementsAreArray; |
| using testing::ElementsAre; |
| using testing::StrEq; |
| |
| namespace android { |
| |
| class AndroidAsyncMessagePipeTest : public ::testing::Test { |
| protected: |
| void SetUp() override { |
| AndroidPipe::Service::resetAll(); |
| mDevice = HostGoldfishPipeDevice::get(); |
| mLooper = std::unique_ptr<base::TestLooper>(new base::TestLooper()); |
| AndroidPipe::initThreadingForTest(TestVmLock::getInstance(), |
| mLooper.get()); |
| } |
| |
| void TearDown() override { |
| AndroidPipe::Service::resetAll(); |
| // mDevice is singleton, no need to tear down |
| // Reset initThreading to avoid using our looper. |
| AndroidPipe::initThreading(TestVmLock::getInstance()); |
| mLooper.reset(); |
| } |
| |
| void writePacket(void* pipe, const std::vector<uint8_t>& data) { |
| const uint32_t payloadSize = static_cast<uint32_t>(data.size()); |
| EXPECT_EQ(sizeof(uint32_t), |
| mDevice->write(pipe, &payloadSize, sizeof(uint32_t))); |
| |
| EXPECT_THAT(mDevice->write(pipe, data), IsOk()); |
| } |
| |
| std::vector<uint8_t> readPacket(void* pipe) { |
| uint32_t responseSize = 0; |
| EXPECT_EQ(sizeof(uint32_t), |
| mDevice->read(pipe, &responseSize, sizeof(uint32_t))); |
| |
| auto result = mDevice->read(pipe, responseSize); |
| EXPECT_TRUE(result.ok()); |
| if (result.ok()) { |
| std::vector<uint8_t> data = result.unwrap(); |
| EXPECT_EQ(data.size(), responseSize); |
| return data; |
| } else { |
| return std::vector<uint8_t>(); |
| } |
| } |
| |
| void pumpLooperUntilEvent(TestEvent& event) { |
| constexpr base::Looper::Duration kTimeoutMs = 15000; // 15 seconds. |
| constexpr base::Looper::Duration kStep = 50; // 50 ms. |
| |
| base::Looper::Duration current = mLooper->nowMs(); |
| const base::Looper::Duration deadline = mLooper->nowMs() + kTimeoutMs; |
| |
| while (!event.isSignaled() && current + kStep < deadline) { |
| mLooper->runOneIterationWithDeadlineMs(current + kStep); |
| current = mLooper->nowMs(); |
| } |
| |
| event.wait(); |
| } |
| |
| void snapshotSave(void* pipe, base::Stream* stream) { |
| RecursiveScopedVmLock lock; |
| auto cStream = reinterpret_cast<Stream*>(stream); |
| android_pipe_guest_pre_save(cStream); |
| android_pipe_guest_save(pipe, cStream); |
| android_pipe_guest_post_save(cStream); |
| } |
| |
| void* snapshotLoad(base::Stream* stream) { |
| RecursiveScopedVmLock lock; |
| auto cStream = reinterpret_cast<Stream*>(stream); |
| android_pipe_guest_pre_load(cStream); |
| void* pipe = mDevice->load(stream); |
| EXPECT_NE(pipe, nullptr); |
| android_pipe_guest_post_load(cStream); |
| return pipe; |
| } |
| |
| HostGoldfishPipeDevice* mDevice = nullptr; |
| std::unique_ptr<base::TestLooper> mLooper; |
| }; |
| |
| constexpr uint8_t kSimplePayload[] = "Hello World"; |
| constexpr uint8_t kSimpleResponse[] = "response"; |
| |
| class SimpleMessagePipe : public AndroidAsyncMessagePipe { |
| public: |
| SimpleMessagePipe(AndroidPipe::Service* service, PipeArgs&& pipeArgs) |
| : AndroidAsyncMessagePipe(service, std::move(pipeArgs)) {} |
| |
| void onMessage(const std::vector<uint8_t>& data) override { |
| EXPECT_THAT(data, ElementsAreArray(kSimplePayload)); |
| |
| const size_t bytes = arraySize(kSimpleResponse); |
| std::vector<uint8_t> output(kSimpleResponse, kSimpleResponse + bytes); |
| send(std::move(output)); |
| } |
| }; |
| |
| TEST_F(AndroidAsyncMessagePipeTest, Basic) { |
| registerAsyncMessagePipeService( |
| new AndroidAsyncMessagePipe::Service<SimpleMessagePipe>( |
| "TestPipe")); |
| |
| auto pipe = mDevice->connect("TestPipe"); |
| |
| const uint32_t payloadSize = arraySize(kSimplePayload); |
| EXPECT_EQ(sizeof(uint32_t), |
| mDevice->write(pipe, &payloadSize, sizeof(uint32_t))); |
| |
| std::vector<uint8_t> toSend(std::begin(kSimplePayload), |
| std::end(kSimplePayload)); |
| EXPECT_THAT(mDevice->write(pipe, toSend), IsOk()); |
| |
| uint32_t responseSize = 0; |
| EXPECT_EQ(sizeof(uint32_t), |
| mDevice->read(pipe, (void*)&responseSize, sizeof(uint32_t))); |
| EXPECT_EQ(responseSize, arraySize(kSimpleResponse)); |
| |
| EXPECT_THAT(mDevice->read(pipe, responseSize), |
| IsOk(ElementsAreArray(kSimpleResponse))); |
| mDevice->close(pipe); |
| } |
| |
| TEST_F(AndroidAsyncMessagePipeTest, Lambda) { |
| auto onMessage = [&](const std::vector<uint8_t>& data, |
| PipeSendFunction sendCallback) { |
| EXPECT_THAT(data, ElementsAreArray(kSimplePayload)); |
| |
| const size_t bytes = arraySize(kSimpleResponse); |
| std::vector<uint8_t> output(kSimpleResponse, kSimpleResponse + bytes); |
| sendCallback(std::move(output)); |
| }; |
| |
| registerAsyncMessagePipeService("TestPipeLambda", onMessage); |
| auto pipe = mDevice->connect("TestPipeLambda"); |
| |
| const uint32_t payloadSize = arraySize(kSimplePayload); |
| EXPECT_EQ(sizeof(uint32_t), |
| mDevice->write(pipe, &payloadSize, sizeof(uint32_t))); |
| |
| std::vector<uint8_t> toSend(std::begin(kSimplePayload), |
| std::end(kSimplePayload)); |
| EXPECT_THAT(mDevice->write(pipe, toSend), IsOk()); |
| |
| uint32_t responseSize = 0; |
| EXPECT_EQ(sizeof(uint32_t), |
| mDevice->read(pipe, (void*)&responseSize, sizeof(uint32_t))); |
| EXPECT_EQ(responseSize, arraySize(kSimpleResponse)); |
| |
| EXPECT_THAT(mDevice->read(pipe, responseSize), |
| IsOk(ElementsAreArray(kSimpleResponse))); |
| mDevice->close(pipe); |
| } |
| |
| TEST_F(AndroidAsyncMessagePipeTest, OutOfBand) { |
| std::vector<uint8_t> lastMessage; |
| PipeSendFunction performSend; |
| |
| auto onMessage = [&](const std::vector<uint8_t>& data, |
| PipeSendFunction sendCallback) { |
| lastMessage = data; |
| performSend = sendCallback; |
| }; |
| |
| registerAsyncMessagePipeService("OutOfBand", onMessage); |
| auto pipe = mDevice->connect("OutOfBand"); |
| |
| writePacket(pipe, {1, 2, 3}); |
| EXPECT_THAT(lastMessage, ElementsAre(1, 2, 3)); |
| EXPECT_EQ(mDevice->poll(pipe), PIPE_POLL_OUT); |
| |
| const std::vector<uint8_t> kResponse = {5, 6, 7}; |
| performSend(std::vector<uint8_t>(kResponse)); |
| EXPECT_EQ(mDevice->poll(pipe), PIPE_POLL_IN); |
| EXPECT_THAT(readPacket(pipe), ElementsAreArray(kResponse)); |
| |
| // Now try sending a second packet. |
| EXPECT_EQ(mDevice->poll(pipe), PIPE_POLL_OUT); |
| |
| const std::vector<uint8_t> kResponse2 = {8, 9, 10, 11, 12}; |
| performSend(std::vector<uint8_t>(kResponse2)); |
| EXPECT_EQ(mDevice->poll(pipe), PIPE_POLL_IN); |
| EXPECT_THAT(readPacket(pipe), ElementsAreArray(kResponse2)); |
| |
| mDevice->close(pipe); |
| } |
| |
| class CloseOnMessagePipe : public AndroidAsyncMessagePipe { |
| public: |
| static constexpr uint8_t kCloseNow = 0; |
| static constexpr uint8_t kQueueClose = 1; |
| |
| CloseOnMessagePipe(AndroidPipe::Service* service, PipeArgs&& pipeArgs) |
| : AndroidAsyncMessagePipe(service, std::move(pipeArgs)) {} |
| |
| void onMessage(const std::vector<uint8_t>& data) override { |
| EXPECT_EQ(data.size(), 1); |
| if (data[0] == kCloseNow) { |
| send({4, 5, 6}); // Should never be received. |
| closeFromHost(); |
| } else if (data[0] == kQueueClose) { |
| send({1, 2, 3}); |
| queueCloseFromHost(); |
| } else { |
| FAIL() << "Unexpected message: " << data[0]; |
| } |
| } |
| }; |
| |
| // Attempt to close the pipe in the onMessage callback. |
| TEST_F(AndroidAsyncMessagePipeTest, CloseOnMessage) { |
| auto pipeService = new AndroidAsyncMessagePipe::Service<CloseOnMessagePipe>( |
| "ClosePipe"); |
| registerAsyncMessagePipeService(pipeService); |
| |
| auto pipe = mDevice->connect("ClosePipe"); |
| |
| TestEvent receivedClose; |
| mDevice->setWakeCallback(pipe, [&](int wakes) { |
| if (wakes & PIPE_WAKE_CLOSED) { |
| receivedClose.signal(); |
| } |
| }); |
| |
| writePacket(pipe, {CloseOnMessagePipe::kCloseNow}); |
| // Check that the pipe was closed by the host. |
| pumpLooperUntilEvent(receivedClose); |
| |
| EXPECT_THAT(mDevice->read(pipe, 3), IsErr(EINVAL)); |
| |
| mDevice->close(pipe); |
| } |
| |
| // Verify closing the pipe skips processing future messages, with multiple |
| // simultaneous messages. |
| TEST_F(AndroidAsyncMessagePipeTest, CloseWithQueuedMessages) { |
| auto pipeService = new AndroidAsyncMessagePipe::Service<CloseOnMessagePipe>( |
| "ClosePipe"); |
| registerAsyncMessagePipeService(pipeService); |
| |
| auto pipe = mDevice->connect("ClosePipe"); |
| |
| TestEvent receivedClose; |
| mDevice->setWakeCallback(pipe, [&](int wakes) { |
| if (wakes & PIPE_WAKE_CLOSED) { |
| receivedClose.signal(); |
| } |
| }); |
| |
| // We need to manually craft the message, since we want all messages to |
| // arrive in the same readBuffers call. |
| std::vector<uint8_t> message((sizeof(uint32_t) + 1) * 2); |
| // Packet 1. |
| *reinterpret_cast<uint32_t*>(&message[0]) = 1; |
| message[4] = CloseOnMessagePipe::kCloseNow; |
| // Packet 2, should never process. If it does CloseOnMessagePipe will hit |
| // a FAIL() for unexpected message type. |
| *reinterpret_cast<uint32_t*>(&message[5]) = 1; |
| message[9] = 123; |
| |
| EXPECT_THAT(mDevice->write(pipe, message), IsOk(5)); |
| |
| // Check that the pipe was closed by the host. |
| pumpLooperUntilEvent(receivedClose); |
| |
| mDevice->close(pipe); |
| } |
| |
| TEST_F(AndroidAsyncMessagePipeTest, QueueCloseOnMessage) { |
| auto pipeService = new AndroidAsyncMessagePipe::Service<CloseOnMessagePipe>( |
| "ClosePipe"); |
| registerAsyncMessagePipeService(pipeService); |
| |
| auto pipe = mDevice->connect("ClosePipe"); |
| |
| TestEvent receivedClose; |
| mDevice->setWakeCallback(pipe, [&](int wakes) { |
| if (wakes & PIPE_WAKE_CLOSED) { |
| receivedClose.signal(); |
| } |
| }); |
| |
| writePacket(pipe, {CloseOnMessagePipe::kQueueClose}); |
| |
| EXPECT_THAT(readPacket(pipe), ElementsAre(1, 2, 3)); |
| |
| // Check that the pipe was closed by the host. |
| pumpLooperUntilEvent(receivedClose); |
| |
| mDevice->close(pipe); |
| } |
| |
| class MultithreadedEchoMessagePipe : public AndroidAsyncMessagePipe { |
| public: |
| MultithreadedEchoMessagePipe(AndroidPipe::Service* service, |
| PipeArgs&& pipeArgs) |
| : AndroidAsyncMessagePipe(service, std::move(pipeArgs)), |
| mWorker(&MultithreadedEchoMessagePipe::threadMain, this) {} |
| ~MultithreadedEchoMessagePipe() { |
| mQuit = true; |
| mWorker.join(); |
| } |
| |
| void onMessage(const std::vector<uint8_t>& data) override { |
| base::AutoLock lock(mLock); |
| mMessages.push_back(data); |
| } |
| |
| private: |
| void threadMain() { |
| while (!mQuit) { |
| base::System::get()->sleepMs(100); |
| { |
| base::AutoLock lock(mLock); |
| if (!mMessages.empty()) { |
| send(std::move(mMessages.front())); |
| mMessages.pop_front(); |
| } |
| } |
| } |
| } |
| |
| base::Lock mLock; |
| std::deque<std::vector<uint8_t>> mMessages; |
| std::atomic<bool> mQuit{false}; |
| std::thread mWorker; |
| }; |
| |
| // bug: 118512307 |
| TEST_F(AndroidAsyncMessagePipeTest, DISABLED_Multithreaded) { |
| registerAsyncMessagePipeService( |
| new AndroidAsyncMessagePipe::Service<MultithreadedEchoMessagePipe>( |
| "Multithreaded")); |
| auto pipe = mDevice->connect("Multithreaded"); |
| |
| std::deque<std::vector<uint8_t>> packets; |
| TestEvent event; |
| |
| mDevice->setWakeCallback(pipe, [&](int wakes) { |
| EXPECT_EQ(wakes, PIPE_WAKE_READ); |
| EXPECT_THAT(readPacket(pipe), ElementsAreArray(packets.front())); |
| packets.pop_front(); |
| event.signal(); |
| }); |
| |
| auto sendData = [&](std::vector<uint8_t> data) { |
| { |
| // Wake callback is called under the VM lock, protect packets with |
| // it so we can safely access it inside the callback. |
| RecursiveScopedVmLock lock; |
| packets.push_back(data); |
| } |
| writePacket(pipe, std::move(data)); |
| }; |
| |
| sendData({1, 2, 3}); |
| pumpLooperUntilEvent(event); |
| |
| sendData({2, 3, 4}); |
| sendData({5, 6, 7}); |
| pumpLooperUntilEvent(event); |
| pumpLooperUntilEvent(event); |
| |
| mDevice->close(pipe); |
| } |
| |
| TEST_F(AndroidAsyncMessagePipeTest, SendAfterDestroy) { |
| std::vector<uint8_t> lastMessage; |
| PipeSendFunction performSend; |
| |
| auto onMessage = [&](const std::vector<uint8_t>& data, |
| PipeSendFunction sendCallback) { |
| lastMessage = data; |
| performSend = sendCallback; |
| }; |
| |
| registerAsyncMessagePipeService("AfterDestroy", onMessage); |
| auto pipe = mDevice->connect("AfterDestroy"); |
| |
| writePacket(pipe, {1, 2, 3}); |
| EXPECT_THAT(lastMessage, ElementsAre(1, 2, 3)); |
| mDevice->close(pipe); |
| |
| const std::vector<uint8_t> kResponse = {5, 6, 7}; |
| performSend(std::vector<uint8_t>(kResponse)); |
| } |
| |
| TEST_F(AndroidAsyncMessagePipeTest, Snapshot) { |
| std::vector<uint8_t> lastMessage; |
| PipeSendFunction performSend; |
| |
| auto onMessage = [&](const std::vector<uint8_t>& data, |
| PipeSendFunction sendCallback) { |
| lastMessage = data; |
| performSend = sendCallback; |
| }; |
| |
| registerAsyncMessagePipeService("Snapshot", onMessage); |
| auto pipe = mDevice->connect("Snapshot"); |
| |
| writePacket(pipe, {1, 2, 3}); |
| EXPECT_THAT(lastMessage, ElementsAre(1, 2, 3)); |
| |
| const std::vector<uint8_t> kResponse = {5, 6, 7}; |
| performSend(std::vector<uint8_t>(kResponse)); |
| |
| base::MemStream snapshotStream; |
| snapshotSave(pipe, &snapshotStream); |
| mDevice->close(pipe); |
| |
| auto restoredPipe = snapshotLoad(&snapshotStream); |
| EXPECT_EQ(mDevice->poll(restoredPipe), PIPE_POLL_IN); |
| EXPECT_THAT(readPacket(restoredPipe), ElementsAreArray(kResponse)); |
| } |
| |
| // Verifies that getPipe can restore the pipe after snapshot load. |
| TEST_F(AndroidAsyncMessagePipeTest, SnapshotGetPipe) { |
| auto pipeService = |
| new AndroidAsyncMessagePipe::Service<SimpleMessagePipe>("TestPipe"); |
| registerAsyncMessagePipeService(pipeService); |
| |
| auto pipe = mDevice->connect("TestPipe"); |
| |
| AsyncMessagePipeHandle handle = static_cast<AndroidAsyncMessagePipe*>( |
| static_cast<AndroidPipe*>(pipe)) |
| ->getHandle(); |
| |
| base::MemStream snapshotStream; |
| snapshotSave(pipe, &snapshotStream); |
| mDevice->close(pipe); |
| |
| auto restoredPipe = snapshotLoad(&snapshotStream); |
| SimpleMessagePipe* derivedRestoredPipe = static_cast<SimpleMessagePipe*>( |
| static_cast<AndroidPipe*>(restoredPipe)); |
| |
| auto pipeRefOpt = pipeService->getPipe(handle); |
| ASSERT_TRUE(pipeRefOpt.hasValue()); |
| EXPECT_EQ(pipeRefOpt.value().pipe, derivedRestoredPipe); |
| } |
| |
| // Verifies that sendCallback remains valid and works after snapshot load. |
| TEST_F(AndroidAsyncMessagePipeTest, SnapshotSendCallback) { |
| std::vector<uint8_t> lastMessage; |
| PipeSendFunction performSend; |
| |
| auto onMessage = [&](const std::vector<uint8_t>& data, |
| PipeSendFunction sendCallback) { |
| lastMessage = data; |
| performSend = sendCallback; |
| }; |
| |
| registerAsyncMessagePipeService("SnapshotSend", onMessage); |
| auto pipe = mDevice->connect("SnapshotSend"); |
| |
| writePacket(pipe, {1, 2, 3}); |
| EXPECT_THAT(lastMessage, ElementsAre(1, 2, 3)); |
| |
| base::MemStream snapshotStream; |
| snapshotSave(pipe, &snapshotStream); |
| mDevice->close(pipe); |
| |
| auto restoredPipe = snapshotLoad(&snapshotStream); |
| const std::vector<uint8_t> kResponse = {5, 6, 7}; |
| performSend(std::vector<uint8_t>(kResponse)); |
| EXPECT_THAT(readPacket(restoredPipe), ElementsAreArray(kResponse)); |
| } |
| |
| } // namespace android |