Merge changes If2fc227f,Ib83228bd
* changes:
Fix race in localizer_test
Add logger and log reader classes.
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 53b3b05..3377d67 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -209,3 +209,40 @@
"@com_google_absl//absl/container:btree",
],
)
+
+flatbuffer_cc_library(
+ name = "logger_fbs",
+ srcs = ["logger.fbs"],
+ gen_reflections = 1,
+ includes = [
+ "//aos:configuration_fbs_includes",
+ ],
+)
+
+cc_library(
+ name = "logger",
+ srcs = ["logger.cc"],
+ hdrs = ["logger.h"],
+ deps = [
+ ":event_loop",
+ ":logger_fbs",
+ "//aos:flatbuffer_merge",
+ "//aos/time",
+ "@com_github_google_flatbuffers//:flatbuffers",
+ "@com_google_absl//absl/container:inlined_vector",
+ "@com_google_absl//absl/strings",
+ ],
+)
+
+cc_test(
+ name = "logger_test",
+ srcs = ["logger_test.cc"],
+ data = ["pingpong_config.json"],
+ deps = [
+ ":logger",
+ ":ping_lib",
+ ":pong_lib",
+ ":simulated_event_loop",
+ "//aos/testing:googletest",
+ ],
+)
diff --git a/aos/events/logger.cc b/aos/events/logger.cc
new file mode 100644
index 0000000..6b659c0
--- /dev/null
+++ b/aos/events/logger.cc
@@ -0,0 +1,498 @@
+#include "aos/events/logger.h"
+
+#include <fcntl.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+#include <vector>
+
+#include "absl/strings/string_view.h"
+#include "absl/types/span.h"
+#include "aos/events/event_loop.h"
+#include "aos/events/logger_generated.h"
+#include "aos/flatbuffer_merge.h"
+#include "aos/time/time.h"
+#include "flatbuffers/flatbuffers.h"
+
+DEFINE_int32(flush_size, 1000000,
+ "Number of outstanding bytes to allow before flushing to disk.");
+
+namespace aos {
+namespace logger {
+
+namespace chrono = std::chrono;
+
+DetachedBufferWriter::DetachedBufferWriter(absl::string_view filename)
+ : fd_(open(std::string(filename).c_str(),
+ O_RDWR | O_CLOEXEC | O_CREAT | O_EXCL, 0774)) {
+ PCHECK(fd_ != -1) << ": Failed to open " << filename;
+}
+
+DetachedBufferWriter::~DetachedBufferWriter() {
+ Flush();
+ PLOG_IF(ERROR, close(fd_) == -1) << " Failed to close logfile";
+}
+
+void DetachedBufferWriter::QueueSizedFlatbuffer(
+ flatbuffers::FlatBufferBuilder *fbb) {
+ QueueSizedFlatbuffer(fbb->Release());
+}
+
+void DetachedBufferWriter::QueueSizedFlatbuffer(
+ flatbuffers::DetachedBuffer &&buffer) {
+ queued_size_ += buffer.size();
+ queue_.emplace_back(std::move(buffer));
+
+ if (queued_size_ > static_cast<size_t>(FLAGS_flush_size)) {
+ Flush();
+ }
+}
+
+void DetachedBufferWriter::Flush() {
+ if (queue_.size() == 0u) {
+ return;
+ }
+ iovec_.clear();
+ iovec_.reserve(queue_.size());
+ size_t counted_size = 0;
+ for (size_t i = 0; i < queue_.size(); ++i) {
+ struct iovec n;
+ n.iov_base = queue_[i].data();
+ n.iov_len = queue_[i].size();
+ counted_size += n.iov_len;
+ iovec_.emplace_back(std::move(n));
+ }
+ CHECK_EQ(counted_size, queued_size_);
+ const ssize_t written = writev(fd_, iovec_.data(), iovec_.size());
+
+ PCHECK(written == static_cast<ssize_t>(queued_size_))
+ << ": Wrote " << written << " expected " << queued_size_;
+
+ queued_size_ = 0;
+ queue_.clear();
+ // TODO(austin): Handle partial writes in some way other than crashing...
+}
+
+Logger::Logger(DetachedBufferWriter *writer, EventLoop *event_loop,
+ std::chrono::milliseconds polling_period)
+ : event_loop_(event_loop),
+ writer_(writer),
+ timer_handler_(event_loop_->AddTimer([this]() { DoLogData(); })),
+ polling_period_(polling_period) {
+ for (const Channel *channel : *event_loop_->configuration()->channels()) {
+ FetcherStruct fs;
+ fs.fetcher = event_loop->MakeRawFetcher(channel);
+ fs.written = false;
+ fetchers_.emplace_back(std::move(fs));
+ }
+
+ // When things start, we want to log the header, then the most recent messages
+ // available on each fetcher to capture the previous state, then start
+ // polling.
+ event_loop_->OnRun([this, polling_period]() {
+ // Grab data from each channel right before we declare the log file started
+ // so we can capture the latest message on each channel. This lets us have
+ // non periodic messages with configuration that now get logged.
+ for (FetcherStruct &f : fetchers_) {
+ f.written = !f.fetcher->Fetch();
+ }
+
+ // We need to pick a point in time to declare the log file "started". This
+ // starts here. It needs to be after everything is fetched so that the
+ // fetchers are all pointed at the most recent message before the start
+ // time.
+ const monotonic_clock::time_point monotonic_now =
+ event_loop_->monotonic_now();
+ const realtime_clock::time_point realtime_now = event_loop_->realtime_now();
+ last_synchronized_time_ = monotonic_now;
+
+ {
+ // Now write the header with this timestamp in it.
+ flatbuffers::FlatBufferBuilder fbb;
+ fbb.ForceDefaults(1);
+
+ flatbuffers::Offset<aos::Configuration> configuration_offset =
+ CopyFlatBuffer(event_loop_->configuration(), &fbb);
+
+ aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
+
+ log_file_header_builder.add_configuration(configuration_offset);
+ // The worst case theoretical out of order is the polling period times 2.
+ // One message could get logged right after the boundary, but be for right
+ // before the next boundary. And the reverse could happen for another
+ // message. Report back 3x to be extra safe, and because the cost isn't
+ // huge on the read side.
+ log_file_header_builder.add_max_out_of_order_duration(
+ std::chrono::duration_cast<std::chrono::nanoseconds>(3 *
+ polling_period)
+ .count());
+
+ log_file_header_builder.add_monotonic_sent_time(
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ monotonic_now.time_since_epoch())
+ .count());
+ log_file_header_builder.add_realtime_sent_time(
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ realtime_now.time_since_epoch())
+ .count());
+
+ fbb.FinishSizePrefixed(log_file_header_builder.Finish());
+ writer_->QueueSizedFlatbuffer(&fbb);
+ }
+
+ timer_handler_->Setup(event_loop_->monotonic_now() + polling_period,
+ polling_period);
+ });
+}
+
+void Logger::DoLogData() {
+ // We want to guarentee that messages aren't out of order by more than
+ // max_out_of_order_duration. To do this, we need sync points. Every write
+ // cycle should be a sync point.
+ const monotonic_clock::time_point monotonic_now = monotonic_clock::now();
+
+ do {
+ // Move the sync point up by at most polling_period. This forces one sync
+ // per iteration, even if it is small.
+ last_synchronized_time_ =
+ std::min(last_synchronized_time_ + polling_period_, monotonic_now);
+ size_t channel_index = 0;
+ // Write each channel to disk, one at a time.
+ for (FetcherStruct &f : fetchers_) {
+ while (true) {
+ if (f.fetcher.get() == nullptr) {
+ if (!f.fetcher->FetchNext()) {
+ VLOG(1) << "No new data on "
+ << FlatbufferToJson(f.fetcher->channel());
+ break;
+ } else {
+ f.written = false;
+ }
+ }
+
+ if (f.written) {
+ if (!f.fetcher->FetchNext()) {
+ VLOG(1) << "No new data on "
+ << FlatbufferToJson(f.fetcher->channel());
+ break;
+ } else {
+ f.written = false;
+ }
+ }
+
+ if (!f.written && f.fetcher->context().monotonic_sent_time <
+ last_synchronized_time_) {
+ // Write!
+ flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
+ max_header_size_);
+ fbb.ForceDefaults(1);
+
+ flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset =
+ fbb.CreateVector(
+ static_cast<uint8_t *>(f.fetcher->context().data),
+ f.fetcher->context().size);
+
+ VLOG(1) << "Writing data for channel "
+ << FlatbufferToJson(f.fetcher->channel());
+
+ MessageHeader::Builder message_header_builder(fbb);
+ message_header_builder.add_channel_index(channel_index);
+ message_header_builder.add_monotonic_sent_time(
+ f.fetcher->context()
+ .monotonic_sent_time.time_since_epoch()
+ .count());
+ message_header_builder.add_realtime_sent_time(
+ f.fetcher->context()
+ .realtime_sent_time.time_since_epoch()
+ .count());
+
+ message_header_builder.add_queue_index(
+ f.fetcher->context().queue_index);
+
+ message_header_builder.add_data(data_offset);
+
+ fbb.FinishSizePrefixed(message_header_builder.Finish());
+ max_header_size_ = std::max(
+ max_header_size_, fbb.GetSize() - f.fetcher->context().size);
+ writer_->QueueSizedFlatbuffer(&fbb);
+
+ f.written = true;
+ }
+ }
+
+ ++channel_index;
+ }
+
+ CHECK_EQ(channel_index, fetchers_.size());
+
+ // If we missed cycles, we could be pretty far behind. Spin until we are
+ // caught up.
+ } while (last_synchronized_time_ + polling_period_ < monotonic_now);
+
+ writer_->Flush();
+}
+
+LogReader::LogReader(absl::string_view filename)
+ : fd_(open(std::string(filename).c_str(), O_RDONLY | O_CLOEXEC)) {
+ PCHECK(fd_ != -1) << ": Failed to open " << filename;
+
+ // Make sure we have enough to read the size.
+ absl::Span<const uint8_t> config_data = ReadMessage();
+
+ // Make sure something was read.
+ CHECK(config_data != absl::Span<const uint8_t>());
+
+ // And copy the config so we have it forever.
+ configuration_ = std::vector<uint8_t>(config_data.begin(), config_data.end());
+
+ max_out_of_order_duration_ = std::chrono::nanoseconds(
+ flatbuffers::GetSizePrefixedRoot<LogFileHeader>(configuration_.data())
+ ->max_out_of_order_duration());
+
+ channels_.resize(configuration()->channels()->size());
+
+ QueueMessages();
+}
+
+bool LogReader::ReadBlock() {
+ if (end_of_file_) {
+ return false;
+ }
+
+ // Appends 256k. This is enough that the read call is efficient. We don't
+ // want to spend too much time reading small chunks because the syscalls for
+ // that will be expensive.
+ constexpr size_t kReadSize = 256 * 1024;
+
+ // Strip off any unused data at the front.
+ if (consumed_data_ != 0) {
+ data_.erase(data_.begin(), data_.begin() + consumed_data_);
+ consumed_data_ = 0;
+ }
+
+ const size_t starting_size = data_.size();
+
+ // This should automatically grow the backing store. It won't shrink if we
+ // get a small chunk later. This reduces allocations when we want to append
+ // more data.
+ data_.resize(data_.size() + kReadSize);
+
+ ssize_t count = read(fd_, &data_[starting_size], kReadSize);
+ data_.resize(starting_size + std::max(count, static_cast<ssize_t>(0)));
+ if (count == 0) {
+ end_of_file_ = true;
+ return false;
+ }
+ PCHECK(count > 0);
+
+ return true;
+}
+
+bool LogReader::MessageAvailable() {
+ // Are we big enough to read the size?
+ if (data_.size() - consumed_data_ < sizeof(flatbuffers::uoffset_t)) {
+ return false;
+ }
+
+ // Then, are we big enough to read the full message?
+ const size_t data_size =
+ flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
+ sizeof(flatbuffers::uoffset_t);
+ if (data_.size() < consumed_data_ + data_size) {
+ return false;
+ }
+
+ return true;
+}
+
+absl::Span<const uint8_t> LogReader::ReadMessage() {
+ // Make sure we have enough for the size.
+ if (data_.size() - consumed_data_ < sizeof(flatbuffers::uoffset_t)) {
+ if (!ReadBlock()) {
+ return absl::Span<const uint8_t>();
+ }
+ }
+
+ // Now make sure we have enough for the message.
+ const size_t data_size =
+ flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
+ sizeof(flatbuffers::uoffset_t);
+ while (data_.size() < consumed_data_ + data_size) {
+ if (!ReadBlock()) {
+ return absl::Span<const uint8_t>();
+ }
+ }
+
+ // And return it, consuming the data.
+ const uint8_t *data_ptr = data_.data() + consumed_data_;
+
+ consumed_data_ += data_size;
+
+ return absl::Span<const uint8_t>(data_ptr, data_size);
+}
+
+void LogReader::QueueMessages() {
+ while (true) {
+ // Don't queue if we have enough data already.
+ // When a log file starts, there should be a message from each channel.
+ // Those messages might be very old. Make sure to read a chunk past the
+ // starting time.
+ if (channel_heap_.size() > 0 &&
+ newest_timestamp_ >
+ std::max(oldest_message().first, monotonic_start_time()) +
+ max_out_of_order_duration_) {
+ break;
+ }
+
+ absl::Span<const uint8_t> msg_data = ReadMessage();
+ if (msg_data == absl::Span<const uint8_t>()) {
+ break;
+ }
+
+ FlatbufferVector<MessageHeader> msg(std::vector<uint8_t>(
+ msg_data.begin() + sizeof(flatbuffers::uoffset_t), msg_data.end()));
+
+ EmplaceDataBack(std::move(msg));
+ }
+
+ queue_data_time_ = newest_timestamp_ - max_out_of_order_duration_;
+}
+
+const Configuration *LogReader::configuration() {
+ return flatbuffers::GetSizePrefixedRoot<LogFileHeader>(configuration_.data())
+ ->configuration();
+}
+
+monotonic_clock::time_point LogReader::monotonic_start_time() {
+ return monotonic_clock::time_point(std::chrono::nanoseconds(
+ flatbuffers::GetSizePrefixedRoot<LogFileHeader>(configuration_.data())
+ ->monotonic_sent_time()));
+}
+
+realtime_clock::time_point LogReader::realtime_start_time() {
+ return realtime_clock::time_point(std::chrono::nanoseconds(
+ flatbuffers::GetSizePrefixedRoot<LogFileHeader>(configuration_.data())
+ ->realtime_sent_time()));
+}
+
+void LogReader::Register(EventLoop *event_loop) {
+ event_loop_ = event_loop;
+
+ for (size_t i = 0; i < channels_.size(); ++i) {
+ CHECK_EQ(configuration()->channels()->Get(i)->name(),
+ event_loop_->configuration()->channels()->Get(i)->name());
+ CHECK_EQ(configuration()->channels()->Get(i)->type(),
+ event_loop_->configuration()->channels()->Get(i)->type());
+
+ channels_[i].raw_sender = event_loop_->MakeRawSender(
+ event_loop_->configuration()->channels()->Get(i));
+ }
+
+ timer_handler_ = event_loop_->AddTimer([this]() {
+ std::pair<monotonic_clock::time_point, int> oldest_channel_index =
+ PopOldestChannel();
+ const monotonic_clock::time_point monotonic_now =
+ event_loop_->monotonic_now();
+ CHECK(monotonic_now == oldest_channel_index.first)
+ << ": Now " << monotonic_now.time_since_epoch().count()
+ << " trying to send "
+ << oldest_channel_index.first.time_since_epoch().count();
+
+ struct LogReader::ChannelData &channel =
+ channels_[oldest_channel_index.second];
+
+ FlatbufferVector<MessageHeader> front = std::move(channel.front());
+
+ CHECK(front.message().data() != nullptr);
+ if (oldest_channel_index.first > monotonic_start_time()) {
+ channel.raw_sender->Send(front.message().data()->Data(),
+ front.message().data()->size());
+ } else {
+ LOG(WARNING) << "Not sending data from before the start of the log file. "
+ << oldest_channel_index.first.time_since_epoch().count()
+ << " start "
+ << monotonic_start_time().time_since_epoch().count() << " "
+ << FlatbufferToJson(front);
+ }
+ channel.data.pop_front();
+
+ // Re-push it and update the oldest timestamp.
+ if (channel.data.size() != 0) {
+ const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
+ chrono::nanoseconds(channel.front().message().monotonic_sent_time()));
+ PushChannelHeap(timestamp, oldest_channel_index.second);
+ channel.oldest_timestamp = timestamp;
+ } else {
+ channel.oldest_timestamp = monotonic_clock::min_time;
+ }
+
+ if (monotonic_now > queue_data_time_) {
+ QueueMessages();
+ }
+
+ if (channel_heap_.size() != 0) {
+ timer_handler_->Setup(oldest_message().first);
+ }
+ });
+
+ if (channel_heap_.size() > 0u) {
+ event_loop_->OnRun(
+ [this]() { timer_handler_->Setup(oldest_message().first); });
+ }
+}
+
+void LogReader::Deregister() {
+ for (size_t i = 0; i < channels_.size(); ++i) {
+ channels_[i].raw_sender.reset();
+ }
+}
+
+void LogReader::EmplaceDataBack(FlatbufferVector<MessageHeader> &&new_data) {
+ const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
+ chrono::nanoseconds(new_data.message().monotonic_sent_time()));
+ const size_t channel_index = new_data.message().channel_index();
+ CHECK_LT(channel_index, channels_.size());
+ newest_timestamp_ = std::max(newest_timestamp_, timestamp);
+ if (channels_[channel_index].data.size() == 0) {
+ channels_[channel_index].oldest_timestamp = timestamp;
+ PushChannelHeap(timestamp, channel_index);
+ }
+ channels_[channel_index].data.emplace_back(std::move(new_data));
+}
+
+namespace {
+
+bool ChannelHeapCompare(
+ const std::pair<monotonic_clock::time_point, int> first,
+ const std::pair<monotonic_clock::time_point, int> second) {
+ if (first.first > second.first) {
+ return true;
+ } else if (first.first == second.first) {
+ return first.second > second.second;
+ } else {
+ return false;
+ }
+}
+
+} // namespace
+
+void LogReader::PushChannelHeap(monotonic_clock::time_point timestamp,
+ int channel_index) {
+ channel_heap_.push_back(std::make_pair(timestamp, channel_index));
+
+ // The default sort puts the newest message first. Use a custom comparator to
+ // put the oldest message first.
+ std::push_heap(channel_heap_.begin(), channel_heap_.end(),
+ ChannelHeapCompare);
+}
+
+std::pair<monotonic_clock::time_point, int> LogReader::PopOldestChannel() {
+ std::pair<monotonic_clock::time_point, int> result = channel_heap_.front();
+ std::pop_heap(channel_heap_.begin(), channel_heap_.end(),
+ &ChannelHeapCompare);
+ channel_heap_.pop_back();
+ return result;
+}
+
+} // namespace logger
+} // namespace aos
diff --git a/aos/events/logger.fbs b/aos/events/logger.fbs
new file mode 100644
index 0000000..36055a6
--- /dev/null
+++ b/aos/events/logger.fbs
@@ -0,0 +1,53 @@
+include "aos/configuration.fbs";
+
+namespace aos.logger;
+
+// A log file is a sequence of size prefixed flatbuffers.
+// The first flatbuffer will be the LogFileHeader, followed by an arbitrary
+// number of MessageHeaders.
+//
+// The log file starts at the time demarcated in the header on the monotonic
+// clock. There will be any number of messages per channel logged before the
+// start time. These messages are logged so that fetchers can retrieve the
+// state of the system at the start of the logfile for things like capturing
+// parameters. In replay, they should be made available to fetchers, but not
+// trigger watchers.
+
+table LogFileHeader {
+ // Time this log file started on the monotonic clock in nanoseconds.
+ monotonic_sent_time:long;
+ // Time this log file started on the realtime clock in nanoseconds.
+ realtime_sent_time:long;
+
+ // Messages are not written in order to disk. They will be out of order by
+ // at most this duration (in nanoseconds). If the log reader buffers until
+ // it finds messages this much newer than it's simulation time, it will never
+ // find a message out of order.
+ max_out_of_order_duration:long;
+
+ // The configuration of the channels.
+ configuration:aos.Configuration;
+
+ // TODO(austin): Node!
+}
+
+// Table holding a message.
+table MessageHeader {
+ // Index into the channel datastructure in the log file header. This
+ // provides the data type.
+ channel_index:uint;
+ // Time this message was sent on the monotonic clock in nanoseconds.
+ monotonic_sent_time:long;
+ // Time this message was sent on the realtime clock in nanoseconds.
+ realtime_sent_time:long;
+ // Index into the ipc queue of this message. This should start with 0 and
+ // always monotonically increment if no messages were ever lost. It will
+ // wrap at a multiple of the queue size.
+ queue_index:uint;
+ // TODO(austin): Node.
+
+ // TODO(austin): Format? Compressed?
+
+ // The nested flatbuffer.
+ data:[ubyte];
+}
diff --git a/aos/events/logger.h b/aos/events/logger.h
new file mode 100644
index 0000000..7873a42
--- /dev/null
+++ b/aos/events/logger.h
@@ -0,0 +1,252 @@
+#ifndef AOS_EVENTS_LOGGER_H_
+#define AOS_EVENTS_LOGGER_H_
+
+#include <deque>
+#include <vector>
+
+#include "absl/strings/string_view.h"
+#include "absl/types/span.h"
+#include "aos/events/event_loop.h"
+#include "aos/events/logger_generated.h"
+#include "aos/time/time.h"
+#include "flatbuffers/flatbuffers.h"
+
+namespace aos {
+namespace logger {
+
+// This class manages efficiently writing a sequence of detached buffers to a
+// file. It queues them up and batches the write operation.
+class DetachedBufferWriter {
+ public:
+ DetachedBufferWriter(absl::string_view filename);
+ ~DetachedBufferWriter();
+
+ // TODO(austin): Snappy compress the log file if it ends with .snappy!
+
+ // Queues up a finished FlatBufferBuilder to be written. Steals the detached
+ // buffer from it.
+ void QueueSizedFlatbuffer(flatbuffers::FlatBufferBuilder *fbb);
+ // Queues up a detached buffer directly.
+ void QueueSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer);
+
+ // Triggers data to be provided to the kernel and written.
+ void Flush();
+
+ private:
+ int fd_ = -1;
+
+ // Size of all the data in the queue.
+ size_t queued_size_ = 0;
+
+ // List of buffers to flush.
+ std::vector<flatbuffers::DetachedBuffer> queue_;
+ // List of iovecs to use with writev. This is a member variable to avoid
+ // churn.
+ std::vector<struct iovec> iovec_;
+};
+
+// Logs all channels available in the event loop to disk every 100 ms.
+// Start by logging one message per channel to capture any state and
+// configuration that is sent rately on a channel and would affect execution.
+class Logger {
+ public:
+ Logger(DetachedBufferWriter *writer, EventLoop *event_loop,
+ std::chrono::milliseconds polling_period =
+ std::chrono::milliseconds(100));
+
+ private:
+ void DoLogData();
+
+ EventLoop *event_loop_;
+ DetachedBufferWriter *writer_;
+
+ // Structure to track both a fetcher, and if the data fetched has been
+ // written. We may want to delay writing data to disk so that we don't let
+ // data get too far out of order when written to disk so we can avoid making
+ // it too hard to sort when reading.
+ struct FetcherStruct {
+ std::unique_ptr<RawFetcher> fetcher;
+ bool written = false;
+ };
+
+ std::vector<FetcherStruct> fetchers_;
+ TimerHandler *timer_handler_;
+
+ // Period to poll the channels.
+ const std::chrono::milliseconds polling_period_;
+
+ // Last time that data was written for all channels to disk.
+ monotonic_clock::time_point last_synchronized_time_;
+
+ // Max size that the header has consumed. This much extra data will be
+ // reserved in the builder to avoid reallocating.
+ size_t max_header_size_ = 0;
+};
+
+// Replays all the channels in the logfile to the event loop.
+class LogReader {
+ public:
+ LogReader(absl::string_view filename);
+
+ // Registers the timer and senders used to resend the messages from the log
+ // file.
+ void Register(EventLoop *event_loop);
+ // Unregisters the senders.
+ void Deregister();
+
+ // TODO(austin): Remap channels?
+
+ // Returns the configuration from the log file.
+ const Configuration *configuration();
+
+ // Returns the starting timestamp for the log file.
+ monotonic_clock::time_point monotonic_start_time();
+ realtime_clock::time_point realtime_start_time();
+
+ // TODO(austin): Add the ability to re-publish the fetched messages. Add 2
+ // options, one which publishes them *now*, and another which publishes them
+ // to the simulated event loop factory back in time where they actually
+ // happened.
+
+ private:
+ // Reads a chunk of data into data_. Returns false if no data was read.
+ bool ReadBlock();
+
+ // Returns true if there is a full message available in the buffer, or if we
+ // will have to read more data from disk.
+ bool MessageAvailable();
+
+ // Returns a span with the data for a message from the log file, excluding the
+ // size.
+ absl::Span<const uint8_t> ReadMessage();
+
+ // Queues at least max_out_of_order_duration_ messages into channels_.
+ void QueueMessages();
+
+ // We need to read a large chunk at a time, then kit it up into parts and
+ // sort.
+ //
+ // We want to read 256 KB chunks at a time. This is the fastest read size.
+ // This leaves us with a fragmentation problem though.
+ //
+ // The easy answer is to read 256 KB chunks. Then, malloc and memcpy those
+ // chunks into single flatbuffer messages and manage them in a sorted queue.
+ // Everything is copied three times (into 256 kb buffer, then into separate
+ // buffer, then into sender), but none of it is all that expensive. We can
+ // optimize if it is slow later.
+ //
+ // As we place the elements in the sorted list of times, keep doing this until
+ // we read a message that is newer than the threshold.
+ //
+ // Then repeat. Keep filling up the sorted list with 256 KB chunks (need a
+ // small state machine so we can resume), and keep pulling messages back out
+ // and sending.
+ //
+ // For sorting, we want to use the fact that each channel is sorted, and then
+ // merge sort the channels. Have a vector of deques, and then hold a sorted
+ // list of pointers to those.
+ //
+ // TODO(austin): Multithreaded read at some point. Gotta go faster!
+ // Especially if we start compressing.
+
+ // Allocator which doesn't zero initialize memory.
+ template <typename T>
+ struct DefaultInitAllocator {
+ typedef T value_type;
+
+ template <typename U>
+ void construct(U *p) {
+ ::new (static_cast<void *>(p)) U;
+ }
+
+ template <typename U, typename... Args>
+ void construct(U *p, Args &&... args) {
+ ::new (static_cast<void *>(p)) U(std::forward<Args>(args)...);
+ }
+
+ T *allocate(std::size_t n) {
+ return reinterpret_cast<T *>(::operator new(sizeof(T) * n));
+ }
+
+ template <typename U>
+ void deallocate(U *p, std::size_t /*n*/) {
+ ::operator delete(static_cast<void *>(p));
+ }
+ };
+
+ // Minimum amount of data to queue up for sorting before we are guarenteed to
+ // not see data out of order.
+ std::chrono::nanoseconds max_out_of_order_duration_;
+
+ // File descriptor for the log file.
+ int fd_ = -1;
+
+ EventLoop *event_loop_;
+ TimerHandler *timer_handler_;
+
+ // Vector to read into. This uses an allocator which doesn't zero initialize
+ // the memory.
+ std::vector<uint8_t, DefaultInitAllocator<uint8_t>> data_;
+
+ // Amount of data consumed already in data_.
+ size_t consumed_data_ = 0;
+
+ // Vector holding the data for the configuration.
+ std::vector<uint8_t> configuration_;
+
+ // Moves the message to the correct channel queue.
+ void EmplaceDataBack(FlatbufferVector<MessageHeader> &&new_data);
+
+ // Pushes a pointer to the channel for the given timestamp to the sorted
+ // channel list.
+ void PushChannelHeap(monotonic_clock::time_point timestamp,
+ int channel_index);
+
+ // Returns a pointer to the channel with the oldest message in it, and the
+ // timestamp.
+ const std::pair<monotonic_clock::time_point, int> &oldest_message() const {
+ return channel_heap_.front();
+ }
+
+ // Pops a pointer to the channel with the oldest message in it, and the
+ // timestamp.
+ std::pair<monotonic_clock::time_point, int> PopOldestChannel();
+
+ // Datastructure to hold the list of messages, cached timestamp for the oldest
+ // message, and sender to send with.
+ struct ChannelData {
+ monotonic_clock::time_point oldest_timestamp = monotonic_clock::min_time;
+ std::deque<FlatbufferVector<MessageHeader>> data;
+ std::unique_ptr<RawSender> raw_sender;
+
+ // Returns the oldest message.
+ const FlatbufferVector<MessageHeader> &front() { return data.front(); }
+
+ // Returns the timestamp for the oldest message.
+ const monotonic_clock::time_point front_timestamp() {
+ return monotonic_clock::time_point(
+ std::chrono::nanoseconds(front().message().monotonic_sent_time()));
+ }
+ };
+
+ // List of channels and messages for them.
+ std::vector<ChannelData> channels_;
+
+ // Heap of channels so we can track which channel to send next.
+ std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap_;
+
+ // Timestamp of the newest message in a channel queue.
+ monotonic_clock::time_point newest_timestamp_ = monotonic_clock::min_time;
+
+ // The time at which we need to read another chunk from the logfile.
+ monotonic_clock::time_point queue_data_time_ = monotonic_clock::min_time;
+
+ // Cached bit for if we have reached the end of the file. Otherwise we will
+ // hammer on the kernel asking for more data each time we send.
+ bool end_of_file_ = false;
+};
+
+} // namespace logger
+} // namespace aos
+
+#endif // AOS_EVENTS_LOGGER_H_
diff --git a/aos/events/logger_test.cc b/aos/events/logger_test.cc
new file mode 100644
index 0000000..71c640d
--- /dev/null
+++ b/aos/events/logger_test.cc
@@ -0,0 +1,111 @@
+#include "aos/events/logger.h"
+
+#include "aos/events/event_loop.h"
+#include "aos/events/ping_lib.h"
+#include "aos/events/pong_lib.h"
+#include "aos/events/simulated_event_loop.h"
+#include "glog/logging.h"
+#include "gtest/gtest.h"
+
+namespace aos {
+namespace logger {
+namespace testing {
+
+namespace chrono = std::chrono;
+
+class LoggerTest : public ::testing::Test {
+ public:
+ LoggerTest()
+ : config_(
+ aos::configuration::ReadConfig("aos/events/pingpong_config.json")),
+ event_loop_factory_(&config_.message()),
+ ping_event_loop_(event_loop_factory_.MakeEventLoop()),
+ ping_(ping_event_loop_.get()),
+ pong_event_loop_(event_loop_factory_.MakeEventLoop()),
+ pong_(pong_event_loop_.get()) {}
+
+ // Config and factory.
+ aos::FlatbufferDetachedBuffer<aos::Configuration> config_;
+ SimulatedEventLoopFactory event_loop_factory_;
+
+ // Event loop and app for Ping
+ std::unique_ptr<EventLoop> ping_event_loop_;
+ Ping ping_;
+
+ // Event loop and app for Pong
+ std::unique_ptr<EventLoop> pong_event_loop_;
+ Pong pong_;
+};
+
+// Tests that we can startup at all. This confirms that the channels are all in
+// the config.
+TEST_F(LoggerTest, Starts) {
+ const ::std::string tmpdir(getenv("TEST_TMPDIR"));
+ const ::std::string logfile = tmpdir + "/logfile.bfbs";
+ // Remove it.
+ unlink(logfile.c_str());
+
+ LOG(INFO) << "Logging data to " << logfile;
+
+ {
+ DetachedBufferWriter writer(logfile);
+ std::unique_ptr<EventLoop> logger_event_loop =
+ event_loop_factory_.MakeEventLoop();
+
+ event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+ Logger logger(&writer, logger_event_loop.get(),
+ std::chrono::milliseconds(100));
+ event_loop_factory_.RunFor(chrono::milliseconds(20000));
+ }
+
+ LogReader reader(logfile);
+
+ LOG(INFO) << "Config " << FlatbufferToJson(reader.configuration());
+
+ // TODO(austin): Figure out what the API needs to look like. How do we replay
+ // the data that was fetched before it all starts? How do we set the starting
+ // time from the log file? Probably need to let the reader do more if it
+ // knows about the factory.
+ SimulatedEventLoopFactory log_reader_factory(reader.configuration());
+ std::unique_ptr<EventLoop> reader_event_loop =
+ log_reader_factory.MakeEventLoop();
+
+ reader.Register(reader_event_loop.get());
+
+ // Capture monotonic start time in OnRun and offset from there? Let the user
+ // configure the factory if they want time to match?
+ /*
+ log_reader_factory.InitializeTime(log_reader.monotonic_start_time(),
+ log_reader.realtime_start_time());
+ */
+ std::unique_ptr<EventLoop> test_event_loop =
+ log_reader_factory.MakeEventLoop();
+
+ int ping_count = 10;
+ int pong_count = 10;
+
+ // Confirm that the ping value matches.
+ test_event_loop->MakeWatcher("/test",
+ [&ping_count](const examples::Ping &ping) {
+ EXPECT_EQ(ping.value(), ping_count + 1);
+ ++ping_count;
+ });
+ // Confirm that the ping and pong counts both match, and the value also
+ // matches.
+ test_event_loop->MakeWatcher(
+ "/test", [&pong_count, &ping_count](const examples::Pong &pong) {
+ EXPECT_EQ(pong.value(), pong_count + 1);
+ ++pong_count;
+ EXPECT_EQ(ping_count, pong_count);
+ });
+
+ log_reader_factory.RunFor(std::chrono::seconds(100));
+ EXPECT_EQ(ping_count, 2010);
+
+ reader.Deregister();
+}
+
+} // namespace testing
+} // namespace logger
+} // namespace aos
diff --git a/aos/flatbuffers.h b/aos/flatbuffers.h
index ff2dced..1050417 100644
--- a/aos/flatbuffers.h
+++ b/aos/flatbuffers.h
@@ -151,6 +151,33 @@
std::string data_;
};
+// Vector backed flatbuffer.
+template <typename T>
+class FlatbufferVector : public Flatbuffer<T> {
+ public:
+ // Builds a Flatbuffer around a vector.
+ FlatbufferVector(std::vector<uint8_t> &&data) : data_(std::move(data)) {}
+
+ // Builds a Flatbuffer by copying the data from the other flatbuffer.
+ FlatbufferVector(const Flatbuffer<T> &other)
+ : data_(other.data(), other.data() + other.size()) {}
+
+ // Copies the data from the other flatbuffer.
+ FlatbufferVector &operator=(const Flatbuffer<T> &other) {
+ data_ = std::vector<uint8_t>(other.data(), other.data() + other.size());
+ return *this;
+ }
+
+ virtual ~FlatbufferVector() override {}
+
+ const uint8_t *data() const override { return data_.data(); }
+ uint8_t *data() override { return data_.data(); }
+ size_t size() const override { return data_.size(); }
+
+ private:
+ std::vector<uint8_t> data_;
+};
+
// This object associates the message type with the memory storing the
// flatbuffer. This only stores root tables.
//
diff --git a/frc971/control_loops/drivetrain/splinedrivetrain.cc b/frc971/control_loops/drivetrain/splinedrivetrain.cc
index 1222ae0..c467e1f 100644
--- a/frc971/control_loops/drivetrain/splinedrivetrain.cc
+++ b/frc971/control_loops/drivetrain/splinedrivetrain.cc
@@ -170,6 +170,8 @@
has_started_execution_ = false;
}
mutex_.Unlock();
+ } else {
+ VLOG(1) << "Failed to acquire trajectory lock.";
}
}
diff --git a/y2019/control_loops/drivetrain/localizer_test.cc b/y2019/control_loops/drivetrain/localizer_test.cc
index d9a55f6..b126c6c 100644
--- a/y2019/control_loops/drivetrain/localizer_test.cc
+++ b/y2019/control_loops/drivetrain/localizer_test.cc
@@ -141,6 +141,9 @@
}
void SetUp() {
+ // Turn on -v 1
+ FLAGS_v = std::max(FLAGS_v, 1);
+
flatbuffers::DetachedBuffer goal_buffer;
{
flatbuffers::FlatBufferBuilder fbb;
@@ -185,10 +188,13 @@
aos::FlatbufferDetachedBuffer<frc971::control_loops::drivetrain::Goal> goal(
std::move(goal_buffer));
- spline_drivetrain_.SetGoal(&goal.message());
-
// Let the spline drivetrain compute the spline.
while (true) {
+ // We need to keep sending the goal. There are conditions when the
+ // trajectory lock isn't grabbed the first time, and we want to keep
+ // banging on it to keep trying. Otherwise we deadlock.
+ spline_drivetrain_.SetGoal(&goal.message());
+
::std::this_thread::sleep_for(::std::chrono::milliseconds(5));
flatbuffers::FlatBufferBuilder fbb;