Initial HLS seamless switch implementation.

Bug: 11854054
Change-Id: I75fc2a258111295039ac13cc37e407df25891dd2
diff --git a/media/libstagefright/httplive/LiveSession.cpp b/media/libstagefright/httplive/LiveSession.cpp
index 79ad100..9e30ebd 100644
--- a/media/libstagefright/httplive/LiveSession.cpp
+++ b/media/libstagefright/httplive/LiveSession.cpp
@@ -37,6 +37,8 @@
 #include <media/stagefright/MetaData.h>
 #include <media/stagefright/Utils.h>
 
+#include <utils/Mutex.h>
+
 #include <ctype.h>
 #include <openssl/aes.h>
 #include <openssl/md5.h>
@@ -57,10 +59,14 @@
                     : 0)),
       mPrevBandwidthIndex(-1),
       mStreamMask(0),
+      mNewStreamMask(0),
+      mSwapMask(0),
       mCheckBandwidthGeneration(0),
+      mSwitchGeneration(0),
       mLastDequeuedTimeUs(0ll),
       mRealTimeBaseUs(0ll),
       mReconfigurationInProgress(false),
+      mSwitchInProgress(false),
       mDisconnectReplyID(0) {
     if (mUIDValid) {
         mHTTPDataSource->setUID(mUID);
@@ -72,16 +78,37 @@
 
     for (size_t i = 0; i < kMaxStreams; ++i) {
         mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
+        mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
     }
 }
 
 LiveSession::~LiveSession() {
 }
 
+sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) {
+    ABuffer *discontinuity = new ABuffer(0);
+    discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE);
+    discontinuity->meta()->setInt32("swapPacketSource", swap);
+    discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration);
+    discontinuity->meta()->setInt64("timeUs", -1);
+    return discontinuity;
+}
+
+void LiveSession::swapPacketSource(StreamType stream) {
+    sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
+    sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
+    sp<AnotherPacketSource> tmp = aps;
+    aps = aps2;
+    aps2 = tmp;
+    aps2->clear();
+}
+
 status_t LiveSession::dequeueAccessUnit(
         StreamType stream, sp<ABuffer> *accessUnit) {
     if (!(mStreamMask & stream)) {
-        return UNKNOWN_ERROR;
+        // return -EWOULDBLOCK to avoid halting the decoder
+        // when switching between audio/video and audio only.
+        return -EWOULDBLOCK;
     }
 
     sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
@@ -121,6 +148,25 @@
               streamStr,
               type,
               extra == NULL ? "NULL" : extra->debugString().c_str());
+
+        int32_t swap;
+        if (type == ATSParser::DISCONTINUITY_FORMATCHANGE
+                && (*accessUnit)->meta()->findInt32("swapPacketSource", &swap)
+                && swap) {
+
+            int32_t switchGeneration;
+            CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration));
+            {
+                Mutex::Autolock lock(mSwapMutex);
+                if (switchGeneration == mSwitchGeneration) {
+                    swapPacketSource(stream);
+                    sp<AMessage> msg = new AMessage(kWhatSwapped, id());
+                    msg->setInt32("stream", stream);
+                    msg->setInt32("switchGeneration", switchGeneration);
+                    msg->post();
+                }
+            }
+        }
     } else if (err == OK) {
         if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
             int64_t timeUs;
@@ -142,6 +188,7 @@
 }
 
 status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
+    // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit.
     if (!(mStreamMask & stream)) {
         return UNKNOWN_ERROR;
     }
@@ -238,7 +285,12 @@
                     if (what == PlaylistFetcher::kWhatStopped) {
                         AString uri;
                         CHECK(msg->findString("uri", &uri));
-                        mFetcherInfos.removeItem(uri);
+                        if (mFetcherInfos.removeItem(uri) < 0) {
+                            // ignore duplicated kWhatStopped messages.
+                            break;
+                        }
+
+                        tryToFinishBandwidthSwitch();
                     }
 
                     if (mContinuation != NULL) {
@@ -274,6 +326,8 @@
                         postPrepared(err);
                     }
 
+                    cancelBandwidthSwitch();
+
                     mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
 
                     mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
@@ -312,6 +366,27 @@
                     break;
                 }
 
+                case PlaylistFetcher::kWhatStartedAt:
+                {
+                    int32_t switchGeneration;
+                    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
+
+                    if (switchGeneration != mSwitchGeneration) {
+                        break;
+                    }
+
+                    // Resume fetcher for the original variant; the resumed fetcher should
+                    // continue until the timestamps found in msg, which is stored by the
+                    // new fetcher to indicate where the new variant has started buffering.
+                    for (size_t i = 0; i < mFetcherInfos.size(); i++) {
+                        const FetcherInfo info = mFetcherInfos.valueAt(i);
+                        if (info.mToBeRemoved) {
+                            info.mFetcher->resumeUntilAsync(msg);
+                        }
+                    }
+                    break;
+                }
+
                 default:
                     TRESPASS();
             }
@@ -356,6 +431,11 @@
             break;
         }
 
+        case kWhatSwapped:
+        {
+            onSwapped(msg);
+            break;
+        }
         default:
             TRESPASS();
             break;
@@ -466,6 +546,10 @@
     // during disconnection either.
     cancelCheckBandwidthEvent();
 
+    // Protect mPacketSources from a swapPacketSource race condition through disconnect.
+    // (finishDisconnect, onFinishDisconnect2)
+    cancelBandwidthSwitch();
+
     for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
         mFetcherInfos.valueAt(i).mFetcher->stopAsync();
     }
@@ -505,11 +589,13 @@
 
     sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id());
     notify->setString("uri", uri);
+    notify->setInt32("switchGeneration", mSwitchGeneration);
 
     FetcherInfo info;
     info.mFetcher = new PlaylistFetcher(notify, this, uri);
     info.mDurationUs = -1ll;
     info.mIsPrepared = false;
+    info.mToBeRemoved = false;
     looper()->registerHandler(info.mFetcher);
 
     mFetcherInfos.add(uri, info);
@@ -844,8 +930,25 @@
     return err;
 }
 
+bool LiveSession::canSwitchUp() {
+    // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds.
+    status_t err = OK;
+    for (size_t i = 0; i < mPacketSources.size(); ++i) {
+        sp<AnotherPacketSource> source = mPacketSources.valueAt(i);
+        int64_t dur = source->getBufferedDurationUs(&err);
+        if (err == OK && dur > 10000000) {
+            return true;
+        }
+    }
+    return false;
+}
+
 void LiveSession::changeConfiguration(
         int64_t timeUs, size_t bandwidthIndex, bool pickTrack) {
+    // Protect mPacketSources from a swapPacketSource race condition through reconfiguration.
+    // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3).
+    cancelBandwidthSwitch();
+
     CHECK(!mReconfigurationInProgress);
     mReconfigurationInProgress = true;
 
@@ -861,7 +964,8 @@
     CHECK_LT(bandwidthIndex, mBandwidthItems.size());
     const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex);
 
-    uint32_t streamMask = 0;
+    uint32_t streamMask = 0; // streams that should be fetched by the new fetcher
+    uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher
 
     AString URIs[kMaxStreams];
     for (size_t i = 0; i < kMaxStreams; ++i) {
@@ -879,9 +983,14 @@
 
         // If we're seeking all current fetchers are discarded.
         if (timeUs < 0ll) {
+            // delay fetcher removal
+            discardFetcher = false;
+
             for (size_t j = 0; j < kMaxStreams; ++j) {
-                if ((streamMask & indexToType(j)) && uri == URIs[j]) {
-                    discardFetcher = false;
+                StreamType type = indexToType(j);
+                if ((streamMask & type) && uri == URIs[j]) {
+                    resumeMask |= type;
+                    streamMask &= ~type;
                 }
             }
         }
@@ -893,8 +1002,15 @@
         }
     }
 
-    sp<AMessage> msg = new AMessage(kWhatChangeConfiguration2, id());
+    sp<AMessage> msg;
+    if (timeUs < 0ll) {
+        // skip onChangeConfiguration2 (decoder destruction) if switching.
+        msg = new AMessage(kWhatChangeConfiguration3, id());
+    } else {
+        msg = new AMessage(kWhatChangeConfiguration2, id());
+    }
     msg->setInt32("streamMask", streamMask);
+    msg->setInt32("resumeMask", resumeMask);
     msg->setInt64("timeUs", timeUs);
     for (size_t i = 0; i < kMaxStreams; ++i) {
         if (streamMask & indexToType(i)) {
@@ -977,11 +1093,13 @@
 }
 
 void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
+    mContinuation.clear();
     // All remaining fetchers are still suspended, the player has shutdown
     // any decoders that needed it.
 
-    uint32_t streamMask;
+    uint32_t streamMask, resumeMask;
     CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
+    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
 
     for (size_t i = 0; i < kMaxStreams; ++i) {
         if (streamMask & indexToType(i)) {
@@ -990,37 +1108,39 @@
     }
 
     int64_t timeUs;
+    bool switching = false;
     CHECK(msg->findInt64("timeUs", &timeUs));
 
     if (timeUs < 0ll) {
         timeUs = mLastDequeuedTimeUs;
+        switching = true;
     }
     mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
 
-    mStreamMask = streamMask;
+    mNewStreamMask = streamMask;
 
-    // Resume all existing fetchers and assign them packet sources.
+    // Of all existing fetchers:
+    // * Resume fetchers that are still needed and assign them original packet sources.
+    // * Mark otherwise unneeded fetchers for removal.
+    ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
     for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
         const AString &uri = mFetcherInfos.keyAt(i);
 
-        uint32_t resumeMask = 0;
-
         sp<AnotherPacketSource> sources[kMaxStreams];
         for (size_t j = 0; j < kMaxStreams; ++j) {
-            if ((streamMask & indexToType(j)) && uri == mStreams[j].mUri) {
+            if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) {
                 sources[j] = mPacketSources.valueFor(indexToType(j));
-                resumeMask |= indexToType(j);
             }
         }
 
-        CHECK_NE(resumeMask, 0u);
-
-        ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
-
-        streamMask &= ~resumeMask;
-
-        mFetcherInfos.valueAt(i).mFetcher->startAsync(
-                sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]);
+        FetcherInfo &info = mFetcherInfos.editValueAt(i);
+        if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL
+                || sources[kSubtitleIndex] != NULL) {
+            info.mFetcher->startAsync(
+                    sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]);
+        } else {
+            info.mToBeRemoved = true;
+        }
     }
 
     // streamMask now only contains the types that need a new fetcher created.
@@ -1029,6 +1149,8 @@
         ALOGV("creating new fetchers for mask 0x%08x", streamMask);
     }
 
+    // Find out when the original fetchers have buffered up to and start the new fetchers
+    // at a later timestamp.
     for (size_t i = 0; i < kMaxStreams; i++) {
         if (!(indexToType(i) & streamMask)) {
             continue;
@@ -1040,12 +1162,40 @@
         sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
         CHECK(fetcher != NULL);
 
+        int32_t latestSeq = -1;
+        int64_t latestTimeUs = 0ll;
         sp<AnotherPacketSource> sources[kMaxStreams];
+
         // TRICKY: looping from i as earlier streams are already removed from streamMask
         for (size_t j = i; j < kMaxStreams; ++j) {
             if ((streamMask & indexToType(j)) && uri == mStreams[j].mUri) {
                 sources[j] = mPacketSources.valueFor(indexToType(j));
-                sources[j]->clear();
+
+                if (!switching) {
+                    sources[j]->clear();
+                } else {
+                    int32_t type, seq;
+                    int64_t srcTimeUs;
+                    sp<AMessage> meta = sources[j]->getLatestMeta();
+
+                    if (meta != NULL && !meta->findInt32("discontinuity", &type)) {
+                        CHECK(meta->findInt32("seq", &seq));
+                        if (seq > latestSeq) {
+                            latestSeq = seq;
+                        }
+                        CHECK(meta->findInt64("timeUs", &srcTimeUs));
+                        if (srcTimeUs > latestTimeUs) {
+                            latestTimeUs = srcTimeUs;
+                        }
+                    }
+
+                    sources[j] = mPacketSources2.valueFor(indexToType(j));
+                    sources[j]->clear();
+                    uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
+                    if (extraStreams & indexToType(j)) {
+                        sources[j]->queueAccessUnit(createFormatChangeBuffer(/* swap = */ false));
+                    }
+                }
 
                 streamMask &= ~indexToType(j);
             }
@@ -1055,7 +1205,9 @@
                 sources[kAudioIndex],
                 sources[kVideoIndex],
                 sources[kSubtitleIndex],
-                timeUs);
+                timeUs,
+                latestTimeUs /* min start time(us) */,
+                latestSeq >= 0 ? latestSeq + 1 : -1 /* starting sequence number hint */ );
     }
 
     // All fetchers have now been started, the configuration change
@@ -1064,14 +1216,61 @@
     scheduleCheckBandwidthEvent();
 
     ALOGV("XXX configuration change completed.");
-
     mReconfigurationInProgress = false;
+    if (switching) {
+        mSwitchInProgress = true;
+    } else {
+        mStreamMask = mNewStreamMask;
+    }
 
     if (mDisconnectReplyID != 0) {
         finishDisconnect();
     }
 }
 
+void LiveSession::onSwapped(const sp<AMessage> &msg) {
+    int32_t switchGeneration;
+    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
+    if (switchGeneration != mSwitchGeneration) {
+        return;
+    }
+
+    int32_t stream;
+    CHECK(msg->findInt32("stream", &stream));
+    mSwapMask |= stream;
+    if (mSwapMask != mStreamMask) {
+        return;
+    }
+
+    // Check if new variant contains extra streams.
+    uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
+    while (extraStreams) {
+        StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1));
+        swapPacketSource(extraStream);
+        extraStreams &= ~extraStream;
+    }
+
+    tryToFinishBandwidthSwitch();
+}
+
+// Mark switch done when:
+//   1. all old buffers are swapped out, AND
+//   2. all old fetchers are removed.
+void LiveSession::tryToFinishBandwidthSwitch() {
+    bool needToRemoveFetchers = false;
+    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
+        if (mFetcherInfos.valueAt(i).mToBeRemoved) {
+            needToRemoveFetchers = true;
+            break;
+        }
+    }
+    if (!needToRemoveFetchers && mSwapMask == mStreamMask) {
+        mStreamMask = mNewStreamMask;
+        mSwitchInProgress = false;
+        mSwapMask = 0;
+    }
+}
+
 void LiveSession::scheduleCheckBandwidthEvent() {
     sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id());
     msg->setInt32("generation", mCheckBandwidthGeneration);
@@ -1082,16 +1281,37 @@
     ++mCheckBandwidthGeneration;
 }
 
-void LiveSession::onCheckBandwidth() {
-    if (mReconfigurationInProgress) {
-        scheduleCheckBandwidthEvent();
-        return;
+void LiveSession::cancelBandwidthSwitch() {
+    Mutex::Autolock lock(mSwapMutex);
+    mSwitchGeneration++;
+    mSwitchInProgress = false;
+    mSwapMask = 0;
+}
+
+bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) {
+    if (mReconfigurationInProgress || mSwitchInProgress) {
+        return false;
     }
 
+    if (mPrevBandwidthIndex < 0) {
+        return true;
+    }
+
+    if (bandwidthIndex == (size_t)mPrevBandwidthIndex) {
+        return false;
+    } else if (bandwidthIndex > (size_t)mPrevBandwidthIndex) {
+        return canSwitchUp();
+    } else {
+        return true;
+    }
+}
+
+void LiveSession::onCheckBandwidth() {
     size_t bandwidthIndex = getBandwidthIndex();
-    if (mPrevBandwidthIndex < 0
-            || bandwidthIndex != (size_t)mPrevBandwidthIndex) {
+    if (canSwitchBandwidthTo(bandwidthIndex)) {
         changeConfiguration(-1ll /* timeUs */, bandwidthIndex);
+    } else {
+        scheduleCheckBandwidthEvent();
     }
 
     // Handling the kWhatCheckBandwidth even here does _not_ automatically
diff --git a/media/libstagefright/httplive/LiveSession.h b/media/libstagefright/httplive/LiveSession.h
index b38c518..e26d024 100644
--- a/media/libstagefright/httplive/LiveSession.h
+++ b/media/libstagefright/httplive/LiveSession.h
@@ -81,6 +81,11 @@
         kWhatPreparationFailed,
     };
 
+    // create a format-change discontinuity
+    //
+    // swap:
+    //   whether is format-change discontinuity should trigger a buffer swap
+    sp<ABuffer> createFormatChangeBuffer(bool swap = true);
 protected:
     virtual ~LiveSession();
 
@@ -99,6 +104,7 @@
         kWhatChangeConfiguration2       = 'chC2',
         kWhatChangeConfiguration3       = 'chC3',
         kWhatFinishDisconnect2          = 'fin2',
+        kWhatSwapped                    = 'swap',
     };
 
     struct BandwidthItem {
@@ -110,6 +116,7 @@
         sp<PlaylistFetcher> mFetcher;
         int64_t mDurationUs;
         bool mIsPrepared;
+        bool mToBeRemoved;
     };
 
     struct StreamItem {
@@ -145,9 +152,26 @@
     KeyedVector<AString, FetcherInfo> mFetcherInfos;
     uint32_t mStreamMask;
 
+    // Masks used during reconfiguration:
+    // mNewStreamMask: streams in the variant playlist we're switching to;
+    // we don't want to immediately overwrite the original value.
+    uint32_t mNewStreamMask;
+
+    // mSwapMask: streams that have started to playback content in the new variant playlist;
+    // we use this to track reconfiguration progress.
+    uint32_t mSwapMask;
+
     KeyedVector<StreamType, sp<AnotherPacketSource> > mPacketSources;
+    // A second set of packet sources that buffer content for the variant we're switching to.
+    KeyedVector<StreamType, sp<AnotherPacketSource> > mPacketSources2;
+
+    // A mutex used to serialize two sets of events:
+    // * the swapping of packet sources in dequeueAccessUnit on the player thread, AND
+    // * a forced bandwidth switch termination in cancelSwitch on the live looper.
+    Mutex mSwapMutex;
 
     int32_t mCheckBandwidthGeneration;
+    int32_t mSwitchGeneration;
 
     size_t mContinuationCounter;
     sp<AMessage> mContinuation;
@@ -156,6 +180,7 @@
     int64_t mRealTimeBaseUs;
 
     bool mReconfigurationInProgress;
+    bool mSwitchInProgress;
     uint32_t mDisconnectReplyID;
 
     sp<PlaylistFetcher> addFetcher(const char *uri);
@@ -197,16 +222,27 @@
     void onChangeConfiguration(const sp<AMessage> &msg);
     void onChangeConfiguration2(const sp<AMessage> &msg);
     void onChangeConfiguration3(const sp<AMessage> &msg);
+    void onSwapped(const sp<AMessage> &msg);
+    void tryToFinishBandwidthSwitch();
 
     void scheduleCheckBandwidthEvent();
     void cancelCheckBandwidthEvent();
 
+    // cancelBandwidthSwitch is atomic wrt swapPacketSource; call it to prevent packet sources
+    // from being swapped out on stale discontinuities while manipulating
+    // mPacketSources/mPacketSources2.
+    void cancelBandwidthSwitch();
+
+    bool canSwitchBandwidthTo(size_t bandwidthIndex);
     void onCheckBandwidth();
 
     void finishDisconnect();
 
     void postPrepared(status_t err);
 
+    void swapPacketSource(StreamType stream);
+    bool canSwitchUp();
+
     DISALLOW_EVIL_CONSTRUCTORS(LiveSession);
 };
 
diff --git a/media/libstagefright/httplive/PlaylistFetcher.cpp b/media/libstagefright/httplive/PlaylistFetcher.cpp
index 9cb16fe..0eac8b3 100644
--- a/media/libstagefright/httplive/PlaylistFetcher.cpp
+++ b/media/libstagefright/httplive/PlaylistFetcher.cpp
@@ -48,16 +48,20 @@
 // static
 const int64_t PlaylistFetcher::kMinBufferedDurationUs = 10000000ll;
 const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000ll;
+const int32_t PlaylistFetcher::kNumSkipFrames = 10;
 
 PlaylistFetcher::PlaylistFetcher(
         const sp<AMessage> &notify,
         const sp<LiveSession> &session,
         const char *uri)
     : mNotify(notify),
+      mStartTimeUsNotify(notify->dup()),
       mSession(session),
       mURI(uri),
       mStreamTypeMask(0),
       mStartTimeUs(-1ll),
+      mMinStartTimeUs(0ll),
+      mStopParams(NULL),
       mLastPlaylistFetchTimeUs(-1ll),
       mSeqNumber(-1),
       mNumRetries(0),
@@ -69,6 +73,8 @@
       mFirstPTSValid(false),
       mAbsoluteTimeAnchorUs(0ll) {
     memset(mPlaylistHash, 0, sizeof(mPlaylistHash));
+    mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
+    mStartTimeUsNotify->setInt32("streamMask", 0);
 }
 
 PlaylistFetcher::~PlaylistFetcher() {
@@ -325,7 +331,9 @@
         const sp<AnotherPacketSource> &audioSource,
         const sp<AnotherPacketSource> &videoSource,
         const sp<AnotherPacketSource> &subtitleSource,
-        int64_t startTimeUs) {
+        int64_t startTimeUs,
+        int64_t minStartTimeUs,
+        int32_t startSeqNumberHint) {
     sp<AMessage> msg = new AMessage(kWhatStart, id());
 
     uint32_t streamTypeMask = 0ul;
@@ -347,6 +355,8 @@
 
     msg->setInt32("streamTypeMask", streamTypeMask);
     msg->setInt64("startTimeUs", startTimeUs);
+    msg->setInt64("minStartTimeUs", minStartTimeUs);
+    msg->setInt32("startSeqNumberHint", startSeqNumberHint);
     msg->post();
 }
 
@@ -358,6 +368,12 @@
     (new AMessage(kWhatStop, id()))->post();
 }
 
+void PlaylistFetcher::resumeUntilAsync(const sp<AMessage> &params) {
+    AMessage* msg = new AMessage(kWhatResumeUntil, id());
+    msg->setMessage("params", params);
+    msg->post();
+}
+
 void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {
     switch (msg->what()) {
         case kWhatStart:
@@ -392,6 +408,7 @@
         }
 
         case kWhatMonitorQueue:
+        case kWhatDownloadNext:
         {
             int32_t generation;
             CHECK(msg->findInt32("generation", &generation));
@@ -401,7 +418,17 @@
                 break;
             }
 
-            onMonitorQueue();
+            if (msg->what() == kWhatMonitorQueue) {
+                onMonitorQueue();
+            } else {
+                onDownloadNext();
+            }
+            break;
+        }
+
+        case kWhatResumeUntil:
+        {
+            onResumeUntil(msg);
             break;
         }
 
@@ -417,7 +444,10 @@
     CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask));
 
     int64_t startTimeUs;
+    int32_t startSeqNumberHint;
     CHECK(msg->findInt64("startTimeUs", &startTimeUs));
+    CHECK(msg->findInt64("minStartTimeUs", (int64_t *) &mMinStartTimeUs));
+    CHECK(msg->findInt32("startSeqNumberHint", &startSeqNumberHint));
 
     if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) {
         void *ptr;
@@ -455,6 +485,10 @@
         mPrepared = false;
     }
 
+    if (startSeqNumberHint >= 0) {
+        mSeqNumber = startSeqNumberHint;
+    }
+
     postMonitorQueue();
 
     return OK;
@@ -462,22 +496,72 @@
 
 void PlaylistFetcher::onPause() {
     cancelMonitorQueue();
-
-    mPacketSources.clear();
-    mStreamTypeMask = 0;
 }
 
 void PlaylistFetcher::onStop() {
     cancelMonitorQueue();
 
-    for (size_t i = 0; i < mPacketSources.size(); ++i) {
-        mPacketSources.valueAt(i)->clear();
-    }
-
     mPacketSources.clear();
     mStreamTypeMask = 0;
 }
 
+// Resume until we have reached the boundary timestamps listed in `msg`; when
+// the remaining time is too short (within a resume threshold) stop immediately
+// instead.
+status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) {
+    sp<AMessage> params;
+    CHECK(msg->findMessage("params", &params));
+
+    bool stop = false;
+    for (size_t i = 0; i < mPacketSources.size(); i++) {
+        sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
+
+        const char *stopKey;
+        int streamType = mPacketSources.keyAt(i);
+        switch (streamType) {
+        case LiveSession::STREAMTYPE_VIDEO:
+            stopKey = "timeUsVideo";
+            break;
+
+        case LiveSession::STREAMTYPE_AUDIO:
+            stopKey = "timeUsAudio";
+            break;
+
+        case LiveSession::STREAMTYPE_SUBTITLES:
+            stopKey = "timeUsSubtitle";
+            break;
+
+        default:
+            TRESPASS();
+        }
+
+        // Don't resume if we would stop within a resume threshold.
+        int64_t latestTimeUs = 0, stopTimeUs = 0;
+        sp<AMessage> latestMeta = packetSource->getLatestMeta();
+        if (latestMeta != NULL
+                && (latestMeta->findInt64("timeUs", &latestTimeUs)
+                && params->findInt64(stopKey, &stopTimeUs))) {
+            int64_t diffUs = stopTimeUs - latestTimeUs;
+            if (diffUs < resumeThreshold(latestMeta)) {
+                stop = true;
+            }
+        }
+    }
+
+    if (stop) {
+        for (size_t i = 0; i < mPacketSources.size(); i++) {
+            mPacketSources.valueAt(i)->queueAccessUnit(mSession->createFormatChangeBuffer());
+        }
+        stopAsync();
+        return OK;
+    }
+
+    mStopParams = params;
+    postMonitorQueue();
+
+    return OK;
+}
+
 void PlaylistFetcher::notifyError(status_t err) {
     sp<AMessage> notify = mNotify->dup();
     notify->setInt32("what", kWhatError);
@@ -519,8 +603,9 @@
                 packetSource->getBufferedDurationUs(&finalResult);
         finalResult = OK;
     } else {
-        bool first = true;
-
+        // Use max stream duration to prevent us from waiting on a non-existent stream;
+        // when we cannot make out from the manifest what streams are included in a playlist
+        // we might assume extra streams.
         for (size_t i = 0; i < mPacketSources.size(); ++i) {
             if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) {
                 continue;
@@ -528,9 +613,10 @@
 
             int64_t bufferedStreamDurationUs =
                 mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult);
-            if (first || bufferedStreamDurationUs < bufferedDurationUs) {
+            ALOGV("buffered %lld for stream %d",
+                    bufferedStreamDurationUs, mPacketSources.keyAt(i));
+            if (bufferedStreamDurationUs > bufferedDurationUs) {
                 bufferedDurationUs = bufferedStreamDurationUs;
-                first = false;
             }
         }
     }
@@ -550,7 +636,12 @@
     if (finalResult == OK && downloadMore) {
         ALOGV("monitoring, buffered=%lld < %lld",
                 bufferedDurationUs, durationToBufferUs);
-        onDownloadNext();
+        // delay the next download slightly; hopefully this gives other concurrent fetchers
+        // a better chance to run.
+        // onDownloadNext();
+        sp<AMessage> msg = new AMessage(kWhatDownloadNext, id());
+        msg->setInt32("generation", mMonitorQueueGeneration);
+        msg->post(1000l);
     } else {
         // Nothing to do yet, try again in a second.
 
@@ -617,6 +708,12 @@
     const int32_t lastSeqNumberInPlaylist =
         firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1;
 
+    if (mStartup && mSeqNumber >= 0
+            && (mSeqNumber < firstSeqNumberInPlaylist || mSeqNumber > lastSeqNumberInPlaylist)) {
+        // in case we guessed wrong during reconfiguration, try fetching the latest content.
+        mSeqNumber = lastSeqNumberInPlaylist;
+    }
+
     if (mSeqNumber < 0) {
         CHECK_GE(mStartTimeUs, 0ll);
 
@@ -761,6 +858,18 @@
 
     err = extractAndQueueAccessUnits(buffer, itemMeta);
 
+    if (err == -EAGAIN) {
+        // bad starting sequence number hint
+        postMonitorQueue();
+        return;
+    }
+
+    if (err == ERROR_OUT_OF_RANGE) {
+        // reached stopping point
+        stopAsync();
+        return;
+    }
+
     if (err != OK) {
         notifyError(err);
         return;
@@ -817,12 +926,15 @@
         }
 
         if (mTSParser == NULL) {
-            mTSParser = new ATSParser;
+            // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers.
+            mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE);
         }
 
         if (mNextPTSTimeUs >= 0ll) {
             sp<AMessage> extra = new AMessage;
-            extra->setInt64(IStreamListener::kKeyMediaTimeUs, mNextPTSTimeUs);
+            // Since we are using absolute timestamps, signal an offset of 0 to prevent
+            // ATSParser from skewing the timestamps of access units.
+            extra->setInt64(IStreamListener::kKeyMediaTimeUs, 0);
 
             mTSParser->signalDiscontinuity(
                     ATSParser::DISCONTINUITY_SEEK, extra);
@@ -841,17 +953,23 @@
             offset += 188;
         }
 
+        status_t err = OK;
         for (size_t i = mPacketSources.size(); i-- > 0;) {
             sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
 
+            const char *key;
             ATSParser::SourceType type;
-            switch (mPacketSources.keyAt(i)) {
+            const LiveSession::StreamType stream = mPacketSources.keyAt(i);
+            switch (stream) {
+
                 case LiveSession::STREAMTYPE_VIDEO:
                     type = ATSParser::VIDEO;
+                    key = "timeUsVideo";
                     break;
 
                 case LiveSession::STREAMTYPE_AUDIO:
                     type = ATSParser::AUDIO;
+                    key = "timeUsAudio";
                     break;
 
                 case LiveSession::STREAMTYPE_SUBTITLES:
@@ -878,19 +996,87 @@
                 continue;
             }
 
+            int64_t timeUs;
             sp<ABuffer> accessUnit;
             status_t finalResult;
             while (source->hasBufferAvailable(&finalResult)
                     && source->dequeueAccessUnit(&accessUnit) == OK) {
-                // Note that we do NOT dequeue any discontinuities.
+
+                CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
+                if (mMinStartTimeUs > 0) {
+                    if (timeUs < mMinStartTimeUs) {
+                        // TODO untested path
+                        // try a later ts
+                        int32_t targetDuration;
+                        mPlaylist->meta()->findInt32("target-duration", &targetDuration);
+                        int32_t incr = (mMinStartTimeUs - timeUs) / 1000000 / targetDuration;
+                        if (incr == 0) {
+                            // increment mSeqNumber by at least one
+                            incr = 1;
+                        }
+                        mSeqNumber += incr;
+                        err = -EAGAIN;
+                        break;
+                    } else {
+                        int64_t startTimeUs;
+                        if (mStartTimeUsNotify != NULL
+                                && !mStartTimeUsNotify->findInt64(key, &startTimeUs)) {
+                            mStartTimeUsNotify->setInt64(key, timeUs);
+
+                            uint32_t streamMask = 0;
+                            mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask);
+                            streamMask |= mPacketSources.keyAt(i);
+                            mStartTimeUsNotify->setInt32("streamMask", streamMask);
+
+                            if (streamMask == mStreamTypeMask) {
+                                mStartTimeUsNotify->post();
+                                mStartTimeUsNotify.clear();
+                            }
+                        }
+                    }
+                }
+
+                if (mStopParams != NULL) {
+                    // Queue discontinuity in original stream.
+                    int64_t stopTimeUs;
+                    if (!mStopParams->findInt64(key, &stopTimeUs) || timeUs >= stopTimeUs) {
+                        packetSource->queueAccessUnit(mSession->createFormatChangeBuffer());
+                        mStreamTypeMask &= ~stream;
+                        mPacketSources.removeItemsAt(i);
+                        break;
+                    }
+                }
+
+                // Note that we do NOT dequeue any discontinuities except for format change.
 
                 // for simplicity, store a reference to the format in each unit
                 sp<MetaData> format = source->getFormat();
                 if (format != NULL) {
                     accessUnit->meta()->setObject("format", format);
                 }
+
+                // Stash the sequence number so we can hint future fetchers where to start at.
+                accessUnit->meta()->setInt32("seq", mSeqNumber);
                 packetSource->queueAccessUnit(accessUnit);
             }
+
+            if (err != OK) {
+                break;
+            }
+        }
+
+        if (err != OK) {
+            for (size_t i = mPacketSources.size(); i-- > 0;) {
+                sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
+                packetSource->clear();
+            }
+            return err;
+        }
+
+        if (!mStreamTypeMask) {
+            // Signal gap is filled between original and new stream.
+            ALOGV("ERROR OUT OF RANGE");
+            return ERROR_OUT_OF_RANGE;
         }
 
         return OK;
@@ -907,6 +1093,7 @@
         CHECK(itemMeta->findInt64("durationUs", &durationUs));
         buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber));
         buffer->meta()->setInt64("durationUs", durationUs);
+        buffer->meta()->setInt32("seq", mSeqNumber);
 
         packetSource->queueAccessUnit(buffer);
         return OK;
@@ -1043,6 +1230,7 @@
         // Each AAC frame encodes 1024 samples.
         numSamples += 1024;
 
+        unit->meta()->setInt32("seq", mSeqNumber);
         packetSource->queueAccessUnit(unit);
 
         offset += aac_frame_length;
@@ -1070,4 +1258,33 @@
     msg->post();
 }
 
+int64_t PlaylistFetcher::resumeThreshold(const sp<AMessage> &msg) {
+    int64_t durationUs, threshold;
+    if (msg->findInt64("durationUs", &durationUs)) {
+        return kNumSkipFrames * durationUs;
+    }
+
+    sp<RefBase> obj;
+    msg->findObject("format", &obj);
+    MetaData *format = static_cast<MetaData *>(obj.get());
+
+    const char *mime;
+    CHECK(format->findCString(kKeyMIMEType, &mime));
+    bool audio = !strncasecmp(mime, "audio/", 6);
+    if (audio) {
+        // Assumes 1000 samples per frame.
+        int32_t sampleRate;
+        CHECK(format->findInt32(kKeySampleRate, &sampleRate));
+        return kNumSkipFrames  /* frames */ * 1000 /* samples */
+                * (1000000 / sampleRate) /* sample duration (us) */;
+    } else {
+        int32_t frameRate;
+        if (format->findInt32(kKeyFrameRate, &frameRate) && frameRate > 0) {
+            return kNumSkipFrames * (1000000 / frameRate);
+        }
+    }
+
+    return 500000ll;
+}
+
 }  // namespace android
diff --git a/media/libstagefright/httplive/PlaylistFetcher.h b/media/libstagefright/httplive/PlaylistFetcher.h
index ac04a77..2e0349f 100644
--- a/media/libstagefright/httplive/PlaylistFetcher.h
+++ b/media/libstagefright/httplive/PlaylistFetcher.h
@@ -43,6 +43,7 @@
         kWhatTemporarilyDoneFetching,
         kWhatPrepared,
         kWhatPreparationFailed,
+        kWhatStartedAt,
     };
 
     PlaylistFetcher(
@@ -56,12 +57,16 @@
             const sp<AnotherPacketSource> &audioSource,
             const sp<AnotherPacketSource> &videoSource,
             const sp<AnotherPacketSource> &subtitleSource,
-            int64_t startTimeUs = -1ll);
+            int64_t startTimeUs = -1ll,
+            int64_t minStartTimeUs = 0ll /* start after this timestamp */,
+            int32_t startSeqNumberHint = -1 /* try starting at this sequence number */);
 
     void pauseAsync();
 
     void stopAsync();
 
+    void resumeUntilAsync(const sp<AMessage> &params);
+
 protected:
     virtual ~PlaylistFetcher();
     virtual void onMessageReceived(const sp<AMessage> &msg);
@@ -76,17 +81,25 @@
         kWhatPause          = 'paus',
         kWhatStop           = 'stop',
         kWhatMonitorQueue   = 'moni',
+        kWhatResumeUntil    = 'rsme',
+        kWhatDownloadNext   = 'dlnx',
     };
 
     static const int64_t kMinBufferedDurationUs;
     static const int64_t kMaxMonitorDelayUs;
+    static const int32_t kNumSkipFrames;
 
+    // notifications to mSession
     sp<AMessage> mNotify;
+    sp<AMessage> mStartTimeUsNotify;
+
     sp<LiveSession> mSession;
     AString mURI;
 
     uint32_t mStreamTypeMask;
     int64_t mStartTimeUs;
+    int64_t mMinStartTimeUs; // start fetching no earlier than this value
+    sp<AMessage> mStopParams; // message containing the latest timestamps we should fetch.
 
     KeyedVector<LiveSession::StreamType, sp<AnotherPacketSource> >
         mPacketSources;
@@ -153,6 +166,9 @@
     void onMonitorQueue();
     void onDownloadNext();
 
+    // Resume a fetcher to continue until the stopping point stored in msg.
+    status_t onResumeUntil(const sp<AMessage> &msg);
+
     status_t extractAndQueueAccessUnits(
             const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta);
 
@@ -165,6 +181,10 @@
 
     void updateDuration();
 
+    // Before resuming a fetcher in onResume, check the remaining duration is longer than that
+    // returned by resumeThreshold.
+    int64_t resumeThreshold(const sp<AMessage> &msg);
+
     DISALLOW_EVIL_CONSTRUCTORS(PlaylistFetcher);
 };