Circular shared memory queue

Change-Id: I82198baf2a9333539df64b1df057f11fa1f6a24b
diff --git a/common/vsoc/lib/BUILD b/common/vsoc/lib/BUILD
index 7b4c6ee..cb85970 100644
--- a/common/vsoc/lib/BUILD
+++ b/common/vsoc/lib/BUILD
@@ -1,4 +1,17 @@
 cc_test(
+    name = "circqueue_test",
+    srcs = [
+        "circqueue_test.cpp",
+    ],
+    visibility = ["//visibility:public"],
+    deps = [
+        ":vsoc_common",
+        "//common/vsoc/shm",
+        "@gtest_repo//:gtest_main",
+    ],
+)
+
+cc_test(
     name = "vsoc_graphics_test",
     srcs = [
         "graphics_test.cpp",
@@ -17,6 +30,8 @@
         "region_common.cpp",
     ],
     hdrs = [
+        "circqueue_impl.h",
+        "compat.h",
         "graphics_common.h",
         "region.h",
     ],
diff --git a/common/vsoc/lib/circqueue_impl.h b/common/vsoc/lib/circqueue_impl.h
new file mode 100644
index 0000000..c590270
--- /dev/null
+++ b/common/vsoc/lib/circqueue_impl.h
@@ -0,0 +1,212 @@
+#pragma once
+
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// For _mm_pause()
+#include <immintrin.h>
+#include "common/vsoc/lib/region.h"
+#include "common/vsoc/shm/circqueue.h"
+
+namespace {
+// Increases the given index until it is naturally aligned for T.
+template <typename T>
+uintptr_t align(uintptr_t index) {
+  return (index + sizeof(T) - 1) & ~(sizeof(T) - 1);
+}
+}  // namespace
+
+namespace vsoc {
+class RegionBase;
+namespace layout {
+
+template <uint32_t SizeLog2>
+void CircularQueueBase<SizeLog2>::Lock() {
+  while (lock_.exchange(1)) {
+    _mm_pause();
+  }
+}
+
+template <uint32_t SizeLog2>
+void CircularQueueBase<SizeLog2>::Unlock() {
+  lock_ = 0;
+}
+
+template <uint32_t SizeLog2>
+void CircularQueueBase<SizeLog2>::CopyInRange(const char* buffer_in,
+                                              const Range& t) {
+  size_t bytes = t.end_idx - t.start_idx;
+  uint32_t index = t->start_idx & (BufferSize - 1);
+  if (index + bytes < BufferSize) {
+    memcpy(buffer_ + index, buffer_in, bytes);
+  } else {
+    size_t part1_size = BufferSize - index;
+    size_t part2_size = bytes - part1_size;
+    mempcy(buffer_ + index, buffer_in, part1_size);
+    memcpy(buffer_, buffer_in + part1_size, part2_size);
+  }
+}
+
+template <uint32_t SizeLog2>
+void CircularQueueBase<SizeLog2>::CopyOutRange(const Range& t,
+                                               char* buffer_out) {
+  uint32_t index = t.start_idx & (BufferSize - 1);
+  size_t total_size = t.end_idx - t.start_idx;
+  if (index + total_size <= BufferSize) {
+    memcpy(buffer_out, buffer_ + index, total_size);
+  } else {
+    uint32_t part1_size = BufferSize - index;
+    uint32_t part2_size = total_size - part1_size;
+    memcpy(buffer_out, buffer_ + index, part1_size);
+    memcpy(buffer_out + part1_size, buffer_, part2_size);
+  }
+}
+
+template <uint32_t SizeLog2>
+void CircularQueueBase<SizeLog2>::WaitForDataLocked(RegionBase* r) {
+  while (1) {
+    uint32_t o_w_pub = w_pub_;
+    // We don't have data. Wait until some appears and try again
+    if (r_released_ != o_w_pub) {
+      return;
+    }
+    Unlock();
+    r->WaitForSignal(&w_pub_, o_w_pub);
+    Lock();
+  }
+}
+
+template <uint32_t SizeLog2>
+intptr_t CircularQueueBase<SizeLog2>::WriteReserveLocked(RegionBase* r,
+                                                         size_t bytes,
+                                                         Range* t) {
+  // Can't write more than the buffer will hold
+  if (bytes > BufferSize) {
+    return -ENOSPC;
+  }
+  while (true) {
+    t->start_idx = w_pub_;
+    uint32_t o_r_release = r_released_;
+    size_t available = BufferSize - t->start_idx + o_r_release;
+    if (available >= bytes) {
+      break;
+    }
+    // If we can't write at the moment wait for a reader to release
+    // some bytes.
+    Unlock();
+    r->WaitForSignal(&r_released_, o_r_release);
+    Lock();
+  }
+  t->end_idx = t->start_idx + bytes;
+  return t->end_idx - t->start_idx;
+}
+
+template <uint32_t SizeLog2>
+intptr_t CircularByteQueue<SizeLog2>::Read(RegionBase* r, char* buffer_out,
+                                           size_t max_size) {
+  this->Lock();
+  this->WaitForDataLocked();
+  Range t;
+  t.start_idx = this->r_released_;
+  t.end_idx = this->w_pub_;
+  // The lock is still held here...
+  // Trim the range if we got more than the reader wanted
+  if ((t.end_idx - t.start_idx) > max_size) {
+    t.end_idx = t.start_idx + max_size;
+  }
+  CopyOutRange(t, buffer_out, max_size);
+  this->r_released_ = t.end_idx;
+  this->Unlock();
+  r->SendSignal(layout::Sides::Both, &this->r_released_);
+  return t->end_idx - t->start_idx;
+}
+
+template <uint32_t SizeLog2>
+intptr_t CircularByteQueue<SizeLog2>::Write(RegionBase* r,
+                                            const char* buffer_in,
+                                            size_t bytes) {
+  Range range;
+  this->Lock();
+  intptr_t rval = WriteReserveLocked(r, bytes, &range);
+  if (rval < 0) {
+    this->Unlock();
+    return rval;
+  }
+  this->CopyInRange(buffer_in, range);
+  // We can't publish until all of the previous write allocations where
+  // published.
+  this->w_pub_ = range.end_idx;
+  this->Unlock();
+  r->SendSignal(layout::Sides::Both, &this->w_pub_);
+  return bytes;
+}
+
+template <uint32_t SizeLog2, uint32_t MaxPacketSize>
+intptr_t CircularPacketQueue<SizeLog2, MaxPacketSize>::CalculateBufferedSize(
+    size_t payload) {
+  return align<uint32_t>(sizeof(uint32_t) + payload);
+}
+
+template <uint32_t SizeLog2, uint32_t MaxPacketSize>
+intptr_t CircularPacketQueue<SizeLog2, MaxPacketSize>::Read(RegionBase* r,
+                                                            char* buffer_out,
+                                                            size_t max_size) {
+  this->Lock();
+  this->WaitForDataLocked();
+  uint32_t packet_size = *reinterpret_cast<uint32_t*>(
+      this->buffer_ + (this->r_released_ & (this->BufferSize - 1)));
+  if (packet_size > max_size) {
+    this->Unlock();
+    return -ENOSPC;
+  }
+  Range t;
+  t.start_idx = this->r_released_ + sizeof(uint32_t);
+  t.end_idx = t.start_idx + this->packet_size;
+  CopyOutRange(t, buffer_out);
+  this->r_released_ += this->CalculateBufferedSize(packet_size);
+  this->Unlock();
+  r->SendSignal(layout::Sides::Both, &this->r_released_);
+  return packet_size;
+}
+
+template <uint32_t SizeLog2, uint32_t MaxPacketSize>
+intptr_t CircularPacketQueue<SizeLog2, MaxPacketSize>::Write(
+    RegionBase* r, const char* buffer_in, uint32_t bytes) {
+  if (bytes > MaxPacketSize) {
+    return -ENOSPC;
+  }
+  Range range;
+  size_t buffered_size = this->CalculateBufferedSize(bytes);
+  this->Lock();
+  intptr_t rval = this->WriteReserveLocked(r, buffered_size, &range);
+  if (rval < 0) {
+    this->Unlock();
+    return rval;
+  }
+  Range header = range;
+  header.end_idx = header.start_idx + sizeof(uint32_t);
+  Range payload{range.start_idx + sizeof(uint32_t),
+                range.start_idx + sizeof(uint32_t) + bytes};
+  this->CopyInRange(&bytes, header);
+  this->CopyInRange(buffer_in, payload);
+  this->w_pub_ = range.end_idx;
+  this->Unlock();
+  r->SendSignal(layout::Sides::Both, &this->w_pub_);
+  return bytes;
+}
+
+}  // namespace layout
+}  // namespace vsoc
diff --git a/common/vsoc/lib/circqueue_test.cpp b/common/vsoc/lib/circqueue_test.cpp
new file mode 100644
index 0000000..059a293
--- /dev/null
+++ b/common/vsoc/lib/circqueue_test.cpp
@@ -0,0 +1,21 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "common/vsoc/lib/circqueue_impl.h"
+
+#include <gtest/gtest.h>
+
+TEST(CircQueueTest, Basic) {
+}
diff --git a/common/vsoc/shm/BUILD b/common/vsoc/shm/BUILD
index 4b0ef9b..4fbdccb 100644
--- a/common/vsoc/shm/BUILD
+++ b/common/vsoc/shm/BUILD
@@ -2,6 +2,7 @@
     name = "shm",
     hdrs = [
         "base.h",
+        "circqueue.h",
         "e2e_test_region.h",
         "graphics.h",
         "version.h",
diff --git a/common/vsoc/shm/circqueue.h b/common/vsoc/shm/circqueue.h
new file mode 100644
index 0000000..7a1e94a
--- /dev/null
+++ b/common/vsoc/shm/circqueue.h
@@ -0,0 +1,161 @@
+#pragma once
+
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Memory layout for byte-oriented circular queues
+
+#include <atomic>
+#include <cstdint>
+
+#include "common/vsoc/shm/base.h"
+
+namespace vsoc {
+class RegionBase;
+namespace layout {
+
+/**
+ * Base classes for all spinlock protected circular queues.
+ * This class should be embedded in the per-reion data structure that is used
+ * as the parameter to TypedRegion.
+ */
+template <uint32_t SizeLog2>
+class CircularQueueBase {
+  CircularQueueBase() = delete;
+  CircularQueueBase(const CircularQueueBase&) = delete;
+  CircularQueueBase& operator=(const CircularQueueBase&) = delete;
+ protected:
+  /**
+   * Specifies a part of the queue. Note, the given indexes must be masked
+   * before they can be used against buffer_
+   */
+  struct Range {
+    // Points to the first bytes that is part of the range
+    uint32_t start_idx;
+    // Points to the first byte that is not in the range. This is similar to
+    // the STL end iterator.
+    uint32_t end_idx;
+  };
+  static const uintptr_t BufferSize = (1 << SizeLog2);
+
+  /**
+   * Acquire the spinlock on the queue. This will effectively block all
+   * readers and writers.
+   */
+  void Lock();
+
+  /**
+   * Release the spinlock.
+   */
+  void Unlock();
+
+  /**
+   * Copy bytes from buffer_in into the part of the queue specified by Range.
+   */
+  void CopyInRange(const char* buffer_in, const Range& t);
+
+  /**
+   * Copy the bytes specified by range to the given buffer. They caller must
+   * ensure that the buffer is large enough to hold the content of the range.
+   */
+  void CopyOutRange(const Range& t, char* buffer_out);
+
+  /**
+   * Wait until data becomes available in the queue. The caller must have
+   * called Lock() before invoking this. The caller must call Unlock()
+   * after this returns.
+   */
+  void WaitForDataLocked(RegionBase* r);
+
+  /**
+   * Reserve space in the queue for writing. The caller must have called Lock()
+   * before invoking this. The caller must call Unlock() after this returns.
+   * Indexes pointing to the reserved space will be placed in range.
+   * On success this returns bytes.
+   * On failure a negative errno indicates the problem. -ENOSPC indicates that
+   * bytes > the queue size
+   */
+  intptr_t WriteReserveLocked(RegionBase* r, size_t bytes, Range* t);
+
+  // Advances when a reader has finished with buffer space
+  uint32_t r_released_;
+  // Advances when buffer space is filled and ready for a reader
+  uint32_t w_pub_;
+  // Spinlock that protects the region. 0 means unlocked
+  std::atomic<uint32_t> lock_;
+  // The actual memory in the buffer
+  char buffer_[BufferSize];
+};
+using CircularQueueBase64k = CircularQueueBase<16>;
+ASSERT_SHM_COMPATIBLE(CircularQueueBase64k, multi_region);
+
+/**
+ * Byte oriented circular queue. Reads will always return some data, but
+ * may return less data than requested. Writes will always write all of the
+ * data or return an error.
+ */
+template <uint32_t SizeLog2>
+class CircularByteQueue : public CircularQueueBase<SizeLog2> {
+ public:
+  /**
+   * Read at most max_size bytes from the qeueue, placing them in buffer_out
+   */
+  intptr_t Read(RegionBase* r, char* buffer_out, std::size_t max_size);
+  /**
+   * Write all of the given bytes into the queue. On success the return value
+   * will match bytes. On failure a negative errno is returned.
+   */
+  intptr_t Write(RegionBase* r, const char* buffer_in, std::size_t bytes);
+
+ protected:
+  using Range = typename CircularQueueBase<SizeLog2>::Range;
+};
+using CircularByteQueue64k = CircularByteQueue<16>;
+ASSERT_SHM_COMPATIBLE(CircularByteQueue64k, multi_region);
+
+/**
+ * Packet oriented circular queue. Reads will either return data or an error.
+ * Each return from read corresponds to a call to write and returns all of the
+ * data from that corresponding Write().
+ */
+template <uint32_t SizeLog2, uint32_t MaxPacketSize>
+class CircularPacketQueue : public CircularQueueBase<SizeLog2> {
+ public:
+  /**
+   * Read a single packet from the queue, placing its data into buffer_out.
+   * If max_size indicates that buffer_out cannot hold the entire packet
+   * this function will return -ENOSPC.
+   */
+  intptr_t Read(RegionBase* r, char* buffer_out, std::size_t max_size);
+
+  /**
+   * Writes [buffer_in, buffer_in + bytes) to the queue.
+   * If the number of bytes to be written exceeds the size of the queue
+   * -ENOSPC will be returned.
+   */
+  intptr_t Write(RegionBase* r, const char* buffer_in, uint32_t bytes);
+
+ protected:
+  static_assert(CircularQueueBase<SizeLog2>::BufferSize >= MaxPacketSize,
+                "Buffer is too small to hold the maximum sized packet");
+  using Range = typename CircularQueueBase<SizeLog2>::Range;
+  intptr_t CalculateBufferedSize(size_t payload);
+};
+using CircularPacketQueue64k = CircularPacketQueue<16, 1024>;
+ASSERT_SHM_COMPATIBLE(CircularPacketQueue64k, multi_region);
+
+}  // namespace layout
+}  // namespace vsoc
diff --git a/common/vsoc/shm/version.h b/common/vsoc/shm/version.h
index fe78e8a..6d09e80 100644
--- a/common/vsoc/shm/version.h
+++ b/common/vsoc/shm/version.h
@@ -16,13 +16,25 @@
  * limitations under the License.
  */
 
-// Version information for the vsoc layouts. This protects multiple things:
+// Version information for structures that are present in VSoC shared memory
+// windows. The proper use of this file will:
 //
-//   It ensures that the guest and host builds agree on the sizes of the shared
-//   structures.
+//   * ensure that the guest and host builds agree on the sizes of the shared
+//     structures.
 //
-//   It provides a single version code for the entire vsoc layout, assuming
-//   that reviewers excercise some care.
+//   * provides a single version code for the entire vsoc layout, assuming
+//     that reviewers excercise some care.
+//
+//
+//  Use:
+//
+//    Every new class / structure in the shm folder needs to add a size
+//    entry here, #include the base.h file, and add a ASSERT_SHM_COMPATIBLE
+//    instantiation just below the class definition,
+//
+//    For templatized classes / structs the author should choose a fixed size,
+//    create a using alias, and instantiate the checks on the alias.
+//    See CircularByteQueue64k for an example of this usage.
 //
 //   Note to reviewers:
 //
@@ -32,7 +44,8 @@
 //     However, the version must increment for any change in the value of a
 //     constant.
 //
-//     #ifdef, etc is absolutely forbidden
+//     #ifdef, etc is absolutely forbidden in this file and highly discouraged
+//     in the other vsoc/shm files.
 
 namespace vsoc {
 namespace layout {
@@ -48,6 +61,9 @@
 }  // namespace
 
 static const std::size_t Base_size = 1;
+static const std::size_t CircularQueueBase64k_size = 65548;
+static const std::size_t CircularByteQueue64k_size = 65548;
+static const std::size_t CircularPacketQueue64k_size = 65548;
 static const std::size_t PixelFormatRegister_size = 4;
 static const std::size_t PixelFormatMaskRegister_size = 8;
 static const std::size_t Sides_size = 4;