Add multi-node log file reading
This handles timestamps, sorting, and merging with data.
For simplicity, we read the log files once per node. Once benchmarks
show if this is a bad idea, we can fix it.
Change-Id: I445ac5bfc7186bda25cc899602ac8d95a4cb946d
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 5b2bfa6..9a849b2 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -26,10 +26,11 @@
// The message originated on another node. Log it and the delivery times
// together. The message_gateway is responsible for logging any messages
// which didn't get delivered.
- kLogMessageAndDeliveryTime
+ kLogMessageAndDeliveryTime,
+ // The message originated on the other node and should be logged on this node.
+ kLogRemoteMessage
};
-
// This class manages efficiently writing a sequence of detached buffers to a
// file. It queues them up and batches the write operation.
class DetachedBufferWriter {
@@ -37,6 +38,8 @@
DetachedBufferWriter(std::string_view filename);
~DetachedBufferWriter();
+ std::string_view filename() const { return filename_; }
+
// TODO(austin): Snappy compress the log file if it ends with .snappy!
// Queues up a finished FlatBufferBuilder to be written. Steals the detached
@@ -51,6 +54,8 @@
void Flush();
private:
+ const std::string filename_;
+
int fd_ = -1;
// Size of all the data in the queue.
@@ -68,6 +73,8 @@
flatbuffers::FlatBufferBuilder *fbb, const Context &context,
int channel_index, LogType log_type);
+FlatbufferVector<LogFileHeader> ReadHeader(std::string_view filename);
+
// Class to read chunks out of a log file.
class SpanReader {
public:
@@ -75,6 +82,8 @@
~SpanReader() { close(fd_); }
+ std::string_view filename() const { return filename_; }
+
// Returns a span with the data for a message from the log file, excluding
// the size.
absl::Span<const uint8_t> ReadMessage();
@@ -92,6 +101,8 @@
// Reads a chunk of data into data_. Returns false if no data was read.
bool ReadBlock();
+ const std::string filename_;
+
// File descriptor for the log file.
int fd_ = -1;
@@ -138,6 +149,8 @@
public:
MessageReader(std::string_view filename);
+ std::string_view filename() const { return span_reader_.filename(); }
+
// Returns the header from the log file.
const LogFileHeader *log_file_header() const {
return flatbuffers::GetSizePrefixedRoot<LogFileHeader>(
@@ -177,52 +190,64 @@
monotonic_clock::time_point newest_timestamp_ = monotonic_clock::min_time;
};
-// We need to read a large chunk at a time, then kit it up into parts and
-// sort.
-//
-// We want to read 256 KB chunks at a time. This is the fastest read size.
-// This leaves us with a fragmentation problem though.
-//
-// The easy answer is to read 256 KB chunks. Then, malloc and memcpy those
-// chunks into single flatbuffer messages and manage them in a sorted queue.
-// Everything is copied three times (into 256 kb buffer, then into separate
-// buffer, then into sender), but none of it is all that expensive. We can
-// optimize if it is slow later.
-//
-// As we place the elements in the sorted list of times, keep doing this
-// until we read a message that is newer than the threshold.
-//
-// Then repeat. Keep filling up the sorted list with 256 KB chunks (need a
-// small state machine so we can resume), and keep pulling messages back out
-// and sending.
-//
-// For sorting, we want to use the fact that each channel is sorted, and
-// then merge sort the channels. Have a vector of deques, and then hold a
-// sorted list of pointers to those.
-class SortedMessageReader {
- public:
- SortedMessageReader(const std::vector<std::string> &filenames);
+class TimestampMerger;
- // Returns the header from the log file.
+// A design requirement is that the relevant data for a channel is not more than
+// max_out_of_order_duration out of order. We approach sorting in layers.
+//
+// 1) Split each (maybe chunked) log file into one queue per channel. Read this
+// log file looking for data pertaining to a specific node.
+// (SplitMessageReader)
+// 2) Merge all the data per channel from the different log files into a sorted
+// list of timestamps and messages. (TimestampMerger)
+// 3) Combine the timestamps and messages. (TimestampMerger)
+// 4) Merge all the channels to produce the next message on a node.
+// (ChannelMerger)
+// 5) Duplicate this entire stack per node.
+
+// This class splits messages and timestamps up into a queue per channel, and
+// handles reading data from multiple chunks.
+class SplitMessageReader {
+ public:
+ SplitMessageReader(const std::vector<std::string> &filenames);
+
+ // Sets the TimestampMerger that gets notified for each channel. The node
+ // that the TimestampMerger is merging as needs to be passed in.
+ void SetTimestampMerger(TimestampMerger *timestamp_merger, int channel,
+ const Node *target_node);
+
+ // Returns the (timestamp, queue_idex) for the oldest message in a channel, or
+ // max_time if there is nothing in the channel.
+ std::tuple<monotonic_clock::time_point, uint32_t> oldest_message(
+ int channel) {
+ return channels_[channel].data.front_timestamp();
+ }
+
+ // Returns the (timestamp, queue_index) for the oldest delivery time in a
+ // channel, or max_time if there is nothing in the channel.
+ std::tuple<monotonic_clock::time_point, uint32_t> oldest_message(
+ int channel, int destination_node) {
+ return channels_[channel].timestamps[destination_node].front_timestamp();
+ }
+
+ // Returns the timestamp, queue_index, and message for the oldest data on a
+ // channel. Requeues data as needed.
+ std::tuple<monotonic_clock::time_point, uint32_t,
+ FlatbufferVector<MessageHeader>>
+ PopOldest(int channel_index);
+
+ // Returns the timestamp, queue_index, and message for the oldest timestamp on
+ // a channel delivered to a node. Requeues data as needed.
+ std::tuple<monotonic_clock::time_point, uint32_t,
+ FlatbufferVector<MessageHeader>>
+ PopOldest(int channel, int node_index);
+
+ // Returns the header for the log files.
const LogFileHeader *log_file_header() const {
return &log_file_header_.message();
}
- // Returns a pointer to the channel with the oldest message in it, and the
- // timestamp.
- const std::pair<monotonic_clock::time_point, int> &oldest_message() const {
- return channel_heap_.front();
- }
-
- // Returns the number of channels with data still in them.
- size_t active_channel_count() const { return channel_heap_.size(); }
-
- // Returns the configuration from the log file header.
- const Configuration *configuration() const {
- return log_file_header()->configuration();
- }
-
- // Returns the start time on both the monotonic and realtime clocks.
+ // Returns the starting time for this set of log files.
monotonic_clock::time_point monotonic_start_time() {
return monotonic_clock::time_point(
std::chrono::nanoseconds(log_file_header()->monotonic_start_time()));
@@ -232,74 +257,304 @@
std::chrono::nanoseconds(log_file_header()->realtime_start_time()));
}
+ // Returns the configuration from the log file header.
+ const Configuration *configuration() const {
+ return log_file_header()->configuration();
+ }
+
// Returns the node who's point of view this log file is from. Make sure this
// is a pointer in the configuration() nodes list so it can be consumed
// elsewhere.
const Node *node() const {
if (configuration()->has_nodes()) {
- CHECK(log_file_header()->has_node());
- CHECK(log_file_header()->node()->has_name());
- return configuration::GetNode(
- configuration(), log_file_header()->node()->name()->string_view());
+ return configuration::GetNodeOrDie(configuration(),
+ log_file_header()->node());
} else {
CHECK(!log_file_header()->has_node());
return nullptr;
}
}
- // Pops a pointer to the channel with the oldest message in it, and the
- // timestamp.
- std::tuple<monotonic_clock::time_point, int, FlatbufferVector<MessageHeader>>
- PopOldestChannel();
+ // Returns the timestamp of the newest message read from the log file, and the
+ // timestamp that we need to re-queue data.
+ monotonic_clock::time_point newest_timestamp() const {
+ return message_reader_->newest_timestamp();
+ }
+ monotonic_clock::time_point queue_data_time() const {
+ return message_reader_->queue_data_time();
+ }
+
+
+ // Adds more messages to the sorted list. This reads enough data such that
+ // oldest_message_time can be replayed safely. Returns false if the log file
+ // has all been read.
+ bool QueueMessages(monotonic_clock::time_point oldest_message_time);
private:
+ // TODO(austin): Need to copy or refcount the message instead of running
+ // multiple copies of the reader. Or maybe have a "as_node" index and hide it
+ // inside.
+
// Moves to the next log file in the list.
bool NextLogFile();
- // Adds more messages to the sorted list.
- void QueueMessages();
+ // Filenames of the log files.
+ std::vector<std::string> filenames_;
+ // And the index of the next file to open.
+ size_t next_filename_index_ = 0;
- // Moves the message to the correct channel queue.
- void EmplaceDataBack(FlatbufferVector<MessageHeader> &&new_data);
-
- // Pushes a pointer to the channel for the given timestamp to the sorted
- // channel list.
- void PushChannelHeap(monotonic_clock::time_point timestamp,
- int channel_index);
-
+ // Log file header to report. This is a copy.
+ FlatbufferDetachedBuffer<LogFileHeader> log_file_header_;
+ // Current log file being read.
+ std::unique_ptr<MessageReader> message_reader_;
// Datastructure to hold the list of messages, cached timestamp for the
// oldest message, and sender to send with.
- struct ChannelData {
- monotonic_clock::time_point oldest_timestamp = monotonic_clock::min_time;
- std::deque<FlatbufferVector<MessageHeader>> data;
- std::unique_ptr<RawSender> raw_sender;
+ struct MessageHeaderQueue {
+ // If true, this is a timestamp queue.
+ bool timestamps = false;
- // Returns the oldest message.
- const FlatbufferVector<MessageHeader> &front() { return data.front(); }
-
- // Returns the timestamp for the oldest message.
- const monotonic_clock::time_point front_timestamp() {
- return monotonic_clock::time_point(
- std::chrono::nanoseconds(front().message().monotonic_sent_time()));
+ // Returns a reference to the the oldest message.
+ FlatbufferVector<MessageHeader> &front() {
+ CHECK_GT(data_.size(), 0u);
+ return data_.front();
}
+
+ // Adds a message to the back of the queue.
+ void emplace_back(FlatbufferVector<MessageHeader> &&msg);
+
+ // Drops the front message. Invalidates the front() reference.
+ void pop_front();
+
+ // The size of the queue.
+ size_t size() { return data_.size(); }
+
+ // Returns the (timestamp, queue_index) for the oldest message.
+ const std::tuple<monotonic_clock::time_point, uint32_t> front_timestamp() {
+ CHECK_GT(data_.size(), 0u);
+ return std::make_tuple(
+ monotonic_clock::time_point(std::chrono::nanoseconds(
+ front().message().monotonic_sent_time())),
+ front().message().queue_index());
+ }
+
+ // Pointer to the timestamp merger for this queue if available.
+ TimestampMerger *timestamp_merger = nullptr;
+ // Pointer to the reader which feeds this queue.
+ SplitMessageReader *split_reader = nullptr;
+
+ private:
+ // The data.
+ std::deque<FlatbufferVector<MessageHeader>> data_;
};
- std::vector<std::string> filenames_;
- size_t next_filename_index_ = 0;
+ // All the queues needed for a channel. There isn't going to be data in all
+ // of these.
+ struct ChannelData {
+ // The data queue for the channel.
+ MessageHeaderQueue data;
+ // Queues for timestamps for each node.
+ std::vector<MessageHeaderQueue> timestamps;
+ };
- FlatbufferDetachedBuffer<LogFileHeader> log_file_header_;
- std::unique_ptr<MessageReader> message_reader_;
-
- // TODO(austin): Multithreaded read at some point. Gotta go faster!
- // Especially if we start compressing.
-
- // List of channels and messages for them.
+ // Data for all the channels.
std::vector<ChannelData> channels_;
- // Heap of channels so we can track which channel to send next.
+ // Once we know the node that this SplitMessageReader will be writing as,
+ // there will be only one MessageHeaderQueue that a specific channel matches.
+ // Precompute this here for efficiency.
+ std::vector<MessageHeaderQueue *> channels_to_write_;
+
+ // Number of messages queued.
+ size_t queued_messages_ = 0;
+};
+
+class ChannelMerger;
+
+// Sorts channels (and timestamps) from multiple log files for a single channel.
+class TimestampMerger {
+ public:
+ TimestampMerger(const Configuration *configuration,
+ std::vector<SplitMessageReader *> split_message_readers,
+ int channel_index, const Node *target_node,
+ ChannelMerger *channel_merger);
+
+ // Metadata used to schedule the message.
+ struct DeliveryTimestamp {
+ monotonic_clock::time_point monotonic_event_time =
+ monotonic_clock::min_time;
+ realtime_clock::time_point realtime_event_time = realtime_clock::min_time;
+
+ monotonic_clock::time_point monotonic_remote_time =
+ monotonic_clock::min_time;
+ realtime_clock::time_point realtime_remote_time = realtime_clock::min_time;
+ uint32_t remote_queue_index = 0xffffffff;
+ };
+
+ // Pushes SplitMessageReader onto the timestamp heap. This should only be
+ // called when timestamps are placed in the channel this class is merging for
+ // the reader.
+ void UpdateTimestamp(
+ SplitMessageReader *split_message_reader,
+ std::tuple<monotonic_clock::time_point, uint32_t> oldest_message_time) {
+ PushTimestampHeap(oldest_message_time, split_message_reader);
+ }
+ // Pushes SplitMessageReader onto the message heap. This should only be
+ // called when data is placed in the channel this class is merging for the
+ // reader.
+ void Update(
+ SplitMessageReader *split_message_reader,
+ std::tuple<monotonic_clock::time_point, uint32_t> oldest_message_time) {
+ PushMessageHeap(oldest_message_time, split_message_reader);
+ }
+
+ // Returns the oldest combined timestamp and data for this channel.
+ std::tuple<DeliveryTimestamp, FlatbufferVector<MessageHeader>> PopOldest();
+
+ // Tracks if the channel merger has pushed this onto it's heap or not.
+ bool pushed() { return pushed_; }
+ // Sets if this has been pushed to the channel merger heap. Should only be
+ // called by the channel merger.
+ void set_pushed(bool pushed) { pushed_ = pushed; }
+
+ private:
+ // Pushes messages and timestamps to the corresponding heaps.
+ void PushMessageHeap(
+ std::tuple<monotonic_clock::time_point, uint32_t> timestamp,
+ SplitMessageReader *split_message_reader);
+ void PushTimestampHeap(
+ std::tuple<monotonic_clock::time_point, uint32_t> timestamp,
+ SplitMessageReader *split_message_reader);
+
+ // Pops a message from the message heap. This automatically triggers the
+ // split message reader to re-fetch any new data.
+ std::tuple<monotonic_clock::time_point, uint32_t,
+ FlatbufferVector<MessageHeader>>
+ PopMessageHeap();
+ // Pops a message from the timestamp heap. This automatically triggers the
+ // split message reader to re-fetch any new data.
+ std::tuple<monotonic_clock::time_point, uint32_t,
+ FlatbufferVector<MessageHeader>>
+ PopTimestampHeap();
+
+ const Configuration *configuration_;
+
+ // If true, this is a forwarded channel and timestamps should be matched.
+ bool has_timestamps_ = false;
+
+ // Tracks if the ChannelMerger has pushed this onto it's queue.
+ bool pushed_ = false;
+
+ // The split message readers used for source data.
+ std::vector<SplitMessageReader *> split_message_readers_;
+
+ // The channel to merge.
+ int channel_index_;
+
+ // Our node.
+ int node_index_;
+
+ // Heaps for messages and timestamps.
+ std::vector<
+ std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>>
+ message_heap_;
+ std::vector<
+ std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>>
+ timestamp_heap_;
+
+ // Parent channel merger.
+ ChannelMerger *channel_merger_;
+};
+
+// This class handles constructing all the split message readers, channel
+// mergers, and combining the results.
+class ChannelMerger {
+ public:
+ // Builds a ChannelMerger around a set of log files. These are of the format:
+ // {
+ // {log1_part0, log1_part1, ...},
+ // {log2}
+ // }
+ // The inner vector is a list of log file chunks which form up a log file.
+ // The outer vector is a list of log files with subsets of the messages, or
+ // messages from different nodes.
+ ChannelMerger(const std::vector<std::vector<std::string>> &filenames);
+
+ // Returns the nodes that we know how to merge.
+ const std::vector<const Node *> nodes() const;
+ // Sets the node that we will return messages as. Returns true if the node
+ // has log files and will produce data. This can only be called once, and
+ // will likely corrupt state if called a second time.
+ bool SetNode(const Node *target_node);
+
+ // Everything else needs the node set before it works.
+
+ // Returns a timestamp for the oldest message in this group of logfiles.
+ monotonic_clock::time_point OldestMessage() const;
+ // Pops the oldest message.
+ std::tuple<TimestampMerger::DeliveryTimestamp, int,
+ FlatbufferVector<MessageHeader>>
+ PopOldest();
+
+ // Returns the config for this set of log files.
+ const Configuration *configuration() const {
+ return log_file_header()->configuration();
+ }
+
+ const LogFileHeader *log_file_header() const {
+ return &log_file_header_.message();
+ }
+
+ // Returns the start times for the configured node's log files.
+ monotonic_clock::time_point monotonic_start_time() {
+ return monotonic_clock::time_point(
+ std::chrono::nanoseconds(log_file_header()->monotonic_start_time()));
+ }
+ realtime_clock::time_point realtime_start_time() {
+ return realtime_clock::time_point(
+ std::chrono::nanoseconds(log_file_header()->realtime_start_time()));
+ }
+
+ // Returns the node set by SetNode above.
+ const Node *node() const { return node_; }
+
+ // Called by the TimestampMerger when new data is available with the provided
+ // timestamp and channel_index.
+ void Update(monotonic_clock::time_point timestamp, int channel_index) {
+ PushChannelHeap(timestamp, channel_index);
+ }
+
+ private:
+ // Queues messages from each SplitMessageReader until enough data is queued
+ // such that we can guarentee all sorting has happened.
+ void QueueMessages(monotonic_clock::time_point oldest_message_time);
+
+ // Pushes the timestamp for new data on the provided channel.
+ void PushChannelHeap(monotonic_clock::time_point timestamp,
+ int channel_index);
+
+ // All the message readers.
+ std::vector<std::unique_ptr<SplitMessageReader>> split_message_readers_;
+
+ // The log header we are claiming to be.
+ FlatbufferDetachedBuffer<LogFileHeader> log_file_header_;
+
+ // The timestamp mergers which combine data from the split message readers.
+ std::vector<TimestampMerger> timestamp_mergers_;
+
+ // A heap of the channel readers and timestamps for the oldest data in each.
std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap_;
+ // This holds a heap of split_message_readers sorted by the time at which they
+ // need to have QueueMessages called on them.
+ std::vector<std::pair<monotonic_clock::time_point, int>>
+ split_message_reader_heap_;
+
+ // Configured node.
+ const Node *node_;
+
+ // Cached copy of the list of nodes.
+ std::vector<const Node *> nodes_;
};
} // namespace logger