Add logger and log reader classes.
These work on EventLoop so they will work both in simulation and in
reality.
There is still some design work to deal with the realtime clock, and
some work to deal with messages from before the official start of the
log file.
Change-Id: Ib83228bdf1282fed626c61fcb6ed6fd86d213afd
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 53b3b05..3377d67 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -209,3 +209,40 @@
"@com_google_absl//absl/container:btree",
],
)
+
+flatbuffer_cc_library(
+ name = "logger_fbs",
+ srcs = ["logger.fbs"],
+ gen_reflections = 1,
+ includes = [
+ "//aos:configuration_fbs_includes",
+ ],
+)
+
+cc_library(
+ name = "logger",
+ srcs = ["logger.cc"],
+ hdrs = ["logger.h"],
+ deps = [
+ ":event_loop",
+ ":logger_fbs",
+ "//aos:flatbuffer_merge",
+ "//aos/time",
+ "@com_github_google_flatbuffers//:flatbuffers",
+ "@com_google_absl//absl/container:inlined_vector",
+ "@com_google_absl//absl/strings",
+ ],
+)
+
+cc_test(
+ name = "logger_test",
+ srcs = ["logger_test.cc"],
+ data = ["pingpong_config.json"],
+ deps = [
+ ":logger",
+ ":ping_lib",
+ ":pong_lib",
+ ":simulated_event_loop",
+ "//aos/testing:googletest",
+ ],
+)
diff --git a/aos/events/logger.cc b/aos/events/logger.cc
new file mode 100644
index 0000000..6b659c0
--- /dev/null
+++ b/aos/events/logger.cc
@@ -0,0 +1,498 @@
+#include "aos/events/logger.h"
+
+#include <fcntl.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+#include <vector>
+
+#include "absl/strings/string_view.h"
+#include "absl/types/span.h"
+#include "aos/events/event_loop.h"
+#include "aos/events/logger_generated.h"
+#include "aos/flatbuffer_merge.h"
+#include "aos/time/time.h"
+#include "flatbuffers/flatbuffers.h"
+
+DEFINE_int32(flush_size, 1000000,
+ "Number of outstanding bytes to allow before flushing to disk.");
+
+namespace aos {
+namespace logger {
+
+namespace chrono = std::chrono;
+
+DetachedBufferWriter::DetachedBufferWriter(absl::string_view filename)
+ : fd_(open(std::string(filename).c_str(),
+ O_RDWR | O_CLOEXEC | O_CREAT | O_EXCL, 0774)) {
+ PCHECK(fd_ != -1) << ": Failed to open " << filename;
+}
+
+DetachedBufferWriter::~DetachedBufferWriter() {
+ Flush();
+ PLOG_IF(ERROR, close(fd_) == -1) << " Failed to close logfile";
+}
+
+void DetachedBufferWriter::QueueSizedFlatbuffer(
+ flatbuffers::FlatBufferBuilder *fbb) {
+ QueueSizedFlatbuffer(fbb->Release());
+}
+
+void DetachedBufferWriter::QueueSizedFlatbuffer(
+ flatbuffers::DetachedBuffer &&buffer) {
+ queued_size_ += buffer.size();
+ queue_.emplace_back(std::move(buffer));
+
+ if (queued_size_ > static_cast<size_t>(FLAGS_flush_size)) {
+ Flush();
+ }
+}
+
+void DetachedBufferWriter::Flush() {
+ if (queue_.size() == 0u) {
+ return;
+ }
+ iovec_.clear();
+ iovec_.reserve(queue_.size());
+ size_t counted_size = 0;
+ for (size_t i = 0; i < queue_.size(); ++i) {
+ struct iovec n;
+ n.iov_base = queue_[i].data();
+ n.iov_len = queue_[i].size();
+ counted_size += n.iov_len;
+ iovec_.emplace_back(std::move(n));
+ }
+ CHECK_EQ(counted_size, queued_size_);
+ const ssize_t written = writev(fd_, iovec_.data(), iovec_.size());
+
+ PCHECK(written == static_cast<ssize_t>(queued_size_))
+ << ": Wrote " << written << " expected " << queued_size_;
+
+ queued_size_ = 0;
+ queue_.clear();
+ // TODO(austin): Handle partial writes in some way other than crashing...
+}
+
+Logger::Logger(DetachedBufferWriter *writer, EventLoop *event_loop,
+ std::chrono::milliseconds polling_period)
+ : event_loop_(event_loop),
+ writer_(writer),
+ timer_handler_(event_loop_->AddTimer([this]() { DoLogData(); })),
+ polling_period_(polling_period) {
+ for (const Channel *channel : *event_loop_->configuration()->channels()) {
+ FetcherStruct fs;
+ fs.fetcher = event_loop->MakeRawFetcher(channel);
+ fs.written = false;
+ fetchers_.emplace_back(std::move(fs));
+ }
+
+ // When things start, we want to log the header, then the most recent messages
+ // available on each fetcher to capture the previous state, then start
+ // polling.
+ event_loop_->OnRun([this, polling_period]() {
+ // Grab data from each channel right before we declare the log file started
+ // so we can capture the latest message on each channel. This lets us have
+ // non periodic messages with configuration that now get logged.
+ for (FetcherStruct &f : fetchers_) {
+ f.written = !f.fetcher->Fetch();
+ }
+
+ // We need to pick a point in time to declare the log file "started". This
+ // starts here. It needs to be after everything is fetched so that the
+ // fetchers are all pointed at the most recent message before the start
+ // time.
+ const monotonic_clock::time_point monotonic_now =
+ event_loop_->monotonic_now();
+ const realtime_clock::time_point realtime_now = event_loop_->realtime_now();
+ last_synchronized_time_ = monotonic_now;
+
+ {
+ // Now write the header with this timestamp in it.
+ flatbuffers::FlatBufferBuilder fbb;
+ fbb.ForceDefaults(1);
+
+ flatbuffers::Offset<aos::Configuration> configuration_offset =
+ CopyFlatBuffer(event_loop_->configuration(), &fbb);
+
+ aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
+
+ log_file_header_builder.add_configuration(configuration_offset);
+ // The worst case theoretical out of order is the polling period times 2.
+ // One message could get logged right after the boundary, but be for right
+ // before the next boundary. And the reverse could happen for another
+ // message. Report back 3x to be extra safe, and because the cost isn't
+ // huge on the read side.
+ log_file_header_builder.add_max_out_of_order_duration(
+ std::chrono::duration_cast<std::chrono::nanoseconds>(3 *
+ polling_period)
+ .count());
+
+ log_file_header_builder.add_monotonic_sent_time(
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ monotonic_now.time_since_epoch())
+ .count());
+ log_file_header_builder.add_realtime_sent_time(
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ realtime_now.time_since_epoch())
+ .count());
+
+ fbb.FinishSizePrefixed(log_file_header_builder.Finish());
+ writer_->QueueSizedFlatbuffer(&fbb);
+ }
+
+ timer_handler_->Setup(event_loop_->monotonic_now() + polling_period,
+ polling_period);
+ });
+}
+
+void Logger::DoLogData() {
+ // We want to guarentee that messages aren't out of order by more than
+ // max_out_of_order_duration. To do this, we need sync points. Every write
+ // cycle should be a sync point.
+ const monotonic_clock::time_point monotonic_now = monotonic_clock::now();
+
+ do {
+ // Move the sync point up by at most polling_period. This forces one sync
+ // per iteration, even if it is small.
+ last_synchronized_time_ =
+ std::min(last_synchronized_time_ + polling_period_, monotonic_now);
+ size_t channel_index = 0;
+ // Write each channel to disk, one at a time.
+ for (FetcherStruct &f : fetchers_) {
+ while (true) {
+ if (f.fetcher.get() == nullptr) {
+ if (!f.fetcher->FetchNext()) {
+ VLOG(1) << "No new data on "
+ << FlatbufferToJson(f.fetcher->channel());
+ break;
+ } else {
+ f.written = false;
+ }
+ }
+
+ if (f.written) {
+ if (!f.fetcher->FetchNext()) {
+ VLOG(1) << "No new data on "
+ << FlatbufferToJson(f.fetcher->channel());
+ break;
+ } else {
+ f.written = false;
+ }
+ }
+
+ if (!f.written && f.fetcher->context().monotonic_sent_time <
+ last_synchronized_time_) {
+ // Write!
+ flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
+ max_header_size_);
+ fbb.ForceDefaults(1);
+
+ flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset =
+ fbb.CreateVector(
+ static_cast<uint8_t *>(f.fetcher->context().data),
+ f.fetcher->context().size);
+
+ VLOG(1) << "Writing data for channel "
+ << FlatbufferToJson(f.fetcher->channel());
+
+ MessageHeader::Builder message_header_builder(fbb);
+ message_header_builder.add_channel_index(channel_index);
+ message_header_builder.add_monotonic_sent_time(
+ f.fetcher->context()
+ .monotonic_sent_time.time_since_epoch()
+ .count());
+ message_header_builder.add_realtime_sent_time(
+ f.fetcher->context()
+ .realtime_sent_time.time_since_epoch()
+ .count());
+
+ message_header_builder.add_queue_index(
+ f.fetcher->context().queue_index);
+
+ message_header_builder.add_data(data_offset);
+
+ fbb.FinishSizePrefixed(message_header_builder.Finish());
+ max_header_size_ = std::max(
+ max_header_size_, fbb.GetSize() - f.fetcher->context().size);
+ writer_->QueueSizedFlatbuffer(&fbb);
+
+ f.written = true;
+ }
+ }
+
+ ++channel_index;
+ }
+
+ CHECK_EQ(channel_index, fetchers_.size());
+
+ // If we missed cycles, we could be pretty far behind. Spin until we are
+ // caught up.
+ } while (last_synchronized_time_ + polling_period_ < monotonic_now);
+
+ writer_->Flush();
+}
+
+LogReader::LogReader(absl::string_view filename)
+ : fd_(open(std::string(filename).c_str(), O_RDONLY | O_CLOEXEC)) {
+ PCHECK(fd_ != -1) << ": Failed to open " << filename;
+
+ // Make sure we have enough to read the size.
+ absl::Span<const uint8_t> config_data = ReadMessage();
+
+ // Make sure something was read.
+ CHECK(config_data != absl::Span<const uint8_t>());
+
+ // And copy the config so we have it forever.
+ configuration_ = std::vector<uint8_t>(config_data.begin(), config_data.end());
+
+ max_out_of_order_duration_ = std::chrono::nanoseconds(
+ flatbuffers::GetSizePrefixedRoot<LogFileHeader>(configuration_.data())
+ ->max_out_of_order_duration());
+
+ channels_.resize(configuration()->channels()->size());
+
+ QueueMessages();
+}
+
+bool LogReader::ReadBlock() {
+ if (end_of_file_) {
+ return false;
+ }
+
+ // Appends 256k. This is enough that the read call is efficient. We don't
+ // want to spend too much time reading small chunks because the syscalls for
+ // that will be expensive.
+ constexpr size_t kReadSize = 256 * 1024;
+
+ // Strip off any unused data at the front.
+ if (consumed_data_ != 0) {
+ data_.erase(data_.begin(), data_.begin() + consumed_data_);
+ consumed_data_ = 0;
+ }
+
+ const size_t starting_size = data_.size();
+
+ // This should automatically grow the backing store. It won't shrink if we
+ // get a small chunk later. This reduces allocations when we want to append
+ // more data.
+ data_.resize(data_.size() + kReadSize);
+
+ ssize_t count = read(fd_, &data_[starting_size], kReadSize);
+ data_.resize(starting_size + std::max(count, static_cast<ssize_t>(0)));
+ if (count == 0) {
+ end_of_file_ = true;
+ return false;
+ }
+ PCHECK(count > 0);
+
+ return true;
+}
+
+bool LogReader::MessageAvailable() {
+ // Are we big enough to read the size?
+ if (data_.size() - consumed_data_ < sizeof(flatbuffers::uoffset_t)) {
+ return false;
+ }
+
+ // Then, are we big enough to read the full message?
+ const size_t data_size =
+ flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
+ sizeof(flatbuffers::uoffset_t);
+ if (data_.size() < consumed_data_ + data_size) {
+ return false;
+ }
+
+ return true;
+}
+
+absl::Span<const uint8_t> LogReader::ReadMessage() {
+ // Make sure we have enough for the size.
+ if (data_.size() - consumed_data_ < sizeof(flatbuffers::uoffset_t)) {
+ if (!ReadBlock()) {
+ return absl::Span<const uint8_t>();
+ }
+ }
+
+ // Now make sure we have enough for the message.
+ const size_t data_size =
+ flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
+ sizeof(flatbuffers::uoffset_t);
+ while (data_.size() < consumed_data_ + data_size) {
+ if (!ReadBlock()) {
+ return absl::Span<const uint8_t>();
+ }
+ }
+
+ // And return it, consuming the data.
+ const uint8_t *data_ptr = data_.data() + consumed_data_;
+
+ consumed_data_ += data_size;
+
+ return absl::Span<const uint8_t>(data_ptr, data_size);
+}
+
+void LogReader::QueueMessages() {
+ while (true) {
+ // Don't queue if we have enough data already.
+ // When a log file starts, there should be a message from each channel.
+ // Those messages might be very old. Make sure to read a chunk past the
+ // starting time.
+ if (channel_heap_.size() > 0 &&
+ newest_timestamp_ >
+ std::max(oldest_message().first, monotonic_start_time()) +
+ max_out_of_order_duration_) {
+ break;
+ }
+
+ absl::Span<const uint8_t> msg_data = ReadMessage();
+ if (msg_data == absl::Span<const uint8_t>()) {
+ break;
+ }
+
+ FlatbufferVector<MessageHeader> msg(std::vector<uint8_t>(
+ msg_data.begin() + sizeof(flatbuffers::uoffset_t), msg_data.end()));
+
+ EmplaceDataBack(std::move(msg));
+ }
+
+ queue_data_time_ = newest_timestamp_ - max_out_of_order_duration_;
+}
+
+const Configuration *LogReader::configuration() {
+ return flatbuffers::GetSizePrefixedRoot<LogFileHeader>(configuration_.data())
+ ->configuration();
+}
+
+monotonic_clock::time_point LogReader::monotonic_start_time() {
+ return monotonic_clock::time_point(std::chrono::nanoseconds(
+ flatbuffers::GetSizePrefixedRoot<LogFileHeader>(configuration_.data())
+ ->monotonic_sent_time()));
+}
+
+realtime_clock::time_point LogReader::realtime_start_time() {
+ return realtime_clock::time_point(std::chrono::nanoseconds(
+ flatbuffers::GetSizePrefixedRoot<LogFileHeader>(configuration_.data())
+ ->realtime_sent_time()));
+}
+
+void LogReader::Register(EventLoop *event_loop) {
+ event_loop_ = event_loop;
+
+ for (size_t i = 0; i < channels_.size(); ++i) {
+ CHECK_EQ(configuration()->channels()->Get(i)->name(),
+ event_loop_->configuration()->channels()->Get(i)->name());
+ CHECK_EQ(configuration()->channels()->Get(i)->type(),
+ event_loop_->configuration()->channels()->Get(i)->type());
+
+ channels_[i].raw_sender = event_loop_->MakeRawSender(
+ event_loop_->configuration()->channels()->Get(i));
+ }
+
+ timer_handler_ = event_loop_->AddTimer([this]() {
+ std::pair<monotonic_clock::time_point, int> oldest_channel_index =
+ PopOldestChannel();
+ const monotonic_clock::time_point monotonic_now =
+ event_loop_->monotonic_now();
+ CHECK(monotonic_now == oldest_channel_index.first)
+ << ": Now " << monotonic_now.time_since_epoch().count()
+ << " trying to send "
+ << oldest_channel_index.first.time_since_epoch().count();
+
+ struct LogReader::ChannelData &channel =
+ channels_[oldest_channel_index.second];
+
+ FlatbufferVector<MessageHeader> front = std::move(channel.front());
+
+ CHECK(front.message().data() != nullptr);
+ if (oldest_channel_index.first > monotonic_start_time()) {
+ channel.raw_sender->Send(front.message().data()->Data(),
+ front.message().data()->size());
+ } else {
+ LOG(WARNING) << "Not sending data from before the start of the log file. "
+ << oldest_channel_index.first.time_since_epoch().count()
+ << " start "
+ << monotonic_start_time().time_since_epoch().count() << " "
+ << FlatbufferToJson(front);
+ }
+ channel.data.pop_front();
+
+ // Re-push it and update the oldest timestamp.
+ if (channel.data.size() != 0) {
+ const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
+ chrono::nanoseconds(channel.front().message().monotonic_sent_time()));
+ PushChannelHeap(timestamp, oldest_channel_index.second);
+ channel.oldest_timestamp = timestamp;
+ } else {
+ channel.oldest_timestamp = monotonic_clock::min_time;
+ }
+
+ if (monotonic_now > queue_data_time_) {
+ QueueMessages();
+ }
+
+ if (channel_heap_.size() != 0) {
+ timer_handler_->Setup(oldest_message().first);
+ }
+ });
+
+ if (channel_heap_.size() > 0u) {
+ event_loop_->OnRun(
+ [this]() { timer_handler_->Setup(oldest_message().first); });
+ }
+}
+
+void LogReader::Deregister() {
+ for (size_t i = 0; i < channels_.size(); ++i) {
+ channels_[i].raw_sender.reset();
+ }
+}
+
+void LogReader::EmplaceDataBack(FlatbufferVector<MessageHeader> &&new_data) {
+ const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
+ chrono::nanoseconds(new_data.message().monotonic_sent_time()));
+ const size_t channel_index = new_data.message().channel_index();
+ CHECK_LT(channel_index, channels_.size());
+ newest_timestamp_ = std::max(newest_timestamp_, timestamp);
+ if (channels_[channel_index].data.size() == 0) {
+ channels_[channel_index].oldest_timestamp = timestamp;
+ PushChannelHeap(timestamp, channel_index);
+ }
+ channels_[channel_index].data.emplace_back(std::move(new_data));
+}
+
+namespace {
+
+bool ChannelHeapCompare(
+ const std::pair<monotonic_clock::time_point, int> first,
+ const std::pair<monotonic_clock::time_point, int> second) {
+ if (first.first > second.first) {
+ return true;
+ } else if (first.first == second.first) {
+ return first.second > second.second;
+ } else {
+ return false;
+ }
+}
+
+} // namespace
+
+void LogReader::PushChannelHeap(monotonic_clock::time_point timestamp,
+ int channel_index) {
+ channel_heap_.push_back(std::make_pair(timestamp, channel_index));
+
+ // The default sort puts the newest message first. Use a custom comparator to
+ // put the oldest message first.
+ std::push_heap(channel_heap_.begin(), channel_heap_.end(),
+ ChannelHeapCompare);
+}
+
+std::pair<monotonic_clock::time_point, int> LogReader::PopOldestChannel() {
+ std::pair<monotonic_clock::time_point, int> result = channel_heap_.front();
+ std::pop_heap(channel_heap_.begin(), channel_heap_.end(),
+ &ChannelHeapCompare);
+ channel_heap_.pop_back();
+ return result;
+}
+
+} // namespace logger
+} // namespace aos
diff --git a/aos/events/logger.fbs b/aos/events/logger.fbs
new file mode 100644
index 0000000..36055a6
--- /dev/null
+++ b/aos/events/logger.fbs
@@ -0,0 +1,53 @@
+include "aos/configuration.fbs";
+
+namespace aos.logger;
+
+// A log file is a sequence of size prefixed flatbuffers.
+// The first flatbuffer will be the LogFileHeader, followed by an arbitrary
+// number of MessageHeaders.
+//
+// The log file starts at the time demarcated in the header on the monotonic
+// clock. There will be any number of messages per channel logged before the
+// start time. These messages are logged so that fetchers can retrieve the
+// state of the system at the start of the logfile for things like capturing
+// parameters. In replay, they should be made available to fetchers, but not
+// trigger watchers.
+
+table LogFileHeader {
+ // Time this log file started on the monotonic clock in nanoseconds.
+ monotonic_sent_time:long;
+ // Time this log file started on the realtime clock in nanoseconds.
+ realtime_sent_time:long;
+
+ // Messages are not written in order to disk. They will be out of order by
+ // at most this duration (in nanoseconds). If the log reader buffers until
+ // it finds messages this much newer than it's simulation time, it will never
+ // find a message out of order.
+ max_out_of_order_duration:long;
+
+ // The configuration of the channels.
+ configuration:aos.Configuration;
+
+ // TODO(austin): Node!
+}
+
+// Table holding a message.
+table MessageHeader {
+ // Index into the channel datastructure in the log file header. This
+ // provides the data type.
+ channel_index:uint;
+ // Time this message was sent on the monotonic clock in nanoseconds.
+ monotonic_sent_time:long;
+ // Time this message was sent on the realtime clock in nanoseconds.
+ realtime_sent_time:long;
+ // Index into the ipc queue of this message. This should start with 0 and
+ // always monotonically increment if no messages were ever lost. It will
+ // wrap at a multiple of the queue size.
+ queue_index:uint;
+ // TODO(austin): Node.
+
+ // TODO(austin): Format? Compressed?
+
+ // The nested flatbuffer.
+ data:[ubyte];
+}
diff --git a/aos/events/logger.h b/aos/events/logger.h
new file mode 100644
index 0000000..7873a42
--- /dev/null
+++ b/aos/events/logger.h
@@ -0,0 +1,252 @@
+#ifndef AOS_EVENTS_LOGGER_H_
+#define AOS_EVENTS_LOGGER_H_
+
+#include <deque>
+#include <vector>
+
+#include "absl/strings/string_view.h"
+#include "absl/types/span.h"
+#include "aos/events/event_loop.h"
+#include "aos/events/logger_generated.h"
+#include "aos/time/time.h"
+#include "flatbuffers/flatbuffers.h"
+
+namespace aos {
+namespace logger {
+
+// This class manages efficiently writing a sequence of detached buffers to a
+// file. It queues them up and batches the write operation.
+class DetachedBufferWriter {
+ public:
+ DetachedBufferWriter(absl::string_view filename);
+ ~DetachedBufferWriter();
+
+ // TODO(austin): Snappy compress the log file if it ends with .snappy!
+
+ // Queues up a finished FlatBufferBuilder to be written. Steals the detached
+ // buffer from it.
+ void QueueSizedFlatbuffer(flatbuffers::FlatBufferBuilder *fbb);
+ // Queues up a detached buffer directly.
+ void QueueSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer);
+
+ // Triggers data to be provided to the kernel and written.
+ void Flush();
+
+ private:
+ int fd_ = -1;
+
+ // Size of all the data in the queue.
+ size_t queued_size_ = 0;
+
+ // List of buffers to flush.
+ std::vector<flatbuffers::DetachedBuffer> queue_;
+ // List of iovecs to use with writev. This is a member variable to avoid
+ // churn.
+ std::vector<struct iovec> iovec_;
+};
+
+// Logs all channels available in the event loop to disk every 100 ms.
+// Start by logging one message per channel to capture any state and
+// configuration that is sent rately on a channel and would affect execution.
+class Logger {
+ public:
+ Logger(DetachedBufferWriter *writer, EventLoop *event_loop,
+ std::chrono::milliseconds polling_period =
+ std::chrono::milliseconds(100));
+
+ private:
+ void DoLogData();
+
+ EventLoop *event_loop_;
+ DetachedBufferWriter *writer_;
+
+ // Structure to track both a fetcher, and if the data fetched has been
+ // written. We may want to delay writing data to disk so that we don't let
+ // data get too far out of order when written to disk so we can avoid making
+ // it too hard to sort when reading.
+ struct FetcherStruct {
+ std::unique_ptr<RawFetcher> fetcher;
+ bool written = false;
+ };
+
+ std::vector<FetcherStruct> fetchers_;
+ TimerHandler *timer_handler_;
+
+ // Period to poll the channels.
+ const std::chrono::milliseconds polling_period_;
+
+ // Last time that data was written for all channels to disk.
+ monotonic_clock::time_point last_synchronized_time_;
+
+ // Max size that the header has consumed. This much extra data will be
+ // reserved in the builder to avoid reallocating.
+ size_t max_header_size_ = 0;
+};
+
+// Replays all the channels in the logfile to the event loop.
+class LogReader {
+ public:
+ LogReader(absl::string_view filename);
+
+ // Registers the timer and senders used to resend the messages from the log
+ // file.
+ void Register(EventLoop *event_loop);
+ // Unregisters the senders.
+ void Deregister();
+
+ // TODO(austin): Remap channels?
+
+ // Returns the configuration from the log file.
+ const Configuration *configuration();
+
+ // Returns the starting timestamp for the log file.
+ monotonic_clock::time_point monotonic_start_time();
+ realtime_clock::time_point realtime_start_time();
+
+ // TODO(austin): Add the ability to re-publish the fetched messages. Add 2
+ // options, one which publishes them *now*, and another which publishes them
+ // to the simulated event loop factory back in time where they actually
+ // happened.
+
+ private:
+ // Reads a chunk of data into data_. Returns false if no data was read.
+ bool ReadBlock();
+
+ // Returns true if there is a full message available in the buffer, or if we
+ // will have to read more data from disk.
+ bool MessageAvailable();
+
+ // Returns a span with the data for a message from the log file, excluding the
+ // size.
+ absl::Span<const uint8_t> ReadMessage();
+
+ // Queues at least max_out_of_order_duration_ messages into channels_.
+ void QueueMessages();
+
+ // We need to read a large chunk at a time, then kit it up into parts and
+ // sort.
+ //
+ // We want to read 256 KB chunks at a time. This is the fastest read size.
+ // This leaves us with a fragmentation problem though.
+ //
+ // The easy answer is to read 256 KB chunks. Then, malloc and memcpy those
+ // chunks into single flatbuffer messages and manage them in a sorted queue.
+ // Everything is copied three times (into 256 kb buffer, then into separate
+ // buffer, then into sender), but none of it is all that expensive. We can
+ // optimize if it is slow later.
+ //
+ // As we place the elements in the sorted list of times, keep doing this until
+ // we read a message that is newer than the threshold.
+ //
+ // Then repeat. Keep filling up the sorted list with 256 KB chunks (need a
+ // small state machine so we can resume), and keep pulling messages back out
+ // and sending.
+ //
+ // For sorting, we want to use the fact that each channel is sorted, and then
+ // merge sort the channels. Have a vector of deques, and then hold a sorted
+ // list of pointers to those.
+ //
+ // TODO(austin): Multithreaded read at some point. Gotta go faster!
+ // Especially if we start compressing.
+
+ // Allocator which doesn't zero initialize memory.
+ template <typename T>
+ struct DefaultInitAllocator {
+ typedef T value_type;
+
+ template <typename U>
+ void construct(U *p) {
+ ::new (static_cast<void *>(p)) U;
+ }
+
+ template <typename U, typename... Args>
+ void construct(U *p, Args &&... args) {
+ ::new (static_cast<void *>(p)) U(std::forward<Args>(args)...);
+ }
+
+ T *allocate(std::size_t n) {
+ return reinterpret_cast<T *>(::operator new(sizeof(T) * n));
+ }
+
+ template <typename U>
+ void deallocate(U *p, std::size_t /*n*/) {
+ ::operator delete(static_cast<void *>(p));
+ }
+ };
+
+ // Minimum amount of data to queue up for sorting before we are guarenteed to
+ // not see data out of order.
+ std::chrono::nanoseconds max_out_of_order_duration_;
+
+ // File descriptor for the log file.
+ int fd_ = -1;
+
+ EventLoop *event_loop_;
+ TimerHandler *timer_handler_;
+
+ // Vector to read into. This uses an allocator which doesn't zero initialize
+ // the memory.
+ std::vector<uint8_t, DefaultInitAllocator<uint8_t>> data_;
+
+ // Amount of data consumed already in data_.
+ size_t consumed_data_ = 0;
+
+ // Vector holding the data for the configuration.
+ std::vector<uint8_t> configuration_;
+
+ // Moves the message to the correct channel queue.
+ void EmplaceDataBack(FlatbufferVector<MessageHeader> &&new_data);
+
+ // Pushes a pointer to the channel for the given timestamp to the sorted
+ // channel list.
+ void PushChannelHeap(monotonic_clock::time_point timestamp,
+ int channel_index);
+
+ // Returns a pointer to the channel with the oldest message in it, and the
+ // timestamp.
+ const std::pair<monotonic_clock::time_point, int> &oldest_message() const {
+ return channel_heap_.front();
+ }
+
+ // Pops a pointer to the channel with the oldest message in it, and the
+ // timestamp.
+ std::pair<monotonic_clock::time_point, int> PopOldestChannel();
+
+ // Datastructure to hold the list of messages, cached timestamp for the oldest
+ // message, and sender to send with.
+ struct ChannelData {
+ monotonic_clock::time_point oldest_timestamp = monotonic_clock::min_time;
+ std::deque<FlatbufferVector<MessageHeader>> data;
+ std::unique_ptr<RawSender> raw_sender;
+
+ // Returns the oldest message.
+ const FlatbufferVector<MessageHeader> &front() { return data.front(); }
+
+ // Returns the timestamp for the oldest message.
+ const monotonic_clock::time_point front_timestamp() {
+ return monotonic_clock::time_point(
+ std::chrono::nanoseconds(front().message().monotonic_sent_time()));
+ }
+ };
+
+ // List of channels and messages for them.
+ std::vector<ChannelData> channels_;
+
+ // Heap of channels so we can track which channel to send next.
+ std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap_;
+
+ // Timestamp of the newest message in a channel queue.
+ monotonic_clock::time_point newest_timestamp_ = monotonic_clock::min_time;
+
+ // The time at which we need to read another chunk from the logfile.
+ monotonic_clock::time_point queue_data_time_ = monotonic_clock::min_time;
+
+ // Cached bit for if we have reached the end of the file. Otherwise we will
+ // hammer on the kernel asking for more data each time we send.
+ bool end_of_file_ = false;
+};
+
+} // namespace logger
+} // namespace aos
+
+#endif // AOS_EVENTS_LOGGER_H_
diff --git a/aos/events/logger_test.cc b/aos/events/logger_test.cc
new file mode 100644
index 0000000..71c640d
--- /dev/null
+++ b/aos/events/logger_test.cc
@@ -0,0 +1,111 @@
+#include "aos/events/logger.h"
+
+#include "aos/events/event_loop.h"
+#include "aos/events/ping_lib.h"
+#include "aos/events/pong_lib.h"
+#include "aos/events/simulated_event_loop.h"
+#include "glog/logging.h"
+#include "gtest/gtest.h"
+
+namespace aos {
+namespace logger {
+namespace testing {
+
+namespace chrono = std::chrono;
+
+class LoggerTest : public ::testing::Test {
+ public:
+ LoggerTest()
+ : config_(
+ aos::configuration::ReadConfig("aos/events/pingpong_config.json")),
+ event_loop_factory_(&config_.message()),
+ ping_event_loop_(event_loop_factory_.MakeEventLoop()),
+ ping_(ping_event_loop_.get()),
+ pong_event_loop_(event_loop_factory_.MakeEventLoop()),
+ pong_(pong_event_loop_.get()) {}
+
+ // Config and factory.
+ aos::FlatbufferDetachedBuffer<aos::Configuration> config_;
+ SimulatedEventLoopFactory event_loop_factory_;
+
+ // Event loop and app for Ping
+ std::unique_ptr<EventLoop> ping_event_loop_;
+ Ping ping_;
+
+ // Event loop and app for Pong
+ std::unique_ptr<EventLoop> pong_event_loop_;
+ Pong pong_;
+};
+
+// Tests that we can startup at all. This confirms that the channels are all in
+// the config.
+TEST_F(LoggerTest, Starts) {
+ const ::std::string tmpdir(getenv("TEST_TMPDIR"));
+ const ::std::string logfile = tmpdir + "/logfile.bfbs";
+ // Remove it.
+ unlink(logfile.c_str());
+
+ LOG(INFO) << "Logging data to " << logfile;
+
+ {
+ DetachedBufferWriter writer(logfile);
+ std::unique_ptr<EventLoop> logger_event_loop =
+ event_loop_factory_.MakeEventLoop();
+
+ event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+ Logger logger(&writer, logger_event_loop.get(),
+ std::chrono::milliseconds(100));
+ event_loop_factory_.RunFor(chrono::milliseconds(20000));
+ }
+
+ LogReader reader(logfile);
+
+ LOG(INFO) << "Config " << FlatbufferToJson(reader.configuration());
+
+ // TODO(austin): Figure out what the API needs to look like. How do we replay
+ // the data that was fetched before it all starts? How do we set the starting
+ // time from the log file? Probably need to let the reader do more if it
+ // knows about the factory.
+ SimulatedEventLoopFactory log_reader_factory(reader.configuration());
+ std::unique_ptr<EventLoop> reader_event_loop =
+ log_reader_factory.MakeEventLoop();
+
+ reader.Register(reader_event_loop.get());
+
+ // Capture monotonic start time in OnRun and offset from there? Let the user
+ // configure the factory if they want time to match?
+ /*
+ log_reader_factory.InitializeTime(log_reader.monotonic_start_time(),
+ log_reader.realtime_start_time());
+ */
+ std::unique_ptr<EventLoop> test_event_loop =
+ log_reader_factory.MakeEventLoop();
+
+ int ping_count = 10;
+ int pong_count = 10;
+
+ // Confirm that the ping value matches.
+ test_event_loop->MakeWatcher("/test",
+ [&ping_count](const examples::Ping &ping) {
+ EXPECT_EQ(ping.value(), ping_count + 1);
+ ++ping_count;
+ });
+ // Confirm that the ping and pong counts both match, and the value also
+ // matches.
+ test_event_loop->MakeWatcher(
+ "/test", [&pong_count, &ping_count](const examples::Pong &pong) {
+ EXPECT_EQ(pong.value(), pong_count + 1);
+ ++pong_count;
+ EXPECT_EQ(ping_count, pong_count);
+ });
+
+ log_reader_factory.RunFor(std::chrono::seconds(100));
+ EXPECT_EQ(ping_count, 2010);
+
+ reader.Deregister();
+}
+
+} // namespace testing
+} // namespace logger
+} // namespace aos
diff --git a/aos/flatbuffers.h b/aos/flatbuffers.h
index b48dc1f..b951564 100644
--- a/aos/flatbuffers.h
+++ b/aos/flatbuffers.h
@@ -155,6 +155,33 @@
std::string data_;
};
+// Vector backed flatbuffer.
+template <typename T>
+class FlatbufferVector : public Flatbuffer<T> {
+ public:
+ // Builds a Flatbuffer around a vector.
+ FlatbufferVector(std::vector<uint8_t> &&data) : data_(std::move(data)) {}
+
+ // Builds a Flatbuffer by copying the data from the other flatbuffer.
+ FlatbufferVector(const Flatbuffer<T> &other)
+ : data_(other.data(), other.data() + other.size()) {}
+
+ // Copies the data from the other flatbuffer.
+ FlatbufferVector &operator=(const Flatbuffer<T> &other) {
+ data_ = std::vector<uint8_t>(other.data(), other.data() + other.size());
+ return *this;
+ }
+
+ virtual ~FlatbufferVector() override {}
+
+ const uint8_t *data() const override { return data_.data(); }
+ uint8_t *data() override { return data_.data(); }
+ size_t size() const override { return data_.size(); }
+
+ private:
+ std::vector<uint8_t> data_;
+};
+
// This object associates the message type with the memory storing the
// flatbuffer. This only stores root tables.
//