Revert "Revert "put gl pipes in nonblocking mode""

This reverts commit cb81ca116ceaeb36606bb86f4a5e1746f07fdfd5.

Reason for revert: good for CtsOpenGlPerf2TestCases again after https://android-review.googlesource.com/c/platform/external/qemu/+/1094454,
req'd to run vulkan cts without timeout :p

Change-Id: I1aeab61b4fbc647d913448c5b67a23eb04bbda2a
diff --git a/shared/OpenglCodecCommon/qemu_pipe.h b/shared/OpenglCodecCommon/qemu_pipe.h
index 429b3fe..82c4a94 100644
--- a/shared/OpenglCodecCommon/qemu_pipe.h
+++ b/shared/OpenglCodecCommon/qemu_pipe.h
@@ -41,6 +41,16 @@
 
 #define QEMU_PIPE_INVALID_HANDLE (-1)
 
+#ifndef QEMU_PIPE_RETRY
+#define QEMU_PIPE_RETRY(exp) ({ \
+    __typeof__(exp) _rc; \
+    do { \
+        _rc = (exp); \
+    } while (_rc == -1 && (errno == EINTR || errno == EAGAIN)); \
+    _rc; }) \
+
+#endif
+
 #ifndef QEMU_PIPE_PATH
 #define QEMU_PIPE_PATH "/dev/qemu_pipe"
 #endif
@@ -80,7 +90,7 @@
   const uint8_t* p = (const uint8_t*)(data);
   size_t remaining = byte_count;
   while (remaining > 0) {
-    ssize_t n = TEMP_FAILURE_RETRY(write(fd, p, remaining));
+    ssize_t n = QEMU_PIPE_RETRY(write(fd, p, remaining));
     if (n == -1) return false;
     p += n;
     remaining -= n;
@@ -123,9 +133,10 @@
 
     snprintf(buff, sizeof buff, "pipe:%s", pipeName);
 
-    fd = TEMP_FAILURE_RETRY(open(QEMU_PIPE_PATH, O_RDWR));
-    if (fd < 0 && errno == ENOENT)
-        fd = TEMP_FAILURE_RETRY(open("/dev/goldfish_pipe", O_RDWR));
+    fd = QEMU_PIPE_RETRY(open(QEMU_PIPE_PATH, O_RDWR | O_NONBLOCK));
+    if (fd < 0 && errno == ENOENT) {
+        fd = QEMU_PIPE_RETRY(open("/dev/goldfish_pipe", O_RDWR | O_NONBLOCK));
+    }
     if (fd < 0) {
         D("%s: Could not open " QEMU_PIPE_PATH ": %s", __FUNCTION__, strerror(errno));
         //errno = ENOSYS;
@@ -159,7 +170,7 @@
 
 static __inline__ bool
 qemu_pipe_try_again() {
-    return errno == EINTR;
+    return errno == EINTR || errno == EAGAIN;
 }
 
 static __inline__ bool
diff --git a/system/OpenglSystemCommon/ProcessPipe.cpp b/system/OpenglSystemCommon/ProcessPipe.cpp
index 3f203f1..e04d153 100644
--- a/system/OpenglSystemCommon/ProcessPipe.cpp
+++ b/system/OpenglSystemCommon/ProcessPipe.cpp
@@ -142,7 +142,7 @@
         stat =
             qemu_pipe_read(sProcPipe, (char*)&sProcUID,
                 sizeof(sProcUID));
-    } while (stat < 0 && errno == EINTR);
+    } while (stat < 0 && (errno == EINTR || errno == EAGAIN));
 
     if (stat != sizeof(sProcUID)) {
         qemu_pipe_close(sProcPipe);
diff --git a/system/OpenglSystemCommon/QemuPipeStream.cpp b/system/OpenglSystemCommon/QemuPipeStream.cpp
index ea1dda7..4f5bd24 100644
--- a/system/OpenglSystemCommon/QemuPipeStream.cpp
+++ b/system/OpenglSystemCommon/QemuPipeStream.cpp
@@ -26,11 +26,16 @@
 #include <unistd.h>
 #include <string.h>
 
+static const size_t kReadSize = 512 * 1024;
+static const size_t kWriteOffset = kReadSize;
+
 QemuPipeStream::QemuPipeStream(size_t bufSize) :
     IOStream(bufSize),
     m_sock((QEMU_PIPE_HANDLE)(-1)),
     m_bufsize(bufSize),
-    m_buf(NULL)
+    m_buf(NULL),
+    m_read(0),
+    m_readLeft(0)
 {
 }
 
@@ -38,7 +43,9 @@
     IOStream(bufSize),
     m_sock(sock),
     m_bufsize(bufSize),
-    m_buf(NULL)
+    m_buf(NULL),
+    m_read(0),
+    m_readLeft(0)
 {
 }
 
@@ -67,6 +74,9 @@
 
 void *QemuPipeStream::allocBuffer(size_t minSize)
 {
+    // Add dedicated read buffer space at the front of the buffer.
+    minSize += kReadSize;
+
     size_t allocSize = (m_bufsize < minSize ? minSize : m_bufsize);
     if (!m_buf) {
         m_buf = (unsigned char *)malloc(allocSize);
@@ -84,13 +94,13 @@
         }
     }
 
-    return m_buf;
+    return m_buf + kWriteOffset;
 };
 
 int QemuPipeStream::commitBuffer(size_t size)
 {
     if (size == 0) return 0;
-    return writeFully(m_buf, size);
+    return writeFully(m_buf + kWriteOffset, size);
 }
 
 int QemuPipeStream::writeFully(const void *buf, size_t len)
@@ -140,44 +150,96 @@
 
 const unsigned char *QemuPipeStream::readFully(void *buf, size_t len)
 {
-    //DBG(">> QemuPipeStream::readFully %d\n", len);
-    if (!valid()) return NULL;
-    if (!buf) {
-        if (len > 0) {
-            // If len is non-zero, buf must not be NULL. Otherwise the pipe would be
-            // in a corrupted state, which is lethal for the emulator.
-            ERR("QemuPipeStream::readFully failed, buf=NULL, len %zu, lethal"
-                    " error, exiting.", len);
-            abort();
-        }
-        return NULL;  // do not allow NULL buf in that implementation
-    }
-    size_t res = len;
-    while (res > 0) {
-        ssize_t stat = qemu_pipe_read(m_sock, (char *)(buf) + len - res, res);
-        if (stat == 0) {
-            // client shutdown;
-            return NULL;
-        } else if (stat < 0) {
-            if (qemu_pipe_try_again()) {
-                continue;
-            } else {
-                ERR("QemuPipeStream::readFully failed (buf %p, len %zu"
-                    ", res %zu): %s, lethal error, exiting.", buf, len, res,
-                    strerror(errno));
-                abort();
-            }
-        } else {
-            res -= stat;
-        }
-    }
-    //DBG("<< QemuPipeStream::readFully %d\n", len);
-    return (const unsigned char *)buf;
+    return commitBufferAndReadFully(0, buf, len);
 }
 
-const unsigned char *QemuPipeStream::commitBufferAndReadFully(size_t size, void *buf, size_t len)
-{
-    return commitBuffer(size) ? nullptr : readFully(buf, len);
+const unsigned char *QemuPipeStream::commitBufferAndReadFully(size_t writeSize, void *userReadBufPtr, size_t totalReadSize) {
+
+    unsigned char* userReadBuf = static_cast<unsigned char*>(userReadBufPtr);
+
+    if (!valid()) return NULL;
+
+    if (!userReadBuf) {
+        if (totalReadSize > 0) {
+            ALOGE("QemuPipeStream::commitBufferAndReadFully failed, userReadBuf=NULL, totalReadSize %zu, lethal"
+                    " error, exiting.", totalReadSize);
+            abort();
+        }
+        if (!writeSize) {
+            return NULL;
+        }
+    }
+
+    // Advance buffered read if not yet consumed.
+    size_t remaining = totalReadSize;
+    size_t bufferedReadSize = m_readLeft < remaining ? m_readLeft : remaining;
+    if (bufferedReadSize) {
+        memcpy(userReadBuf, m_buf + (m_read - m_readLeft), bufferedReadSize);
+        remaining -= bufferedReadSize;
+        m_readLeft -= bufferedReadSize;
+    }
+
+    // Early out if nothing left to do.
+    if (!writeSize && !remaining) {
+        return userReadBuf;
+    }
+
+    int writeFullyRes = writeFully(m_buf + kWriteOffset, writeSize);
+
+    // Now done writing. Early out if no reading left to do.
+    if (!remaining) {
+        return userReadBuf;
+    }
+
+    // Read up to kReadSize bytes if all buffered read has been consumed.
+    size_t maxRead = m_readLeft ? 0 : kReadSize;
+
+    ssize_t actual = 0;
+
+    if (maxRead) {
+        actual = qemu_pipe_read(m_sock, m_buf, maxRead);
+        // Updated buffered read size.
+        if (actual > 0) {
+            m_read = m_readLeft = actual;
+        }
+
+        if (actual == 0) {
+            ALOGD("%s: end of pipe", __FUNCTION__);
+            return NULL;
+        }
+    }
+
+    // Consume buffered read and read more if necessary.
+    while (remaining) {
+        bufferedReadSize = m_readLeft < remaining ? m_readLeft : remaining;
+        if (bufferedReadSize) {
+            memcpy(userReadBuf + (totalReadSize - remaining),
+                   m_buf + (m_read - m_readLeft),
+                   bufferedReadSize);
+            remaining -= bufferedReadSize;
+            m_readLeft -= bufferedReadSize;
+            continue;
+        }
+
+        actual = qemu_pipe_read(m_sock, m_buf, kReadSize);
+
+        if (actual == 0) {
+            ALOGD("%s: Failed reading from pipe: %d", __FUNCTION__,  errno);
+            return NULL;
+        }
+
+        if (actual > 0) {
+            m_read = m_readLeft = actual;
+            continue;
+        }
+
+        if (!qemu_pipe_try_again()) {
+            ALOGD("%s: Error reading from pipe: %d", __FUNCTION__, errno);
+            return NULL;
+        }
+    }
+
+    return userReadBuf;
 }
 
 const unsigned char *QemuPipeStream::read( void *buf, size_t *inout_len)
@@ -216,8 +278,9 @@
         if (res == 0) { /* EOF */
              break;
         }
-        if (qemu_pipe_try_again())
+        if (qemu_pipe_try_again()) {
             continue;
+        }
 
         /* A real error */
         if (ret == 0)
diff --git a/system/OpenglSystemCommon/QemuPipeStream.h b/system/OpenglSystemCommon/QemuPipeStream.h
index f8d6fef..65755da 100644
--- a/system/OpenglSystemCommon/QemuPipeStream.h
+++ b/system/OpenglSystemCommon/QemuPipeStream.h
@@ -55,13 +55,13 @@
     QEMU_PIPE_HANDLE m_sock;
     size_t m_bufsize;
     unsigned char *m_buf;
+    size_t m_read;
+    size_t m_readLeft;
 #ifdef __Fuchsia__
     fuchsia::hardware::goldfish::pipe::DeviceSyncPtr m_device;
     fuchsia::hardware::goldfish::pipe::PipeSyncPtr m_pipe;
     zx::event m_event;
     zx::vmo m_vmo;
-    size_t m_read = 0;
-    size_t m_readLeft = 0;
 #endif
     QemuPipeStream(QEMU_PIPE_HANDLE sock, size_t bufSize);
 };
diff --git a/system/OpenglSystemCommon/QemuPipeStreamFuchsia.cpp b/system/OpenglSystemCommon/QemuPipeStreamFuchsia.cpp
index de95a8d..e0aa0b5 100644
--- a/system/OpenglSystemCommon/QemuPipeStreamFuchsia.cpp
+++ b/system/OpenglSystemCommon/QemuPipeStreamFuchsia.cpp
@@ -234,7 +234,7 @@
     }
 
     // Early out if nothing left to do.
-    if  (!size && !remaining) {
+    if (!size && !remaining) {
         return static_cast<const unsigned char *>(buf);
     }