Align all message buffers to 64 bytes
This means asking flatbuffers for an aligned pointer in the message
actually works. flatbuffers only aligns relative to the end of the
buffer, and assumes that's aligned enough.
Change-Id: Ia055fddefea277697c37abafbac6f533fb8ec02e
diff --git a/aos/events/BUILD b/aos/events/BUILD
index fb0f356..a4c24ce 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -58,6 +58,7 @@
"//aos:configuration",
"//aos:configuration_fbs",
"//aos:flatbuffers",
+ "//aos/ipc_lib:data_alignment",
"//aos/time",
"//aos/util:phased_loop",
"@com_github_google_flatbuffers//:flatbuffers",
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index 4a12096..9618a32 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -11,6 +11,7 @@
#include "aos/events/event_loop_generated.h"
#include "aos/events/timing_statistics.h"
#include "aos/flatbuffers.h"
+#include "aos/ipc_lib/data_alignment.h"
#include "aos/json_to_flatbuffer.h"
#include "aos/time/time.h"
#include "aos/util/phased_loop.h"
@@ -141,6 +142,13 @@
// Returns the queue index that this was sent with.
uint32_t sent_queue_index() const { return sent_queue_index_; }
+ // Returns the associated flatbuffers-style allocator. This must be
+ // deallocated before the message is sent.
+ PreallocatedAllocator *fbb_allocator() {
+ fbb_allocator_ = PreallocatedAllocator(data(), size());
+ return &fbb_allocator_;
+ }
+
protected:
EventLoop *event_loop() { return event_loop_; }
@@ -166,6 +174,8 @@
const Channel *channel_;
internal::RawSenderTiming timing_;
+
+ PreallocatedAllocator fbb_allocator_{nullptr, 0};
};
// Fetches the newest message from a channel.
@@ -177,12 +187,26 @@
// Fetches the next message. Returns true if it fetched a new message. This
// method will only return messages sent after the Fetcher was created.
- bool FetchNext() { return fetcher_->FetchNext(); }
+ bool FetchNext() {
+ const bool result = fetcher_->FetchNext();
+ if (result) {
+ CheckChannelDataAlignment(fetcher_->context().data,
+ fetcher_->context().size);
+ }
+ return result;
+ }
// Fetches the most recent message. Returns true if it fetched a new message.
// This will return the latest message regardless of if it was sent before or
// after the fetcher was created.
- bool Fetch() { return fetcher_->Fetch(); }
+ bool Fetch() {
+ const bool result = fetcher_->Fetch();
+ if (result) {
+ CheckChannelDataAlignment(fetcher_->context().data,
+ fetcher_->context().size);
+ }
+ return result;
+ }
// Returns a pointer to the contained flatbuffer, or nullptr if there is no
// available message.
@@ -222,10 +246,17 @@
// builder.Send(t_builder.Finish());
class Builder {
public:
- Builder(RawSender *sender, void *data, size_t size)
- : alloc_(data, size), fbb_(size, &alloc_), sender_(sender) {
+ Builder(RawSender *sender, PreallocatedAllocator *allocator)
+ : fbb_(allocator->size(), allocator), sender_(sender) {
+ CheckChannelDataAlignment(allocator->data(), allocator->size());
fbb_.ForceDefaults(1);
}
+ Builder() {}
+ Builder(const Builder &) = delete;
+ Builder(Builder &&) = default;
+
+ Builder &operator=(const Builder &) = delete;
+ Builder &operator=(Builder &&) = default;
flatbuffers::FlatBufferBuilder *fbb() { return &fbb_; }
@@ -243,7 +274,6 @@
void CheckSent() { fbb_.Finished(); }
private:
- PreallocatedAllocator alloc_;
flatbuffers::FlatBufferBuilder fbb_;
RawSender *sender_;
};
diff --git a/aos/events/event_loop_tmpl.h b/aos/events/event_loop_tmpl.h
index c2c9884..dfb5a6d 100644
--- a/aos/events/event_loop_tmpl.h
+++ b/aos/events/event_loop_tmpl.h
@@ -23,7 +23,7 @@
template <typename T>
typename Sender<T>::Builder Sender<T>::MakeBuilder() {
- return Builder(sender_.get(), sender_->data(), sender_->size());
+ return Builder(sender_.get(), sender_->fbb_allocator());
}
template <typename Watch>
@@ -193,6 +193,7 @@
// context.
void DoCallCallback(std::function<monotonic_clock::time_point()> get_time,
Context context) {
+ CheckChannelDataAlignment(context.data, context.size);
const monotonic_clock::time_point monotonic_start_time = get_time();
{
const float start_latency =
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index cc11520..0db7291 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -184,8 +184,8 @@
event_loop->configuration()->channel_storage_duration()))),
lockless_queue_(lockless_queue_memory_.memory(),
lockless_queue_memory_.config()),
- data_storage_(static_cast<AlignedChar *>(aligned_alloc(
- alignof(AlignedChar), channel->max_size())),
+ data_storage_(static_cast<char *>(malloc(channel->max_size() +
+ kChannelDataAlignment - 1)),
&free) {
context_.data = nullptr;
// Point the queue index at the next index to read starting now. This
@@ -217,7 +217,7 @@
actual_queue_index_.index(), &context_.monotonic_event_time,
&context_.realtime_event_time, &context_.monotonic_remote_time,
&context_.realtime_remote_time, &context_.remote_queue_index,
- &context_.size, reinterpret_cast<char *>(data_storage_.get()));
+ &context_.size, data_storage_start());
if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
context_.queue_index = actual_queue_index_.index();
if (context_.remote_queue_index == 0xffffffffu) {
@@ -229,7 +229,7 @@
if (context_.realtime_remote_time == aos::realtime_clock::min_time) {
context_.realtime_remote_time = context_.realtime_event_time;
}
- context_.data = reinterpret_cast<char *>(data_storage_.get()) +
+ context_.data = data_storage_start() +
lockless_queue_.message_data_size() - context_.size;
actual_queue_index_ = actual_queue_index_.Increment();
}
@@ -267,7 +267,7 @@
queue_index.index(), &context_.monotonic_event_time,
&context_.realtime_event_time, &context_.monotonic_remote_time,
&context_.realtime_remote_time, &context_.remote_queue_index,
- &context_.size, reinterpret_cast<char *>(data_storage_.get()));
+ &context_.size, data_storage_start());
if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
context_.queue_index = queue_index.index();
if (context_.remote_queue_index == 0xffffffffu) {
@@ -279,7 +279,7 @@
if (context_.realtime_remote_time == aos::realtime_clock::min_time) {
context_.realtime_remote_time = context_.realtime_event_time;
}
- context_.data = reinterpret_cast<char *>(data_storage_.get()) +
+ context_.data = data_storage_start() +
lockless_queue_.message_data_size() - context_.size;
actual_queue_index_ = queue_index.Increment();
}
@@ -315,6 +315,10 @@
void UnregisterWakeup() { lockless_queue_.UnregisterWakeup(); }
private:
+ char *data_storage_start() {
+ return RoundChannelData(data_storage_.get(), channel_->max_size());
+ }
+
const Channel *const channel_;
MMapedQueue lockless_queue_memory_;
ipc_lib::LocklessQueue lockless_queue_;
@@ -322,14 +326,7 @@
ipc_lib::QueueIndex actual_queue_index_ =
ipc_lib::LocklessQueue::empty_queue_index();
- struct AlignedChar {
- // Cortex-A72 (Raspberry Pi 4) and Cortex-A53 (Xavier AGX) both have 64 byte
- // cache lines.
- // V4L2 requires 64 byte alignment for USERPTR.
- alignas(64) char data;
- };
-
- std::unique_ptr<AlignedChar, decltype(&free)> data_storage_;
+ std::unique_ptr<char, decltype(&free)> data_storage_;
Context context_;
};
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index 108a485..c857bb6 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -14,19 +14,17 @@
// Container for both a message, and the context for it for simulation. This
// makes tracking the timestamps associated with the data easy.
struct SimulatedMessage {
- // Struct to let us force data to be well aligned.
- struct OveralignedChar {
- char data alignas(64);
- };
-
// Context for the data.
Context context;
// The data.
- char *data() { return reinterpret_cast<char *>(&actual_data[0]); }
+ char *data(size_t buffer_size) {
+ return RoundChannelData(&actual_data[0], buffer_size);
+ }
- // Then the data.
- OveralignedChar actual_data[];
+ // Then the data, including padding on the end so we can align the buffer we
+ // actually return from data().
+ char actual_data[];
};
class SimulatedEventLoop;
@@ -125,9 +123,9 @@
// This is a shared_ptr so we don't have to implement refcounting or copying.
std::shared_ptr<SimulatedMessage> MakeSimulatedMessage(size_t size) {
SimulatedMessage *message = reinterpret_cast<SimulatedMessage *>(
- malloc(sizeof(SimulatedMessage) + size));
+ malloc(sizeof(SimulatedMessage) + size + kChannelDataAlignment - 1));
message->context.size = size;
- message->context.data = message->data();
+ message->context.data = message->data(size);
return std::shared_ptr<SimulatedMessage>(message, free);
}
@@ -144,7 +142,7 @@
if (!message_) {
message_ = MakeSimulatedMessage(simulated_channel_->max_size());
}
- return message_->data();
+ return message_->data(simulated_channel_->max_size());
}
size_t size() override { return simulated_channel_->max_size(); }
@@ -187,7 +185,9 @@
// Now fill in the message. size is already populated above, and
// queue_index will be populated in simulated_channel_. Put this at the
// back of the data segment.
- memcpy(message_->data() + simulated_channel_->max_size() - size, msg, size);
+ memcpy(message_->data(simulated_channel_->max_size()) +
+ simulated_channel_->max_size() - size,
+ msg, size);
return DoSend(size, monotonic_remote_time, realtime_remote_time,
remote_queue_index);
@@ -606,8 +606,8 @@
uint32_t SimulatedChannel::Send(std::shared_ptr<SimulatedMessage> message) {
const uint32_t queue_index = next_queue_index_.index();
message->context.queue_index = queue_index;
- message->context.data =
- message->data() + channel()->max_size() - message->context.size;
+ message->context.data = message->data(channel()->max_size()) +
+ channel()->max_size() - message->context.size;
next_queue_index_ = next_queue_index_.Increment();
latest_message_ = message;
diff --git a/aos/flatbuffers.h b/aos/flatbuffers.h
index c1c0db0..d9fcab6 100644
--- a/aos/flatbuffers.h
+++ b/aos/flatbuffers.h
@@ -12,6 +12,8 @@
// This class is a base class for all sizes of array backed allocators.
class FixedAllocatorBase : public flatbuffers::Allocator {
public:
+ ~FixedAllocatorBase() override { CHECK(!is_allocated_); }
+
// TODO(austin): Read the contract for these.
uint8_t *allocate(size_t) override;
@@ -25,6 +27,7 @@
virtual size_t size() const = 0;
void Reset() { is_allocated_ = false; }
+ bool is_allocated() const { return is_allocated_; }
private:
bool is_allocated_ = false;
@@ -51,14 +54,32 @@
class PreallocatedAllocator : public FixedAllocatorBase {
public:
PreallocatedAllocator(void *data, size_t size) : data_(data), size_(size) {}
- uint8_t *data() override { return reinterpret_cast<uint8_t *>(data_); }
- const uint8_t *data() const override {
- return reinterpret_cast<const uint8_t *>(data_);
+ PreallocatedAllocator(const PreallocatedAllocator&) = delete;
+ PreallocatedAllocator(PreallocatedAllocator &&other)
+ : data_(other.data_), size_(other.size_) {
+ CHECK(!is_allocated());
+ CHECK(!other.is_allocated());
}
- size_t size() const override { return size_; }
+
+ PreallocatedAllocator &operator=(const PreallocatedAllocator &) = delete;
+ PreallocatedAllocator &operator=(PreallocatedAllocator &&other) {
+ CHECK(!is_allocated());
+ CHECK(!other.is_allocated());
+ data_ = other.data_;
+ size_ = other.size_;
+ return *this;
+ }
+
+ uint8_t *data() final {
+ return reinterpret_cast<uint8_t *>(CHECK_NOTNULL(data_));
+ }
+ const uint8_t *data() const final {
+ return reinterpret_cast<const uint8_t *>(CHECK_NOTNULL(data_));
+ }
+ size_t size() const final { return size_; }
private:
- void* data_;
+ void *data_;
size_t size_;
};
diff --git a/aos/ipc_lib/BUILD b/aos/ipc_lib/BUILD
index 57adc6d..91b050c 100644
--- a/aos/ipc_lib/BUILD
+++ b/aos/ipc_lib/BUILD
@@ -204,6 +204,7 @@
visibility = ["//visibility:public"],
deps = [
":aos_sync",
+ ":data_alignment",
":index",
"//aos:realtime",
"//aos/time",
@@ -259,3 +260,14 @@
"//aos/testing:test_logging",
],
)
+
+cc_library(
+ name = "data_alignment",
+ hdrs = [
+ "data_alignment.h",
+ ],
+ visibility = ["//visibility:public"],
+ deps = [
+ "@com_github_google_glog//:glog",
+ ],
+)
diff --git a/aos/ipc_lib/data_alignment.h b/aos/ipc_lib/data_alignment.h
new file mode 100644
index 0000000..2f59b78
--- /dev/null
+++ b/aos/ipc_lib/data_alignment.h
@@ -0,0 +1,41 @@
+#ifndef AOS_IPC_LIB_DATA_ALIGNMENT_H_
+#define AOS_IPC_LIB_DATA_ALIGNMENT_H_
+
+#include "glog/logging.h"
+
+namespace aos {
+
+// All data buffers sent over or received from a channel will guarantee this
+// alignment for their end. Flatbuffers aligns from the end, so this is what
+// matters.
+//
+// 64 is a reasonable choice for now:
+// Cortex-A72 (Raspberry Pi 4) and Cortex-A53 (Xavier AGX) both have 64 byte
+// cache lines.
+// V4L2 requires 64 byte alignment for USERPTR buffers.
+static constexpr size_t kChannelDataAlignment = 64;
+
+template <typename T>
+inline void CheckChannelDataAlignment(T *data, size_t size) {
+ CHECK_EQ((reinterpret_cast<uintptr_t>(data) + size) % kChannelDataAlignment,
+ 0u)
+ << ": data pointer is not end aligned as it should be: " << data << " + "
+ << size;
+}
+
+// Aligns the beginning of a channel data buffer. There must be
+// kChannelDataAlignment-1 extra bytes beyond the end to potentially use after
+// aligning it.
+inline char *RoundChannelData(char *data, size_t size) {
+ const uintptr_t data_value = reinterpret_cast<uintptr_t>(data);
+ const uintptr_t data_end = data_value + size;
+ const uintptr_t data_end_max = data_end + (kChannelDataAlignment - 1);
+ const uintptr_t rounded_data_end =
+ data_end_max - (data_end_max % kChannelDataAlignment);
+ const uintptr_t rounded_data = rounded_data_end - size;
+ return reinterpret_cast<char *>(rounded_data);
+}
+
+} // namespace aos
+
+#endif // AOS_IPC_LIB_DATA_ALIGNMENT_H_
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index 903150b..c323b8b 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -241,7 +241,8 @@
size_t LocklessQueueConfiguration::message_size() const {
// Round up the message size so following data is aligned appropriately.
- return LocklessQueueMemory::AlignmentRoundUp(message_data_size) +
+ return LocklessQueueMemory::AlignmentRoundUp(message_data_size +
+ (kChannelDataAlignment - 1)) +
sizeof(Message);
}
@@ -549,7 +550,7 @@
Message *message = memory_->GetMessage(scratch_index);
message->header.queue_index.Invalidate();
- return &message->data[0];
+ return message->data(memory_->message_data_size());
}
void LocklessQueue::Sender::Send(
@@ -788,7 +789,7 @@
}
*monotonic_remote_time = m->header.monotonic_remote_time;
*realtime_remote_time = m->header.realtime_remote_time;
- memcpy(data, &m->data[0], message_data_size());
+ memcpy(data, m->data(memory_->message_data_size()), message_data_size());
*length = m->header.length;
// And finally, confirm that the message *still* points to the queue index we
@@ -891,8 +892,9 @@
::std::cout << " }" << ::std::endl;
::std::cout << " data: {";
+ const char *const m_data = m->data(memory->message_data_size());
for (size_t j = 0; j < m->header.length; ++j) {
- char data = m->data[j];
+ char data = m_data[j];
if (j != 0) {
::std::cout << " ";
}
diff --git a/aos/ipc_lib/lockless_queue.h b/aos/ipc_lib/lockless_queue.h
index 976f758..0384aa8 100644
--- a/aos/ipc_lib/lockless_queue.h
+++ b/aos/ipc_lib/lockless_queue.h
@@ -7,6 +7,7 @@
#include <vector>
#include "aos/ipc_lib/aos_sync.h"
+#include "aos/ipc_lib/data_alignment.h"
#include "aos/ipc_lib/index.h"
#include "aos/time/time.h"
@@ -76,7 +77,19 @@
size_t length;
} header;
- char data[];
+ char *data(size_t message_size) { return RoundedData(message_size); }
+ const char *data(size_t message_size) const {
+ return RoundedData(message_size);
+ }
+
+ private:
+ // This returns a non-const pointer into a const object. Be very careful about
+ // const correctness in publicly accessible APIs using it.
+ char *RoundedData(size_t message_size) const {
+ return RoundChannelData(const_cast<char *>(&data_pointer[0]), message_size);
+ }
+
+ char data_pointer[];
};
struct LocklessQueueConfiguration {
diff --git a/aos/ipc_lib/lockless_queue_memory.h b/aos/ipc_lib/lockless_queue_memory.h
index 0c0973c..cbe76a7 100644
--- a/aos/ipc_lib/lockless_queue_memory.h
+++ b/aos/ipc_lib/lockless_queue_memory.h
@@ -89,18 +89,24 @@
// Getters for each of the 4 lists.
Sender *GetSender(size_t sender_index) {
+ static_assert(alignof(Sender) <= kDataAlignment,
+ "kDataAlignment is too small");
return reinterpret_cast<Sender *>(&data[0] + SizeOfQueue() +
SizeOfMessages() + SizeOfWatchers() +
sender_index * sizeof(Sender));
}
Watcher *GetWatcher(size_t watcher_index) {
+ static_assert(alignof(Watcher) <= kDataAlignment,
+ "kDataAlignment is too small");
return reinterpret_cast<Watcher *>(&data[0] + SizeOfQueue() +
SizeOfMessages() +
watcher_index * sizeof(Watcher));
}
AtomicIndex *GetQueue(uint32_t index) {
+ static_assert(alignof(AtomicIndex) <= kDataAlignment,
+ "kDataAlignment is too small");
return reinterpret_cast<AtomicIndex *>(&data[0] +
sizeof(AtomicIndex) * index);
}
@@ -109,6 +115,8 @@
// sender list, since those are messages available to be filled in and sent.
// This removes the need to find lost messages when a sender dies.
Message *GetMessage(Index index) {
+ static_assert(alignof(Message) <= kDataAlignment,
+ "kDataAlignment is too small");
return reinterpret_cast<Message *>(&data[0] + SizeOfQueue() +
index.message_index() * message_size());
}