Merge "Add a utility to print out contents of queue memory"
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 8e5e27c..7125bda 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -461,7 +461,7 @@
}
} else {
if (!NextLogFile()) {
- VLOG(1) << "End of log file " << filenames_.back();
+ VLOG(1) << "No more files, last was " << filenames_.back();
at_end_ = true;
for (MessageHeaderQueue *queue : channels_to_write_) {
if (queue == nullptr || queue->timestamp_merger == nullptr) {
@@ -541,7 +541,8 @@
std::move(channels_[channel_index].data.front());
channels_[channel_index].data.pop_front();
- VLOG(1) << "Popped " << this << " " << std::get<0>(timestamp);
+ VLOG(1) << "Popped " << this << " " << std::get<0>(timestamp) << " for "
+ << channel_index;
QueueMessages(std::get<0>(timestamp));
@@ -559,7 +560,8 @@
std::move(channels_[channel].timestamps[node_index].front());
channels_[channel].timestamps[node_index].pop_front();
- VLOG(1) << "Popped " << this << " " << std::get<0>(timestamp);
+ VLOG(1) << "Popped " << this << " " << std::get<0>(timestamp) << " for "
+ << channel << " on " << node_index;
QueueMessages(std::get<0>(timestamp));
@@ -693,7 +695,7 @@
// If we are just a data merger, don't wait for timestamps.
if (!has_timestamps_) {
- channel_merger_->Update(std::get<0>(timestamp), channel_index_);
+ channel_merger_->Update(std::get<0>(message_heap_[0]), channel_index_);
pushed_ = true;
}
}
@@ -737,7 +739,7 @@
// If we are a timestamp merger, don't wait for data. Missing data will be
// caught at read time.
if (has_timestamps_) {
- channel_merger_->Update(std::get<0>(timestamp), channel_index_);
+ channel_merger_->Update(std::get<0>(timestamp_heap_[0]), channel_index_);
pushed_ = true;
}
}
@@ -766,7 +768,7 @@
CHECK_EQ(std::get<1>(oldest_message), std::get<1>(oldest_message_reader));
// Now, keep reading until we have found all duplicates.
- while (message_heap_.size() > 0u) {
+ while (!message_heap_.empty()) {
// See if it is a duplicate.
std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
next_oldest_message_reader = message_heap_.front();
@@ -888,7 +890,7 @@
std::get<2>(oldest_timestamp).message().monotonic_remote_time()));
// See if we have any data. If not, pass the problem up the chain.
- if (message_heap_.size() == 0u) {
+ if (message_heap_.empty()) {
VLOG(1) << "No data to match timestamp on "
<< configuration::CleanedChannelToString(
configuration_->channels()->Get(channel_index_));
@@ -1072,14 +1074,14 @@
}
monotonic_clock::time_point ChannelMerger::OldestMessage() const {
- if (channel_heap_.size() == 0u) {
+ if (channel_heap_.empty()) {
return monotonic_clock::max_time;
}
return channel_heap_.front().first;
}
TimestampMerger::DeliveryTimestamp ChannelMerger::OldestTimestamp() const {
- if (timestamp_heap_.size() == 0u) {
+ if (timestamp_heap_.empty()) {
return TimestampMerger::DeliveryTimestamp{};
}
return timestamp_mergers_[timestamp_heap_.front().second].OldestTimestamp();
@@ -1101,20 +1103,34 @@
// Pop and recreate the heap if it has already been pushed. And since we are
// pushing again, we don't need to clear pushed.
if (timestamp_mergers_[channel_index].pushed()) {
- channel_heap_.erase(std::find_if(
+ const auto channel_iterator = std::find_if(
channel_heap_.begin(), channel_heap_.end(),
[channel_index](const std::pair<monotonic_clock::time_point, int> x) {
return x.second == channel_index;
- }));
+ });
+ DCHECK(channel_iterator != channel_heap_.end());
+ if (std::get<0>(*channel_iterator) == timestamp) {
+ // It's already in the heap, in the correct spot, so nothing
+ // more for us to do here.
+ return;
+ }
+ channel_heap_.erase(channel_iterator);
std::make_heap(channel_heap_.begin(), channel_heap_.end(),
ChannelHeapCompare);
if (timestamp_mergers_[channel_index].has_timestamps()) {
- timestamp_heap_.erase(std::find_if(
+ const auto timestamp_iterator = std::find_if(
timestamp_heap_.begin(), timestamp_heap_.end(),
[channel_index](const std::pair<monotonic_clock::time_point, int> x) {
return x.second == channel_index;
- }));
+ });
+ DCHECK(timestamp_iterator != timestamp_heap_.end());
+ if (std::get<0>(*timestamp_iterator) == timestamp) {
+ // It's already in the heap, in the correct spot, so nothing
+ // more for us to do here.
+ return;
+ }
+ timestamp_heap_.erase(timestamp_iterator);
std::make_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
ChannelHeapCompare);
}
@@ -1164,6 +1180,10 @@
std::tuple<TimestampMerger::DeliveryTimestamp,
FlatbufferVector<MessageHeader>>
message = merger->PopOldest();
+ DCHECK_EQ(std::get<0>(message).monotonic_event_time,
+ oldest_channel_data.first)
+ << ": channel_heap_ was corrupted for " << channel_index << ": "
+ << DebugString();
return std::make_tuple(std::get<0>(message), channel_index,
std::move(std::get<1>(message)));
@@ -1247,7 +1267,7 @@
std::vector<
std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>>
message_heap = message_heap_;
- while (message_heap.size() > 0u) {
+ while (!message_heap.empty()) {
std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
oldest_message_reader = message_heap.front();
@@ -1276,7 +1296,7 @@
ss << "channel_heap {\n";
std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap =
channel_heap_;
- while (channel_heap.size() > 0u) {
+ while (!channel_heap.empty()) {
std::tuple<monotonic_clock::time_point, int> channel = channel_heap.front();
ss << " " << std::get<0>(channel) << " (" << std::get<1>(channel) << ") "
<< configuration::CleanedChannelToString(
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index e767427..8969ee2 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -599,7 +599,9 @@
// to timestamps on log files where the timestamp log file starts before the
// data. In this case, it is reasonable to expect missing data.
ignore_missing_data_ = true;
+ VLOG(1) << "Running until start time: " << start_time;
event_loop_factory_->RunFor(start_time.time_since_epoch());
+ VLOG(1) << "At start time";
// Now that we are running for real, missing data means that the log file is
// corrupted or went wrong.
ignore_missing_data_ = false;
diff --git a/aos/events/pingpong.json b/aos/events/pingpong.json
index 0cdba93..db727c6 100644
--- a/aos/events/pingpong.json
+++ b/aos/events/pingpong.json
@@ -2,11 +2,13 @@
"channels": [
{
"name": "/test",
- "type": "aos.examples.Ping"
+ "type": "aos.examples.Ping",
+ "frequency": 2000
},
{
"name": "/test",
- "type": "aos.examples.Pong"
+ "type": "aos.examples.Pong",
+ "frequency": 2000
}
],
"imports": [
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index 2383050..d2c9112 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -190,8 +190,7 @@
class SimpleShmFetcher {
public:
- explicit SimpleShmFetcher(ShmEventLoop *event_loop, const Channel *channel,
- bool copy_data)
+ explicit SimpleShmFetcher(ShmEventLoop *event_loop, const Channel *channel)
: event_loop_(event_loop),
channel_(channel),
lockless_queue_memory_(
@@ -200,10 +199,6 @@
event_loop->configuration()->channel_storage_duration()))),
lockless_queue_(lockless_queue_memory_.memory(),
lockless_queue_memory_.config()) {
- if (copy_data) {
- data_storage_.reset(static_cast<char *>(
- malloc(channel->max_size() + kChannelDataAlignment - 1)));
- }
context_.data = nullptr;
// Point the queue index at the next index to read starting now. This
// makes it such that FetchNext will read the next message sent after
@@ -213,6 +208,13 @@
~SimpleShmFetcher() {}
+ // Sets this object to copy data out of the shared memory into a private
+ // buffer when fetching.
+ void CopyDataOnFetch() {
+ data_storage_.reset(static_cast<char *>(
+ malloc(channel_->max_size() + kChannelDataAlignment - 1)));
+ }
+
// Points the next message to fetch at the queue index which will be
// populated next.
void PointAtNextQueueIndex() {
@@ -228,47 +230,8 @@
}
bool FetchNext() {
- // TODO(austin): Get behind and make sure it dies both here and with
- // Fetch.
- ipc_lib::LocklessQueue::ReadResult read_result = lockless_queue_.Read(
- actual_queue_index_.index(), &context_.monotonic_event_time,
- &context_.realtime_event_time, &context_.monotonic_remote_time,
- &context_.realtime_remote_time, &context_.remote_queue_index,
- &context_.size, data_storage_start());
- if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
- context_.queue_index = actual_queue_index_.index();
- if (context_.remote_queue_index == 0xffffffffu) {
- context_.remote_queue_index = context_.queue_index;
- }
- if (context_.monotonic_remote_time == aos::monotonic_clock::min_time) {
- context_.monotonic_remote_time = context_.monotonic_event_time;
- }
- if (context_.realtime_remote_time == aos::realtime_clock::min_time) {
- context_.realtime_remote_time = context_.realtime_event_time;
- }
- if (copy_data()) {
- context_.data = data_storage_start() +
- lockless_queue_.message_data_size() - context_.size;
- } else {
- context_.data = nullptr;
- }
- actual_queue_index_ = actual_queue_index_.Increment();
- }
-
- // Make sure the data wasn't modified while we were reading it. This
- // can only happen if you are reading the last message *while* it is
- // being written to, which means you are pretty far behind.
- CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::OVERWROTE)
- << ": Got behind while reading and the last message was modified "
- "out from under us while we were reading it. Don't get so far "
- "behind. "
- << configuration::CleanedChannelToString(channel_);
-
- if (read_result == ipc_lib::LocklessQueue::ReadResult::TOO_OLD) {
- event_loop_->SendTimingReport();
- LOG(FATAL) << "The next message is no longer available. "
- << configuration::CleanedChannelToString(channel_);
- }
+ const ipc_lib::LocklessQueue::ReadResult read_result =
+ DoFetch(actual_queue_index_);
return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD;
}
@@ -287,53 +250,12 @@
return false;
}
- ipc_lib::LocklessQueue::ReadResult read_result = lockless_queue_.Read(
- queue_index.index(), &context_.monotonic_event_time,
- &context_.realtime_event_time, &context_.monotonic_remote_time,
- &context_.realtime_remote_time, &context_.remote_queue_index,
- &context_.size, data_storage_start());
- if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
- context_.queue_index = queue_index.index();
- if (context_.remote_queue_index == 0xffffffffu) {
- context_.remote_queue_index = context_.queue_index;
- }
- if (context_.monotonic_remote_time == aos::monotonic_clock::min_time) {
- context_.monotonic_remote_time = context_.monotonic_event_time;
- }
- if (context_.realtime_remote_time == aos::realtime_clock::min_time) {
- context_.realtime_remote_time = context_.realtime_event_time;
- }
- if (copy_data()) {
- context_.data = data_storage_start() +
- lockless_queue_.message_data_size() - context_.size;
- } else {
- context_.data = nullptr;
- }
- actual_queue_index_ = queue_index.Increment();
- }
-
- // Make sure the data wasn't modified while we were reading it. This
- // can only happen if you are reading the last message *while* it is
- // being written to, which means you are pretty far behind.
- CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::OVERWROTE)
- << ": Got behind while reading and the last message was modified "
- "out from under us while we were reading it. Don't get so far "
- "behind."
- << configuration::CleanedChannelToString(channel_);
+ const ipc_lib::LocklessQueue::ReadResult read_result = DoFetch(queue_index);
CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::NOTHING_NEW)
<< ": Queue index went backwards. This should never happen. "
<< configuration::CleanedChannelToString(channel_);
- // We fell behind between when we read the index and read the value.
- // This isn't worth recovering from since this means we went to sleep
- // for a long time in the middle of this function.
- if (read_result == ipc_lib::LocklessQueue::ReadResult::TOO_OLD) {
- event_loop_->SendTimingReport();
- LOG(FATAL) << "The next message is no longer available. "
- << configuration::CleanedChannelToString(channel_);
- }
-
return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD;
}
@@ -350,17 +272,83 @@
}
absl::Span<char> GetPrivateMemory() const {
- CHECK(copy_data());
+ // Can't usefully expose this for pinning, because the buffer changes
+ // address for each message. Callers who want to work with that should just
+ // grab the whole shared memory buffer instead.
return absl::Span<char>(
const_cast<SimpleShmFetcher *>(this)->data_storage_start(),
lockless_queue_.message_data_size());
}
private:
- char *data_storage_start() {
- if (!copy_data()) return nullptr;
+ ipc_lib::LocklessQueue::ReadResult DoFetch(ipc_lib::QueueIndex queue_index) {
+ // TODO(austin): Get behind and make sure it dies.
+ char *copy_buffer = nullptr;
+ if (copy_data()) {
+ copy_buffer = data_storage_start();
+ }
+ ipc_lib::LocklessQueue::ReadResult read_result = lockless_queue_.Read(
+ queue_index.index(), &context_.monotonic_event_time,
+ &context_.realtime_event_time, &context_.monotonic_remote_time,
+ &context_.realtime_remote_time, &context_.remote_queue_index,
+ &context_.size, copy_buffer);
+
+ if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
+ context_.queue_index = queue_index.index();
+ if (context_.remote_queue_index == 0xffffffffu) {
+ context_.remote_queue_index = context_.queue_index;
+ }
+ if (context_.monotonic_remote_time == aos::monotonic_clock::min_time) {
+ context_.monotonic_remote_time = context_.monotonic_event_time;
+ }
+ if (context_.realtime_remote_time == aos::realtime_clock::min_time) {
+ context_.realtime_remote_time = context_.realtime_event_time;
+ }
+ const char *const data = DataBuffer();
+ if (data) {
+ context_.data =
+ data + lockless_queue_.message_data_size() - context_.size;
+ } else {
+ context_.data = nullptr;
+ }
+ actual_queue_index_ = queue_index.Increment();
+ }
+
+ // Make sure the data wasn't modified while we were reading it. This
+ // can only happen if you are reading the last message *while* it is
+ // being written to, which means you are pretty far behind.
+ CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::OVERWROTE)
+ << ": Got behind while reading and the last message was modified "
+ "out from under us while we were reading it. Don't get so far "
+ "behind on: "
+ << configuration::CleanedChannelToString(channel_);
+
+ // We fell behind between when we read the index and read the value.
+ // This isn't worth recovering from since this means we went to sleep
+ // for a long time in the middle of this function.
+ if (read_result == ipc_lib::LocklessQueue::ReadResult::TOO_OLD) {
+ event_loop_->SendTimingReport();
+ LOG(FATAL) << "The next message is no longer available. "
+ << configuration::CleanedChannelToString(channel_);
+ }
+
+ return read_result;
+ }
+
+ char *data_storage_start() const {
+ CHECK(copy_data());
return RoundChannelData(data_storage_.get(), channel_->max_size());
}
+
+ // Note that for some modes the return value will change as new messages are
+ // read.
+ const char *DataBuffer() const {
+ if (copy_data()) {
+ return data_storage_start();
+ }
+ return nullptr;
+ }
+
bool copy_data() const { return static_cast<bool>(data_storage_); }
aos::ShmEventLoop *event_loop_;
@@ -381,7 +369,9 @@
public:
explicit ShmFetcher(ShmEventLoop *event_loop, const Channel *channel)
: RawFetcher(event_loop, channel),
- simple_shm_fetcher_(event_loop, channel, true) {}
+ simple_shm_fetcher_(event_loop, channel) {
+ simple_shm_fetcher_.CopyDataOnFetch();
+ }
~ShmFetcher() { context_.data = nullptr; }
@@ -487,7 +477,11 @@
: WatcherState(event_loop, channel, std::move(fn)),
event_loop_(event_loop),
event_(this),
- simple_shm_fetcher_(event_loop, channel, copy_data) {}
+ simple_shm_fetcher_(event_loop, channel) {
+ if (copy_data) {
+ simple_shm_fetcher_.CopyDataOnFetch();
+ }
+ }
~ShmWatcherState() override { event_loop_->RemoveEvent(&event_); }
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index ee4e6d6..bff04b9 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -3,21 +3,38 @@
#include <algorithm>
#include <deque>
#include <string_view>
+#include <vector>
#include "absl/container/btree_map.h"
+#include "aos/events/aos_logging.h"
#include "aos/events/simulated_network_bridge.h"
#include "aos/json_to_flatbuffer.h"
#include "aos/util/phased_loop.h"
-#include "aos/events/aos_logging.h"
namespace aos {
+class SimulatedEventLoop;
+class SimulatedFetcher;
+class SimulatedChannel;
+
+namespace {
+
// Container for both a message, and the context for it for simulation. This
// makes tracking the timestamps associated with the data easy.
-struct SimulatedMessage {
+struct SimulatedMessage final {
+ SimulatedMessage(const SimulatedMessage &) = delete;
+ SimulatedMessage &operator=(const SimulatedMessage &) = delete;
+
+ // Creates a SimulatedMessage with size bytes of storage.
+ // This is a shared_ptr so we don't have to implement refcounting or copying.
+ static std::shared_ptr<SimulatedMessage> Make(SimulatedChannel *channel);
+
// Context for the data.
Context context;
+ SimulatedChannel *const channel = nullptr;
+ int buffer_index;
+
// The data.
char *data(size_t buffer_size) {
return RoundChannelData(&actual_data[0], buffer_size);
@@ -26,12 +43,21 @@
// Then the data, including padding on the end so we can align the buffer we
// actually return from data().
char actual_data[];
+
+ private:
+ SimulatedMessage(SimulatedChannel *channel_in);
+ ~SimulatedMessage();
+
+ static void DestroyAndFree(SimulatedMessage *p) {
+ p->~SimulatedMessage();
+ free(p);
+ }
};
-class SimulatedEventLoop;
-class SimulatedFetcher;
-class SimulatedChannel;
+} // namespace
+// TODO(Brian): This should be in the anonymous namespace, but that annoys GCC
+// for some reason...
class SimulatedWatcher : public WatcherState {
public:
SimulatedWatcher(
@@ -65,12 +91,59 @@
class SimulatedChannel {
public:
- explicit SimulatedChannel(const Channel *channel, EventScheduler *scheduler)
+ explicit SimulatedChannel(const Channel *channel, EventScheduler *scheduler,
+ std::chrono::nanoseconds channel_storage_duration)
: channel_(channel),
scheduler_(scheduler),
- next_queue_index_(ipc_lib::QueueIndex::Zero(channel->max_size())) {}
+ channel_storage_duration_(channel_storage_duration),
+ next_queue_index_(ipc_lib::QueueIndex::Zero(number_buffers())) {
+ available_buffer_indices_.reserve(number_buffers());
+ for (int i = 0; i < number_buffers(); ++i) {
+ available_buffer_indices_.push_back(i);
+ }
+ }
- ~SimulatedChannel() { CHECK_EQ(0u, fetchers_.size()); }
+ ~SimulatedChannel() {
+ latest_message_.reset();
+ CHECK_EQ(static_cast<size_t>(number_buffers()),
+ available_buffer_indices_.size());
+ CHECK_EQ(0u, fetchers_.size());
+ CHECK_EQ(0u, watchers_.size());
+ CHECK_EQ(0, sender_count_);
+ }
+
+ // The number of messages we pretend to have in the queue.
+ int queue_size() const {
+ return channel()->frequency() *
+ std::chrono::duration_cast<std::chrono::duration<double>>(
+ channel_storage_duration_)
+ .count();
+ }
+
+ // The number of extra buffers (beyond the queue) we pretend to have.
+ int number_scratch_buffers() const {
+ // We need to start creating messages before we know how many
+ // senders+readers we'll have, so we need to just pick something which is
+ // always big enough.
+ return 50;
+ }
+
+ int number_buffers() const { return queue_size() + number_scratch_buffers(); }
+
+ int GetBufferIndex() {
+ CHECK(!available_buffer_indices_.empty()) << ": This should be impossible";
+ const int result = available_buffer_indices_.back();
+ available_buffer_indices_.pop_back();
+ return result;
+ }
+
+ void FreeBufferIndex(int i) {
+ DCHECK(std::find(available_buffer_indices_.begin(),
+ available_buffer_indices_.end(),
+ i) == available_buffer_indices_.end())
+ << ": Buffer is not in use: " << i;
+ available_buffer_indices_.push_back(i);
+ }
// Makes a connected raw sender which calls Send below.
::std::unique_ptr<RawSender> MakeRawSender(EventLoop *event_loop);
@@ -103,6 +176,7 @@
const Channel *channel() const { return channel_; }
void CountSenderCreated() {
+ CheckBufferCount();
if (sender_count_ >= channel()->num_senders()) {
LOG(FATAL) << "Failed to create sender on "
<< configuration::CleanedChannelToString(channel())
@@ -116,7 +190,11 @@
}
private:
- const Channel *channel_;
+ void CheckBufferCount() { CHECK_LT(sender_count_, number_scratch_buffers()); }
+
+ const Channel *const channel_;
+ EventScheduler *const scheduler_;
+ const std::chrono::nanoseconds channel_storage_duration_;
// List of all watchers.
::std::vector<SimulatedWatcher *> watchers_;
@@ -124,24 +202,36 @@
// List of all fetchers.
::std::vector<SimulatedFetcher *> fetchers_;
std::shared_ptr<SimulatedMessage> latest_message_;
- EventScheduler *scheduler_;
ipc_lib::QueueIndex next_queue_index_;
int sender_count_ = 0;
+
+ std::vector<uint16_t> available_buffer_indices_;
};
namespace {
-// Creates a SimulatedMessage with size bytes of storage.
-// This is a shared_ptr so we don't have to implement refcounting or copying.
-std::shared_ptr<SimulatedMessage> MakeSimulatedMessage(size_t size) {
- SimulatedMessage *message = reinterpret_cast<SimulatedMessage *>(
+std::shared_ptr<SimulatedMessage> SimulatedMessage::Make(
+ SimulatedChannel *channel) {
+ const size_t size = channel->max_size();
+ SimulatedMessage *const message = reinterpret_cast<SimulatedMessage *>(
malloc(sizeof(SimulatedMessage) + size + kChannelDataAlignment - 1));
+ new (message) SimulatedMessage(channel);
message->context.size = size;
message->context.data = message->data(size);
- return std::shared_ptr<SimulatedMessage>(message, free);
+ return std::shared_ptr<SimulatedMessage>(message,
+ &SimulatedMessage::DestroyAndFree);
+}
+
+SimulatedMessage::SimulatedMessage(SimulatedChannel *channel_in)
+ : channel(channel_in) {
+ buffer_index = channel->GetBufferIndex();
+}
+
+SimulatedMessage::~SimulatedMessage() {
+ channel->FreeBufferIndex(buffer_index);
}
class SimulatedSender : public RawSender {
@@ -156,7 +246,7 @@
void *data() override {
if (!message_) {
- message_ = MakeSimulatedMessage(simulated_channel_->max_size());
+ message_ = SimulatedMessage::Make(simulated_channel_);
}
return message_->data(simulated_channel_->max_size());
}
@@ -196,7 +286,7 @@
// This is wasteful, but since flatbuffers fill from the back end of the
// queue, we need it to be full sized.
- message_ = MakeSimulatedMessage(simulated_channel_->max_size());
+ message_ = SimulatedMessage::Make(simulated_channel_);
// Now fill in the message. size is already populated above, and
// queue_index will be populated in simulated_channel_. Put this at the
@@ -230,6 +320,8 @@
return std::make_pair(false, monotonic_clock::min_time);
}
+ CHECK(!fell_behind_) << ": Got behind";
+
SetMsg(msgs_.front());
msgs_.pop_front();
return std::make_pair(true, event_loop()->monotonic_now());
@@ -251,6 +343,7 @@
// latest message from before we started.
SetMsg(msgs_.back());
msgs_.clear();
+ fell_behind_ = false;
return std::make_pair(true, event_loop()->monotonic_now());
}
@@ -275,6 +368,14 @@
// Internal method for Simulation to add a message to the buffer.
void Enqueue(std::shared_ptr<SimulatedMessage> buffer) {
msgs_.emplace_back(buffer);
+ if (fell_behind_ ||
+ msgs_.size() > static_cast<size_t>(simulated_channel_->queue_size())) {
+ fell_behind_ = true;
+ // Might as well empty out all the intermediate messages now.
+ while (msgs_.size() > 1) {
+ msgs_.pop_front();
+ }
+ }
}
SimulatedChannel *simulated_channel_;
@@ -282,6 +383,9 @@
// Messages queued up but not in use.
::std::deque<std::shared_ptr<SimulatedMessage>> msgs_;
+
+ // Whether we're currently "behind", which means a FetchNext call will fail.
+ bool fell_behind_ = false;
};
class SimulatedTimerHandler : public TimerHandler {
@@ -332,8 +436,7 @@
class SimulatedEventLoop : public EventLoop {
public:
explicit SimulatedEventLoop(
- EventScheduler *scheduler,
- NodeEventLoopFactory *node_event_loop_factory,
+ EventScheduler *scheduler, NodeEventLoopFactory *node_event_loop_factory,
absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>>
*channels,
const Configuration *configuration,
@@ -532,8 +635,10 @@
if (it == channels_->end()) {
it = channels_
->emplace(SimpleChannel(channel),
- std::unique_ptr<SimulatedChannel>(
- new SimulatedChannel(channel, scheduler_)))
+ std::unique_ptr<SimulatedChannel>(new SimulatedChannel(
+ channel, scheduler_,
+ std::chrono::nanoseconds(
+ configuration()->channel_storage_duration()))))
.first;
}
return it->second.get();
diff --git a/aos/flatbuffers.cc b/aos/flatbuffers.cc
index 06d11f4..4f6497e 100644
--- a/aos/flatbuffers.cc
+++ b/aos/flatbuffers.cc
@@ -4,11 +4,12 @@
namespace aos {
-uint8_t *FixedAllocatorBase::allocate(size_t) {
+uint8_t *FixedAllocatorBase::allocate(size_t allocated_size) {
if (is_allocated_) {
LOG(FATAL) << "Can't allocate more memory with a fixed size allocator. "
"Increase the memory reserved.";
}
+ CHECK_LE(allocated_size, size());
is_allocated_ = true;
return data();
diff --git a/aos/flatbuffers.h b/aos/flatbuffers.h
index 4aa0ff8..e016770 100644
--- a/aos/flatbuffers.h
+++ b/aos/flatbuffers.h
@@ -19,7 +19,12 @@
// TODO(austin): Read the contract for these.
uint8_t *allocate(size_t) override;
- void deallocate(uint8_t *, size_t) override { is_allocated_ = false; }
+ void deallocate(uint8_t *allocated_data, size_t allocated_size) override {
+ DCHECK_LE(allocated_size, size());
+ DCHECK_EQ(allocated_data, data());
+ CHECK(is_allocated_);
+ is_allocated_ = false;
+ }
uint8_t *reallocate_downward(uint8_t *, size_t, size_t, size_t,
size_t) override;
@@ -28,7 +33,10 @@
virtual uint8_t *data() = 0;
virtual size_t size() const = 0;
- void Reset() { is_allocated_ = false; }
+ void Reset() {
+ CHECK(!is_allocated_);
+ is_allocated_ = false;
+ }
bool is_allocated() const { return is_allocated_; }
bool allocated() { return is_allocated_; }
@@ -241,31 +249,38 @@
template <typename T, size_t Size>
class FlatbufferFixedAllocatorArray final : public Flatbuffer<T> {
public:
- FlatbufferFixedAllocatorArray() : buffer_(), allocator_(&buffer_[0], Size) {
- builder_ = flatbuffers::FlatBufferBuilder(Size, &allocator_);
- builder_.ForceDefaults(true);
+ FlatbufferFixedAllocatorArray() : buffer_(), allocator_(&buffer_[0], Size) {}
+
+ FlatbufferFixedAllocatorArray(const FlatbufferFixedAllocatorArray &) = delete;
+ void operator=(const Flatbuffer<T> &) = delete;
+
+ void CopyFrom(const Flatbuffer<T> &other) {
+ CHECK(!allocator_.is_allocated()) << ": May not overwrite while building";
+ memcpy(buffer_.begin(), other.data(), other.size());
+ data_ = buffer_.begin();
+ size_ = other.size();
}
void Reset() {
- allocator_.Reset();
+ CHECK(!allocator_.is_allocated()) << ": May not reset while building";
builder_ = flatbuffers::FlatBufferBuilder(Size, &allocator_);
builder_.ForceDefaults(true);
}
flatbuffers::FlatBufferBuilder *Builder() {
- if (allocator_.allocated()) {
- LOG(FATAL) << "Array backed flatbuffer can only be built once";
- }
+ CHECK(!allocator_.allocated())
+ << ": Array backed flatbuffer can only be built once";
+ builder_ = flatbuffers::FlatBufferBuilder(Size, &allocator_);
+ builder_.ForceDefaults(true);
return &builder_;
}
void Finish(flatbuffers::Offset<T> root) {
- if (!allocator_.allocated()) {
- LOG(FATAL) << "Cannot finish if never building";
- }
+ CHECK(allocator_.allocated()) << ": Cannot finish if not building";
builder_.Finish(root);
data_ = builder_.GetBufferPointer();
size_ = builder_.GetSize();
+ DCHECK_LE(size_, Size);
}
const uint8_t *data() const override {
@@ -284,8 +299,6 @@
flatbuffers::FlatBufferBuilder builder_;
uint8_t *data_ = nullptr;
size_t size_ = 0;
-
- DISALLOW_COPY_AND_ASSIGN(FlatbufferFixedAllocatorArray);
};
// This object associates the message type with the memory storing the
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index a450560..d9a1a71 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -38,7 +38,9 @@
LocklessQueueMemory *const memory_;
};
-void Cleanup(LocklessQueueMemory *memory, const GrabQueueSetupLockOrDie &) {
+// Returns true if it succeeded. Returns false if another sender died in the
+// middle.
+bool DoCleanup(LocklessQueueMemory *memory, const GrabQueueSetupLockOrDie &) {
// Make sure we start looking at shared memory fresh right now. We'll handle
// people dying partway through by either cleaning up after them or not, but
// we want to ensure we clean up after anybody who has already died when we
@@ -66,6 +68,8 @@
// queue is active while we do this, it may take a couple of go arounds to see
// everything.
+ ::std::vector<bool> need_recovery(num_senders, false);
+
// Do the easy case. Find all senders who have died. See if they are either
// consistent already, or if they have copied over to_replace to the scratch
// index, but haven't cleared to_replace. Count them.
@@ -74,65 +78,71 @@
Sender *sender = memory->GetSender(i);
const uint32_t tid =
__atomic_load_n(&(sender->tid.futex), __ATOMIC_ACQUIRE);
- if (tid & FUTEX_OWNER_DIED) {
- VLOG(3) << "Found an easy death for sender " << i;
- // We can do a relaxed load here because we're the only person touching
- // this sender at this point.
- const Index to_replace = sender->to_replace.RelaxedLoad();
- const Index scratch_index = sender->scratch_index.Load();
-
- // I find it easiest to think about this in terms of the set of observable
- // states. The main code progresses through the following states:
-
- // 1) scratch_index = xxx
- // to_replace = invalid
- // This is unambiguous. Already good.
-
- // 2) scratch_index = xxx
- // to_replace = yyy
- // Very ambiguous. Is xxx or yyy the correct one? Need to either roll
- // this forwards or backwards.
-
- // 3) scratch_index = yyy
- // to_replace = yyy
- // We are in the act of moving to_replace to scratch_index, but didn't
- // finish. Easy.
-
- // 4) scratch_index = yyy
- // to_replace = invalid
- // Finished, but died. Looks like 1)
-
- // Any cleanup code needs to follow the same set of states to be robust to
- // death, so death can be restarted.
-
- // Could be 2) or 3).
- if (to_replace.valid()) {
- // 3)
- if (to_replace == scratch_index) {
- // Just need to invalidate to_replace to finish.
- sender->to_replace.Invalidate();
-
- // And mark that we succeeded.
- __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
- ++valid_senders;
- }
- } else {
- // 1) or 4). Make sure we aren't corrupted and declare victory.
- CHECK(scratch_index.valid());
-
- __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
- ++valid_senders;
- }
- } else {
+ if (!(tid & FUTEX_OWNER_DIED)) {
// Not dead.
++valid_senders;
+ continue;
}
+ VLOG(3) << "Found an easy death for sender " << i;
+ // We can do a relaxed load here because we're the only person touching
+ // this sender at this point.
+ const Index to_replace = sender->to_replace.RelaxedLoad();
+ const Index scratch_index = sender->scratch_index.Load();
+
+ // I find it easiest to think about this in terms of the set of observable
+ // states. The main code progresses through the following states:
+
+ // 1) scratch_index = xxx
+ // to_replace = invalid
+ // This is unambiguous. Already good.
+
+ // 2) scratch_index = xxx
+ // to_replace = yyy
+ // Very ambiguous. Is xxx or yyy the correct one? Need to either roll
+ // this forwards or backwards.
+
+ // 3) scratch_index = yyy
+ // to_replace = yyy
+ // We are in the act of moving to_replace to scratch_index, but didn't
+ // finish. Easy.
+
+ // 4) scratch_index = yyy
+ // to_replace = invalid
+ // Finished, but died. Looks like 1)
+
+ // Any cleanup code needs to follow the same set of states to be robust to
+ // death, so death can be restarted.
+
+ if (!to_replace.valid()) {
+ // 1) or 4). Make sure we aren't corrupted and declare victory.
+ CHECK(scratch_index.valid());
+
+ __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
+ ++valid_senders;
+ continue;
+ }
+
+ // Could be 2) or 3) at this point.
+
+ if (to_replace == scratch_index) {
+ // 3) for sure.
+ // Just need to invalidate to_replace to finish.
+ sender->to_replace.Invalidate();
+
+ // And mark that we succeeded.
+ __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
+ ++valid_senders;
+ continue;
+ }
+
+ // Must be 2). Mark it for later.
+ need_recovery[i] = true;
}
// If all the senders are (or were made) good, there is no need to do the hard
// case.
if (valid_senders == num_senders) {
- return;
+ return true;
}
VLOG(3) << "Starting hard cleanup";
@@ -148,8 +158,12 @@
const uint32_t tid =
__atomic_load_n(&(sender->tid.futex), __ATOMIC_ACQUIRE);
if (tid & FUTEX_OWNER_DIED) {
+ if (!need_recovery[i]) {
+ return false;
+ }
++num_missing;
} else {
+ CHECK(!need_recovery[i]) << ": Somebody else recovered a sender: " << i;
// We can do a relaxed load here because we're the only person touching
// this sender at this point, if it matters. If it's not a dead sender,
// then any message it every has will already be accounted for, so this
@@ -170,6 +184,8 @@
}
accounted_for[index.message_index()] = true;
}
+
+ CHECK_LE(num_accounted_for + num_missing, num_messages);
}
while (num_missing != 0) {
@@ -178,67 +194,83 @@
Sender *sender = memory->GetSender(i);
const uint32_t tid =
__atomic_load_n(&(sender->tid.futex), __ATOMIC_ACQUIRE);
- if (tid & FUTEX_OWNER_DIED) {
- // We can do relaxed loads here because we're the only person touching
- // this sender at this point.
- const Index scratch_index = sender->scratch_index.RelaxedLoad();
- const Index to_replace = sender->to_replace.RelaxedLoad();
+ if (!(tid & FUTEX_OWNER_DIED)) {
+ CHECK(!need_recovery[i]) << ": Somebody else recovered a sender: " << i;
+ continue;
+ }
+ if (!need_recovery[i]) {
+ return false;
+ }
+ // We can do relaxed loads here because we're the only person touching
+ // this sender at this point.
+ const Index scratch_index = sender->scratch_index.RelaxedLoad();
+ const Index to_replace = sender->to_replace.RelaxedLoad();
- // Candidate.
- if (to_replace.valid()) {
- CHECK_LE(to_replace.message_index(), accounted_for.size());
- }
- if (scratch_index.valid()) {
- CHECK_LE(scratch_index.message_index(), accounted_for.size());
- }
- if (!to_replace.valid() || accounted_for[to_replace.message_index()]) {
- CHECK(scratch_index.valid());
- VLOG(3) << "Sender " << i
- << " died, to_replace is already accounted for";
- // If both are accounted for, we are corrupt...
- CHECK(!accounted_for[scratch_index.message_index()]);
+ // Candidate.
+ if (to_replace.valid()) {
+ CHECK_LE(to_replace.message_index(), accounted_for.size());
+ }
+ if (scratch_index.valid()) {
+ CHECK_LE(scratch_index.message_index(), accounted_for.size());
+ }
+ if (!to_replace.valid() || accounted_for[to_replace.message_index()]) {
+ CHECK(scratch_index.valid());
+ VLOG(3) << "Sender " << i
+ << " died, to_replace is already accounted for";
+ // If both are accounted for, we are corrupt...
+ CHECK(!accounted_for[scratch_index.message_index()]);
- // to_replace is already accounted for. This means that we didn't
- // atomically insert scratch_index into the queue yet. So
- // invalidate to_replace.
- sender->to_replace.Invalidate();
+ // to_replace is already accounted for. This means that we didn't
+ // atomically insert scratch_index into the queue yet. So
+ // invalidate to_replace.
+ sender->to_replace.Invalidate();
- // And then mark this sender clean.
- __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
+ // And then mark this sender clean.
+ __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
+ need_recovery[i] = false;
- // And account for scratch_index.
- accounted_for[scratch_index.message_index()] = true;
- --num_missing;
- ++num_accounted_for;
- } else if (!scratch_index.valid() ||
- accounted_for[scratch_index.message_index()]) {
- VLOG(3) << "Sender " << i
- << " died, scratch_index is already accounted for";
- // scratch_index is accounted for. That means we did the insert,
- // but didn't record it.
- CHECK(to_replace.valid());
- // Finish the transaction. Copy to_replace, then clear it.
+ // And account for scratch_index.
+ accounted_for[scratch_index.message_index()] = true;
+ --num_missing;
+ ++num_accounted_for;
+ } else if (!scratch_index.valid() ||
+ accounted_for[scratch_index.message_index()]) {
+ VLOG(3) << "Sender " << i
+ << " died, scratch_index is already accounted for";
+ // scratch_index is accounted for. That means we did the insert,
+ // but didn't record it.
+ CHECK(to_replace.valid());
+ // Finish the transaction. Copy to_replace, then clear it.
- sender->scratch_index.Store(to_replace);
- sender->to_replace.Invalidate();
+ sender->scratch_index.Store(to_replace);
+ sender->to_replace.Invalidate();
- // And then mark this sender clean.
- __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
+ // And then mark this sender clean.
+ __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
+ need_recovery[i] = false;
- // And account for to_replace.
- accounted_for[to_replace.message_index()] = true;
- --num_missing;
- ++num_accounted_for;
- } else {
- VLOG(3) << "Sender " << i << " died, neither is accounted for";
- // Ambiguous. There will be an unambiguous one somewhere that we
- // can do first.
- }
+ // And account for to_replace.
+ accounted_for[to_replace.message_index()] = true;
+ --num_missing;
+ ++num_accounted_for;
+ } else {
+ VLOG(3) << "Sender " << i << " died, neither is accounted for";
+ // Ambiguous. There will be an unambiguous one somewhere that we
+ // can do first.
}
}
// CHECK that we are making progress.
CHECK_NE(num_missing, starting_num_missing);
}
+ return true;
+}
+
+void Cleanup(LocklessQueueMemory *memory, const GrabQueueSetupLockOrDie &lock) {
+ // The number of iterations is bounded here because there are only a finite
+ // number of senders in existence which could die, and no new ones can be
+ // created while we're in here holding the lock.
+ while (!DoCleanup(memory, lock)) {
+ }
}
// Exposes rt_tgsigqueueinfo so we can send the signal *just* to the target