Add multi-node local logging to the logger
This is not yet able to forward messages, but is able to log messages
that have been forwarded. Create a log file and test that the
timestamps are getting recorded correctly.
Change-Id: Ica891dbc560543546f6ee594438cebb03672190e
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index 497dabb..1e9a218 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -46,10 +46,17 @@
std::vector<struct iovec> iovec_;
};
-// Packes a message pointed to by the context into a MessageHeader.
-flatbuffers::Offset<MessageHeader> PackMessage(
- flatbuffers::FlatBufferBuilder *fbb, const Context &context,
- int channel_index);
+enum class LogType : uint8_t {
+ // The message originated on this node and should be logged here.
+ kLogMessage,
+ // The message originated on another node, but only the delivery times are
+ // logged here.
+ kLogDeliveryTimeOnly,
+ // The message originated on another node. Log it and the delivery times
+ // together. The message_gateway is responsible for logging any messages
+ // which didn't get delivered.
+ kLogMessageAndDeliveryTime
+};
// Logs all channels available in the event loop to disk every 100 ms.
// Start by logging one message per channel to capture any state and
@@ -73,6 +80,8 @@
struct FetcherStruct {
std::unique_ptr<RawFetcher> fetcher;
bool written = false;
+
+ LogType log_type;
};
std::vector<FetcherStruct> fetchers_;
@@ -89,6 +98,11 @@
size_t max_header_size_ = 0;
};
+// Packes a message pointed to by the context into a MessageHeader.
+flatbuffers::Offset<MessageHeader> PackMessage(
+ flatbuffers::FlatBufferBuilder *fbb, const Context &context,
+ int channel_index, LogType log_type);
+
// Replays all the channels in the logfile to the event loop.
class LogReader {
public:
@@ -106,7 +120,10 @@
// TODO(austin): Remap channels?
// Returns the configuration from the log file.
- const Configuration *configuration();
+ const Configuration *configuration() const;
+
+ // Returns the node that this log file was created on.
+ const Node *node() const;
// Returns the starting timestamp for the log file.
monotonic_clock::time_point monotonic_start_time();
@@ -125,8 +142,8 @@
// will have to read more data from disk.
bool MessageAvailable();
- // Returns a span with the data for a message from the log file, excluding the
- // size.
+ // Returns a span with the data for a message from the log file, excluding
+ // the size.
absl::Span<const uint8_t> ReadMessage();
// Queues at least max_out_of_order_duration_ messages into channels_.
@@ -144,16 +161,16 @@
// buffer, then into sender), but none of it is all that expensive. We can
// optimize if it is slow later.
//
- // As we place the elements in the sorted list of times, keep doing this until
- // we read a message that is newer than the threshold.
+ // As we place the elements in the sorted list of times, keep doing this
+ // until we read a message that is newer than the threshold.
//
// Then repeat. Keep filling up the sorted list with 256 KB chunks (need a
// small state machine so we can resume), and keep pulling messages back out
// and sending.
//
- // For sorting, we want to use the fact that each channel is sorted, and then
- // merge sort the channels. Have a vector of deques, and then hold a sorted
- // list of pointers to those.
+ // For sorting, we want to use the fact that each channel is sorted, and
+ // then merge sort the channels. Have a vector of deques, and then hold a
+ // sorted list of pointers to those.
//
// TODO(austin): Multithreaded read at some point. Gotta go faster!
// Especially if we start compressing.
@@ -183,8 +200,8 @@
}
};
- // Minimum amount of data to queue up for sorting before we are guarenteed to
- // not see data out of order.
+ // Minimum amount of data to queue up for sorting before we are guarenteed
+ // to not see data out of order.
std::chrono::nanoseconds max_out_of_order_duration_;
// File descriptor for the log file.
@@ -195,8 +212,8 @@
EventLoop *event_loop_ = nullptr;
TimerHandler *timer_handler_;
- // Vector to read into. This uses an allocator which doesn't zero initialize
- // the memory.
+ // Vector to read into. This uses an allocator which doesn't zero
+ // initialize the memory.
std::vector<uint8_t, DefaultInitAllocator<uint8_t>> data_;
// Amount of data consumed already in data_.
@@ -223,8 +240,8 @@
// timestamp.
std::pair<monotonic_clock::time_point, int> PopOldestChannel();
- // Datastructure to hold the list of messages, cached timestamp for the oldest
- // message, and sender to send with.
+ // Datastructure to hold the list of messages, cached timestamp for the
+ // oldest message, and sender to send with.
struct ChannelData {
monotonic_clock::time_point oldest_timestamp = monotonic_clock::min_time;
std::deque<FlatbufferVector<MessageHeader>> data;