Allow for reschedule-able messages
Re-scheduleable messages allows for delayed messages to be re-scheduled
to a later point in time to avoid unnecessary overhead of handling stale messages.
Test: atest AMessage_tests
Bug: 234833109
Change-Id: Id5e76060f1d021f5ea30690cca7dd108dcf8c51d
Merged-In: Id5e76060f1d021f5ea30690cca7dd108dcf8c51d
diff --git a/media/module/foundation/ALooper.cpp b/media/module/foundation/ALooper.cpp
index a276722..61bac02 100644
--- a/media/module/foundation/ALooper.cpp
+++ b/media/module/foundation/ALooper.cpp
@@ -69,6 +69,10 @@
return systemTime(SYSTEM_TIME_MONOTONIC) / 1000LL;
}
+int64_t ALooper::getNowUs() {
+ return GetNowUs();
+}
+
ALooper::ALooper()
: mRunningLocally(false) {
// clean up stale AHandlers. Doing it here instead of in the destructor avoids
@@ -170,11 +174,11 @@
int64_t whenUs;
if (delayUs > 0) {
- int64_t nowUs = GetNowUs();
+ int64_t nowUs = getNowUs();
whenUs = (delayUs > INT64_MAX - nowUs ? INT64_MAX : nowUs + delayUs);
} else {
- whenUs = GetNowUs();
+ whenUs = getNowUs();
}
List<Event>::iterator it = mEventQueue.begin();
@@ -185,6 +189,7 @@
Event event;
event.mWhenUs = whenUs;
event.mMessage = msg;
+ event.mToken = nullptr;
if (it == mEventQueue.begin()) {
mQueueChangedCondition.signal();
@@ -193,7 +198,57 @@
mEventQueue.insert(it, event);
}
+status_t ALooper::postUnique(const sp<AMessage> &msg, const sp<RefBase> &token, int64_t delayUs) {
+ if (token == nullptr) {
+ return -EINVAL;
+ }
+ Mutex::Autolock autoLock(mLock);
+
+ int64_t whenUs;
+ if (delayUs > 0) {
+ int64_t nowUs = getNowUs();
+ whenUs = (delayUs > INT64_MAX - nowUs ? INT64_MAX : nowUs + delayUs);
+ } else {
+ whenUs = getNowUs();
+ }
+
+ // We only need to wake the loop up if we're rescheduling to the earliest event in the queue.
+ // This needs to be checked now, before we reschedule the message, in case this message is
+ // already at the beginning of the queue.
+ bool shouldAwakeLoop = mEventQueue.empty() || whenUs < mEventQueue.begin()->mWhenUs;
+
+ // Erase any previously-posted event with this token.
+ for (auto i = mEventQueue.begin(); i != mEventQueue.end();) {
+ if (i->mToken == token) {
+ i = mEventQueue.erase(i);
+ } else {
+ ++i;
+ }
+ }
+
+ // Find the insertion point for the rescheduled message.
+ List<Event>::iterator i = mEventQueue.begin();
+ while (i != mEventQueue.end() && i->mWhenUs <= whenUs) {
+ ++i;
+ }
+
+ Event event;
+ event.mWhenUs = whenUs;
+ event.mMessage = msg;
+ event.mToken = token;
+ mEventQueue.insert(i, event);
+
+ // If we rescheduled the event to be earlier than the first event, then we need to wake up the
+ // looper earlier than it was previously scheduled to be woken up. Otherwise, it can sleep until
+ // the previous wake-up time and then go to sleep again if needed.
+ if (shouldAwakeLoop){
+ mQueueChangedCondition.signal();
+ }
+ return OK;
+}
+
bool ALooper::loop() {
+
Event event;
{
@@ -206,7 +261,7 @@
return true;
}
int64_t whenUs = (*mEventQueue.begin()).mWhenUs;
- int64_t nowUs = GetNowUs();
+ int64_t nowUs = getNowUs();
if (whenUs > nowUs) {
int64_t delayUs = whenUs - nowUs;
diff --git a/media/module/foundation/AMessage.cpp b/media/module/foundation/AMessage.cpp
index 5c99cc9..b61dc47 100644
--- a/media/module/foundation/AMessage.cpp
+++ b/media/module/foundation/AMessage.cpp
@@ -430,6 +430,17 @@
return OK;
}
+status_t AMessage::postUnique(const sp<RefBase> &token, int64_t delayUs) {
+ sp<ALooper> looper = mLooper.promote();
+ if (looper == NULL) {
+ ALOGW("failed to post message as target looper for handler %d is gone.",
+ mTarget);
+ return -ENOENT;
+ }
+
+ return looper->postUnique(this, token, delayUs);
+}
+
status_t AMessage::postAndAwaitResponse(sp<AMessage> *response) {
sp<ALooper> looper = mLooper.promote();
if (looper == NULL) {
diff --git a/media/module/foundation/include/media/stagefright/foundation/ALooper.h b/media/module/foundation/include/media/stagefright/foundation/ALooper.h
index 09c469b..60bda1f 100644
--- a/media/module/foundation/include/media/stagefright/foundation/ALooper.h
+++ b/media/module/foundation/include/media/stagefright/foundation/ALooper.h
@@ -59,6 +59,9 @@
}
protected:
+ // overridable by test harness
+ virtual int64_t getNowUs();
+
virtual ~ALooper();
private:
@@ -67,6 +70,7 @@
struct Event {
int64_t mWhenUs;
sp<AMessage> mMessage;
+ sp<RefBase> mToken;
};
Mutex mLock;
@@ -87,9 +91,14 @@
// START --- methods used only by AMessage
- // posts a message on this looper with the given timeout
+ // Posts a message on this looper with the given timeout.
void post(const sp<AMessage> &msg, int64_t delayUs);
+ // Post a message uniquely on this looper with the given timeout.
+ // This method ensures that there is exactly one message with the same token pending posted on
+ // this looper after the call returns. A null token will result in an EINVAL error status.
+ status_t postUnique(const sp<AMessage> &msg, const sp<RefBase> &token, int64_t delayUs);
+
// creates a reply token to be used with this looper
sp<AReplyToken> createReplyToken();
// waits for a response for the reply token. If status is OK, the response
diff --git a/media/module/foundation/include/media/stagefright/foundation/AMessage.h b/media/module/foundation/include/media/stagefright/foundation/AMessage.h
index 960212a..6f73597 100644
--- a/media/module/foundation/include/media/stagefright/foundation/AMessage.h
+++ b/media/module/foundation/include/media/stagefright/foundation/AMessage.h
@@ -141,6 +141,11 @@
status_t post(int64_t delayUs = 0);
+ // Post a message uniquely to its target with the given timeout.
+ // This method ensures that there is exactly one message with the same token posted to its
+ // target after the call returns. A null token will result in an EINVAL error status.
+ status_t postUnique(const sp<RefBase> &token, int64_t delayUs = 0);
+
// Posts the message to its target and waits for a response (or error)
// before returning.
status_t postAndAwaitResponse(sp<AMessage> *response);
diff --git a/media/module/foundation/tests/AMessage_test.cpp b/media/module/foundation/tests/AMessage_test.cpp
index 2b11326..e08ed77 100644
--- a/media/module/foundation/tests/AMessage_test.cpp
+++ b/media/module/foundation/tests/AMessage_test.cpp
@@ -17,18 +17,43 @@
//#define LOG_NDEBUG 0
#define LOG_TAG "AData_test"
+#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <utils/RefBase.h>
#include <media/stagefright/foundation/AMessage.h>
+#include <media/stagefright/foundation/AHandler.h>
+#include <media/stagefright/foundation/ALooper.h>
using namespace android;
-class AMessageTest : public ::testing::Test {
+using ::testing::InSequence;
+using ::testing::NiceMock;
+
+class LooperWithSettableClock : public ALooper {
+public:
+ LooperWithSettableClock() : mClockUs(0) {}
+
+ void setClockUs(int64_t nowUs) {
+ mClockUs = nowUs;
+ }
+
+ int64_t getNowUs() override {
+ return mClockUs;
+ }
+
+private:
+ int64_t mClockUs;
};
+timespec millis100 = {0, 100L*1000*1000};
-TEST(AMessage_tests, item_manipulation) {
+class MockHandler : public AHandler {
+public:
+ MOCK_METHOD(void, onMessageReceived, (const sp<AMessage>&), (override));
+};
+
+TEST(AMessage_tests, settersAndGetters) {
sp<AMessage> m1 = new AMessage();
m1->setInt32("value", 2);
@@ -120,6 +145,171 @@
EXPECT_TRUE(m1->findInt32("alittlelonger", &i32));
EXPECT_NE(OK, m1->removeEntryByName("notpresent"));
-
}
+TEST(AMessage_tests, deliversMultipleMessagesInOrderImmediately) {
+ sp<NiceMock<MockHandler>> mockHandler = new NiceMock<MockHandler>;
+ sp<LooperWithSettableClock> looper = new LooperWithSettableClock();
+ looper->registerHandler(mockHandler);
+
+ sp<AMessage> msgNow1 = new AMessage(0, mockHandler);
+ msgNow1->post();
+ sp<AMessage> msgNow2 = new AMessage(0, mockHandler);
+ msgNow2->post();
+
+ {
+ InSequence inSequence;
+ EXPECT_CALL(*mockHandler, onMessageReceived(msgNow1)).Times(1);
+ EXPECT_CALL(*mockHandler, onMessageReceived(msgNow2)).Times(1);
+ }
+ looper->start();
+ nanosleep(&millis100, nullptr); // just enough time for the looper thread to run
+}
+
+TEST(AMessage_tests, doesNotDeliverDelayedMessageImmediately) {
+ sp<NiceMock<MockHandler>> mockHandler = new NiceMock<MockHandler>;
+ sp<LooperWithSettableClock> looper = new LooperWithSettableClock();
+ looper->registerHandler(mockHandler);
+
+ sp<AMessage> msgNow = new AMessage(0, mockHandler);
+ msgNow->post();
+ sp<AMessage> msgDelayed = new AMessage(0, mockHandler);
+ msgDelayed->post(100);
+
+ EXPECT_CALL(*mockHandler, onMessageReceived(msgNow)).Times(1);
+ // note: never called
+ EXPECT_CALL(*mockHandler, onMessageReceived(msgDelayed)).Times(0);
+ looper->start();
+ nanosleep(&millis100, nullptr); // just enough time for the looper thread to run
+}
+
+TEST(AMessage_tests, deliversDelayedMessagesInSequence) {
+ sp<NiceMock<MockHandler>> mockHandler = new NiceMock<MockHandler>;
+ sp<LooperWithSettableClock> looper = new LooperWithSettableClock();
+ looper->registerHandler(mockHandler);
+
+ sp<AMessage> msgIn500 = new AMessage(0, mockHandler);
+ msgIn500->post(500);
+ sp<AMessage> msgNow = new AMessage(0, mockHandler);
+ msgNow->post();
+ sp<AMessage> msgIn100 = new AMessage(0, mockHandler);
+ msgIn100->post(100);
+ // not expected to be received
+ sp<AMessage> msgIn1000 = new AMessage(0, mockHandler);
+ msgIn1000->post(1000);
+
+ looper->setClockUs(500);
+ {
+ InSequence inSequence;
+
+ EXPECT_CALL(*mockHandler, onMessageReceived(msgNow)).Times(1);
+ EXPECT_CALL(*mockHandler, onMessageReceived(msgIn100)).Times(1);
+ EXPECT_CALL(*mockHandler, onMessageReceived(msgIn500)).Times(1);
+ }
+ // note: never called
+ EXPECT_CALL(*mockHandler, onMessageReceived(msgIn1000)).Times(0);
+ looper->start();
+ nanosleep(&millis100, nullptr); // just enough time for the looper thread to run
+}
+
+TEST(AMessage_tests, deliversDelayedUniqueMessage) {
+ sp<NiceMock<MockHandler>> mockHandler = new NiceMock<MockHandler>;
+ sp<LooperWithSettableClock> looper = new LooperWithSettableClock();
+ looper->registerHandler(mockHandler);
+
+ sp<AMessage> msg = new AMessage(0, mockHandler);
+ msg->postUnique(msg, 50);
+
+ looper->setClockUs(50);
+ EXPECT_CALL(*mockHandler, onMessageReceived(msg)).Times(1);
+ looper->start();
+ nanosleep(&millis100, nullptr); // just enough time for the looper thread to run
+}
+
+TEST(AMessage_tests, deliversImmediateUniqueMessage) {
+ sp<NiceMock<MockHandler>> mockHandler = new NiceMock<MockHandler>;
+ // note: we don't need to set the clock, but we do want a stable clock that doesn't advance
+ sp<LooperWithSettableClock> looper = new LooperWithSettableClock();
+ looper->registerHandler(mockHandler);
+
+ sp<AMessage> msg = new AMessage(0, mockHandler);
+ msg->postUnique(msg, 0);
+
+ EXPECT_CALL(*mockHandler, onMessageReceived(msg)).Times(1);
+ looper->start();
+ nanosleep(&millis100, nullptr); // just enough time for the looper thread to run
+}
+
+TEST(AMessage_tests, doesNotDeliverUniqueMessageAfterRescheduleLater) {
+ sp<NiceMock<MockHandler>> mockHandler = new NiceMock<MockHandler>;
+ sp<LooperWithSettableClock> looper = new LooperWithSettableClock();
+ looper->registerHandler(mockHandler);
+
+ sp<AMessage> msg = new AMessage(0, mockHandler);
+ msg->postUnique(msg, 50);
+ msg->postUnique(msg, 100); // reschedule for later
+
+ looper->setClockUs(50); // if the message is correctly rescheduled, it should not be delivered
+ // Never called because the message was rescheduled to a later point in time
+ EXPECT_CALL(*mockHandler, onMessageReceived(msg)).Times(0);
+ looper->start();
+ nanosleep(&millis100, nullptr); // just enough time for the looper thread to run
+}
+
+TEST(AMessage_tests, deliversUniqueMessageAfterRescheduleEarlier) {
+ sp<NiceMock<MockHandler>> mockHandler = new NiceMock<MockHandler>;
+ sp<LooperWithSettableClock> looper = new LooperWithSettableClock();
+ looper->registerHandler(mockHandler);
+
+ sp<AMessage> msg = new AMessage(0, mockHandler);
+ msg->postUnique(msg, 100);
+ msg->postUnique(msg, 50); // reschedule to fire earlier
+
+ looper->setClockUs(50); // if the message is rescheduled correctly, it should be delivered
+ EXPECT_CALL(*mockHandler, onMessageReceived(msg)).Times(1);
+ looper->start();
+ nanosleep(&millis100, nullptr); // just enough time for the looper thread to run
+}
+
+TEST(AMessage_tests, deliversSameMessageTwice) {
+ sp<NiceMock<MockHandler>> mockHandler = new NiceMock<MockHandler>;
+ sp<LooperWithSettableClock> looper = new LooperWithSettableClock();
+ looper->registerHandler(mockHandler);
+
+ sp<AMessage> msg = new AMessage(0, mockHandler);
+ msg->post(50);
+ msg->post(100);
+
+ looper->setClockUs(100);
+ EXPECT_CALL(*mockHandler, onMessageReceived(msg)).Times(2);
+ looper->start();
+ nanosleep(&millis100, nullptr); // just enough time for the looper thread to run
+}
+
+// When messages are posted twice with the same token, it will only be delivered once after being
+// rescheduled.
+TEST(AMessage_tests, deliversUniqueMessageOnce) {
+ sp<NiceMock<MockHandler>> mockHandler = new NiceMock<MockHandler>;
+ sp<LooperWithSettableClock> looper = new LooperWithSettableClock();
+ looper->registerHandler(mockHandler);
+
+ sp<AMessage> msg1 = new AMessage(0, mockHandler);
+ msg1->postUnique(msg1, 50);
+ sp<AMessage> msg2 = new AMessage(0, mockHandler);
+ msg2->postUnique(msg1, 75); // note, using the same token as msg1
+
+ looper->setClockUs(100);
+ EXPECT_CALL(*mockHandler, onMessageReceived(msg1)).Times(0);
+ EXPECT_CALL(*mockHandler, onMessageReceived(msg2)).Times(1);
+ looper->start();
+ nanosleep(&millis100, nullptr); // just enough time for the looper thread to run
+}
+
+TEST(AMessage_tests, postUnique_withNullToken_returnsInvalidArgument) {
+ sp<NiceMock<MockHandler>> mockHandler = new NiceMock<MockHandler>;
+ sp<ALooper> looper = new ALooper();
+ looper->registerHandler(mockHandler);
+
+ sp<AMessage> msg = new AMessage(0, mockHandler);
+ EXPECT_EQ(msg->postUnique(nullptr, 0), -EINVAL);
+}
diff --git a/media/module/foundation/tests/Android.bp b/media/module/foundation/tests/Android.bp
index e72ce43..c409dd2 100644
--- a/media/module/foundation/tests/Android.bp
+++ b/media/module/foundation/tests/Android.bp
@@ -20,10 +20,14 @@
shared_libs: [
"liblog",
- "libstagefright_foundation",
"libutils",
],
+ static_libs: [
+ "libstagefright_foundation",
+ "libgmock",
+ ],
+
srcs: [
"AData_test.cpp",
"AMessage_test.cpp",