[vulkan] Improve pipe performance on Fuchsia.

This makes use of the new Call method to write and
readback using a single IPC call. The IOStream
interface has been updated to allow for this optimization.

It also implements buffered reads. Each time we readback
data on Fuchsia we attempt to read a larger fixed size,
which we consume until another read is needed.

Change-Id: Ia7aabb5849c651f6d777da996e9d6d271722d0c8
diff --git a/host/include/libOpenglRender/IOStream.h b/host/include/libOpenglRender/IOStream.h
index d06440e..ad82072 100644
--- a/host/include/libOpenglRender/IOStream.h
+++ b/host/include/libOpenglRender/IOStream.h
@@ -33,6 +33,7 @@
     virtual void *allocBuffer(size_t minSize) = 0;
     virtual int commitBuffer(size_t size) = 0;
     virtual const unsigned char *readFully( void *buf, size_t len) = 0;
+    virtual const unsigned char *commitBufferAndReadFully(size_t size, void *buf, size_t len) = 0;
     virtual const unsigned char *read( void *buf, size_t *inout_len) = 0;
     virtual int writeFully(const void* buf, size_t len) = 0;
 
@@ -79,7 +80,12 @@
     }
 
     const unsigned char *readback(void *buf, size_t len) {
-        flush();
+        if (m_buf && m_free != m_bufsize) {
+            size_t size = m_bufsize - m_free;
+            m_buf = NULL;
+            m_free = 0;
+            return commitBufferAndReadFully(size, buf, len);
+        }
         return readFully(buf, len);
     }
 
diff --git a/shared/OpenglCodecCommon/SocketStream.cpp b/shared/OpenglCodecCommon/SocketStream.cpp
index e813142..067a75f 100644
--- a/shared/OpenglCodecCommon/SocketStream.cpp
+++ b/shared/OpenglCodecCommon/SocketStream.cpp
@@ -130,6 +130,11 @@
     return (const unsigned char *)buf;
 }
 
+const unsigned char *SocketStream::commitBufferAndReadFully(size_t size, void *buf, size_t len)
+{
+    return commitBuffer(size) ? nullptr : readFully(buf, len);
+}
+
 const unsigned char *SocketStream::read( void *buf, size_t *inout_len)
 {
     if (!valid()) return NULL;
diff --git a/shared/OpenglCodecCommon/SocketStream.h b/shared/OpenglCodecCommon/SocketStream.h
index 3a501b4..3d8f5f5 100644
--- a/shared/OpenglCodecCommon/SocketStream.h
+++ b/shared/OpenglCodecCommon/SocketStream.h
@@ -33,6 +33,7 @@
     virtual void *allocBuffer(size_t minSize);
     virtual int commitBuffer(size_t size);
     virtual const unsigned char *readFully(void *buf, size_t len);
+    virtual const unsigned char *commitBufferAndReadFully(size_t size, void *buf, size_t len);
     virtual const unsigned char *read(void *buf, size_t *inout_len);
 
     bool valid() { return m_sock >= 0; }
diff --git a/system/OpenglSystemCommon/ProcessPipe.cpp b/system/OpenglSystemCommon/ProcessPipe.cpp
index 5fe9fa3..3f203f1 100644
--- a/system/OpenglSystemCommon/ProcessPipe.cpp
+++ b/system/OpenglSystemCommon/ProcessPipe.cpp
@@ -29,6 +29,7 @@
 #include <fuchsia/hardware/goldfish/pipe/cpp/fidl.h>
 #include <lib/fdio/fdio.h>
 #include <lib/zx/vmo.h>
+static QEMU_PIPE_HANDLE   sProcDevice = 0;
 #endif
 
 static QEMU_PIPE_HANDLE   sProcPipe = 0;
@@ -66,9 +67,12 @@
     fuchsia::hardware::goldfish::pipe::DeviceSyncPtr device;
     device.Bind(std::move(channel));
 
+    fuchsia::hardware::goldfish::pipe::PipeSyncPtr pipe;
+    device->OpenPipe(pipe.NewRequest());
+
     zx_status_t status2 = ZX_OK;
     zx::vmo vmo;
-    status = device->GetBuffer(&status2, &vmo);
+    status = pipe->GetBuffer(&status2, &vmo);
     if (status != ZX_OK || status2 != ZX_OK) {
         ALOGE("%s: failed to get buffer: %d:%d", __FUNCTION__, status, status2);
         return;
@@ -81,31 +85,23 @@
         return;
     }
     uint64_t actual;
-    status = device->Write(len + 1, 0, &status2, &actual);
+    status = pipe->Write(len + 1, 0, &status2, &actual);
     if (status != ZX_OK || status2 != ZX_OK) {
         ALOGD("%s: connecting to pipe service failed: %d:%d", __FUNCTION__,
               status, status2);
         return;
     }
 
-    // Send a confirmation int to the host
+    // Send a confirmation int to the host and get per-process unique ID back
     int32_t confirmInt = 100;
     status = vmo.write(&confirmInt, 0, sizeof(confirmInt));
     if (status != ZX_OK) {
         ALOGE("%s: failed write confirm int", __FUNCTION__);
         return;
     }
-    status = device->Write(sizeof(confirmInt), 0, &status2, &actual);
+    status = pipe->Call(sizeof(confirmInt), 0, sizeof(sProcUID), 0, &status2, &actual);
     if (status != ZX_OK || status2 != ZX_OK) {
-        ALOGD("%s: failed to send confirm value: %d:%d", __FUNCTION__,
-              status, status2);
-        return;
-    }
-
-    // Ask the host for per-process unique ID
-    status = device->Read(sizeof(sProcUID), 0, &status2, &actual);
-    if (status != ZX_OK || status2 != ZX_OK) {
-        ALOGD("%s: failed to recv per-process ID: %d:%d", __FUNCTION__,
+        ALOGD("%s: failed to get per-process ID: %d:%d", __FUNCTION__,
               status, status2);
         return;
     }
@@ -114,7 +110,8 @@
         ALOGE("%s: failed read per-process ID: %d", __FUNCTION__, status);
         return;
     }
-    sProcPipe = device.Unbind().TakeChannel().release();
+    sProcDevice = device.Unbind().TakeChannel().release();
+    sProcPipe = pipe.Unbind().TakeChannel().release();
 }
 #else
 static void processPipeInitOnce() {
diff --git a/system/OpenglSystemCommon/QemuPipeStream.cpp b/system/OpenglSystemCommon/QemuPipeStream.cpp
index fb515c9..ea1dda7 100644
--- a/system/OpenglSystemCommon/QemuPipeStream.cpp
+++ b/system/OpenglSystemCommon/QemuPipeStream.cpp
@@ -175,6 +175,11 @@
     return (const unsigned char *)buf;
 }
 
+const unsigned char *QemuPipeStream::commitBufferAndReadFully(size_t size, void *buf, size_t len)
+{
+    return commitBuffer(size) ? nullptr : readFully(buf, len);
+}
+
 const unsigned char *QemuPipeStream::read( void *buf, size_t *inout_len)
 {
     //DBG(">> QemuPipeStream::read %d\n", *inout_len);
diff --git a/system/OpenglSystemCommon/QemuPipeStream.h b/system/OpenglSystemCommon/QemuPipeStream.h
index a6a3d88..f8d6fef 100644
--- a/system/OpenglSystemCommon/QemuPipeStream.h
+++ b/system/OpenglSystemCommon/QemuPipeStream.h
@@ -42,6 +42,7 @@
     virtual void *allocBuffer(size_t minSize);
     virtual int commitBuffer(size_t size);
     virtual const unsigned char *readFully( void *buf, size_t len);
+    virtual const unsigned char *commitBufferAndReadFully(size_t size, void *buf, size_t len);
     virtual const unsigned char *read( void *buf, size_t *inout_len);
 
     bool valid() { return qemu_pipe_valid(m_sock); }
@@ -56,8 +57,11 @@
     unsigned char *m_buf;
 #ifdef __Fuchsia__
     fuchsia::hardware::goldfish::pipe::DeviceSyncPtr m_device;
+    fuchsia::hardware::goldfish::pipe::PipeSyncPtr m_pipe;
     zx::event m_event;
     zx::vmo m_vmo;
+    size_t m_read = 0;
+    size_t m_readLeft = 0;
 #endif
     QemuPipeStream(QEMU_PIPE_HANDLE sock, size_t bufSize);
 };
diff --git a/system/OpenglSystemCommon/QemuPipeStreamFuchsia.cpp b/system/OpenglSystemCommon/QemuPipeStreamFuchsia.cpp
index 4377d40..de95a8d 100644
--- a/system/OpenglSystemCommon/QemuPipeStreamFuchsia.cpp
+++ b/system/OpenglSystemCommon/QemuPipeStreamFuchsia.cpp
@@ -27,6 +27,9 @@
 
 #include <utility>
 
+constexpr size_t kReadSize = 512 * 1024;
+constexpr size_t kWriteOffset = kReadSize;
+
 QemuPipeStream::QemuPipeStream(size_t bufSize) :
     IOStream(bufSize),
     m_sock(-1),
@@ -78,6 +81,7 @@
         return -1;
     }
     m_device.Bind(std::move(channel));
+    m_device->OpenPipe(m_pipe.NewRequest());
 
     zx::event event;
     status = zx::event::create(0, &event);
@@ -92,29 +96,27 @@
         return -1;
     }
 
-    status = m_device->SetEvent(std::move(event_copy));
+    status = m_pipe->SetEvent(std::move(event_copy));
     if (status != ZX_OK) {
         ALOGE("%s: failed to set event: %d:%d", __FUNCTION__, status);
         return -1;
     }
 
-    zx_status_t status2 = ZX_OK;
-    zx::vmo vmo;
-    status = m_device->GetBuffer(&status2, &vmo);
-    if (status != ZX_OK || status2 != ZX_OK) {
-        ALOGE("%s: failed to get buffer: %d:%d", __FUNCTION__, status, status2);
+    if (!allocBuffer(m_bufsize)) {
+        ALOGE("%s: failed allocate initial buffer", __FUNCTION__);
         return -1;
     }
 
     size_t len = strlen("pipe:opengles");
-    status = vmo.write("pipe:opengles", 0, len + 1);
+    status = m_vmo.write("pipe:opengles", 0, len + 1);
     if (status != ZX_OK) {
         ALOGE("%s: failed write pipe name", __FUNCTION__);
         return -1;
     }
 
     uint64_t actual;
-    status = m_device->Write(len + 1, 0, &status2, &actual);
+    zx_status_t status2 = ZX_OK;
+    status = m_pipe->Write(len + 1, 0, &status2, &actual);
     if (status != ZX_OK || status2 != ZX_OK) {
         ALOGD("%s: connecting to pipe service failed: %d:%d", __FUNCTION__,
               status, status2);
@@ -122,16 +124,18 @@
     }
 
     m_event = std::move(event);
-    m_vmo = std::move(vmo);
     return 0;
 }
 
 void *QemuPipeStream::allocBuffer(size_t minSize)
 {
+    // Add dedicated read buffer space at the front of buffer.
+    minSize += kReadSize;
+
     zx_status_t status;
     if (m_buf) {
         if (minSize <= m_bufsize) {
-            return m_buf;
+            return m_buf + kWriteOffset;
         }
         status = zx_vmar_unmap(zx_vmar_root_self(),
                                reinterpret_cast<zx_vaddr_t>(m_buf),
@@ -146,14 +150,14 @@
     size_t allocSize = m_bufsize < minSize ? minSize : m_bufsize;
 
     zx_status_t status2 = ZX_OK;
-    status = m_device->SetBufferSize(allocSize, &status2);
+    status = m_pipe->SetBufferSize(allocSize, &status2);
     if (status != ZX_OK || status2 != ZX_OK) {
         ALOGE("%s: failed to get buffer: %d:%d", __FUNCTION__, status, status2);
         return nullptr;
     }
 
     zx::vmo vmo;
-    status = m_device->GetBuffer(&status2, &vmo);
+    status = m_pipe->GetBuffer(&status2, &vmo);
     if (status != ZX_OK || status2 != ZX_OK) {
         ALOGE("%s: failed to get buffer: %d:%d", __FUNCTION__, status, status2);
         return nullptr;
@@ -171,44 +175,19 @@
     m_buf = reinterpret_cast<unsigned char*>(mapped_addr);
     m_bufsize = allocSize;
     m_vmo = std::move(vmo);
-    return m_buf;
+    return m_buf + kWriteOffset;
 }
 
 int QemuPipeStream::commitBuffer(size_t size)
 {
     if (size == 0) return 0;
 
-    size_t remaining = size;
-    while (remaining) {
-        zx_status_t status2 = ZX_OK;
-        uint64_t actual = 0;
-        zx_status_t status = m_device->Write(
-            remaining, size - remaining, &status2, &actual);
-        if (status != ZX_OK) {
-            ALOGD("%s: Failed writing to pipe: %d", __FUNCTION__, status);
-            return -1;
-        }
-        if (actual) {
-            remaining -= actual;
-            continue;
-        }
-        if (status2 != ZX_ERR_SHOULD_WAIT) {
-            ALOGD("%s: Error writing to pipe: %d", __FUNCTION__, status2);
-            return -1;
-        }
-        zx_signals_t observed = ZX_SIGNAL_NONE;
-        status = m_event.wait_one(
-            fuchsia::hardware::goldfish::pipe::SIGNAL_WRITABLE |
-            fuchsia::hardware::goldfish::pipe::SIGNAL_HANGUP,
-            zx::time::infinite(), &observed);
-        if (status != ZX_OK) {
-            ALOGD("%s: wait_one failed: %d", __FUNCTION__, status);
-            return -1;
-        }
-        if (observed & fuchsia::hardware::goldfish::pipe::SIGNAL_HANGUP) {
-            ALOGD("%s: Remote end hungup", __FUNCTION__);
-            return -1;
-        }
+    uint64_t actual = 0;
+    zx_status_t status2 = ZX_OK;
+    zx_status_t status = m_pipe->Call(size, kWriteOffset, 0, 0, &status2, &actual);
+    if (status != ZX_OK || status2 != ZX_OK) {
+        ALOGD("%s: Pipe call failed: %d:%d", __FUNCTION__, status, status2);
+        return -1;
     }
 
     return 0;
@@ -227,30 +206,74 @@
 
 const unsigned char *QemuPipeStream::readFully(void *buf, size_t len)
 {
+    return commitBufferAndReadFully(0, buf, len);
+}
+
+const unsigned char *QemuPipeStream::commitBufferAndReadFully(size_t size, void *buf, size_t len)
+{
     if (!m_device.is_bound()) return nullptr;
 
     if (!buf) {
         if (len > 0) {
-            ALOGE("QemuPipeStream::readFully failed, buf=NULL, len %zu, lethal"
+            ALOGE("QemuPipeStream::commitBufferAndReadFully failed, buf=NULL, len %zu, lethal"
                     " error, exiting.", len);
             abort();
         }
+        if (!size) {
+            return nullptr;
+        }
+    }
+
+    // Advance buffered read if not yet consumed.
+    size_t remaining = len;
+    size_t readSize = m_readLeft < remaining ? m_readLeft : remaining;
+    if (readSize) {
+        memcpy(static_cast<char*>(buf), m_buf + (m_read - m_readLeft), readSize);
+        remaining -= readSize;
+        m_readLeft -= readSize;
+    }
+
+    // Early out if nothing left to do.
+    if  (!size && !remaining) {
+        return static_cast<const unsigned char *>(buf);
+    }
+
+    // Read up to kReadSize bytes if all buffered read has been consumed.
+    size_t maxRead = (m_readLeft || !remaining) ? 0 : kReadSize;
+    uint64_t actual = 0;
+    zx_status_t status2 = ZX_OK;
+    zx_status_t status = m_pipe->Call(size, kWriteOffset, maxRead, 0, &status2, &actual);
+    if (status != ZX_OK) {
+        ALOGD("%s: Pipe call failed: %d", __FUNCTION__, status);
         return nullptr;
     }
 
-    size_t remaining = len;
+    // Updated buffered read size.
+    if (actual) {
+        m_read = m_readLeft = actual;
+    }
+
+    // Consume buffered read and read more if neccessary.
     while (remaining) {
-        size_t readSize = m_bufsize < remaining ? m_bufsize : remaining;
-        zx_status_t status2 = ZX_OK;
-        uint64_t actual = 0;
-        zx_status_t status = m_device->Read(readSize, 0, &status2, &actual);
+        readSize = m_readLeft < remaining ? m_readLeft : remaining;
+        if (readSize) {
+            memcpy(static_cast<char*>(buf) + (len - remaining),
+                   m_buf + (m_read - m_readLeft),
+                   readSize);
+            remaining -= readSize;
+            m_readLeft -= readSize;
+            continue;
+        }
+
+        status2 = ZX_OK;
+        actual = 0;
+        status = m_pipe->Read(kReadSize, 0, &status2, &actual);
         if (status != ZX_OK) {
             ALOGD("%s: Failed reading from pipe: %d", __FUNCTION__, status);
             return nullptr;
         }
         if (actual) {
-            m_vmo.read(static_cast<char *>(buf) + (len - remaining), 0, actual);
-            remaining -= actual;
+            m_read = m_readLeft = actual;
             continue;
         }
         if (status2 != ZX_ERR_SHOULD_WAIT) {
diff --git a/system/OpenglSystemCommon/VirtioGpuStream.h b/system/OpenglSystemCommon/VirtioGpuStream.h
index 9d6faa5..5d44bcc 100644
--- a/system/OpenglSystemCommon/VirtioGpuStream.h
+++ b/system/OpenglSystemCommon/VirtioGpuStream.h
@@ -69,6 +69,10 @@
     virtual int writeFully(const void *buf, size_t len);
     virtual const unsigned char *readFully(void *buf, size_t len);
     virtual int commitBuffer(size_t size);
+    virtual int commitBufferAndReadFully(size_t size, void *buf, size_t len)
+    {
+        return commitBuffer(size) ? nullptr : readFully(buf, len);
+    }
     virtual const unsigned char *read(void *buf, size_t *inout_len) final
     {
         return readFully(buf, *inout_len);
diff --git a/system/vulkan_enc/VulkanStreamGuest.cpp b/system/vulkan_enc/VulkanStreamGuest.cpp
index d6ebf50..43f3706 100644
--- a/system/vulkan_enc/VulkanStreamGuest.cpp
+++ b/system/vulkan_enc/VulkanStreamGuest.cpp
@@ -53,8 +53,7 @@
     }
 
     ssize_t read(void *buffer, size_t size) override {
-        commitWrite();
-        if (!mStream->readFully(buffer, size)) {
+        if (!mStream->readback(buffer, size)) {
             ALOGE("FATAL: Could not read back %zu bytes", size);
             abort();
         }