Move log file manipulation logic out of LogReader
It really had 4 separate layers that should each have been a class.
Split them out in perparation for multi-file file logs.
1) Read chunks of data from a file
2) Read the header and messages from a file.
3) Sort those messages
4) And then send them over the event loop.
Change-Id: Ib885e6f0ed027851a4d7faea71b9391c1b60cf19
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index dc1801d..714796e 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -8,8 +8,11 @@
#include <vector>
+#include "aos/configuration.h"
#include "aos/events/logging/logger_generated.h"
#include "flatbuffers/flatbuffers.h"
+#include "gflags/gflags.h"
+#include "glog/logging.h"
DEFINE_int32(flush_size, 1000000,
"Number of outstanding bytes to allow before flushing to disk.");
@@ -17,6 +20,8 @@
namespace aos {
namespace logger {
+namespace chrono = std::chrono;
+
DetachedBufferWriter::DetachedBufferWriter(std::string_view filename)
: fd_(open(std::string(filename).c_str(),
O_RDWR | O_CLOEXEC | O_CREAT | O_EXCL, 0774)) {
@@ -116,5 +121,220 @@
return message_header_builder.Finish();
}
+SpanReader::SpanReader(std::string_view filename)
+ : fd_(open(std::string(filename).c_str(), O_RDONLY | O_CLOEXEC)) {
+ PCHECK(fd_ != -1) << ": Failed to open " << filename;
+}
+
+absl::Span<const uint8_t> SpanReader::ReadMessage() {
+ // Make sure we have enough for the size.
+ if (data_.size() - consumed_data_ < sizeof(flatbuffers::uoffset_t)) {
+ if (!ReadBlock()) {
+ return absl::Span<const uint8_t>();
+ }
+ }
+
+ // Now make sure we have enough for the message.
+ const size_t data_size =
+ flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
+ sizeof(flatbuffers::uoffset_t);
+ while (data_.size() < consumed_data_ + data_size) {
+ if (!ReadBlock()) {
+ return absl::Span<const uint8_t>();
+ }
+ }
+
+ // And return it, consuming the data.
+ const uint8_t *data_ptr = data_.data() + consumed_data_;
+
+ consumed_data_ += data_size;
+
+ return absl::Span<const uint8_t>(data_ptr, data_size);
+}
+
+bool SpanReader::MessageAvailable() {
+ // Are we big enough to read the size?
+ if (data_.size() - consumed_data_ < sizeof(flatbuffers::uoffset_t)) {
+ return false;
+ }
+
+ // Then, are we big enough to read the full message?
+ const size_t data_size =
+ flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
+ sizeof(flatbuffers::uoffset_t);
+ if (data_.size() < consumed_data_ + data_size) {
+ return false;
+ }
+
+ return true;
+}
+
+bool SpanReader::ReadBlock() {
+ if (end_of_file_) {
+ return false;
+ }
+
+ // Appends 256k. This is enough that the read call is efficient. We don't
+ // want to spend too much time reading small chunks because the syscalls for
+ // that will be expensive.
+ constexpr size_t kReadSize = 256 * 1024;
+
+ // Strip off any unused data at the front.
+ if (consumed_data_ != 0) {
+ data_.erase(data_.begin(), data_.begin() + consumed_data_);
+ consumed_data_ = 0;
+ }
+
+ const size_t starting_size = data_.size();
+
+ // This should automatically grow the backing store. It won't shrink if we
+ // get a small chunk later. This reduces allocations when we want to append
+ // more data.
+ data_.resize(data_.size() + kReadSize);
+
+ ssize_t count = read(fd_, &data_[starting_size], kReadSize);
+ data_.resize(starting_size + std::max(count, static_cast<ssize_t>(0)));
+ if (count == 0) {
+ end_of_file_ = true;
+ return false;
+ }
+ PCHECK(count > 0);
+
+ return true;
+}
+
+MessageReader::MessageReader(std::string_view filename)
+ : span_reader_(filename) {
+ // Make sure we have enough to read the size.
+ absl::Span<const uint8_t> config_data = span_reader_.ReadMessage();
+
+ // Make sure something was read.
+ CHECK(config_data != absl::Span<const uint8_t>());
+
+ // And copy the config so we have it forever.
+ configuration_ = std::vector<uint8_t>(config_data.begin(), config_data.end());
+
+ max_out_of_order_duration_ = std::chrono::nanoseconds(
+ flatbuffers::GetSizePrefixedRoot<LogFileHeader>(configuration_.data())
+ ->max_out_of_order_duration());
+}
+
+std::optional<FlatbufferVector<MessageHeader>> MessageReader::ReadMessage() {
+ absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
+ if (msg_data == absl::Span<const uint8_t>()) {
+ return std::nullopt;
+ }
+
+ FlatbufferVector<MessageHeader> result{std::vector<uint8_t>(
+ msg_data.begin() + sizeof(flatbuffers::uoffset_t), msg_data.end())};
+
+ const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
+ chrono::nanoseconds(result.message().monotonic_sent_time()));
+
+ newest_timestamp_ = std::max(newest_timestamp_, timestamp);
+ return result;
+}
+
+SortedMessageReader::SortedMessageReader(std::string_view filename)
+ : message_reader_(filename) {
+ channels_.resize(configuration()->channels()->size());
+
+ QueueMessages();
+}
+
+void SortedMessageReader::EmplaceDataBack(
+ FlatbufferVector<MessageHeader> &&new_data) {
+ const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
+ chrono::nanoseconds(new_data.message().monotonic_sent_time()));
+ const size_t channel_index = new_data.message().channel_index();
+ CHECK_LT(channel_index, channels_.size());
+
+ if (channels_[channel_index].data.size() == 0) {
+ channels_[channel_index].oldest_timestamp = timestamp;
+ PushChannelHeap(timestamp, channel_index);
+ }
+ channels_[channel_index].data.emplace_back(std::move(new_data));
+}
+
+namespace {
+
+bool ChannelHeapCompare(
+ const std::pair<monotonic_clock::time_point, int> first,
+ const std::pair<monotonic_clock::time_point, int> second) {
+ if (first.first > second.first) {
+ return true;
+ } else if (first.first == second.first) {
+ return first.second > second.second;
+ } else {
+ return false;
+ }
+}
+
+} // namespace
+
+void SortedMessageReader::PushChannelHeap(monotonic_clock::time_point timestamp,
+ int channel_index) {
+ channel_heap_.push_back(std::make_pair(timestamp, channel_index));
+
+ // The default sort puts the newest message first. Use a custom comparator to
+ // put the oldest message first.
+ std::push_heap(channel_heap_.begin(), channel_heap_.end(),
+ ChannelHeapCompare);
+}
+
+void SortedMessageReader::QueueMessages() {
+ while (true) {
+ // Don't queue if we have enough data already.
+ // When a log file starts, there should be a message from each channel.
+ // Those messages might be very old. Make sure to read a chunk past the
+ // starting time.
+ if (channel_heap_.size() > 0 &&
+ message_reader_.newest_timestamp() >
+ std::max(oldest_message().first, monotonic_start_time()) +
+ message_reader_.max_out_of_order_duration()) {
+ break;
+ }
+
+ if (std::optional<FlatbufferVector<MessageHeader>> msg =
+ message_reader_.ReadMessage()) {
+ EmplaceDataBack(std::move(msg.value()));
+ } else {
+ break;
+ }
+ }
+}
+
+std::tuple<monotonic_clock::time_point, int, FlatbufferVector<MessageHeader>>
+SortedMessageReader::PopOldestChannel() {
+ std::pair<monotonic_clock::time_point, int> oldest_channel_data =
+ channel_heap_.front();
+ std::pop_heap(channel_heap_.begin(), channel_heap_.end(),
+ &ChannelHeapCompare);
+ channel_heap_.pop_back();
+
+ struct ChannelData &channel = channels_[oldest_channel_data.second];
+
+ FlatbufferVector<MessageHeader> front = std::move(channel.front());
+
+ channel.data.pop_front();
+
+ // Re-push it and update the oldest timestamp.
+ if (channel.data.size() != 0) {
+ const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
+ chrono::nanoseconds(channel.front().message().monotonic_sent_time()));
+ PushChannelHeap(timestamp, oldest_channel_data.second);
+ channel.oldest_timestamp = timestamp;
+ } else {
+ channel.oldest_timestamp = monotonic_clock::min_time;
+ }
+
+ if (oldest_channel_data.first > message_reader_.queue_data_time()) {
+ QueueMessages();
+ }
+
+ return std::make_tuple(oldest_channel_data.first, oldest_channel_data.second,
+ std::move(front));
+}
+
} // namespace logger
} // namespace aos
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 0892290..285fe05 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -3,9 +3,12 @@
#include <sys/uio.h>
+#include <deque>
+#include <optional>
#include <string_view>
#include <vector>
+#include "absl/types/span.h"
#include "aos/events/event_loop.h"
#include "aos/events/logging/logger_generated.h"
#include "flatbuffers/flatbuffers.h"
@@ -62,15 +65,232 @@
flatbuffers::FlatBufferBuilder *fbb, const Context &context,
int channel_index, LogType log_type);
-// TODO(austin): 3 objects:
-// 1) log chunk reader. Returns span.
-// 2) Sorted message header reader. Returns sorted messages.
-// 3) LogReader, which does all the registration.
-//
-// Then, we can do a multi-node sim which forwards data nicely, try logging it, and then try replaying it.
+// Class to read chunks out of a log file.
+class SpanReader {
+ public:
+ SpanReader(std::string_view filename);
-// Optimization:
-// Allocate the 256k blocks like we do today. But, refcount them with shared_ptr pointed to by the messageheader that is returned. This avoids the copy.
+ ~SpanReader() { close(fd_); }
+
+ // Returns a span with the data for a message from the log file, excluding
+ // the size.
+ absl::Span<const uint8_t> ReadMessage();
+
+ // Returns true if there is a full message available in the buffer, or if we
+ // will have to read more data from disk.
+ bool MessageAvailable();
+
+ private:
+ // TODO(austin): Optimization:
+ // Allocate the 256k blocks like we do today. But, refcount them with
+ // shared_ptr pointed to by the messageheader that is returned. This avoids
+ // the copy. Need to do more benchmarking.
+
+ // Reads a chunk of data into data_. Returns false if no data was read.
+ bool ReadBlock();
+
+ // File descriptor for the log file.
+ int fd_ = -1;
+
+ // Allocator which doesn't zero initialize memory.
+ template <typename T>
+ struct DefaultInitAllocator {
+ typedef T value_type;
+
+ template <typename U>
+ void construct(U *p) {
+ ::new (static_cast<void *>(p)) U;
+ }
+
+ template <typename U, typename... Args>
+ void construct(U *p, Args &&... args) {
+ ::new (static_cast<void *>(p)) U(std::forward<Args>(args)...);
+ }
+
+ T *allocate(std::size_t n) {
+ return reinterpret_cast<T *>(::operator new(sizeof(T) * n));
+ }
+
+ template <typename U>
+ void deallocate(U *p, std::size_t /*n*/) {
+ ::operator delete(static_cast<void *>(p));
+ }
+ };
+
+ // Vector to read into. This uses an allocator which doesn't zero
+ // initialize the memory.
+ std::vector<uint8_t, DefaultInitAllocator<uint8_t>> data_;
+
+ // Amount of data consumed already in data_.
+ size_t consumed_data_ = 0;
+
+ // Cached bit for if we have reached the end of the file. Otherwise we will
+ // hammer on the kernel asking for more data each time we send.
+ bool end_of_file_ = false;
+};
+
+// Class which handles reading the header and messages from the log file. This
+// handles any per-file state left before merging below.
+class MessageReader {
+ public:
+ MessageReader(std::string_view filename);
+
+ // Returns the header from the log file.
+ const LogFileHeader *log_file_header() const {
+ return flatbuffers::GetSizePrefixedRoot<LogFileHeader>(
+ configuration_.data());
+ }
+
+ // Returns the minimum maount of data needed to queue up for sorting before
+ // ware guarenteed to not see data out of order.
+ std::chrono::nanoseconds max_out_of_order_duration() const {
+ return max_out_of_order_duration_;
+ }
+
+ monotonic_clock::time_point newest_timestamp() const {
+ return newest_timestamp_;
+ }
+
+ // Returns the next message if there is one.
+ std::optional<FlatbufferVector<MessageHeader>> ReadMessage();
+
+ // The time at which we need to read another chunk from the logfile.
+ monotonic_clock::time_point queue_data_time() const {
+ return newest_timestamp() - max_out_of_order_duration();
+ }
+
+ private:
+ // Log chunk reader.
+ SpanReader span_reader_;
+
+ // Vector holding the data for the configuration.
+ std::vector<uint8_t> configuration_;
+
+ // Minimum amount of data to queue up for sorting before we are guarenteed
+ // to not see data out of order.
+ std::chrono::nanoseconds max_out_of_order_duration_;
+
+ // Timestamp of the newest message in a channel queue.
+ monotonic_clock::time_point newest_timestamp_ = monotonic_clock::min_time;
+};
+
+// We need to read a large chunk at a time, then kit it up into parts and
+// sort.
+//
+// We want to read 256 KB chunks at a time. This is the fastest read size.
+// This leaves us with a fragmentation problem though.
+//
+// The easy answer is to read 256 KB chunks. Then, malloc and memcpy those
+// chunks into single flatbuffer messages and manage them in a sorted queue.
+// Everything is copied three times (into 256 kb buffer, then into separate
+// buffer, then into sender), but none of it is all that expensive. We can
+// optimize if it is slow later.
+//
+// As we place the elements in the sorted list of times, keep doing this
+// until we read a message that is newer than the threshold.
+//
+// Then repeat. Keep filling up the sorted list with 256 KB chunks (need a
+// small state machine so we can resume), and keep pulling messages back out
+// and sending.
+//
+// For sorting, we want to use the fact that each channel is sorted, and
+// then merge sort the channels. Have a vector of deques, and then hold a
+// sorted list of pointers to those.
+class SortedMessageReader {
+ public:
+ SortedMessageReader(std::string_view filename);
+
+ // Returns the header from the log file.
+ const LogFileHeader *log_file_header() const {
+ return message_reader_.log_file_header();
+ }
+
+ // Returns a pointer to the channel with the oldest message in it, and the
+ // timestamp.
+ const std::pair<monotonic_clock::time_point, int> &oldest_message() const {
+ return channel_heap_.front();
+ }
+
+ // Returns the number of channels with data still in them.
+ size_t active_channel_count() const { return channel_heap_.size(); }
+
+ // Returns the configuration from the log file header.
+ const Configuration *configuration() const {
+ return log_file_header()->configuration();
+ }
+
+ // Returns the start time on both the monotonic and realtime clocks.
+ monotonic_clock::time_point monotonic_start_time() {
+ return monotonic_clock::time_point(
+ std::chrono::nanoseconds(log_file_header()->monotonic_start_time()));
+ }
+ realtime_clock::time_point realtime_start_time() {
+ return realtime_clock::time_point(
+ std::chrono::nanoseconds(log_file_header()->realtime_start_time()));
+ }
+
+ // Returns the node who's point of view this log file is from. Make sure this
+ // is a pointer in the configuration() nodes list so it can be consumed
+ // elsewhere.
+ const Node *node() const {
+ if (configuration()->has_nodes()) {
+ CHECK(log_file_header()->has_node());
+ CHECK(log_file_header()->node()->has_name());
+ return configuration::GetNode(
+ configuration(), log_file_header()->node()->name()->string_view());
+ } else {
+ CHECK(!log_file_header()->has_node());
+ return nullptr;
+ }
+ }
+
+ // Pops a pointer to the channel with the oldest message in it, and the
+ // timestamp.
+ std::tuple<monotonic_clock::time_point, int, FlatbufferVector<MessageHeader>>
+ PopOldestChannel();
+
+ private:
+ // Adds more messages to the sorted list.
+ void QueueMessages();
+
+ // Moves the message to the correct channel queue.
+ void EmplaceDataBack(FlatbufferVector<MessageHeader> &&new_data);
+
+ // Pushes a pointer to the channel for the given timestamp to the sorted
+ // channel list.
+ void PushChannelHeap(monotonic_clock::time_point timestamp,
+ int channel_index);
+
+
+ // Datastructure to hold the list of messages, cached timestamp for the
+ // oldest message, and sender to send with.
+ struct ChannelData {
+ monotonic_clock::time_point oldest_timestamp = monotonic_clock::min_time;
+ std::deque<FlatbufferVector<MessageHeader>> data;
+ std::unique_ptr<RawSender> raw_sender;
+
+ // Returns the oldest message.
+ const FlatbufferVector<MessageHeader> &front() { return data.front(); }
+
+ // Returns the timestamp for the oldest message.
+ const monotonic_clock::time_point front_timestamp() {
+ return monotonic_clock::time_point(
+ std::chrono::nanoseconds(front().message().monotonic_sent_time()));
+ }
+ };
+
+ MessageReader message_reader_;
+
+ // TODO(austin): Multithreaded read at some point. Gotta go faster!
+ // Especially if we start compressing.
+
+ // List of channels and messages for them.
+ std::vector<ChannelData> channels_;
+
+ // Heap of channels so we can track which channel to send next.
+ std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap_;
+
+};
} // namespace logger
} // namespace aos
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index e7d0d3b..22c1a6e 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -7,7 +7,6 @@
#include <sys/uio.h>
#include <vector>
-#include "absl/strings/string_view.h"
#include "absl/types/span.h"
#include "aos/events/event_loop.h"
#include "aos/events/logging/logger_generated.h"
@@ -220,172 +219,27 @@
writer_->Flush();
}
-LogReader::LogReader(absl::string_view filename)
- : fd_(open(std::string(filename).c_str(), O_RDONLY | O_CLOEXEC)) {
- PCHECK(fd_ != -1) << ": Failed to open " << filename;
-
- // Make sure we have enough to read the size.
- absl::Span<const uint8_t> config_data = ReadMessage();
-
- // Make sure something was read.
- CHECK(config_data != absl::Span<const uint8_t>());
-
- // And copy the config so we have it forever.
- configuration_ = std::vector<uint8_t>(config_data.begin(), config_data.end());
-
- max_out_of_order_duration_ = std::chrono::nanoseconds(
- flatbuffers::GetSizePrefixedRoot<LogFileHeader>(configuration_.data())
- ->max_out_of_order_duration());
-
+LogReader::LogReader(std::string_view filename)
+ : sorted_message_reader_(filename) {
channels_.resize(configuration()->channels()->size());
-
- QueueMessages();
}
LogReader::~LogReader() {
- CHECK(!event_loop_unique_ptr_) << "Did you remember to call Deregister?";
-}
-
-bool LogReader::ReadBlock() {
- if (end_of_file_) {
- return false;
- }
-
- // Appends 256k. This is enough that the read call is efficient. We don't
- // want to spend too much time reading small chunks because the syscalls for
- // that will be expensive.
- constexpr size_t kReadSize = 256 * 1024;
-
- // Strip off any unused data at the front.
- if (consumed_data_ != 0) {
- data_.erase(data_.begin(), data_.begin() + consumed_data_);
- consumed_data_ = 0;
- }
-
- const size_t starting_size = data_.size();
-
- // This should automatically grow the backing store. It won't shrink if we
- // get a small chunk later. This reduces allocations when we want to append
- // more data.
- data_.resize(data_.size() + kReadSize);
-
- ssize_t count = read(fd_, &data_[starting_size], kReadSize);
- data_.resize(starting_size + std::max(count, static_cast<ssize_t>(0)));
- if (count == 0) {
- end_of_file_ = true;
- return false;
- }
- PCHECK(count > 0);
-
- return true;
-}
-
-bool LogReader::MessageAvailable() {
- // Are we big enough to read the size?
- if (data_.size() - consumed_data_ < sizeof(flatbuffers::uoffset_t)) {
- return false;
- }
-
- // Then, are we big enough to read the full message?
- const size_t data_size =
- flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
- sizeof(flatbuffers::uoffset_t);
- if (data_.size() < consumed_data_ + data_size) {
- return false;
- }
-
- return true;
-}
-
-absl::Span<const uint8_t> LogReader::ReadMessage() {
- // Make sure we have enough for the size.
- if (data_.size() - consumed_data_ < sizeof(flatbuffers::uoffset_t)) {
- if (!ReadBlock()) {
- return absl::Span<const uint8_t>();
- }
- }
-
- // Now make sure we have enough for the message.
- const size_t data_size =
- flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
- sizeof(flatbuffers::uoffset_t);
- while (data_.size() < consumed_data_ + data_size) {
- if (!ReadBlock()) {
- return absl::Span<const uint8_t>();
- }
- }
-
- // And return it, consuming the data.
- const uint8_t *data_ptr = data_.data() + consumed_data_;
-
- consumed_data_ += data_size;
-
- return absl::Span<const uint8_t>(data_ptr, data_size);
-}
-
-void LogReader::QueueMessages() {
- while (true) {
- // Don't queue if we have enough data already.
- // When a log file starts, there should be a message from each channel.
- // Those messages might be very old. Make sure to read a chunk past the
- // starting time.
- if (channel_heap_.size() > 0 &&
- newest_timestamp_ >
- std::max(oldest_message().first, monotonic_start_time()) +
- max_out_of_order_duration_) {
- break;
- }
-
- absl::Span<const uint8_t> msg_data = ReadMessage();
- if (msg_data == absl::Span<const uint8_t>()) {
- break;
- }
-
- FlatbufferVector<MessageHeader> msg(std::vector<uint8_t>(
- msg_data.begin() + sizeof(flatbuffers::uoffset_t), msg_data.end()));
-
- EmplaceDataBack(std::move(msg));
- }
-
- queue_data_time_ = newest_timestamp_ - max_out_of_order_duration_;
+ CHECK(!event_loop_unique_ptr_) << ": Did you remember to call Deregister?";
}
const Configuration *LogReader::configuration() const {
- return flatbuffers::GetSizePrefixedRoot<LogFileHeader>(configuration_.data())
- ->configuration();
+ return sorted_message_reader_.configuration();
}
-const Node *LogReader::node() const {
- if (configuration()->has_nodes()) {
- CHECK(flatbuffers::GetSizePrefixedRoot<LogFileHeader>(configuration_.data())
- ->has_node());
- CHECK(flatbuffers::GetSizePrefixedRoot<LogFileHeader>(configuration_.data())
- ->node()
- ->has_name());
- return configuration::GetNode(
- configuration(),
- flatbuffers::GetSizePrefixedRoot<LogFileHeader>(configuration_.data())
- ->node()
- ->name()
- ->string_view());
- } else {
- CHECK(
- !flatbuffers::GetSizePrefixedRoot<LogFileHeader>(configuration_.data())
- ->has_node());
- return nullptr;
- }
-}
+const Node *LogReader::node() const { return sorted_message_reader_.node(); }
monotonic_clock::time_point LogReader::monotonic_start_time() {
- return monotonic_clock::time_point(std::chrono::nanoseconds(
- flatbuffers::GetSizePrefixedRoot<LogFileHeader>(configuration_.data())
- ->monotonic_start_time()));
+ return sorted_message_reader_.monotonic_start_time();
}
realtime_clock::time_point LogReader::realtime_start_time() {
- return realtime_clock::time_point(std::chrono::nanoseconds(
- flatbuffers::GetSizePrefixedRoot<LogFileHeader>(configuration_.data())
- ->realtime_start_time()));
+ return sorted_message_reader_.realtime_start_time();
}
void LogReader::Register(SimulatedEventLoopFactory *event_loop_factory) {
@@ -413,88 +267,75 @@
CHECK_EQ(configuration()->channels()->Get(i)->type(),
event_loop_->configuration()->channels()->Get(i)->type());
- channels_[i].raw_sender = event_loop_->MakeRawSender(
+ channels_[i] = event_loop_->MakeRawSender(
event_loop_->configuration()->channels()->Get(i));
}
timer_handler_ = event_loop_->AddTimer([this]() {
- std::pair<monotonic_clock::time_point, int> oldest_channel_index =
- PopOldestChannel();
+ monotonic_clock::time_point channel_timestamp;
+ int channel_index;
+ FlatbufferVector<MessageHeader> channel_data =
+ FlatbufferVector<MessageHeader>::Empty();
+
+ std::tie(channel_timestamp, channel_index, channel_data) =
+ sorted_message_reader_.PopOldestChannel();
+
const monotonic_clock::time_point monotonic_now =
event_loop_->context().monotonic_event_time;
- CHECK(monotonic_now == oldest_channel_index.first)
+ CHECK(monotonic_now == channel_timestamp)
<< ": Now " << monotonic_now.time_since_epoch().count()
- << " trying to send "
- << oldest_channel_index.first.time_since_epoch().count();
+ << " trying to send " << channel_timestamp.time_since_epoch().count();
- struct LogReader::ChannelData &channel =
- channels_[oldest_channel_index.second];
-
- FlatbufferVector<MessageHeader> front = std::move(channel.front());
-
- if (oldest_channel_index.first > monotonic_start_time() ||
+ if (channel_timestamp > monotonic_start_time() ||
event_loop_factory_ != nullptr) {
if (!FLAGS_skip_missing_forwarding_entries ||
- front.message().data() != nullptr) {
- CHECK(front.message().data() != nullptr)
- << ": Got a message without data. Forwarding entry which was not "
+ channel_data.message().data() != nullptr) {
+ CHECK(channel_data.message().data() != nullptr)
+ << ": Got a message without data. Forwarding entry which was "
+ "not "
"matched? Use --skip_missing_forwarding_entries to ignore "
"this.";
// If we have access to the factory, use it to fix the realtime time.
if (event_loop_factory_ != nullptr) {
event_loop_factory_->SetRealtimeOffset(
- monotonic_clock::time_point(
- chrono::nanoseconds(front.message().monotonic_sent_time())),
- realtime_clock::time_point(
- chrono::nanoseconds(front.message().realtime_sent_time())));
+ monotonic_clock::time_point(chrono::nanoseconds(
+ channel_data.message().monotonic_sent_time())),
+ realtime_clock::time_point(chrono::nanoseconds(
+ channel_data.message().realtime_sent_time())));
}
- channel.raw_sender->Send(
- front.message().data()->Data(), front.message().data()->size(),
- monotonic_clock::time_point(
- chrono::nanoseconds(front.message().monotonic_remote_time())),
- realtime_clock::time_point(
- chrono::nanoseconds(front.message().realtime_remote_time())),
- front.message().remote_queue_index());
+ channels_[channel_index]->Send(
+ channel_data.message().data()->Data(),
+ channel_data.message().data()->size(),
+ monotonic_clock::time_point(chrono::nanoseconds(
+ channel_data.message().monotonic_remote_time())),
+ realtime_clock::time_point(chrono::nanoseconds(
+ channel_data.message().realtime_remote_time())),
+ channel_data.message().remote_queue_index());
}
} else {
LOG(WARNING) << "Not sending data from before the start of the log file. "
- << oldest_channel_index.first.time_since_epoch().count()
- << " start "
+ << channel_timestamp.time_since_epoch().count() << " start "
<< monotonic_start_time().time_since_epoch().count() << " "
- << FlatbufferToJson(front);
- }
- channel.data.pop_front();
-
- // Re-push it and update the oldest timestamp.
- if (channel.data.size() != 0) {
- const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
- chrono::nanoseconds(channel.front().message().monotonic_sent_time()));
- PushChannelHeap(timestamp, oldest_channel_index.second);
- channel.oldest_timestamp = timestamp;
- } else {
- channel.oldest_timestamp = monotonic_clock::min_time;
+ << FlatbufferToJson(channel_data);
}
- if (monotonic_now > queue_data_time_) {
- QueueMessages();
- }
-
- if (channel_heap_.size() != 0) {
- timer_handler_->Setup(oldest_message().first);
+ if (sorted_message_reader_.active_channel_count() > 0u) {
+ timer_handler_->Setup(sorted_message_reader_.oldest_message().first);
}
});
- if (channel_heap_.size() > 0u) {
- event_loop_->OnRun(
- [this]() { timer_handler_->Setup(oldest_message().first); });
+ if (sorted_message_reader_.active_channel_count() > 0u) {
+ event_loop_->OnRun([this]() {
+ timer_handler_->Setup(sorted_message_reader_.oldest_message().first);
+ });
}
}
void LogReader::Deregister() {
for (size_t i = 0; i < channels_.size(); ++i) {
- channels_[i].raw_sender.reset();
+ channels_[i].reset();
}
event_loop_factory_ = nullptr;
@@ -502,52 +343,5 @@
event_loop_ = nullptr;
}
-void LogReader::EmplaceDataBack(FlatbufferVector<MessageHeader> &&new_data) {
- const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
- chrono::nanoseconds(new_data.message().monotonic_sent_time()));
- const size_t channel_index = new_data.message().channel_index();
- CHECK_LT(channel_index, channels_.size());
- newest_timestamp_ = std::max(newest_timestamp_, timestamp);
- if (channels_[channel_index].data.size() == 0) {
- channels_[channel_index].oldest_timestamp = timestamp;
- PushChannelHeap(timestamp, channel_index);
- }
- channels_[channel_index].data.emplace_back(std::move(new_data));
-}
-
-namespace {
-
-bool ChannelHeapCompare(
- const std::pair<monotonic_clock::time_point, int> first,
- const std::pair<monotonic_clock::time_point, int> second) {
- if (first.first > second.first) {
- return true;
- } else if (first.first == second.first) {
- return first.second > second.second;
- } else {
- return false;
- }
-}
-
-} // namespace
-
-void LogReader::PushChannelHeap(monotonic_clock::time_point timestamp,
- int channel_index) {
- channel_heap_.push_back(std::make_pair(timestamp, channel_index));
-
- // The default sort puts the newest message first. Use a custom comparator to
- // put the oldest message first.
- std::push_heap(channel_heap_.begin(), channel_heap_.end(),
- ChannelHeapCompare);
-}
-
-std::pair<monotonic_clock::time_point, int> LogReader::PopOldestChannel() {
- std::pair<monotonic_clock::time_point, int> result = channel_heap_.front();
- std::pop_heap(channel_heap_.begin(), channel_heap_.end(),
- &ChannelHeapCompare);
- channel_heap_.pop_back();
- return result;
-}
-
} // namespace logger
} // namespace aos
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index e44f65f..a35b453 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -3,8 +3,8 @@
#include <deque>
#include <vector>
+#include <string_view>
-#include "absl/strings/string_view.h"
#include "absl/types/span.h"
#include "aos/events/event_loop.h"
#include "aos/events/logging/logfile_utils.h"
@@ -59,7 +59,7 @@
// Replays all the channels in the logfile to the event loop.
class LogReader {
public:
- LogReader(absl::string_view filename);
+ LogReader(std::string_view filename);
~LogReader();
// Registers the timer and senders used to resend the messages from the log
@@ -89,143 +89,18 @@
// happened.
private:
- // Reads a chunk of data into data_. Returns false if no data was read.
- bool ReadBlock();
-
- // Returns true if there is a full message available in the buffer, or if we
- // will have to read more data from disk.
- bool MessageAvailable();
-
- // Returns a span with the data for a message from the log file, excluding
- // the size.
- absl::Span<const uint8_t> ReadMessage();
-
// Queues at least max_out_of_order_duration_ messages into channels_.
void QueueMessages();
- // We need to read a large chunk at a time, then kit it up into parts and
- // sort.
- //
- // We want to read 256 KB chunks at a time. This is the fastest read size.
- // This leaves us with a fragmentation problem though.
- //
- // The easy answer is to read 256 KB chunks. Then, malloc and memcpy those
- // chunks into single flatbuffer messages and manage them in a sorted queue.
- // Everything is copied three times (into 256 kb buffer, then into separate
- // buffer, then into sender), but none of it is all that expensive. We can
- // optimize if it is slow later.
- //
- // As we place the elements in the sorted list of times, keep doing this
- // until we read a message that is newer than the threshold.
- //
- // Then repeat. Keep filling up the sorted list with 256 KB chunks (need a
- // small state machine so we can resume), and keep pulling messages back out
- // and sending.
- //
- // For sorting, we want to use the fact that each channel is sorted, and
- // then merge sort the channels. Have a vector of deques, and then hold a
- // sorted list of pointers to those.
- //
- // TODO(austin): Multithreaded read at some point. Gotta go faster!
- // Especially if we start compressing.
+ // Log chunk reader.
+ SortedMessageReader sorted_message_reader_;
- // Allocator which doesn't zero initialize memory.
- template <typename T>
- struct DefaultInitAllocator {
- typedef T value_type;
-
- template <typename U>
- void construct(U *p) {
- ::new (static_cast<void *>(p)) U;
- }
-
- template <typename U, typename... Args>
- void construct(U *p, Args &&... args) {
- ::new (static_cast<void *>(p)) U(std::forward<Args>(args)...);
- }
-
- T *allocate(std::size_t n) {
- return reinterpret_cast<T *>(::operator new(sizeof(T) * n));
- }
-
- template <typename U>
- void deallocate(U *p, std::size_t /*n*/) {
- ::operator delete(static_cast<void *>(p));
- }
- };
-
- // Minimum amount of data to queue up for sorting before we are guarenteed
- // to not see data out of order.
- std::chrono::nanoseconds max_out_of_order_duration_;
-
- // File descriptor for the log file.
- int fd_ = -1;
+ std::vector<std::unique_ptr<RawSender>> channels_;
SimulatedEventLoopFactory *event_loop_factory_ = nullptr;
std::unique_ptr<EventLoop> event_loop_unique_ptr_;
EventLoop *event_loop_ = nullptr;
TimerHandler *timer_handler_;
-
- // Vector to read into. This uses an allocator which doesn't zero
- // initialize the memory.
- std::vector<uint8_t, DefaultInitAllocator<uint8_t>> data_;
-
- // Amount of data consumed already in data_.
- size_t consumed_data_ = 0;
-
- // Vector holding the data for the configuration.
- std::vector<uint8_t> configuration_;
-
- // Moves the message to the correct channel queue.
- void EmplaceDataBack(FlatbufferVector<MessageHeader> &&new_data);
-
- // Pushes a pointer to the channel for the given timestamp to the sorted
- // channel list.
- void PushChannelHeap(monotonic_clock::time_point timestamp,
- int channel_index);
-
- // Returns a pointer to the channel with the oldest message in it, and the
- // timestamp.
- const std::pair<monotonic_clock::time_point, int> &oldest_message() const {
- return channel_heap_.front();
- }
-
- // Pops a pointer to the channel with the oldest message in it, and the
- // timestamp.
- std::pair<monotonic_clock::time_point, int> PopOldestChannel();
-
- // Datastructure to hold the list of messages, cached timestamp for the
- // oldest message, and sender to send with.
- struct ChannelData {
- monotonic_clock::time_point oldest_timestamp = monotonic_clock::min_time;
- std::deque<FlatbufferVector<MessageHeader>> data;
- std::unique_ptr<RawSender> raw_sender;
-
- // Returns the oldest message.
- const FlatbufferVector<MessageHeader> &front() { return data.front(); }
-
- // Returns the timestamp for the oldest message.
- const monotonic_clock::time_point front_timestamp() {
- return monotonic_clock::time_point(
- std::chrono::nanoseconds(front().message().monotonic_sent_time()));
- }
- };
-
- // List of channels and messages for them.
- std::vector<ChannelData> channels_;
-
- // Heap of channels so we can track which channel to send next.
- std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap_;
-
- // Timestamp of the newest message in a channel queue.
- monotonic_clock::time_point newest_timestamp_ = monotonic_clock::min_time;
-
- // The time at which we need to read another chunk from the logfile.
- monotonic_clock::time_point queue_data_time_ = monotonic_clock::min_time;
-
- // Cached bit for if we have reached the end of the file. Otherwise we will
- // hammer on the kernel asking for more data each time we send.
- bool end_of_file_ = false;
};
} // namespace logger
diff --git a/aos/flatbuffers.h b/aos/flatbuffers.h
index 630741f..4e4520c 100644
--- a/aos/flatbuffers.h
+++ b/aos/flatbuffers.h
@@ -171,6 +171,11 @@
return *this;
}
+ // Constructs an empty flatbuffer of type T.
+ static FlatbufferVector<T> Empty() {
+ return FlatbufferVector<T>(std::vector<uint8_t>{});
+ }
+
virtual ~FlatbufferVector() override {}
const uint8_t *data() const override { return data_.data(); }