Align all message buffers to 64 bytes

This means asking flatbuffers for an aligned pointer in the message
actually works. flatbuffers only aligns relative to the end of the
buffer, and assumes that's aligned enough.

Change-Id: Ia055fddefea277697c37abafbac6f533fb8ec02e
diff --git a/aos/ipc_lib/BUILD b/aos/ipc_lib/BUILD
index 57adc6d..91b050c 100644
--- a/aos/ipc_lib/BUILD
+++ b/aos/ipc_lib/BUILD
@@ -204,6 +204,7 @@
     visibility = ["//visibility:public"],
     deps = [
         ":aos_sync",
+        ":data_alignment",
         ":index",
         "//aos:realtime",
         "//aos/time",
@@ -259,3 +260,14 @@
         "//aos/testing:test_logging",
     ],
 )
+
+cc_library(
+    name = "data_alignment",
+    hdrs = [
+        "data_alignment.h",
+    ],
+    visibility = ["//visibility:public"],
+    deps = [
+        "@com_github_google_glog//:glog",
+    ],
+)
diff --git a/aos/ipc_lib/data_alignment.h b/aos/ipc_lib/data_alignment.h
new file mode 100644
index 0000000..2f59b78
--- /dev/null
+++ b/aos/ipc_lib/data_alignment.h
@@ -0,0 +1,41 @@
+#ifndef AOS_IPC_LIB_DATA_ALIGNMENT_H_
+#define AOS_IPC_LIB_DATA_ALIGNMENT_H_
+
+#include "glog/logging.h"
+
+namespace aos {
+
+// All data buffers sent over or received from a channel will guarantee this
+// alignment for their end. Flatbuffers aligns from the end, so this is what
+// matters.
+//
+// 64 is a reasonable choice for now:
+//   Cortex-A72 (Raspberry Pi 4) and Cortex-A53 (Xavier AGX) both have 64 byte
+//   cache lines.
+//   V4L2 requires 64 byte alignment for USERPTR buffers.
+static constexpr size_t kChannelDataAlignment = 64;
+
+template <typename T>
+inline void CheckChannelDataAlignment(T *data, size_t size) {
+  CHECK_EQ((reinterpret_cast<uintptr_t>(data) + size) % kChannelDataAlignment,
+           0u)
+      << ": data pointer is not end aligned as it should be: " << data << " + "
+      << size;
+}
+
+// Aligns the beginning of a channel data buffer. There must be
+// kChannelDataAlignment-1 extra bytes beyond the end to potentially use after
+// aligning it.
+inline char *RoundChannelData(char *data, size_t size) {
+  const uintptr_t data_value = reinterpret_cast<uintptr_t>(data);
+  const uintptr_t data_end = data_value + size;
+  const uintptr_t data_end_max = data_end + (kChannelDataAlignment - 1);
+  const uintptr_t rounded_data_end =
+      data_end_max - (data_end_max % kChannelDataAlignment);
+  const uintptr_t rounded_data = rounded_data_end - size;
+  return reinterpret_cast<char *>(rounded_data);
+}
+
+}  // namespace aos
+
+#endif  // AOS_IPC_LIB_DATA_ALIGNMENT_H_
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index 903150b..c323b8b 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -241,7 +241,8 @@
 
 size_t LocklessQueueConfiguration::message_size() const {
   // Round up the message size so following data is aligned appropriately.
-  return LocklessQueueMemory::AlignmentRoundUp(message_data_size) +
+  return LocklessQueueMemory::AlignmentRoundUp(message_data_size +
+                                               (kChannelDataAlignment - 1)) +
          sizeof(Message);
 }
 
@@ -549,7 +550,7 @@
   Message *message = memory_->GetMessage(scratch_index);
   message->header.queue_index.Invalidate();
 
-  return &message->data[0];
+  return message->data(memory_->message_data_size());
 }
 
 void LocklessQueue::Sender::Send(
@@ -788,7 +789,7 @@
   }
   *monotonic_remote_time = m->header.monotonic_remote_time;
   *realtime_remote_time = m->header.realtime_remote_time;
-  memcpy(data, &m->data[0], message_data_size());
+  memcpy(data, m->data(memory_->message_data_size()), message_data_size());
   *length = m->header.length;
 
   // And finally, confirm that the message *still* points to the queue index we
@@ -891,8 +892,9 @@
     ::std::cout << "      }" << ::std::endl;
     ::std::cout << "      data: {";
 
+    const char *const m_data = m->data(memory->message_data_size());
     for (size_t j = 0; j < m->header.length; ++j) {
-      char data = m->data[j];
+      char data = m_data[j];
       if (j != 0) {
         ::std::cout << " ";
       }
diff --git a/aos/ipc_lib/lockless_queue.h b/aos/ipc_lib/lockless_queue.h
index 976f758..0384aa8 100644
--- a/aos/ipc_lib/lockless_queue.h
+++ b/aos/ipc_lib/lockless_queue.h
@@ -7,6 +7,7 @@
 #include <vector>
 
 #include "aos/ipc_lib/aos_sync.h"
+#include "aos/ipc_lib/data_alignment.h"
 #include "aos/ipc_lib/index.h"
 #include "aos/time/time.h"
 
@@ -76,7 +77,19 @@
     size_t length;
   } header;
 
-  char data[];
+  char *data(size_t message_size) { return RoundedData(message_size); }
+  const char *data(size_t message_size) const {
+    return RoundedData(message_size);
+  }
+
+ private:
+  // This returns a non-const pointer into a const object. Be very careful about
+  // const correctness in publicly accessible APIs using it.
+  char *RoundedData(size_t message_size) const {
+    return RoundChannelData(const_cast<char *>(&data_pointer[0]), message_size);
+  }
+
+  char data_pointer[];
 };
 
 struct LocklessQueueConfiguration {
diff --git a/aos/ipc_lib/lockless_queue_memory.h b/aos/ipc_lib/lockless_queue_memory.h
index 0c0973c..cbe76a7 100644
--- a/aos/ipc_lib/lockless_queue_memory.h
+++ b/aos/ipc_lib/lockless_queue_memory.h
@@ -89,18 +89,24 @@
 
   // Getters for each of the 4 lists.
   Sender *GetSender(size_t sender_index) {
+    static_assert(alignof(Sender) <= kDataAlignment,
+                  "kDataAlignment is too small");
     return reinterpret_cast<Sender *>(&data[0] + SizeOfQueue() +
                                       SizeOfMessages() + SizeOfWatchers() +
                                       sender_index * sizeof(Sender));
   }
 
   Watcher *GetWatcher(size_t watcher_index) {
+    static_assert(alignof(Watcher) <= kDataAlignment,
+                  "kDataAlignment is too small");
     return reinterpret_cast<Watcher *>(&data[0] + SizeOfQueue() +
                                        SizeOfMessages() +
                                        watcher_index * sizeof(Watcher));
   }
 
   AtomicIndex *GetQueue(uint32_t index) {
+    static_assert(alignof(AtomicIndex) <= kDataAlignment,
+                  "kDataAlignment is too small");
     return reinterpret_cast<AtomicIndex *>(&data[0] +
                                            sizeof(AtomicIndex) * index);
   }
@@ -109,6 +115,8 @@
   // sender list, since those are messages available to be filled in and sent.
   // This removes the need to find lost messages when a sender dies.
   Message *GetMessage(Index index) {
+    static_assert(alignof(Message) <= kDataAlignment,
+                  "kDataAlignment is too small");
     return reinterpret_cast<Message *>(&data[0] + SizeOfQueue() +
                                        index.message_index() * message_size());
   }