GenericSource: Allow multiple buffer reads for video

Bug: 29125703
Change-Id: I23490a65ad3968d7be561805c9fa742320b5c78a
diff --git a/include/media/IMediaSource.h b/include/media/IMediaSource.h
index 7a7599d..2ff42ec 100644
--- a/include/media/IMediaSource.h
+++ b/include/media/IMediaSource.h
@@ -131,6 +131,10 @@
     // Returns true if |readMultiple| is supported, otherwise false.
     virtual bool supportReadMultiple() = 0;
 
+    // Returns true if |read| supports nonblocking option, otherwise false.
+    // |readMultiple| if supported, always allows the nonblocking option.
+    virtual bool supportNonblockingRead() = 0;
+
     // Causes this source to suspend pulling data from its upstream source
     // until a subsequent read-with-seek. Currently only supported by
     // OMXCodec.
@@ -161,6 +165,7 @@
         return ERROR_UNSUPPORTED;
     }
 
+    // TODO: Implement this for local media sources.
     virtual status_t readMultiple(
             Vector<MediaBuffer *> * /* buffers */, uint32_t /* maxNumBuffers = 1 */,
             const ReadOptions * /* options = nullptr */) {
@@ -171,7 +176,15 @@
         return false;
     }
 
+    // Override in source if nonblocking reads are supported.
+    virtual bool supportNonblockingRead() {
+        return false;
+    }
+
     static const size_t kBinderMediaBuffers = 4; // buffers managed by BnMediaSource
+    static const size_t kTransferSharedAsSharedThreshold = 4 * 1024;  // if >= shared, else inline
+    static const size_t kTransferInlineAsSharedThreshold = 64 * 1024; // if >= shared, else inline
+    static const size_t kInlineMaxTransfer = 256 * 1024; // Binder size limited to BINDER_VM_SIZE.
 
 protected:
     virtual ~BnMediaSource();
diff --git a/include/media/stagefright/MediaBufferGroup.h b/include/media/stagefright/MediaBufferGroup.h
index d1f59f4..dfa31b2 100644
--- a/include/media/stagefright/MediaBufferGroup.h
+++ b/include/media/stagefright/MediaBufferGroup.h
@@ -30,10 +30,16 @@
 class MediaBufferGroup : public MediaBufferObserver {
 public:
     MediaBufferGroup(size_t growthLimit = 0);
+
+    // create a media buffer group with preallocated buffers
+    MediaBufferGroup(size_t buffers, size_t buffer_size, size_t growthLimit = 0);
+
     ~MediaBufferGroup();
 
     void add_buffer(MediaBuffer *buffer);
 
+    bool has_buffers();
+
     // If nonBlocking is false, it blocks until a buffer is available and
     // passes it to the caller in *buffer, while returning OK.
     // The returned buffer will have a reference count of 1.
diff --git a/media/libmedia/IMediaSource.cpp b/media/libmedia/IMediaSource.cpp
index 2adce19..dd94ccf 100644
--- a/media/libmedia/IMediaSource.cpp
+++ b/media/libmedia/IMediaSource.cpp
@@ -38,7 +38,8 @@
     GETFORMAT,
     // READ, deprecated
     READMULTIPLE,
-    RELEASE_BUFFER
+    RELEASE_BUFFER,
+    SUPPORT_NONBLOCKING_READ,
 };
 
 enum {
@@ -186,10 +187,22 @@
         return ret;
     }
 
-    bool supportReadMultiple() {
+    // Binder proxy adds readMultiple support.
+    virtual bool supportReadMultiple() {
         return true;
     }
 
+    virtual bool supportNonblockingRead() {
+        ALOGV("supportNonblockingRead");
+        Parcel data, reply;
+        data.writeInterfaceToken(BpMediaSource::getInterfaceDescriptor());
+        status_t ret = remote()->transact(SUPPORT_NONBLOCKING_READ, data, &reply);
+        if (ret == NO_ERROR) {
+            return reply.readInt32() != 0;
+        }
+        return false;
+    }
+
     virtual status_t pause() {
         ALOGV("pause");
         Parcel data, reply;
@@ -325,6 +338,7 @@
 
             mGroup->gc(kBinderMediaBuffers /* freeBuffers */);
             mIndexCache.gc();
+            size_t inlineTransferSize = 0;
             status_t ret = NO_ERROR;
             uint32_t bufferCount = 0;
             for (; bufferCount < maxNumBuffers; ++bufferCount, ++mBuffersSinceStop) {
@@ -344,7 +358,8 @@
                 MediaBuffer *transferBuf = nullptr;
                 const size_t length = buf->range_length();
                 size_t offset = buf->range_offset();
-                if (length >= MediaBuffer::kSharedMemThreshold) {
+                if (length >= (supportNonblockingRead() && buf->mMemory != nullptr ?
+                        kTransferSharedAsSharedThreshold : kTransferInlineAsSharedThreshold)) {
                     if (buf->mMemory != nullptr) {
                         ALOGV("Use shared memory: %zu", length);
                         transferBuf = buf;
@@ -366,6 +381,9 @@
                         } else {
                             memcpy(transferBuf->data(), (uint8_t*)buf->data() + offset, length);
                             offset = 0;
+                            if (!mGroup->has_buffers()) {
+                                maxNumBuffers = 0; // No more MediaBuffers, stop readMultiple.
+                            }
                         }
                     }
                 }
@@ -395,6 +413,8 @@
                     buf->meta_data()->writeToParcel(*reply);
                     if (transferBuf != buf) {
                         buf->release();
+                    } else if (!supportNonblockingRead()) {
+                        maxNumBuffers = 0; // stop readMultiple with one shared buffer.
                     }
                 } else {
                     ALOGV_IF(buf->mMemory != nullptr,
@@ -404,6 +424,10 @@
                     reply->writeByteArray(length, (uint8_t*)buf->data() + offset);
                     buf->meta_data()->writeToParcel(*reply);
                     buf->release();
+                    inlineTransferSize += length;
+                    if (inlineTransferSize > kInlineMaxTransfer) {
+                        maxNumBuffers = 0; // stop readMultiple if inline transfer is too large.
+                    }
                 }
             }
             reply->writeInt32(NULL_BUFFER); // Indicate no more MediaBuffers.
@@ -412,6 +436,12 @@
                     ret, bufferCount, mBuffersSinceStop);
             return NO_ERROR;
         }
+        case SUPPORT_NONBLOCKING_READ: {
+            ALOGV("supportNonblockingRead");
+            CHECK_INTERFACE(IMediaSource, data, reply);
+            reply->writeInt32((int32_t)supportNonblockingRead());
+            return NO_ERROR;
+        }
         default:
             return BBinder::onTransact(code, data, reply, flags);
     }
diff --git a/media/libmediaplayerservice/nuplayer/GenericSource.cpp b/media/libmediaplayerservice/nuplayer/GenericSource.cpp
index efe82ba..af2d0f3 100644
--- a/media/libmediaplayerservice/nuplayer/GenericSource.cpp
+++ b/media/libmediaplayerservice/nuplayer/GenericSource.cpp
@@ -1385,7 +1385,7 @@
             if (mIsWidevine) {
                 maxBuffers = 2;
             } else {
-                maxBuffers = 4;
+                maxBuffers = 8;  // too large of a number may influence seeks
             }
             break;
         case MEDIA_TRACK_TYPE_AUDIO:
@@ -1417,25 +1417,24 @@
     MediaSource::ReadOptions options;
 
     bool seeking = false;
-
     if (seekTimeUs >= 0) {
         options.setSeekTo(seekTimeUs, MediaSource::ReadOptions::SEEK_PREVIOUS_SYNC);
         seeking = true;
     }
 
-    if (mIsWidevine) {
+    const bool couldReadMultiple = (!mIsWidevine && track->mSource->supportReadMultiple());
+
+    if (mIsWidevine || couldReadMultiple) {
         options.setNonBlocking();
     }
 
-    bool couldReadMultiple =
-        (!mIsWidevine && trackType == MEDIA_TRACK_TYPE_AUDIO
-                && track->mSource->supportReadMultiple());
     for (size_t numBuffers = 0; numBuffers < maxBuffers; ) {
         Vector<MediaBuffer *> mediaBuffers;
         status_t err = NO_ERROR;
 
-        if (!seeking && couldReadMultiple) {
-            err = track->mSource->readMultiple(&mediaBuffers, (maxBuffers - numBuffers));
+        if (couldReadMultiple) {
+            err = track->mSource->readMultiple(
+                    &mediaBuffers, maxBuffers - numBuffers, &options);
         } else {
             MediaBuffer *mbuf = NULL;
             err = track->mSource->read(&mbuf, &options);
diff --git a/media/libstagefright/MPEG4Extractor.cpp b/media/libstagefright/MPEG4Extractor.cpp
index 58448010..bbc75d6 100644
--- a/media/libstagefright/MPEG4Extractor.cpp
+++ b/media/libstagefright/MPEG4Extractor.cpp
@@ -75,6 +75,7 @@
     virtual sp<MetaData> getFormat();
 
     virtual status_t read(MediaBuffer **buffer, const ReadOptions *options = NULL);
+    virtual bool supportNonblockingRead() { return true; }
     virtual status_t fragmentedRead(MediaBuffer **buffer, const ReadOptions *options = NULL);
 
 protected:
@@ -3563,13 +3564,20 @@
 
     // A somewhat arbitrary limit that should be sufficient for 8k video frames
     // If you see the message below for a valid input stream: increase the limit
-    if (max_size > 64 * 1024 * 1024) {
-        ALOGE("bogus max input size: %zu", max_size);
+    const size_t kMaxBufferSize = 64 * 1024 * 1024;
+    if (max_size > kMaxBufferSize) {
+        ALOGE("bogus max input size: %zu > %zu", max_size, kMaxBufferSize);
         return ERROR_MALFORMED;
     }
-    mGroup = new MediaBufferGroup;
-    mGroup->add_buffer(new MediaBuffer(max_size));
+    if (max_size == 0) {
+        ALOGE("zero max input size");
+        return ERROR_MALFORMED;
+    }
 
+    // Allow up to kMaxBuffers, but not if the total exceeds kMaxBufferSize.
+    const size_t kMaxBuffers = 8;
+    const size_t buffers = min(kMaxBufferSize / max_size, kMaxBuffers);
+    mGroup = new MediaBufferGroup(buffers, max_size);
     mSrcBuffer = new (std::nothrow) uint8_t[max_size];
     if (mSrcBuffer == NULL) {
         // file probably specified a bad max size
@@ -4196,6 +4204,11 @@
 
     CHECK(mStarted);
 
+    if (options != nullptr && options->getNonBlocking() && !mGroup->has_buffers()) {
+        *out = nullptr;
+        return WOULD_BLOCK;
+    }
+
     if (mFirstMoofOffset > 0) {
         return fragmentedRead(out, options);
     }
diff --git a/media/libstagefright/WAVExtractor.cpp b/media/libstagefright/WAVExtractor.cpp
index 38a2a06..1f04fa0 100644
--- a/media/libstagefright/WAVExtractor.cpp
+++ b/media/libstagefright/WAVExtractor.cpp
@@ -70,6 +70,8 @@
     virtual status_t read(
             MediaBuffer **buffer, const ReadOptions *options = NULL);
 
+    virtual bool supportNonblockingRead() { return true; }
+
 protected:
     virtual ~WAVSource();
 
@@ -377,8 +379,8 @@
 
     CHECK(!mStarted);
 
-    mGroup = new MediaBufferGroup;
-    mGroup->add_buffer(new MediaBuffer(kMaxFrameSize));
+    // some WAV files may have large audio buffers that use shared memory transfer.
+    mGroup = new MediaBufferGroup(4 /* buffers */, kMaxFrameSize);
 
     if (mBitsPerSample == 8) {
         // As a temporary buffer for 8->16 bit conversion.
@@ -415,6 +417,10 @@
         MediaBuffer **out, const ReadOptions *options) {
     *out = NULL;
 
+    if (options != nullptr && options->getNonBlocking() && !mGroup->has_buffers()) {
+        return WOULD_BLOCK;
+    }
+
     int64_t seekTimeUs;
     ReadOptions::SeekMode mode;
     if (options != NULL && options->getSeekTo(&seekTimeUs, &mode)) {
diff --git a/media/libstagefright/foundation/MediaBufferGroup.cpp b/media/libstagefright/foundation/MediaBufferGroup.cpp
index 8be8e8b..cb78879 100644
--- a/media/libstagefright/foundation/MediaBufferGroup.cpp
+++ b/media/libstagefright/foundation/MediaBufferGroup.cpp
@@ -23,10 +23,57 @@
 
 namespace android {
 
+// std::min is not constexpr in C++11
+template<typename T>
+constexpr T MIN(const T &a, const T &b) { return a <= b ? a : b; }
+
+// MediaBufferGroup may create shared memory buffers at a
+// smaller threshold than an isolated new MediaBuffer.
+static const size_t kSharedMemoryThreshold = MIN(
+        (size_t)MediaBuffer::kSharedMemThreshold, (size_t)(4 * 1024));
+
 MediaBufferGroup::MediaBufferGroup(size_t growthLimit) :
     mGrowthLimit(growthLimit) {
 }
 
+MediaBufferGroup::MediaBufferGroup(size_t buffers, size_t buffer_size, size_t growthLimit)
+    : mGrowthLimit(growthLimit) {
+
+    if (buffer_size >= kSharedMemoryThreshold) {
+        ALOGD("creating MemoryDealer");
+        // Using a single MemoryDealer is efficient for a group of shared memory objects.
+        // This loop guarantees that we use shared memory (no fallback to malloc).
+
+        size_t alignment = MemoryDealer::getAllocationAlignment();
+        size_t augmented_size = buffer_size + sizeof(MediaBuffer::SharedControl);
+        size_t total = (augmented_size + alignment - 1) / alignment * alignment * buffers;
+        sp<MemoryDealer> memoryDealer = new MemoryDealer(total, "MediaBufferGroup");
+
+        for (size_t i = 0; i < buffers; ++i) {
+            sp<IMemory> mem = memoryDealer->allocate(augmented_size);
+            if (mem.get() == nullptr) {
+                ALOGW("Only allocated %zu shared buffers of size %zu", i, buffer_size);
+                break;
+            }
+            MediaBuffer *buffer = new MediaBuffer(mem);
+            buffer->getSharedControl()->clear();
+            add_buffer(buffer);
+        }
+        return;
+    }
+
+    // Non-shared memory allocation.
+    for (size_t i = 0; i < buffers; ++i) {
+        MediaBuffer *buffer = new MediaBuffer(buffer_size);
+        if (buffer->data() == nullptr) {
+            delete buffer; // don't call release, it's not properly formed
+            ALOGW("Only allocated %zu malloc buffers of size %zu", i, buffer_size);
+            break;
+        }
+        add_buffer(buffer);
+    }
+}
+
 MediaBufferGroup::~MediaBufferGroup() {
     for (MediaBuffer *buffer : mBuffers) {
         buffer->resolvePendingRelease();
@@ -67,6 +114,19 @@
     }
 }
 
+bool MediaBufferGroup::has_buffers() {
+    if (mBuffers.size() < mGrowthLimit) {
+        return true; // We can add more buffers internally.
+    }
+    for (MediaBuffer *buffer : mBuffers) {
+        buffer->resolvePendingRelease();
+        if (buffer->refcount() == 0) {
+            return true;
+        }
+    }
+    return false;
+}
+
 status_t MediaBufferGroup::acquire_buffer(
         MediaBuffer **out, bool nonBlocking, size_t requestedSize) {
     Mutex::Autolock autoLock(mLock);