Support for RTP packets arriving interleaved with RTSP responses.

Change-Id: Ib32fba257da32a199134cf8943117cf3eaa07a25
diff --git a/media/libstagefright/rtsp/ARTPConnection.cpp b/media/libstagefright/rtsp/ARTPConnection.cpp
index 6816c45..42a22b7 100644
--- a/media/libstagefright/rtsp/ARTPConnection.cpp
+++ b/media/libstagefright/rtsp/ARTPConnection.cpp
@@ -57,6 +57,8 @@
 
     int32_t mNumRTCPPacketsReceived;
     struct sockaddr_in mRemoteRTCPAddr;
+
+    bool mIsInjected;
 };
 
 ARTPConnection::ARTPConnection(uint32_t flags)
@@ -72,13 +74,15 @@
         int rtpSocket, int rtcpSocket,
         const sp<ASessionDescription> &sessionDesc,
         size_t index,
-        const sp<AMessage> &notify) {
+        const sp<AMessage> &notify,
+        bool injected) {
     sp<AMessage> msg = new AMessage(kWhatAddStream, id());
     msg->setInt32("rtp-socket", rtpSocket);
     msg->setInt32("rtcp-socket", rtcpSocket);
     msg->setObject("session-desc", sessionDesc);
     msg->setSize("index", index);
     msg->setMessage("notify", notify);
+    msg->setInt32("injected", injected);
     msg->post();
 }
 
@@ -154,6 +158,12 @@
             break;
         }
 
+        case kWhatInjectPacket:
+        {
+            onInjectPacket(msg);
+            break;
+        }
+
         default:
         {
             TRESPASS();
@@ -172,6 +182,11 @@
     CHECK(msg->findInt32("rtcp-socket", &s));
     info->mRTCPSocket = s;
 
+    int32_t injected;
+    CHECK(msg->findInt32("injected", &injected));
+
+    info->mIsInjected = injected;
+
     sp<RefBase> obj;
     CHECK(msg->findObject("session-desc", &obj));
     info->mSessionDesc = static_cast<ASessionDescription *>(obj.get());
@@ -182,7 +197,9 @@
     info->mNumRTCPPacketsReceived = 0;
     memset(&info->mRemoteRTCPAddr, 0, sizeof(info->mRemoteRTCPAddr));
 
-    postPollEvent();
+    if (!injected) {
+        postPollEvent();
+    }
 }
 
 void ARTPConnection::onRemoveStream(const sp<AMessage> &msg) {
@@ -231,6 +248,10 @@
     int maxSocket = -1;
     for (List<StreamInfo>::iterator it = mStreams.begin();
          it != mStreams.end(); ++it) {
+        if ((*it).mIsInjected) {
+            continue;
+        }
+
         FD_SET(it->mRTPSocket, &rs);
         FD_SET(it->mRTCPSocket, &rs);
 
@@ -248,6 +269,10 @@
     if (res > 0) {
         for (List<StreamInfo>::iterator it = mStreams.begin();
              it != mStreams.end(); ++it) {
+            if ((*it).mIsInjected) {
+                continue;
+            }
+
             if (FD_ISSET(it->mRTPSocket, &rs)) {
                 receive(&*it, true);
             }
@@ -301,6 +326,8 @@
 }
 
 status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) {
+    CHECK(!s->mIsInjected);
+
     sp<ABuffer> buffer = new ABuffer(65536);
 
     socklen_t remoteAddrLen =
@@ -559,5 +586,42 @@
     return source;
 }
 
+void ARTPConnection::injectPacket(int index, const sp<ABuffer> &buffer) {
+    sp<AMessage> msg = new AMessage(kWhatInjectPacket, id());
+    msg->setInt32("index", index);
+    msg->setObject("buffer", buffer);
+    msg->post();
+}
+
+void ARTPConnection::onInjectPacket(const sp<AMessage> &msg) {
+    int32_t index;
+    CHECK(msg->findInt32("index", &index));
+
+    sp<RefBase> obj;
+    CHECK(msg->findObject("buffer", &obj));
+
+    sp<ABuffer> buffer = static_cast<ABuffer *>(obj.get());
+
+    List<StreamInfo>::iterator it = mStreams.begin();
+    while (it != mStreams.end()
+           && it->mRTPSocket != index && it->mRTCPSocket != index) {
+        ++it;
+    }
+
+    if (it == mStreams.end()) {
+        TRESPASS();
+    }
+
+    StreamInfo *s = &*it;
+
+    status_t err;
+    if (it->mRTPSocket == index) {
+        err = parseRTP(s, buffer);
+    } else {
+        ++s->mNumRTCPPacketsReceived;
+        err = parseRTCP(s, buffer);
+    }
+}
+
 }  // namespace android
 
diff --git a/media/libstagefright/rtsp/ARTPConnection.h b/media/libstagefright/rtsp/ARTPConnection.h
index c535199..77f81fa 100644
--- a/media/libstagefright/rtsp/ARTPConnection.h
+++ b/media/libstagefright/rtsp/ARTPConnection.h
@@ -38,10 +38,13 @@
     void addStream(
             int rtpSocket, int rtcpSocket,
             const sp<ASessionDescription> &sessionDesc, size_t index,
-            const sp<AMessage> &notify);
+            const sp<AMessage> &notify,
+            bool injected);
 
     void removeStream(int rtpSocket, int rtcpSocket);
 
+    void injectPacket(int index, const sp<ABuffer> &buffer);
+
     // Creates a pair of UDP datagram sockets bound to adjacent ports
     // (the rtpSocket is bound to an even port, the rtcpSocket to the
     // next higher port).
@@ -57,6 +60,7 @@
         kWhatAddStream,
         kWhatRemoveStream,
         kWhatPollStreams,
+        kWhatInjectPacket,
     };
 
     static const int64_t kSelectTimeoutUs;
@@ -72,6 +76,7 @@
     void onAddStream(const sp<AMessage> &msg);
     void onRemoveStream(const sp<AMessage> &msg);
     void onPollStreams();
+    void onInjectPacket(const sp<AMessage> &msg);
     void onSendReceiverReports();
 
     status_t receive(StreamInfo *info, bool receiveRTP);
diff --git a/media/libstagefright/rtsp/ARTPSession.cpp b/media/libstagefright/rtsp/ARTPSession.cpp
index e082078..d2c56f7 100644
--- a/media/libstagefright/rtsp/ARTPSession.cpp
+++ b/media/libstagefright/rtsp/ARTPSession.cpp
@@ -83,7 +83,8 @@
         sp<AMessage> notify = new AMessage(kWhatAccessUnitComplete, id());
         notify->setSize("track-index", mTracks.size() - 1);
 
-        mRTPConn->addStream(rtpSocket, rtcpSocket, mDesc, i, notify);
+        mRTPConn->addStream(
+                rtpSocket, rtcpSocket, mDesc, i, notify, false /* injected */);
 
         info->mPacketSource = source;
     }
diff --git a/media/libstagefright/rtsp/ARTSPConnection.cpp b/media/libstagefright/rtsp/ARTSPConnection.cpp
index 5f8f5fd..cbd4836 100644
--- a/media/libstagefright/rtsp/ARTSPConnection.cpp
+++ b/media/libstagefright/rtsp/ARTSPConnection.cpp
@@ -19,6 +19,7 @@
 #include <media/stagefright/foundation/ABuffer.h>
 #include <media/stagefright/foundation/ADebug.h>
 #include <media/stagefright/foundation/AMessage.h>
+#include <media/stagefright/MediaErrors.h>
 
 #include <arpa/inet.h>
 #include <fcntl.h>
@@ -67,6 +68,12 @@
     msg->post();
 }
 
+void ARTSPConnection::observeBinaryData(const sp<AMessage> &reply) {
+    sp<AMessage> msg = new AMessage(kWhatObserveBinaryData, id());
+    msg->setMessage("reply", reply);
+    msg->post();
+}
+
 void ARTSPConnection::onMessageReceived(const sp<AMessage> &msg) {
     switch (msg->what()) {
         case kWhatConnect:
@@ -89,6 +96,12 @@
             onReceiveResponse();
             break;
 
+        case kWhatObserveBinaryData:
+        {
+            CHECK(msg->findMessage("reply", &mObserveBinaryMessage));
+            break;
+        }
+
         default:
             TRESPASS();
             break;
@@ -396,16 +409,13 @@
     mReceiveResponseEventPending = true;
 }
 
-bool ARTSPConnection::receiveLine(AString *line) {
-    line->clear();
-
-    bool sawCR = false;
-    for (;;) {
-        char c;
-        ssize_t n = recv(mSocket, &c, 1, 0);
+status_t ARTSPConnection::receive(void *data, size_t size) {
+    size_t offset = 0;
+    while (offset < size) {
+        ssize_t n = recv(mSocket, (uint8_t *)data + offset, size - offset, 0);
         if (n == 0) {
             // Server closed the connection.
-            return false;
+            return ERROR_IO;
         } else if (n < 0) {
             if (errno == EINTR) {
                 continue;
@@ -414,6 +424,22 @@
             TRESPASS();
         }
 
+        offset += (size_t)n;
+    }
+
+    return OK;
+}
+
+bool ARTSPConnection::receiveLine(AString *line) {
+    line->clear();
+
+    bool sawCR = false;
+    for (;;) {
+        char c;
+        if (receive(&c, 1) != OK) {
+            return false;
+        }
+
         if (sawCR && c == '\n') {
             line->erase(line->size() - 1, 1);
             return true;
@@ -421,17 +447,59 @@
 
         line->append(&c, 1);
 
+        if (c == '$' && line->size() == 1) {
+            // Special-case for interleaved binary data.
+            return true;
+        }
+
         sawCR = (c == '\r');
     }
 }
 
-bool ARTSPConnection::receiveRTSPReponse() {
-    sp<ARTSPResponse> response = new ARTSPResponse;
+sp<ABuffer> ARTSPConnection::receiveBinaryData() {
+    uint8_t x[3];
+    if (receive(x, 3) != OK) {
+        return NULL;
+    }
 
-    if (!receiveLine(&response->mStatusLine)) {
+    sp<ABuffer> buffer = new ABuffer((x[1] << 8) | x[2]);
+    if (receive(buffer->data(), buffer->size()) != OK) {
+        return NULL;
+    }
+
+    buffer->meta()->setInt32("index", (int32_t)x[0]);
+
+    return buffer;
+}
+
+bool ARTSPConnection::receiveRTSPReponse() {
+    AString statusLine;
+
+    if (!receiveLine(&statusLine)) {
         return false;
     }
 
+    if (statusLine == "$") {
+        sp<ABuffer> buffer = receiveBinaryData();
+
+        if (buffer == NULL) {
+            return false;
+        }
+
+        if (mObserveBinaryMessage != NULL) {
+            sp<AMessage> notify = mObserveBinaryMessage->dup();
+            notify->setObject("buffer", buffer);
+            notify->post();
+        } else {
+            LOG(WARNING) << "received binary data, but no one cares.";
+        }
+
+        return true;
+    }
+
+    sp<ARTSPResponse> response = new ARTSPResponse;
+    response->mStatusLine = statusLine;
+
     LOG(INFO) << "status: " << response->mStatusLine;
 
     ssize_t space1 = response->mStatusLine.find(" ");
diff --git a/media/libstagefright/rtsp/ARTSPConnection.h b/media/libstagefright/rtsp/ARTSPConnection.h
index 3577a2f..96e0d5b 100644
--- a/media/libstagefright/rtsp/ARTSPConnection.h
+++ b/media/libstagefright/rtsp/ARTSPConnection.h
@@ -40,6 +40,8 @@
 
     void sendRequest(const char *request, const sp<AMessage> &reply);
 
+    void observeBinaryData(const sp<AMessage> &reply);
+
 protected:
     virtual ~ARTSPConnection();
     virtual void onMessageReceived(const sp<AMessage> &msg);
@@ -57,6 +59,7 @@
         kWhatCompleteConnection = 'comc',
         kWhatSendRequest        = 'sreq',
         kWhatReceiveResponse    = 'rres',
+        kWhatObserveBinaryData  = 'obin',
     };
 
     static const int64_t kSelectTimeoutUs;
@@ -69,6 +72,8 @@
 
     KeyedVector<int32_t, sp<AMessage> > mPendingRequests;
 
+    sp<AMessage> mObserveBinaryMessage;
+
     void onConnect(const sp<AMessage> &msg);
     void onDisconnect(const sp<AMessage> &msg);
     void onCompleteConnection(const sp<AMessage> &msg);
@@ -80,7 +85,9 @@
 
     // Return false iff something went unrecoverably wrong.
     bool receiveRTSPReponse();
+    status_t receive(void *data, size_t size);
     bool receiveLine(AString *line);
+    sp<ABuffer> receiveBinaryData();
     bool notifyResponseListener(const sp<ARTSPResponse> &response);
 
     static bool ParseURL(
diff --git a/media/libstagefright/rtsp/MyHandler.h b/media/libstagefright/rtsp/MyHandler.h
index e248463..4c6f058 100644
--- a/media/libstagefright/rtsp/MyHandler.h
+++ b/media/libstagefright/rtsp/MyHandler.h
@@ -29,6 +29,8 @@
 #include <media/stagefright/foundation/AMessage.h>
 #include <media/stagefright/MediaErrors.h>
 
+#define USE_TCP_INTERLEAVED     0
+
 namespace android {
 
 struct MyHandler : public AHandler {
@@ -55,6 +57,9 @@
         mLooper->registerHandler(mConn);
         (1 ? mNetLooper : mLooper)->registerHandler(mRTPConn);
 
+        sp<AMessage> notify = new AMessage('biny', id());
+        mConn->observeBinaryData(notify);
+
         sp<AMessage> reply = new AMessage('conn', id());
         mConn->connect(mSessionURL.c_str(), reply);
     }
@@ -91,6 +96,8 @@
 
                     sp<AMessage> reply = new AMessage('desc', id());
                     mConn->sendRequest(request.c_str(), reply);
+                } else {
+                    (new AMessage('disc', id()))->post();
                 }
                 break;
             }
@@ -183,8 +190,10 @@
 
                 if (result != OK) {
                     if (track) {
-                        close(track->mRTPSocket);
-                        close(track->mRTCPSocket);
+                        if (!track->mUsingInterleavedTCP) {
+                            close(track->mRTPSocket);
+                            close(track->mRTCPSocket);
+                        }
 
                         mTracks.removeItemsAt(trackIndex);
                     }
@@ -216,7 +225,7 @@
                     mRTPConn->addStream(
                             track->mRTPSocket, track->mRTCPSocket,
                             mSessionDesc, index,
-                            notify);
+                            notify, track->mUsingInterleavedTCP);
 
                     mSetupTracksSuccessful = true;
                 }
@@ -263,6 +272,9 @@
                     mDoneMsg->setInt32("result", OK);
                     mDoneMsg->post();
                     mDoneMsg = NULL;
+
+                    sp<AMessage> timeout = new AMessage('tiou', id());
+                    timeout->post(10000000ll);
                 } else {
                     sp<AMessage> reply = new AMessage('disc', id());
                     mConn->disconnect(reply);
@@ -451,6 +463,29 @@
                 break;
             }
 
+            case 'biny':
+            {
+                sp<RefBase> obj;
+                CHECK(msg->findObject("buffer", &obj));
+                sp<ABuffer> buffer = static_cast<ABuffer *>(obj.get());
+
+                int32_t index;
+                CHECK(buffer->meta()->findInt32("index", &index));
+
+                mRTPConn->injectPacket(index, buffer);
+                break;
+            }
+
+            case 'tiou':
+            {
+                if (mFirstAccessUnit) {
+                    LOG(WARNING) << "Never received any data, disconnecting.";
+
+                }
+                (new AMessage('abor', id()))->post();
+                break;
+            }
+
             default:
                 TRESPASS();
                 break;
@@ -485,6 +520,7 @@
     struct TrackInfo {
         int mRTPSocket;
         int mRTCPSocket;
+        bool mUsingInterleavedTCP;
 
         sp<APacketSource> mPacketSource;
     };
@@ -515,19 +551,33 @@
         mTracks.push(TrackInfo());
         TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
         info->mPacketSource = source;
-
-        unsigned rtpPort;
-        ARTPConnection::MakePortPair(
-                &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
+        info->mUsingInterleavedTCP = false;
 
         AString request = "SETUP ";
         request.append(trackURL);
         request.append(" RTSP/1.0\r\n");
 
+#if USE_TCP_INTERLEAVED
+        size_t interleaveIndex = 2 * (mTracks.size() - 1);
+        info->mUsingInterleavedTCP = true;
+        info->mRTPSocket = interleaveIndex;
+        info->mRTCPSocket = interleaveIndex + 1;
+
+        request.append("Transport: RTP/AVP/TCP;interleaved=");
+        request.append(interleaveIndex);
+        request.append("-");
+        request.append(interleaveIndex + 1);
+#else
+        unsigned rtpPort;
+        ARTPConnection::MakePortPair(
+                &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
+
         request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
         request.append(rtpPort);
         request.append("-");
         request.append(rtpPort + 1);
+#endif
+
         request.append("\r\n");
 
         if (index > 1) {