blob: 09ca1c9115c298e336834e690ab5aaa2f1b1654a [file] [log] [blame] [edit]
/*
* Copyright (C) 2010 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
//#define LOG_NDEBUG 0
#define LOG_TAG "LiveSession"
#include <utils/Log.h>
#include "LiveSession.h"
#include "HTTPDownloader.h"
#include "M3UParser.h"
#include "PlaylistFetcher.h"
#include <mpeg2ts/AnotherPacketSource.h>
#include <cutils/properties.h>
#include <media/MediaHTTPService.h>
#include <media/stagefright/foundation/ABuffer.h>
#include <media/stagefright/foundation/ADebug.h>
#include <media/stagefright/foundation/AMessage.h>
#include <media/stagefright/foundation/AUtils.h>
#include <media/stagefright/MediaDefs.h>
#include <media/stagefright/MetaData.h>
#include <media/stagefright/Utils.h>
#include <media/stagefright/FoundationUtils.h>
#include <utils/Mutex.h>
#include <ctype.h>
#include <inttypes.h>
namespace android {
// static
// Bandwidth Switch Mark Defaults
const int64_t LiveSession::kUpSwitchMarkUs = 15000000LL;
const int64_t LiveSession::kDownSwitchMarkUs = 20000000LL;
const int64_t LiveSession::kUpSwitchMarginUs = 5000000LL;
const int64_t LiveSession::kResumeThresholdUs = 100000LL;
//TODO: redefine this mark to a fair value
// default buffer underflow mark
static const int kUnderflowMarkMs = 1000; // 1 second
struct LiveSession::BandwidthEstimator : public RefBase {
BandwidthEstimator();
void addBandwidthMeasurement(size_t numBytes, int64_t delayUs);
bool estimateBandwidth(
int32_t *bandwidth,
bool *isStable = NULL,
int32_t *shortTermBps = NULL);
private:
// Bandwidth estimation parameters
static const int32_t kShortTermBandwidthItems = 3;
static const int32_t kMinBandwidthHistoryItems = 20;
static const int64_t kMinBandwidthHistoryWindowUs = 5000000LL; // 5 sec
static const int64_t kMaxBandwidthHistoryWindowUs = 30000000LL; // 30 sec
static const int64_t kMaxBandwidthHistoryAgeUs = 60000000LL; // 60 sec
struct BandwidthEntry {
int64_t mTimestampUs;
int64_t mDelayUs;
size_t mNumBytes;
};
Mutex mLock;
List<BandwidthEntry> mBandwidthHistory;
List<int32_t> mPrevEstimates;
int32_t mShortTermEstimate;
bool mHasNewSample;
bool mIsStable;
int64_t mTotalTransferTimeUs;
size_t mTotalTransferBytes;
DISALLOW_EVIL_CONSTRUCTORS(BandwidthEstimator);
};
LiveSession::BandwidthEstimator::BandwidthEstimator() :
mShortTermEstimate(0),
mHasNewSample(false),
mIsStable(true),
mTotalTransferTimeUs(0),
mTotalTransferBytes(0) {
}
void LiveSession::BandwidthEstimator::addBandwidthMeasurement(
size_t numBytes, int64_t delayUs) {
AutoMutex autoLock(mLock);
int64_t nowUs = ALooper::GetNowUs();
BandwidthEntry entry;
entry.mTimestampUs = nowUs;
entry.mDelayUs = delayUs;
entry.mNumBytes = numBytes;
mTotalTransferTimeUs += delayUs;
mTotalTransferBytes += numBytes;
mBandwidthHistory.push_back(entry);
mHasNewSample = true;
// Remove no more than 10% of total transfer time at a time
// to avoid sudden jump on bandwidth estimation. There might
// be long blocking reads that takes up signification time,
// we have to keep a longer window in that case.
int64_t bandwidthHistoryWindowUs = mTotalTransferTimeUs * 9 / 10;
if (bandwidthHistoryWindowUs < kMinBandwidthHistoryWindowUs) {
bandwidthHistoryWindowUs = kMinBandwidthHistoryWindowUs;
} else if (bandwidthHistoryWindowUs > kMaxBandwidthHistoryWindowUs) {
bandwidthHistoryWindowUs = kMaxBandwidthHistoryWindowUs;
}
// trim old samples, keeping at least kMaxBandwidthHistoryItems samples,
// and total transfer time at least kMaxBandwidthHistoryWindowUs.
while (mBandwidthHistory.size() > kMinBandwidthHistoryItems) {
List<BandwidthEntry>::iterator it = mBandwidthHistory.begin();
// remove sample if either absolute age or total transfer time is
// over kMaxBandwidthHistoryWindowUs
if (nowUs - it->mTimestampUs < kMaxBandwidthHistoryAgeUs &&
mTotalTransferTimeUs - it->mDelayUs < bandwidthHistoryWindowUs) {
break;
}
mTotalTransferTimeUs -= it->mDelayUs;
mTotalTransferBytes -= it->mNumBytes;
mBandwidthHistory.erase(mBandwidthHistory.begin());
}
}
bool LiveSession::BandwidthEstimator::estimateBandwidth(
int32_t *bandwidthBps, bool *isStable, int32_t *shortTermBps) {
AutoMutex autoLock(mLock);
if (mBandwidthHistory.size() < 2) {
return false;
}
if (!mHasNewSample) {
*bandwidthBps = *(--mPrevEstimates.end());
if (isStable) {
*isStable = mIsStable;
}
if (shortTermBps) {
*shortTermBps = mShortTermEstimate;
}
return true;
}
*bandwidthBps = ((double)mTotalTransferBytes * 8E6 / mTotalTransferTimeUs);
mPrevEstimates.push_back(*bandwidthBps);
while (mPrevEstimates.size() > 3) {
mPrevEstimates.erase(mPrevEstimates.begin());
}
mHasNewSample = false;
int64_t totalTimeUs = 0;
size_t totalBytes = 0;
if (mBandwidthHistory.size() >= kShortTermBandwidthItems) {
List<BandwidthEntry>::iterator it = --mBandwidthHistory.end();
for (size_t i = 0; i < kShortTermBandwidthItems; i++, it--) {
totalTimeUs += it->mDelayUs;
totalBytes += it->mNumBytes;
}
}
mShortTermEstimate = totalTimeUs > 0 ?
(totalBytes * 8E6 / totalTimeUs) : *bandwidthBps;
if (shortTermBps) {
*shortTermBps = mShortTermEstimate;
}
int64_t minEstimate = -1, maxEstimate = -1;
List<int32_t>::iterator it;
for (it = mPrevEstimates.begin(); it != mPrevEstimates.end(); it++) {
int32_t estimate = *it;
if (minEstimate < 0 || minEstimate > estimate) {
minEstimate = estimate;
}
if (maxEstimate < 0 || maxEstimate < estimate) {
maxEstimate = estimate;
}
}
// consider it stable if long-term average is not jumping a lot
// and short-term average is not much lower than long-term average
mIsStable = (maxEstimate <= minEstimate * 4 / 3)
&& mShortTermEstimate > minEstimate * 7 / 10;
if (isStable) {
*isStable = mIsStable;
}
#if 0
{
char dumpStr[1024] = {0};
size_t itemIdx = 0;
size_t histSize = mBandwidthHistory.size();
sprintf(dumpStr, "estimate bps=%d stable=%d history (n=%d): {",
*bandwidthBps, mIsStable, histSize);
List<BandwidthEntry>::iterator it = mBandwidthHistory.begin();
for (; it != mBandwidthHistory.end(); ++it) {
if (itemIdx > 50) {
sprintf(dumpStr + strlen(dumpStr),
"...(%zd more items)... }", histSize - itemIdx);
break;
}
sprintf(dumpStr + strlen(dumpStr), "%dk/%.3fs%s",
it->mNumBytes / 1024,
(double)it->mDelayUs * 1.0e-6,
(it == (--mBandwidthHistory.end())) ? "}" : ", ");
itemIdx++;
}
ALOGE(dumpStr);
}
#endif
return true;
}
//static
const char *LiveSession::getKeyForStream(StreamType type) {
switch (type) {
case STREAMTYPE_VIDEO:
return "timeUsVideo";
case STREAMTYPE_AUDIO:
return "timeUsAudio";
case STREAMTYPE_SUBTITLES:
return "timeUsSubtitle";
case STREAMTYPE_METADATA:
return "timeUsMetadata"; // unused
default:
TRESPASS();
}
return NULL;
}
//static
const char *LiveSession::getNameForStream(StreamType type) {
switch (type) {
case STREAMTYPE_VIDEO:
return "video";
case STREAMTYPE_AUDIO:
return "audio";
case STREAMTYPE_SUBTITLES:
return "subs";
case STREAMTYPE_METADATA:
return "metadata";
default:
break;
}
return "unknown";
}
//static
ATSParser::SourceType LiveSession::getSourceTypeForStream(StreamType type) {
switch (type) {
case STREAMTYPE_VIDEO:
return ATSParser::VIDEO;
case STREAMTYPE_AUDIO:
return ATSParser::AUDIO;
case STREAMTYPE_METADATA:
return ATSParser::META;
case STREAMTYPE_SUBTITLES:
default:
TRESPASS();
}
return ATSParser::NUM_SOURCE_TYPES; // should not reach here
}
LiveSession::LiveSession(
const sp<AMessage> &notify, uint32_t flags,
const sp<MediaHTTPService> &httpService)
: mNotify(notify),
mFlags(flags),
mHTTPService(httpService),
mBuffering(false),
mInPreparationPhase(true),
mPollBufferingGeneration(0),
mPrevBufferPercentage(-1),
mCurBandwidthIndex(-1),
mOrigBandwidthIndex(-1),
mLastBandwidthBps(-1LL),
mLastBandwidthStable(false),
mBandwidthEstimator(new BandwidthEstimator()),
mMaxWidth(720),
mMaxHeight(480),
mStreamMask(0),
mNewStreamMask(0),
mSwapMask(0),
mSwitchGeneration(0),
mSubtitleGeneration(0),
mLastDequeuedTimeUs(0LL),
mRealTimeBaseUs(0LL),
mReconfigurationInProgress(false),
mSwitchInProgress(false),
mUpSwitchMark(kUpSwitchMarkUs),
mDownSwitchMark(kDownSwitchMarkUs),
mUpSwitchMargin(kUpSwitchMarginUs),
mFirstTimeUsValid(false),
mFirstTimeUs(0),
mLastSeekTimeUs(0),
mHasMetadata(false) {
mStreams[kAudioIndex] = StreamItem("audio");
mStreams[kVideoIndex] = StreamItem("video");
mStreams[kSubtitleIndex] = StreamItem("subtitles");
for (size_t i = 0; i < kNumSources; ++i) {
mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
}
}
LiveSession::~LiveSession() {
if (mFetcherLooper != NULL) {
mFetcherLooper->stop();
}
}
int64_t LiveSession::calculateMediaTimeUs(
int64_t firstTimeUs, int64_t timeUs, int32_t discontinuitySeq) {
if (timeUs >= firstTimeUs) {
timeUs -= firstTimeUs;
} else {
timeUs = 0;
}
timeUs += mLastSeekTimeUs;
if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) {
timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq);
}
return timeUs;
}
status_t LiveSession::dequeueAccessUnit(
StreamType stream, sp<ABuffer> *accessUnit) {
status_t finalResult = OK;
sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
ssize_t streamIdx = typeToIndex(stream);
if (streamIdx < 0) {
return BAD_VALUE;
}
const char *streamStr = getNameForStream(stream);
// Do not let client pull data if we don't have data packets yet.
// We might only have a format discontinuity queued without data.
// When NuPlayerDecoder dequeues the format discontinuity, it will
// immediately try to getFormat. If we return NULL, NuPlayerDecoder
// thinks it can do seamless change, so will not shutdown decoder.
// When the actual format arrives, it can't handle it and get stuck.
if (!packetSource->hasDataBufferAvailable(&finalResult)) {
ALOGV("[%s] dequeueAccessUnit: no buffer available (finalResult=%d)",
streamStr, finalResult);
if (finalResult == OK) {
return -EAGAIN;
} else {
return finalResult;
}
}
// Let the client dequeue as long as we have buffers available
// Do not make pause/resume decisions here.
status_t err = packetSource->dequeueAccessUnit(accessUnit);
if (err == INFO_DISCONTINUITY) {
// adaptive streaming, discontinuities in the playlist
int32_t type;
CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type));
sp<AMessage> extra;
if (!(*accessUnit)->meta()->findMessage("extra", &extra)) {
extra.clear();
}
ALOGI("[%s] read discontinuity of type %d, extra = %s",
streamStr,
type,
extra == NULL ? "NULL" : extra->debugString().c_str());
} else if (err == OK) {
if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
int64_t timeUs, originalTimeUs;
int32_t discontinuitySeq = 0;
StreamItem& strm = mStreams[streamIdx];
CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs));
originalTimeUs = timeUs;
(*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq);
if (discontinuitySeq > (int32_t) strm.mCurDiscontinuitySeq) {
int64_t offsetTimeUs;
if (mDiscontinuityOffsetTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(strm.mCurDiscontinuitySeq);
} else {
offsetTimeUs = 0;
}
if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0
&& strm.mLastDequeuedTimeUs >= 0) {
int64_t firstTimeUs;
firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs;
offsetTimeUs += strm.mLastSampleDurationUs;
} else {
offsetTimeUs += strm.mLastSampleDurationUs;
}
mDiscontinuityOffsetTimesUs.add(discontinuitySeq, offsetTimeUs);
strm.mCurDiscontinuitySeq = discontinuitySeq;
}
int32_t discard = 0;
int64_t firstTimeUs;
if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
int64_t durUs; // approximate sample duration
if (timeUs > strm.mLastDequeuedTimeUs) {
durUs = timeUs - strm.mLastDequeuedTimeUs;
} else {
durUs = strm.mLastDequeuedTimeUs - timeUs;
}
strm.mLastSampleDurationUs = durUs;
firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
} else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) {
firstTimeUs = timeUs;
} else {
mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs);
firstTimeUs = timeUs;
}
strm.mLastDequeuedTimeUs = timeUs;
timeUs = calculateMediaTimeUs(firstTimeUs, timeUs, discontinuitySeq);
ALOGV("[%s] dequeueAccessUnit: time %lld us, original %lld us",
streamStr, (long long)timeUs, (long long)originalTimeUs);
(*accessUnit)->meta()->setInt64("timeUs", timeUs);
mLastDequeuedTimeUs = timeUs;
mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
} else if (stream == STREAMTYPE_SUBTITLES) {
int32_t subtitleGeneration;
if ((*accessUnit)->meta()->findInt32("subtitleGeneration", &subtitleGeneration)
&& subtitleGeneration != mSubtitleGeneration) {
return -EAGAIN;
};
(*accessUnit)->meta()->setInt32(
"track-index", mPlaylist->getSelectedIndex());
(*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
} else if (stream == STREAMTYPE_METADATA) {
HLSTime mdTime((*accessUnit)->meta());
if (mDiscontinuityAbsStartTimesUs.indexOfKey(mdTime.mSeq) < 0) {
packetSource->requeueAccessUnit((*accessUnit));
return -EAGAIN;
} else {
int64_t firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(mdTime.mSeq);
int64_t timeUs = calculateMediaTimeUs(firstTimeUs, mdTime.mTimeUs, mdTime.mSeq);
(*accessUnit)->meta()->setInt64("timeUs", timeUs);
(*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
}
}
} else {
ALOGI("[%s] encountered error %d", streamStr, err);
}
return err;
}
status_t LiveSession::getStreamFormatMeta(StreamType stream, sp<MetaData> *meta) {
if (!(mStreamMask & stream)) {
return UNKNOWN_ERROR;
}
sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
*meta = packetSource->getFormat();
if (*meta == NULL) {
return -EWOULDBLOCK;
}
if (stream == STREAMTYPE_AUDIO) {
// set AAC input buffer size to 32K bytes (256kbps x 1sec)
(*meta)->setInt32(kKeyMaxInputSize, 32 * 1024);
} else if (stream == STREAMTYPE_VIDEO) {
(*meta)->setInt32(kKeyMaxWidth, mMaxWidth);
(*meta)->setInt32(kKeyMaxHeight, mMaxHeight);
}
return OK;
}
sp<HTTPDownloader> LiveSession::getHTTPDownloader() {
return new HTTPDownloader(mHTTPService, mExtraHeaders);
}
void LiveSession::setBufferingSettings(
const BufferingSettings &buffering) {
sp<AMessage> msg = new AMessage(kWhatSetBufferingSettings, this);
writeToAMessage(msg, buffering);
msg->post();
}
void LiveSession::connectAsync(
const char *url, const KeyedVector<String8, String8> *headers) {
sp<AMessage> msg = new AMessage(kWhatConnect, this);
msg->setString("url", url);
if (headers != NULL) {
msg->setPointer(
"headers",
new KeyedVector<String8, String8>(*headers));
}
msg->post();
}
status_t LiveSession::disconnect() {
sp<AMessage> msg = new AMessage(kWhatDisconnect, this);
sp<AMessage> response;
status_t err = msg->postAndAwaitResponse(&response);
return err;
}
status_t LiveSession::seekTo(int64_t timeUs, MediaPlayerSeekMode mode) {
sp<AMessage> msg = new AMessage(kWhatSeek, this);
msg->setInt64("timeUs", timeUs);
msg->setInt32("mode", mode);
sp<AMessage> response;
status_t err = msg->postAndAwaitResponse(&response);
return err;
}
bool LiveSession::checkSwitchProgress(
sp<AMessage> &stopParams, int64_t delayUs, bool *needResumeUntil) {
AString newUri;
CHECK(stopParams->findString("uri", &newUri));
*needResumeUntil = false;
sp<AMessage> firstNewMeta[kMaxStreams];
for (size_t i = 0; i < kMaxStreams; ++i) {
StreamType stream = indexToType(i);
if (!(mSwapMask & mNewStreamMask & stream)
|| (mStreams[i].mNewUri != newUri)) {
continue;
}
if (stream == STREAMTYPE_SUBTITLES) {
continue;
}
sp<AnotherPacketSource> &source = mPacketSources.editValueAt(i);
// First, get latest dequeued meta, which is where the decoder is at.
// (when upswitching, we take the meta after a certain delay, so that
// the decoder is left with some cushion)
sp<AMessage> lastDequeueMeta, lastEnqueueMeta;
if (delayUs > 0) {
lastDequeueMeta = source->getMetaAfterLastDequeued(delayUs);
if (lastDequeueMeta == NULL) {
// this means we don't have enough cushion, try again later
ALOGV("[%s] up switching failed due to insufficient buffer",
getNameForStream(stream));
return false;
}
} else {
// It's okay for lastDequeueMeta to be NULL here, it means the
// decoder hasn't even started dequeueing
lastDequeueMeta = source->getLatestDequeuedMeta();
}
// Then, trim off packets at beginning of mPacketSources2 that's before
// the latest dequeued time. These samples are definitely too late.
firstNewMeta[i] = mPacketSources2.editValueAt(i)
->trimBuffersBeforeMeta(lastDequeueMeta);
// Now firstNewMeta[i] is the first sample after the trim.
// If it's NULL, we failed because dequeue already past all samples
// in mPacketSource2, we have to try again.
if (firstNewMeta[i] == NULL) {
HLSTime dequeueTime(lastDequeueMeta);
ALOGV("[%s] dequeue time (%d, %lld) past start time",
getNameForStream(stream),
dequeueTime.mSeq, (long long) dequeueTime.mTimeUs);
return false;
}
// Otherwise, we check if mPacketSources2 overlaps with what old fetcher
// already fetched, and see if we need to resumeUntil
lastEnqueueMeta = source->getLatestEnqueuedMeta();
// lastEnqueueMeta == NULL means old fetcher stopped at a discontinuity
// boundary, no need to resume as the content will look different anyways
if (lastEnqueueMeta != NULL) {
HLSTime lastTime(lastEnqueueMeta), startTime(firstNewMeta[i]);
// no need to resume old fetcher if new fetcher started in different
// discontinuity sequence, as the content will look different.
*needResumeUntil |= (startTime.mSeq == lastTime.mSeq
&& startTime.mTimeUs - lastTime.mTimeUs > kResumeThresholdUs);
// update the stopTime for resumeUntil
stopParams->setInt32("discontinuitySeq", startTime.mSeq);
stopParams->setInt64(getKeyForStream(stream), startTime.mTimeUs);
}
}
// if we're here, it means dequeue progress hasn't passed some samples in
// mPacketSource2, we can trim off the excess in mPacketSource.
// (old fetcher might still need to resumeUntil the start time of new fetcher)
for (size_t i = 0; i < kMaxStreams; ++i) {
StreamType stream = indexToType(i);
if (!(mSwapMask & mNewStreamMask & stream)
|| (newUri != mStreams[i].mNewUri)
|| stream == STREAMTYPE_SUBTITLES) {
continue;
}
mPacketSources.valueFor(stream)->trimBuffersAfterMeta(firstNewMeta[i]);
}
// no resumeUntil if already underflow
*needResumeUntil &= !mBuffering;
return true;
}
void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
switch (msg->what()) {
case kWhatSetBufferingSettings:
{
readFromAMessage(msg, &mBufferingSettings);
break;
}
case kWhatConnect:
{
onConnect(msg);
break;
}
case kWhatDisconnect:
{
CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID));
if (mReconfigurationInProgress) {
break;
}
finishDisconnect();
break;
}
case kWhatSeek:
{
if (mReconfigurationInProgress) {
msg->post(50000);
break;
}
CHECK(msg->senderAwaitsResponse(&mSeekReplyID));
mSeekReply = new AMessage;
onSeek(msg);
break;
}
case kWhatFetcherNotify:
{
int32_t what;
CHECK(msg->findInt32("what", &what));
switch (what) {
case PlaylistFetcher::kWhatStarted:
break;
case PlaylistFetcher::kWhatPaused:
case PlaylistFetcher::kWhatStopped:
{
AString uri;
CHECK(msg->findString("uri", &uri));
ssize_t index = mFetcherInfos.indexOfKey(uri);
if (index < 0) {
// ignore msgs from fetchers that's already gone
break;
}
ALOGV("fetcher-%d %s",
mFetcherInfos[index].mFetcher->getFetcherID(),
what == PlaylistFetcher::kWhatPaused ?
"paused" : "stopped");
if (what == PlaylistFetcher::kWhatStopped) {
mFetcherLooper->unregisterHandler(
mFetcherInfos[index].mFetcher->id());
mFetcherInfos.removeItemsAt(index);
} else if (what == PlaylistFetcher::kWhatPaused) {
int32_t seekMode;
CHECK(msg->findInt32("seekMode", &seekMode));
for (size_t i = 0; i < kMaxStreams; ++i) {
if (mStreams[i].mUri == uri) {
mStreams[i].mSeekMode = (SeekMode) seekMode;
}
}
}
if (mContinuation != NULL) {
CHECK_GT(mContinuationCounter, 0u);
if (--mContinuationCounter == 0) {
mContinuation->post();
}
ALOGV("%zu fetcher(s) left", mContinuationCounter);
}
break;
}
case PlaylistFetcher::kWhatDurationUpdate:
{
AString uri;
CHECK(msg->findString("uri", &uri));
int64_t durationUs;
CHECK(msg->findInt64("durationUs", &durationUs));
ssize_t index = mFetcherInfos.indexOfKey(uri);
if (index >= 0) {
FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
info->mDurationUs = durationUs;
}
break;
}
case PlaylistFetcher::kWhatTargetDurationUpdate:
{
int64_t targetDurationUs;
CHECK(msg->findInt64("targetDurationUs", &targetDurationUs));
mUpSwitchMark = min(kUpSwitchMarkUs, targetDurationUs * 7 / 4);
mDownSwitchMark = min(kDownSwitchMarkUs, targetDurationUs * 9 / 4);
mUpSwitchMargin = min(kUpSwitchMarginUs, targetDurationUs);
break;
}
case PlaylistFetcher::kWhatError:
{
status_t err;
CHECK(msg->findInt32("err", &err));
ALOGE("XXX Received error %d from PlaylistFetcher.", err);
// handle EOS on subtitle tracks independently
AString uri;
if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) {
ssize_t i = mFetcherInfos.indexOfKey(uri);
if (i >= 0) {
const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher;
if (fetcher != NULL) {
uint32_t type = fetcher->getStreamTypeMask();
if (type == STREAMTYPE_SUBTITLES) {
mPacketSources.valueFor(
STREAMTYPE_SUBTITLES)->signalEOS(err);;
break;
}
}
}
}
// remember the failure index (as mCurBandwidthIndex will be restored
// after cancelBandwidthSwitch()), and record last fail time
size_t failureIndex = mCurBandwidthIndex;
mBandwidthItems.editItemAt(
failureIndex).mLastFailureUs = ALooper::GetNowUs();
if (mSwitchInProgress) {
// if error happened when we switch to a variant, try fallback
// to other variant to save the session
if (tryBandwidthFallback()) {
break;
}
}
if (mInPreparationPhase) {
postPrepared(err);
}
cancelBandwidthSwitch();
mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
mPacketSources.valueFor(
STREAMTYPE_SUBTITLES)->signalEOS(err);
postError(err);
break;
}
case PlaylistFetcher::kWhatStopReached:
{
ALOGV("kWhatStopReached");
AString oldUri;
CHECK(msg->findString("uri", &oldUri));
ssize_t index = mFetcherInfos.indexOfKey(oldUri);
if (index < 0) {
break;
}
tryToFinishBandwidthSwitch(oldUri);
break;
}
case PlaylistFetcher::kWhatStartedAt:
{
int32_t switchGeneration;
CHECK(msg->findInt32("switchGeneration", &switchGeneration));
ALOGV("kWhatStartedAt: switchGen=%d, mSwitchGen=%d",
switchGeneration, mSwitchGeneration);
if (switchGeneration != mSwitchGeneration) {
break;
}
AString uri;
CHECK(msg->findString("uri", &uri));
// mark new fetcher mToBeResumed
ssize_t index = mFetcherInfos.indexOfKey(uri);
if (index >= 0) {
mFetcherInfos.editValueAt(index).mToBeResumed = true;
}
// temporarily disable packet sources to be swapped to prevent
// NuPlayerDecoder from dequeuing while we check progress
for (size_t i = 0; i < mPacketSources.size(); ++i) {
if ((mSwapMask & mPacketSources.keyAt(i))
&& uri == mStreams[i].mNewUri) {
mPacketSources.editValueAt(i)->enable(false);
}
}
bool switchUp = (mCurBandwidthIndex > mOrigBandwidthIndex);
// If switching up, require a cushion bigger than kUnderflowMark
// to avoid buffering immediately after the switch.
// (If we don't have that cushion we'd rather cancel and try again.)
int64_t delayUs =
switchUp ?
(kUnderflowMarkMs * 1000LL + 1000000LL)
: 0;
bool needResumeUntil = false;
sp<AMessage> stopParams = msg;
if (checkSwitchProgress(stopParams, delayUs, &needResumeUntil)) {
// playback time hasn't passed startAt time
if (!needResumeUntil) {
ALOGV("finish switch");
for (size_t i = 0; i < kMaxStreams; ++i) {
if ((mSwapMask & indexToType(i))
&& uri == mStreams[i].mNewUri) {
// have to make a copy of mStreams[i].mUri because
// tryToFinishBandwidthSwitch is modifying mStreams[]
AString oldURI = mStreams[i].mUri;
tryToFinishBandwidthSwitch(oldURI);
break;
}
}
} else {
// startAt time is after last enqueue time
// Resume fetcher for the original variant; the resumed fetcher should
// continue until the timestamps found in msg, which is stored by the
// new fetcher to indicate where the new variant has started buffering.
ALOGV("finish switch with resumeUntilAsync");
for (size_t i = 0; i < mFetcherInfos.size(); i++) {
const FetcherInfo &info = mFetcherInfos.valueAt(i);
if (info.mToBeRemoved) {
info.mFetcher->resumeUntilAsync(stopParams);
}
}
}
} else {
// playback time passed startAt time
if (switchUp) {
// if switching up, cancel and retry if condition satisfies again
ALOGV("cancel up switch because we're too late");
cancelBandwidthSwitch(true /* resume */);
} else {
ALOGV("retry down switch at next sample");
resumeFetcher(uri, mSwapMask, -1, true /* newUri */);
}
}
// re-enable all packet sources
for (size_t i = 0; i < mPacketSources.size(); ++i) {
mPacketSources.editValueAt(i)->enable(true);
}
break;
}
case PlaylistFetcher::kWhatPlaylistFetched:
{
onMasterPlaylistFetched(msg);
break;
}
case PlaylistFetcher::kWhatMetadataDetected:
{
if (!mHasMetadata) {
mHasMetadata = true;
sp<AMessage> notify = mNotify->dup();
notify->setInt32("what", kWhatMetadataDetected);
notify->post();
}
break;
}
default:
TRESPASS();
}
break;
}
case kWhatChangeConfiguration:
{
onChangeConfiguration(msg);
break;
}
case kWhatChangeConfiguration2:
{
onChangeConfiguration2(msg);
break;
}
case kWhatChangeConfiguration3:
{
onChangeConfiguration3(msg);
break;
}
case kWhatPollBuffering:
{
int32_t generation;
CHECK(msg->findInt32("generation", &generation));
if (generation == mPollBufferingGeneration) {
onPollBuffering();
}
break;
}
default:
TRESPASS();
break;
}
}
// static
bool LiveSession::isBandwidthValid(const BandwidthItem &item) {
static const int64_t kBlacklistWindowUs = 300 * 1000000LL;
return item.mLastFailureUs < 0
|| ALooper::GetNowUs() - item.mLastFailureUs > kBlacklistWindowUs;
}
// static
int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
if (a->mBandwidth < b->mBandwidth) {
return -1;
} else if (a->mBandwidth == b->mBandwidth) {
return 0;
}
return 1;
}
// static
LiveSession::StreamType LiveSession::indexToType(int idx) {
CHECK(idx >= 0 && idx < kNumSources);
return (StreamType)(1 << idx);
}
// static
ssize_t LiveSession::typeToIndex(int32_t type) {
switch (type) {
case STREAMTYPE_AUDIO:
return 0;
case STREAMTYPE_VIDEO:
return 1;
case STREAMTYPE_SUBTITLES:
return 2;
case STREAMTYPE_METADATA:
return 3;
default:
return -1;
};
return -1;
}
void LiveSession::onConnect(const sp<AMessage> &msg) {
CHECK(msg->findString("url", &mMasterURL));
// TODO currently we don't know if we are coming here from incognito mode
ALOGI("onConnect %s", uriDebugString(mMasterURL).c_str());
KeyedVector<String8, String8> *headers = NULL;
if (!msg->findPointer("headers", (void **)&headers)) {
mExtraHeaders.clear();
} else {
mExtraHeaders = *headers;
delete headers;
headers = NULL;
}
// create looper for fetchers
if (mFetcherLooper == NULL) {
mFetcherLooper = new ALooper();
mFetcherLooper->setName("Fetcher");
mFetcherLooper->start(false, /* runOnCallingThread */
true /* canCallJava */);
}
// create fetcher to fetch the master playlist
addFetcher(mMasterURL.c_str())->fetchPlaylistAsync();
}
void LiveSession::onMasterPlaylistFetched(const sp<AMessage> &msg) {
AString uri;
CHECK(msg->findString("uri", &uri));
ssize_t index = mFetcherInfos.indexOfKey(uri);
if (index < 0) {
ALOGW("fetcher for master playlist is gone.");
return;
}
// no longer useful, remove
mFetcherLooper->unregisterHandler(mFetcherInfos[index].mFetcher->id());
mFetcherInfos.removeItemsAt(index);
CHECK(msg->findObject("playlist", (sp<RefBase> *)&mPlaylist));
if (mPlaylist == NULL) {
ALOGE("unable to fetch master playlist %s.",
uriDebugString(mMasterURL).c_str());
postPrepared(ERROR_IO);
return;
}
// We trust the content provider to make a reasonable choice of preferred
// initial bandwidth by listing it first in the variant playlist.
// At startup we really don't have a good estimate on the available
// network bandwidth since we haven't tranferred any data yet. Once
// we have we can make a better informed choice.
size_t initialBandwidth = 0;
size_t initialBandwidthIndex = 0;
int32_t maxWidth = 0;
int32_t maxHeight = 0;
if (mPlaylist->isVariantPlaylist()) {
Vector<BandwidthItem> itemsWithVideo;
for (size_t i = 0; i < mPlaylist->size(); ++i) {
BandwidthItem item;
item.mPlaylistIndex = i;
item.mLastFailureUs = -1LL;
sp<AMessage> meta;
AString uri;
mPlaylist->itemAt(i, &uri, &meta);
CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
int32_t width, height;
if (meta->findInt32("width", &width)) {
maxWidth = max(maxWidth, width);
}
if (meta->findInt32("height", &height)) {
maxHeight = max(maxHeight, height);
}
mBandwidthItems.push(item);
if (mPlaylist->hasType(i, "video")) {
itemsWithVideo.push(item);
}
}
// remove the audio-only variants if we have at least one with video
if (!itemsWithVideo.empty()
&& itemsWithVideo.size() < mBandwidthItems.size()) {
mBandwidthItems.clear();
for (size_t i = 0; i < itemsWithVideo.size(); ++i) {
mBandwidthItems.push(itemsWithVideo[i]);
}
}
CHECK_GT(mBandwidthItems.size(), 0u);
initialBandwidth = mBandwidthItems[0].mBandwidth;
mBandwidthItems.sort(SortByBandwidth);
for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
initialBandwidthIndex = i;
break;
}
}
} else {
// dummy item.
BandwidthItem item;
item.mPlaylistIndex = 0;
item.mBandwidth = 0;
mBandwidthItems.push(item);
}
mMaxWidth = maxWidth > 0 ? maxWidth : mMaxWidth;
mMaxHeight = maxHeight > 0 ? maxHeight : mMaxHeight;
mPlaylist->pickRandomMediaItems();
changeConfiguration(
0LL /* timeUs */, initialBandwidthIndex, false /* pickTrack */);
}
void LiveSession::finishDisconnect() {
ALOGV("finishDisconnect");
// No reconfiguration is currently pending, make sure none will trigger
// during disconnection either.
cancelBandwidthSwitch();
// cancel buffer polling
cancelPollBuffering();
// TRICKY: don't wait for all fetcher to be stopped when disconnecting
//
// Some fetchers might be stuck in connect/getSize at this point. These
// operations will eventually timeout (as we have a timeout set in
// MediaHTTPConnection), but we don't want to block the main UI thread
// until then. Here we just need to make sure we clear all references
// to the fetchers, so that when they finally exit from the blocking
// operation, they can be destructed.
//
// There is one very tricky point though. For this scheme to work, the
// fecther must hold a reference to LiveSession, so that LiveSession is
// destroyed after fetcher. Otherwise LiveSession would get stuck in its
// own destructor when it waits for mFetcherLooper to stop, which still
// blocks main UI thread.
for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
mFetcherInfos.valueAt(i).mFetcher->stopAsync();
mFetcherLooper->unregisterHandler(
mFetcherInfos.valueAt(i).mFetcher->id());
}
mFetcherInfos.clear();
mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM);
mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM);
mPacketSources.valueFor(
STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM);
sp<AMessage> response = new AMessage;
response->setInt32("err", OK);
response->postReply(mDisconnectReplyID);
mDisconnectReplyID.clear();
}
sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
ssize_t index = mFetcherInfos.indexOfKey(uri);
if (index >= 0) {
return NULL;
}
sp<AMessage> notify = new AMessage(kWhatFetcherNotify, this);
notify->setString("uri", uri);
notify->setInt32("switchGeneration", mSwitchGeneration);
FetcherInfo info;
info.mFetcher = new PlaylistFetcher(
notify, this, uri, mCurBandwidthIndex, mSubtitleGeneration);
info.mDurationUs = -1LL;
info.mToBeRemoved = false;
info.mToBeResumed = false;
mFetcherLooper->registerHandler(info.mFetcher);
mFetcherInfos.add(uri, info);
return info.mFetcher;
}
#if 0
static double uniformRand() {
return (double)rand() / RAND_MAX;
}
#endif
bool LiveSession::UriIsSameAsIndex(const AString &uri, int32_t i, bool newUri) {
ALOGV("[timed_id3] i %d UriIsSameAsIndex newUri %s, %s", i,
newUri ? "true" : "false",
newUri ? mStreams[i].mNewUri.c_str() : mStreams[i].mUri.c_str());
return i >= 0
&& ((!newUri && uri == mStreams[i].mUri)
|| (newUri && uri == mStreams[i].mNewUri));
}
sp<AnotherPacketSource> LiveSession::getPacketSourceForStreamIndex(
size_t trackIndex, bool newUri) {
StreamType type = indexToType(trackIndex);
sp<AnotherPacketSource> source = NULL;
if (newUri) {
source = mPacketSources2.valueFor(type);
source->clear();
} else {
source = mPacketSources.valueFor(type);
};
return source;
}
sp<AnotherPacketSource> LiveSession::getMetadataSource(
sp<AnotherPacketSource> sources[kNumSources], uint32_t streamMask, bool newUri) {
// todo: One case where the following strategy can fail is when audio and video
// are in separate playlists, both are transport streams, and the metadata
// is actually contained in the audio stream.
ALOGV("[timed_id3] getMetadataSourceForUri streamMask %x newUri %s",
streamMask, newUri ? "true" : "false");
if ((sources[kVideoIndex] != NULL) // video fetcher; or ...
|| (!(streamMask & STREAMTYPE_VIDEO) && sources[kAudioIndex] != NULL)) {
// ... audio fetcher for audio only variant
return getPacketSourceForStreamIndex(kMetaDataIndex, newUri);
}
return NULL;
}
bool LiveSession::resumeFetcher(
const AString &uri, uint32_t streamMask, int64_t timeUs, bool newUri) {
ssize_t index = mFetcherInfos.indexOfKey(uri);
if (index < 0) {
ALOGE("did not find fetcher for uri: %s", uriDebugString(uri).c_str());
return false;
}
bool resume = false;
sp<AnotherPacketSource> sources[kNumSources];
for (size_t i = 0; i < kMaxStreams; ++i) {
if ((streamMask & indexToType(i)) && UriIsSameAsIndex(uri, i, newUri)) {
resume = true;
sources[i] = getPacketSourceForStreamIndex(i, newUri);
}
}
if (resume) {
sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(index).mFetcher;
SeekMode seekMode = newUri ? kSeekModeNextSample : kSeekModeExactPosition;
ALOGV("resuming fetcher-%d, timeUs=%lld, seekMode=%d",
fetcher->getFetcherID(), (long long)timeUs, seekMode);
fetcher->startAsync(
sources[kAudioIndex],
sources[kVideoIndex],
sources[kSubtitleIndex],
getMetadataSource(sources, streamMask, newUri),
timeUs, -1, -1, seekMode);
}
return resume;
}
float LiveSession::getAbortThreshold(
ssize_t currentBWIndex, ssize_t targetBWIndex) const {
float abortThreshold = -1.0f;
if (currentBWIndex > 0 && targetBWIndex < currentBWIndex) {
/*
If we're switching down, we need to decide whether to
1) finish last segment of high-bandwidth variant, or
2) abort last segment of high-bandwidth variant, and fetch an
overlapping portion from low-bandwidth variant.
Here we try to maximize the amount of buffer left when the
switch point is met. Given the following parameters:
B: our current buffering level in seconds
T: target duration in seconds
X: sample duration in seconds remain to fetch in last segment
bw0: bandwidth of old variant (as specified in playlist)
bw1: bandwidth of new variant (as specified in playlist)
bw: measured bandwidth available
If we choose 1), when switch happens at the end of current
segment, our buffering will be
B + X - X * bw0 / bw
If we choose 2), when switch happens where we aborted current
segment, our buffering will be
B - (T - X) * bw1 / bw
We should only choose 1) if
X/T < bw1 / (bw1 + bw0 - bw)
*/
// abort old bandwidth immediately if bandwidth is fluctuating a lot.
// our estimate could be far off, and fetching old bandwidth could
// take too long.
if (!mLastBandwidthStable) {
return 0.0f;
}
// Taking the measured current bandwidth at 50% face value only,
// as our bandwidth estimation is a lagging indicator. Being
// conservative on this, we prefer switching to lower bandwidth
// unless we're really confident finishing up the last segment
// of higher bandwidth will be fast.
CHECK(mLastBandwidthBps >= 0);
abortThreshold =
(float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth
/ ((float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth
+ (float)mBandwidthItems.itemAt(currentBWIndex).mBandwidth
- (float)mLastBandwidthBps * 0.5f);
if (abortThreshold < 0.0f) {
abortThreshold = -1.0f; // do not abort
}
ALOGV("Switching Down: bps %ld => %ld, measured %d, abort ratio %.2f",
mBandwidthItems.itemAt(currentBWIndex).mBandwidth,
mBandwidthItems.itemAt(targetBWIndex).mBandwidth,
mLastBandwidthBps,
abortThreshold);
}
return abortThreshold;
}
void LiveSession::addBandwidthMeasurement(size_t numBytes, int64_t delayUs) {
mBandwidthEstimator->addBandwidthMeasurement(numBytes, delayUs);
}
ssize_t LiveSession::getLowestValidBandwidthIndex() const {
for (size_t index = 0; index < mBandwidthItems.size(); index++) {
if (isBandwidthValid(mBandwidthItems[index])) {
return index;
}
}
// if playlists are all blacklisted, return 0 and hope it's alive
return 0;
}
size_t LiveSession::getBandwidthIndex(int32_t bandwidthBps) {
if (mBandwidthItems.size() < 2) {
// shouldn't be here if we only have 1 bandwidth, check
// logic to get rid of redundant bandwidth polling
ALOGW("getBandwidthIndex() called for single bandwidth playlist!");
return 0;
}
#if 1
char value[PROPERTY_VALUE_MAX];
ssize_t index = -1;
if (property_get("media.httplive.bw-index", value, NULL)) {
char *end;
index = strtol(value, &end, 10);
CHECK(end > value && *end == '\0');
if (index >= 0 && (size_t)index >= mBandwidthItems.size()) {
index = mBandwidthItems.size() - 1;
}
}
if (index < 0) {
char value[PROPERTY_VALUE_MAX];
if (property_get("media.httplive.max-bw", value, NULL)) {
char *end;
long maxBw = strtoul(value, &end, 10);
if (end > value && *end == '\0') {
if (maxBw > 0 && bandwidthBps > maxBw) {
ALOGV("bandwidth capped to %ld bps", maxBw);
bandwidthBps = maxBw;
}
}
}
// Pick the highest bandwidth stream that's not currently blacklisted
// below or equal to estimated bandwidth.
index = mBandwidthItems.size() - 1;
ssize_t lowestBandwidth = getLowestValidBandwidthIndex();
while (index > lowestBandwidth) {
// be conservative (70%) to avoid overestimating and immediately
// switching down again.
size_t adjustedBandwidthBps = bandwidthBps * .7f;
const BandwidthItem &item = mBandwidthItems[index];
if (item.mBandwidth <= adjustedBandwidthBps
&& isBandwidthValid(item)) {
break;
}
--index;
}
}
#elif 0
// Change bandwidth at random()
size_t index = uniformRand() * mBandwidthItems.size();
#elif 0
// There's a 50% chance to stay on the current bandwidth and
// a 50% chance to switch to the next higher bandwidth (wrapping around
// to lowest)
const size_t kMinIndex = 0;
static ssize_t mCurBandwidthIndex = -1;
size_t index;
if (mCurBandwidthIndex < 0) {
index = kMinIndex;
} else if (uniformRand() < 0.5) {
index = (size_t)mCurBandwidthIndex;
} else {
index = mCurBandwidthIndex + 1;
if (index == mBandwidthItems.size()) {
index = kMinIndex;
}
}
mCurBandwidthIndex = index;
#elif 0
// Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
size_t index = mBandwidthItems.size() - 1;
while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
--index;
}
#elif 1
char value[PROPERTY_VALUE_MAX];
size_t index;
if (property_get("media.httplive.bw-index", value, NULL)) {
char *end;
index = strtoul(value, &end, 10);
CHECK(end > value && *end == '\0');
if (index >= mBandwidthItems.size()) {
index = mBandwidthItems.size() - 1;
}
} else {
index = 0;
}
#else
size_t index = mBandwidthItems.size() - 1; // Highest bandwidth stream
#endif
CHECK_GE(index, 0);
return index;
}
HLSTime LiveSession::latestMediaSegmentStartTime() const {
HLSTime audioTime(mPacketSources.valueFor(
STREAMTYPE_AUDIO)->getLatestDequeuedMeta());
HLSTime videoTime(mPacketSources.valueFor(
STREAMTYPE_VIDEO)->getLatestDequeuedMeta());
return audioTime < videoTime ? videoTime : audioTime;
}
void LiveSession::onSeek(const sp<AMessage> &msg) {
int64_t timeUs;
int32_t mode;
CHECK(msg->findInt64("timeUs", &timeUs));
CHECK(msg->findInt32("mode", &mode));
// TODO: add "mode" to changeConfiguration.
changeConfiguration(timeUs/* , (MediaPlayerSeekMode)mode */);
}
status_t LiveSession::getDuration(int64_t *durationUs) const {
int64_t maxDurationUs = -1LL;
for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs;
if (fetcherDurationUs > maxDurationUs) {
maxDurationUs = fetcherDurationUs;
}
}
*durationUs = maxDurationUs;
return OK;
}
bool LiveSession::isSeekable() const {
int64_t durationUs;
return getDuration(&durationUs) == OK && durationUs >= 0;
}
bool LiveSession::hasDynamicDuration() const {
return false;
}
size_t LiveSession::getTrackCount() const {
if (mPlaylist == NULL) {
return 0;
} else {
return mPlaylist->getTrackCount() + (mHasMetadata ? 1 : 0);
}
}
sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const {
if (mPlaylist == NULL) {
return NULL;
} else {
if (trackIndex == mPlaylist->getTrackCount() && mHasMetadata) {
sp<AMessage> format = new AMessage();
format->setInt32("type", MEDIA_TRACK_TYPE_METADATA);
format->setString("language", "und");
format->setString("mime", MEDIA_MIMETYPE_DATA_TIMED_ID3);
return format;
}
return mPlaylist->getTrackInfo(trackIndex);
}
}
status_t LiveSession::selectTrack(size_t index, bool select) {
if (mPlaylist == NULL) {
return INVALID_OPERATION;
}
ALOGV("selectTrack: index=%zu, select=%d, mSubtitleGen=%d++",
index, select, mSubtitleGeneration);
++mSubtitleGeneration;
status_t err = mPlaylist->selectTrack(index, select);
if (err == OK) {
sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, this);
msg->setInt32("pickTrack", select);
msg->post();
}
return err;
}
ssize_t LiveSession::getSelectedTrack(media_track_type type) const {
if (mPlaylist == NULL) {
return -1;
} else {
return mPlaylist->getSelectedTrack(type);
}
}
void LiveSession::changeConfiguration(
int64_t timeUs, ssize_t bandwidthIndex, bool pickTrack) {
ALOGV("changeConfiguration: timeUs=%lld us, bwIndex=%zd, pickTrack=%d",
(long long)timeUs, bandwidthIndex, pickTrack);
cancelBandwidthSwitch();
CHECK(!mReconfigurationInProgress);
mReconfigurationInProgress = true;
if (bandwidthIndex >= 0) {
mOrigBandwidthIndex = mCurBandwidthIndex;
mCurBandwidthIndex = bandwidthIndex;
if (mOrigBandwidthIndex != mCurBandwidthIndex) {
ALOGI("#### Starting Bandwidth Switch: %zd => %zd",
mOrigBandwidthIndex, mCurBandwidthIndex);
}
}
CHECK_LT((size_t)mCurBandwidthIndex, mBandwidthItems.size());
const BandwidthItem &item = mBandwidthItems.itemAt(mCurBandwidthIndex);
uint32_t streamMask = 0; // streams that should be fetched by the new fetcher
uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher
AString URIs[kMaxStreams];
for (size_t i = 0; i < kMaxStreams; ++i) {
if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) {
streamMask |= indexToType(i);
}
}
// Step 1, stop and discard fetchers that are no longer needed.
// Pause those that we'll reuse.
for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
// skip fetchers that are marked mToBeRemoved,
// these are done and can't be reused
if (mFetcherInfos[i].mToBeRemoved) {
continue;
}
const AString &uri = mFetcherInfos.keyAt(i);
sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(i).mFetcher;
bool discardFetcher = true, delayRemoval = false;
for (size_t j = 0; j < kMaxStreams; ++j) {
StreamType type = indexToType(j);
if ((streamMask & type) && uri == URIs[j]) {
resumeMask |= type;
streamMask &= ~type;
discardFetcher = false;
}
}
// Delay fetcher removal if not picking tracks, AND old fetcher
// has stream mask that overlaps new variant. (Okay to discard
// old fetcher now, if completely no overlap.)
if (discardFetcher && timeUs < 0LL && !pickTrack
&& (fetcher->getStreamTypeMask() & streamMask)) {
discardFetcher = false;
delayRemoval = true;
}
if (discardFetcher) {
ALOGV("discarding fetcher-%d", fetcher->getFetcherID());
fetcher->stopAsync();
} else {
float threshold = 0.0f; // default to pause after current block (47Kbytes)
bool disconnect = false;
if (timeUs >= 0LL) {
// seeking, no need to finish fetching
disconnect = true;
} else if (delayRemoval) {
// adapting, abort if remaining of current segment is over threshold
threshold = getAbortThreshold(
mOrigBandwidthIndex, mCurBandwidthIndex);
}
ALOGV("pausing fetcher-%d, threshold=%.2f",
fetcher->getFetcherID(), threshold);
fetcher->pauseAsync(threshold, disconnect);
}
}
sp<AMessage> msg;
if (timeUs < 0LL) {
// skip onChangeConfiguration2 (decoder destruction) if not seeking.
msg = new AMessage(kWhatChangeConfiguration3, this);
} else {
msg = new AMessage(kWhatChangeConfiguration2, this);
}
msg->setInt32("streamMask", streamMask);
msg->setInt32("resumeMask", resumeMask);
msg->setInt32("pickTrack", pickTrack);
msg->setInt64("timeUs", timeUs);
for (size_t i = 0; i < kMaxStreams; ++i) {
if ((streamMask | resumeMask) & indexToType(i)) {
msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str());
}
}
// Every time a fetcher acknowledges the stopAsync or pauseAsync request
// we'll decrement mContinuationCounter, once it reaches zero, i.e. all
// fetchers have completed their asynchronous operation, we'll post
// mContinuation, which then is handled below in onChangeConfiguration2.
mContinuationCounter = mFetcherInfos.size();
mContinuation = msg;
if (mContinuationCounter == 0) {
msg->post();
}
}
void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
ALOGV("onChangeConfiguration");
if (!mReconfigurationInProgress) {
int32_t pickTrack = 0;
msg->findInt32("pickTrack", &pickTrack);
changeConfiguration(-1LL /* timeUs */, -1, pickTrack);
} else {
msg->post(1000000LL); // retry in 1 sec
}
}
void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
ALOGV("onChangeConfiguration2");
mContinuation.clear();
// All fetchers are either suspended or have been removed now.
// If we're seeking, clear all packet sources before we report
// seek complete, to prevent decoder from pulling stale data.
int64_t timeUs;
CHECK(msg->findInt64("timeUs", &timeUs));
if (timeUs >= 0) {
mLastSeekTimeUs = timeUs;
mLastDequeuedTimeUs = timeUs;
for (size_t i = 0; i < mPacketSources.size(); i++) {
sp<AnotherPacketSource> packetSource = mPacketSources.editValueAt(i);
sp<MetaData> format = packetSource->getFormat();
packetSource->clear();
// Set a tentative format here such that HTTPLiveSource will always have
// a format available when NuPlayer queries. Without an available video
// format when setting a surface NuPlayer might disable video decoding
// altogether. The tentative format will be overwritten by the
// authoritative (and possibly same) format once content from the new
// position is dequeued.
packetSource->setFormat(format);
}
for (size_t i = 0; i < kMaxStreams; ++i) {
mStreams[i].reset();
}
mDiscontinuityOffsetTimesUs.clear();
mDiscontinuityAbsStartTimesUs.clear();
if (mSeekReplyID != NULL) {
CHECK(mSeekReply != NULL);
mSeekReply->setInt32("err", OK);
mSeekReply->postReply(mSeekReplyID);
mSeekReplyID.clear();
mSeekReply.clear();
}
// restart buffer polling after seek becauese previous
// buffering position is no longer valid.
restartPollBuffering();
}
uint32_t streamMask, resumeMask;
CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
streamMask |= resumeMask;
AString URIs[kMaxStreams];
for (size_t i = 0; i < kMaxStreams; ++i) {
if (streamMask & indexToType(i)) {
const AString &uriKey = mStreams[i].uriKey();
CHECK(msg->findString(uriKey.c_str(), &URIs[i]));
ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str());
}
}
uint32_t changedMask = 0;
for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) {
// stream URI could change even if onChangeConfiguration2 is only
// used for seek. Seek could happen during a bw switch, in this
// case bw switch will be cancelled, but the seekTo position will
// fetch from the new URI.
if ((mStreamMask & streamMask & indexToType(i))
&& !mStreams[i].mUri.empty()
&& !(URIs[i] == mStreams[i].mUri)) {
ALOGV("stream %zu changed: oldURI %s, newURI %s", i,
mStreams[i].mUri.c_str(), URIs[i].c_str());
sp<AnotherPacketSource> source = mPacketSources.valueFor(indexToType(i));
if (source->getLatestDequeuedMeta() != NULL) {
source->queueDiscontinuity(
ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true);
}
}
// Determine which decoders to shutdown on the player side,
// a decoder has to be shutdown if its streamtype was active
// before but now longer isn't.
if ((mStreamMask & ~streamMask & indexToType(i))) {
changedMask |= indexToType(i);
}
}
if (changedMask == 0) {
// If nothing changed as far as the audio/video decoders
// are concerned we can proceed.
onChangeConfiguration3(msg);
return;
}
// Something changed, inform the player which will shutdown the
// corresponding decoders and will post the reply once that's done.
// Handling the reply will continue executing below in
// onChangeConfiguration3.
sp<AMessage> notify = mNotify->dup();
notify->setInt32("what", kWhatStreamsChanged);
notify->setInt32("changedMask", changedMask);
msg->setWhat(kWhatChangeConfiguration3);
msg->setTarget(this);
notify->setMessage("reply", msg);
notify->post();
}
void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
mContinuation.clear();
// All remaining fetchers are still suspended, the player has shutdown
// any decoders that needed it.
uint32_t streamMask, resumeMask;
CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
mNewStreamMask = streamMask | resumeMask;
int64_t timeUs;
int32_t pickTrack;
bool switching = false;
CHECK(msg->findInt64("timeUs", &timeUs));
CHECK(msg->findInt32("pickTrack", &pickTrack));
if (timeUs < 0LL) {
if (!pickTrack) {
// mSwapMask contains streams that are in both old and new variant,
// (in mNewStreamMask & mStreamMask) but with different URIs
// (not in resumeMask).
// For example, old variant has video and audio in two separate
// URIs, and new variant has only audio with unchanged URI. mSwapMask
// should be 0 as there is nothing to swap. We only need to stop video,
// and resume audio.
mSwapMask = mNewStreamMask & mStreamMask & ~resumeMask;
switching = (mSwapMask != 0);
}
mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs;
} else {
mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
}
ALOGV("onChangeConfiguration3: timeUs=%lld, switching=%d, pickTrack=%d, "
"mStreamMask=0x%x, mNewStreamMask=0x%x, mSwapMask=0x%x",
(long long)timeUs, switching, pickTrack,
mStreamMask, mNewStreamMask, mSwapMask);
for (size_t i = 0; i < kMaxStreams; ++i) {
if (streamMask & indexToType(i)) {
if (switching) {
CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri));
} else {
CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri));
}
}
}
// Of all existing fetchers:
// * Resume fetchers that are still needed and assign them original packet sources.
// * Mark otherwise unneeded fetchers for removal.
ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
const AString &uri = mFetcherInfos.keyAt(i);
if (!resumeFetcher(uri, resumeMask, timeUs)) {
ALOGV("marking fetcher-%d to be removed",
mFetcherInfos[i].mFetcher->getFetcherID());
mFetcherInfos.editValueAt(i).mToBeRemoved = true;
}
}
// streamMask now only contains the types that need a new fetcher created.
if (streamMask != 0) {
ALOGV("creating new fetchers for mask 0x%08x", streamMask);
}
// Find out when the original fetchers have buffered up to and start the new fetchers
// at a later timestamp.
for (size_t i = 0; i < kMaxStreams; i++) {
if (!(indexToType(i) & streamMask)) {
continue;
}
AString uri;
uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri;
sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
CHECK(fetcher != NULL);
HLSTime startTime;
SeekMode seekMode = kSeekModeExactPosition;
sp<AnotherPacketSource> sources[kNumSources];
if (i == kSubtitleIndex || (!pickTrack && !switching)) {
startTime = latestMediaSegmentStartTime();
}
// TRICKY: looping from i as earlier streams are already removed from streamMask
for (size_t j = i; j < kMaxStreams; ++j) {
const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri;
if ((streamMask & indexToType(j)) && uri == streamUri) {
sources[j] = mPacketSources.valueFor(indexToType(j));
if (timeUs >= 0) {
startTime.mTimeUs = timeUs;
} else {
int32_t type;
sp<AMessage> meta;
if (!switching) {
// selecting, or adapting but no swap required
meta = sources[j]->getLatestDequeuedMeta();
} else {
// adapting and swap required
meta = sources[j]->getLatestEnqueuedMeta();
if (meta != NULL && mCurBandwidthIndex > mOrigBandwidthIndex) {
// switching up
meta = sources[j]->getMetaAfterLastDequeued(mUpSwitchMargin);
}
}
if ((j == kAudioIndex || j == kVideoIndex)
&& meta != NULL && !meta->findInt32("discontinuity", &type)) {
HLSTime tmpTime(meta);
if (startTime < tmpTime) {
startTime = tmpTime;
}
}
if (!switching) {
// selecting, or adapting but no swap required
sources[j]->clear();
if (j == kSubtitleIndex) {
break;
}
ALOGV("stream[%zu]: queue format change", j);
sources[j]->queueDiscontinuity(
ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, true);
} else {
// switching, queue discontinuities after resume
sources[j] = mPacketSources2.valueFor(indexToType(j));
sources[j]->clear();
// the new fetcher might be providing streams that used to be
// provided by two different fetchers, if one of the fetcher
// paused in the middle while the other somehow paused in next
// seg, we have to start from next seg.
if (seekMode < mStreams[j].mSeekMode) {
seekMode = mStreams[j].mSeekMode;
}
}
}
streamMask &= ~indexToType(j);
}
}
ALOGV("[fetcher-%d] startAsync: startTimeUs %lld mLastSeekTimeUs %lld "
"segmentStartTimeUs %lld seekMode %d",
fetcher->getFetcherID(),
(long long)startTime.mTimeUs,
(long long)mLastSeekTimeUs,
(long long)startTime.getSegmentTimeUs(),
seekMode);
// Set the target segment start time to the middle point of the
// segment where the last sample was.
// This gives a better guess if segments of the two variants are not
// perfectly aligned. (If the corresponding segment in new variant
// starts slightly later than that in the old variant, we still want
// to pick that segment, not the one before)
fetcher->startAsync(
sources[kAudioIndex],
sources[kVideoIndex],
sources[kSubtitleIndex],
getMetadataSource(sources, mNewStreamMask, switching),
startTime.mTimeUs < 0 ? mLastSeekTimeUs : startTime.mTimeUs,
startTime.getSegmentTimeUs(),
startTime.mSeq,
seekMode);
}
// All fetchers have now been started, the configuration change
// has completed.
mReconfigurationInProgress = false;
if (switching) {
mSwitchInProgress = true;
} else {
mStreamMask = mNewStreamMask;
if (mOrigBandwidthIndex != mCurBandwidthIndex) {
ALOGV("#### Finished Bandwidth Switch Early: %zd => %zd",
mOrigBandwidthIndex, mCurBandwidthIndex);
mOrigBandwidthIndex = mCurBandwidthIndex;
}
}
ALOGV("onChangeConfiguration3: mSwitchInProgress %d, mStreamMask 0x%x",
mSwitchInProgress, mStreamMask);
if (mDisconnectReplyID != NULL) {
finishDisconnect();
}
}
void LiveSession::swapPacketSource(StreamType stream) {
ALOGV("[%s] swapPacketSource", getNameForStream(stream));
// transfer packets from source2 to source
sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
// queue discontinuity in mPacketSource
aps->queueDiscontinuity(ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, false);
// queue packets in mPacketSource2 to mPacketSource
status_t finalResult = OK;
sp<ABuffer> accessUnit;
while (aps2->hasBufferAvailable(&finalResult) && finalResult == OK &&
OK == aps2->dequeueAccessUnit(&accessUnit)) {
aps->queueAccessUnit(accessUnit);
}
aps2->clear();
}
void LiveSession::tryToFinishBandwidthSwitch(const AString &oldUri) {
if (!mSwitchInProgress) {
return;
}
ssize_t index = mFetcherInfos.indexOfKey(oldUri);
if (index < 0 || !mFetcherInfos[index].mToBeRemoved) {
return;
}
// Swap packet source of streams provided by old variant
for (size_t idx = 0; idx < kMaxStreams; idx++) {
StreamType stream = indexToType(idx);
if ((mSwapMask & stream) && (oldUri == mStreams[idx].mUri)) {
swapPacketSource(stream);
if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) {
ALOGW("swapping stream type %d %s to empty stream",
stream, uriDebugString(mStreams[idx].mUri).c_str());
}
mStreams[idx].mUri = mStreams[idx].mNewUri;
mStreams[idx].mNewUri.clear();
mSwapMask &= ~stream;
}
}
mFetcherInfos.editValueAt(index).mFetcher->stopAsync(false /* clear */);
ALOGV("tryToFinishBandwidthSwitch: mSwapMask=0x%x", mSwapMask);
if (mSwapMask != 0) {
return;
}
// Check if new variant contains extra streams.
uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
while (extraStreams) {
StreamType stream = (StreamType) (extraStreams & ~(extraStreams - 1));
extraStreams &= ~stream;
swapPacketSource(stream);
ssize_t idx = typeToIndex(stream);
CHECK(idx >= 0);
if (mStreams[idx].mNewUri.empty()) {
ALOGW("swapping extra stream type %d %s to empty stream",
stream, uriDebugString(mStreams[idx].mUri).c_str());
}
mStreams[idx].mUri = mStreams[idx].mNewUri;
mStreams[idx].mNewUri.clear();
}
// Restart new fetcher (it was paused after the first 47k block)
// and let it fetch into mPacketSources (not mPacketSources2)
for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
FetcherInfo &info = mFetcherInfos.editValueAt(i);
if (info.mToBeResumed) {
resumeFetcher(mFetcherInfos.keyAt(i), mNewStreamMask);
info.mToBeResumed = false;
}
}
ALOGI("#### Finished Bandwidth Switch: %zd => %zd",
mOrigBandwidthIndex, mCurBandwidthIndex);
mStreamMask = mNewStreamMask;
mSwitchInProgress = false;
mOrigBandwidthIndex = mCurBandwidthIndex;
restartPollBuffering();
}
void LiveSession::schedulePollBuffering() {
sp<AMessage> msg = new AMessage(kWhatPollBuffering, this);
msg->setInt32("generation", mPollBufferingGeneration);
msg->post(1000000LL);
}
void LiveSession::cancelPollBuffering() {
++mPollBufferingGeneration;
mPrevBufferPercentage = -1;
}
void LiveSession::restartPollBuffering() {
cancelPollBuffering();
onPollBuffering();
}
void LiveSession::onPollBuffering() {
ALOGV("onPollBuffering: mSwitchInProgress %d, mReconfigurationInProgress %d, "
"mInPreparationPhase %d, mCurBandwidthIndex %zd, mStreamMask 0x%x",
mSwitchInProgress, mReconfigurationInProgress,
mInPreparationPhase, mCurBandwidthIndex, mStreamMask);
bool underflow, ready, down, up;
if (checkBuffering(underflow, ready, down, up)) {
if (mInPreparationPhase) {
// Allow down switch even if we're still preparing.
//
// Some streams have a high bandwidth index as default,
// when bandwidth is low, it takes a long time to buffer
// to ready mark, then it immediately pauses after start
// as we have to do a down switch. It's better experience
// to restart from a lower index, if we detect low bw.
if (!switchBandwidthIfNeeded(false /* up */, down) && ready) {
postPrepared(OK);
}
}
if (!mInPreparationPhase) {
if (ready) {
stopBufferingIfNecessary();
} else if (underflow) {
startBufferingIfNecessary();
}
switchBandwidthIfNeeded(up, down);
}
}
schedulePollBuffering();
}
void LiveSession::cancelBandwidthSwitch(bool resume) {
ALOGV("cancelBandwidthSwitch: mSwitchGen(%d)++, orig %zd, cur %zd",
mSwitchGeneration, mOrigBandwidthIndex, mCurBandwidthIndex);
if (!mSwitchInProgress) {
return;
}
for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
FetcherInfo& info = mFetcherInfos.editValueAt(i);
if (info.mToBeRemoved) {
info.mToBeRemoved = false;
if (resume) {
resumeFetcher(mFetcherInfos.keyAt(i), mSwapMask);
}
}
}
for (size_t i = 0; i < kMaxStreams; ++i) {
AString newUri = mStreams[i].mNewUri;
if (!newUri.empty()) {
// clear all mNewUri matching this newUri
for (size_t j = i; j < kMaxStreams; ++j) {
if (mStreams[j].mNewUri == newUri) {
mStreams[j].mNewUri.clear();
}
}
ALOGV("stopping newUri = %s", newUri.c_str());
ssize_t index = mFetcherInfos.indexOfKey(newUri);
if (index < 0) {
ALOGE("did not find fetcher for newUri: %s", uriDebugString(newUri).c_str());
continue;
}
FetcherInfo &info = mFetcherInfos.editValueAt(index);
info.mToBeRemoved = true;
info.mFetcher->stopAsync();
}
}
ALOGI("#### Canceled Bandwidth Switch: %zd => %zd",
mOrigBandwidthIndex, mCurBandwidthIndex);
mSwitchGeneration++;
mSwitchInProgress = false;
mCurBandwidthIndex = mOrigBandwidthIndex;
mSwapMask = 0;
}
bool LiveSession::checkBuffering(
bool &underflow, bool &ready, bool &down, bool &up) {
underflow = ready = down = up = false;
if (mReconfigurationInProgress) {
ALOGV("Switch/Reconfig in progress, defer buffer polling");
return false;
}
size_t activeCount, underflowCount, readyCount, downCount, upCount;
activeCount = underflowCount = readyCount = downCount = upCount =0;
int32_t minBufferPercent = -1;
int64_t durationUs;
if (getDuration(&durationUs) != OK) {
durationUs = -1;
}
for (size_t i = 0; i < mPacketSources.size(); ++i) {
// we don't check subtitles for buffering level
if (!(mStreamMask & mPacketSources.keyAt(i)
& (STREAMTYPE_AUDIO | STREAMTYPE_VIDEO))) {
continue;
}
// ignore streams that never had any packet queued.
// (it's possible that the variant only has audio or video)
sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta();
if (meta == NULL) {
continue;
}
status_t finalResult;
int64_t bufferedDurationUs =
mPacketSources[i]->getBufferedDurationUs(&finalResult);
ALOGV("[%s] buffered %lld us",
getNameForStream(mPacketSources.keyAt(i)),
(long long)bufferedDurationUs);
if (durationUs >= 0) {
int32_t percent;
if (mPacketSources[i]->isFinished(0 /* duration */)) {
percent = 100;
} else {
percent = (int32_t)(100.0 *
(mLastDequeuedTimeUs + bufferedDurationUs) / durationUs);
}
if (minBufferPercent < 0 || percent < minBufferPercent) {
minBufferPercent = percent;
}
}
++activeCount;
int64_t readyMarkUs =
(mInPreparationPhase ?
mBufferingSettings.mInitialMarkMs :
mBufferingSettings.mResumePlaybackMarkMs) * 1000LL;
if (bufferedDurationUs > readyMarkUs
|| mPacketSources[i]->isFinished(0)) {
++readyCount;
}
if (!mPacketSources[i]->isFinished(0)) {
if (bufferedDurationUs < kUnderflowMarkMs * 1000LL) {
++underflowCount;
}
if (bufferedDurationUs > mUpSwitchMark) {
++upCount;
}
if (bufferedDurationUs < mDownSwitchMark) {
++downCount;
}
}
}
if (minBufferPercent >= 0) {
notifyBufferingUpdate(minBufferPercent);
}
if (activeCount > 0) {
up = (upCount == activeCount);
down = (downCount > 0);
ready = (readyCount == activeCount);
underflow = (underflowCount > 0);
return true;
}
return false;
}
void LiveSession::startBufferingIfNecessary() {
ALOGV("startBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d",
mInPreparationPhase, mBuffering);
if (!mBuffering) {
mBuffering = true;
sp<AMessage> notify = mNotify->dup();
notify->setInt32("what", kWhatBufferingStart);
notify->post();
}
}
void LiveSession::stopBufferingIfNecessary() {
ALOGV("stopBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d",
mInPreparationPhase, mBuffering);
if (mBuffering) {
mBuffering = false;
sp<AMessage> notify = mNotify->dup();
notify->setInt32("what", kWhatBufferingEnd);
notify->post();
}
}
void LiveSession::notifyBufferingUpdate(int32_t percentage) {
if (percentage < mPrevBufferPercentage) {
percentage = mPrevBufferPercentage;
} else if (percentage > 100) {
percentage = 100;
}
mPrevBufferPercentage = percentage;
ALOGV("notifyBufferingUpdate: percentage=%d%%", percentage);
sp<AMessage> notify = mNotify->dup();
notify->setInt32("what", kWhatBufferingUpdate);
notify->setInt32("percentage", percentage);
notify->post();
}
bool LiveSession::tryBandwidthFallback() {
if (mInPreparationPhase || mReconfigurationInProgress) {
// Don't try fallback during prepare or reconfig.
// If error happens there, it's likely unrecoverable.
return false;
}
if (mCurBandwidthIndex > mOrigBandwidthIndex) {
// if we're switching up, simply cancel and resume old variant
cancelBandwidthSwitch(true /* resume */);
return true;
} else {
// if we're switching down, we're likely about to underflow (if
// not already underflowing). try the lowest viable bandwidth if
// not on that variant already.
ssize_t lowestValid = getLowestValidBandwidthIndex();
if (mCurBandwidthIndex > lowestValid) {
cancelBandwidthSwitch();
changeConfiguration(-1LL, lowestValid);
return true;
}
}
// return false if we couldn't find any fallback
return false;
}
/*
* returns true if a bandwidth switch is actually needed (and started),
* returns false otherwise
*/
bool LiveSession::switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow) {
// no need to check bandwidth if we only have 1 bandwidth settings
if (mBandwidthItems.size() < 2) {
return false;
}
if (mSwitchInProgress) {
if (mBuffering) {
tryBandwidthFallback();
}
return false;
}
int32_t bandwidthBps, shortTermBps;
bool isStable;
if (mBandwidthEstimator->estimateBandwidth(
&bandwidthBps, &isStable, &shortTermBps)) {
ALOGV("bandwidth estimated at %.2f kbps, "
"stable %d, shortTermBps %.2f kbps",
bandwidthBps / 1024.0f, isStable, shortTermBps / 1024.0f);
mLastBandwidthBps = bandwidthBps;
mLastBandwidthStable = isStable;
} else {
ALOGV("no bandwidth estimate.");
return false;
}
int32_t curBandwidth = mBandwidthItems.itemAt(mCurBandwidthIndex).mBandwidth;
// canSwithDown and canSwitchUp can't both be true.
// we only want to switch up when measured bw is 120% higher than current variant,
// and we only want to switch down when measured bw is below current variant.
bool canSwitchDown = bufferLow
&& (bandwidthBps < (int32_t)curBandwidth);
bool canSwitchUp = bufferHigh
&& (bandwidthBps > (int32_t)curBandwidth * 12 / 10);
if (canSwitchDown || canSwitchUp) {
// bandwidth estimating has some delay, if we have to downswitch when
// it hasn't stabilized, use the short term to guess real bandwidth,
// since it may be dropping too fast.
// (note this doesn't apply to upswitch, always use longer average there)
if (!isStable && canSwitchDown) {
if (shortTermBps < bandwidthBps) {
bandwidthBps = shortTermBps;
}
}
ssize_t bandwidthIndex = getBandwidthIndex(bandwidthBps);
// it's possible that we're checking for canSwitchUp case, but the returned
// bandwidthIndex is < mCurBandwidthIndex, as getBandwidthIndex() only uses 70%
// of measured bw. In that case we don't want to do anything, since we have
// both enough buffer and enough bw.
if ((canSwitchUp && bandwidthIndex > mCurBandwidthIndex)
|| (canSwitchDown && bandwidthIndex < mCurBandwidthIndex)) {
// if not yet prepared, just restart again with new bw index.
// this is faster and playback experience is cleaner.
changeConfiguration(
mInPreparationPhase ? 0 : -1LL, bandwidthIndex);
return true;
}
}
return false;
}
void LiveSession::postError(status_t err) {
// if we reached EOS, notify buffering of 100%
if (err == ERROR_END_OF_STREAM) {
notifyBufferingUpdate(100);
}
// we'll stop buffer polling now, before that notify
// stop buffering to stop the spinning icon
stopBufferingIfNecessary();
cancelPollBuffering();
sp<AMessage> notify = mNotify->dup();
notify->setInt32("what", kWhatError);
notify->setInt32("err", err);
notify->post();
}
void LiveSession::postPrepared(status_t err) {
CHECK(mInPreparationPhase);
sp<AMessage> notify = mNotify->dup();
if (err == OK || err == ERROR_END_OF_STREAM) {
notify->setInt32("what", kWhatPrepared);
} else {
cancelPollBuffering();
notify->setInt32("what", kWhatPreparationFailed);
notify->setInt32("err", err);
}
notify->post();
mInPreparationPhase = false;
}
} // namespace android