Split logger.h into log_reader and log_writer
Much simpler!
Change-Id: I6c4ee363b56b67dac40c456261bbed79d01b8eb6
diff --git a/aos/events/logging/log_reader.h b/aos/events/logging/log_reader.h
new file mode 100644
index 0000000..875c90d
--- /dev/null
+++ b/aos/events/logging/log_reader.h
@@ -0,0 +1,472 @@
+#ifndef AOS_EVENTS_LOGGING_LOG_READER_H_
+#define AOS_EVENTS_LOGGING_LOG_READER_H_
+
+#include <chrono>
+#include <deque>
+#include <string_view>
+#include <tuple>
+#include <vector>
+
+#include "aos/events/event_loop.h"
+#include "aos/events/logging/logfile_sorting.h"
+#include "aos/events/logging/logfile_utils.h"
+#include "aos/events/logging/logger_generated.h"
+#include "aos/events/logging/uuid.h"
+#include "aos/events/simulated_event_loop.h"
+#include "aos/network/message_bridge_server_generated.h"
+#include "aos/network/multinode_timestamp_filter.h"
+#include "aos/network/remote_message_generated.h"
+#include "aos/network/timestamp_filter.h"
+#include "aos/time/time.h"
+#include "flatbuffers/flatbuffers.h"
+
+namespace aos {
+namespace logger {
+
+// We end up with one of the following 3 log file types.
+//
+// Single node logged as the source node.
+// -> Replayed just on the source node.
+//
+// Forwarding timestamps only logged from the perspective of the destination
+// node.
+// -> Matched with data on source node and logged.
+//
+// Forwarding timestamps with data logged as the destination node.
+// -> Replayed just as the destination
+// -> Replayed as the source (Much harder, ordering is not defined)
+//
+// Duplicate data logged. -> CHECK that it matches and explode otherwise.
+//
+// This can be boiled down to a set of constraints and tools.
+//
+// 1) Forwarding timestamps and data need to be logged separately.
+// 2) Any forwarded data logged on the destination node needs to be logged
+// separately such that it can be sorted.
+//
+// 1) Log reader needs to be able to sort a list of log files.
+// 2) Log reader needs to be able to merge sorted lists of log files.
+// 3) Log reader needs to be able to match timestamps with messages.
+//
+// We also need to be able to generate multiple views of a log file depending on
+// the target.
+
+// Replays all the channels in the logfile to the event loop.
+class LogReader {
+ public:
+ // If you want to supply a new configuration that will be used for replay
+ // (e.g., to change message rates, or to populate an updated schema), then
+ // pass it in here. It must provide all the channels that the original logged
+ // config did.
+ //
+ // The single file constructor calls SortParts internally.
+ LogReader(std::string_view filename,
+ const Configuration *replay_configuration = nullptr);
+ LogReader(std::vector<LogFile> log_files,
+ const Configuration *replay_configuration = nullptr);
+ ~LogReader();
+
+ // Registers all the callbacks to send the log file data out on an event loop
+ // created in event_loop_factory. This also updates time to be at the start
+ // of the log file by running until the log file starts.
+ // Note: the configuration used in the factory should be configuration()
+ // below, but can be anything as long as the locations needed to send
+ // everything are available.
+ void Register(SimulatedEventLoopFactory *event_loop_factory);
+ // Creates an SimulatedEventLoopFactory accessible via event_loop_factory(),
+ // and then calls Register.
+ void Register();
+ // Registers callbacks for all the events after the log file starts. This is
+ // only useful when replaying live.
+ void Register(EventLoop *event_loop);
+
+ // Unregisters the senders. You only need to call this if you separately
+ // supplied an event loop or event loop factory and the lifetimes are such
+ // that they need to be explicitly destroyed before the LogReader destructor
+ // gets called.
+ void Deregister();
+
+ // Returns the configuration being used for replay from the log file.
+ // Note that this may be different from the configuration actually used for
+ // handling events. You should generally only use this to create a
+ // SimulatedEventLoopFactory, and then get the configuration from there for
+ // everything else.
+ const Configuration *logged_configuration() const;
+ // Returns the configuration being used for replay from the log file.
+ // Note that this may be different from the configuration actually used for
+ // handling events. You should generally only use this to create a
+ // SimulatedEventLoopFactory, and then get the configuration from there for
+ // everything else.
+ // The pointer is invalidated whenever RemapLoggedChannel is called.
+ const Configuration *configuration() const;
+
+ // Returns the nodes that this log file was created on. This is a list of
+ // pointers to a node in the nodes() list inside logged_configuration().
+ std::vector<const Node *> LoggedNodes() const;
+
+ // Returns the starting timestamp for the log file.
+ monotonic_clock::time_point monotonic_start_time(
+ const Node *node = nullptr) const;
+ realtime_clock::time_point realtime_start_time(
+ const Node *node = nullptr) const;
+
+ // Causes the logger to publish the provided channel on a different name so
+ // that replayed applications can publish on the proper channel name without
+ // interference. This operates on raw channel names, without any node or
+ // application specific mappings.
+ void RemapLoggedChannel(std::string_view name, std::string_view type,
+ std::string_view add_prefix = "/original",
+ std::string_view new_type = "");
+ template <typename T>
+ void RemapLoggedChannel(std::string_view name,
+ std::string_view add_prefix = "/original",
+ std::string_view new_type = "") {
+ RemapLoggedChannel(name, T::GetFullyQualifiedName(), add_prefix, new_type);
+ }
+
+ // Remaps the provided channel, though this respects node mappings, and
+ // preserves them too. This makes it so if /aos -> /pi1/aos on one node,
+ // /original/aos -> /original/pi1/aos on the same node after renaming, just
+ // like you would hope. If new_type is not empty, the new channel will use
+ // the provided type instead. This allows for renaming messages.
+ //
+ // TODO(austin): If you have 2 nodes remapping something to the same channel,
+ // this doesn't handle that. No use cases exist yet for that, so it isn't
+ // being done yet.
+ void RemapLoggedChannel(std::string_view name, std::string_view type,
+ const Node *node,
+ std::string_view add_prefix = "/original",
+ std::string_view new_type = "");
+ template <typename T>
+ void RemapLoggedChannel(std::string_view name, const Node *node,
+ std::string_view add_prefix = "/original",
+ std::string_view new_type = "") {
+ RemapLoggedChannel(name, T::GetFullyQualifiedName(), node, add_prefix,
+ new_type);
+ }
+
+ template <typename T>
+ bool HasChannel(std::string_view name, const Node *node = nullptr) {
+ return configuration::GetChannel(logged_configuration(), name,
+ T::GetFullyQualifiedName(), "", node,
+ true) != nullptr;
+ }
+
+ // Returns true if the channel exists on the node and was logged.
+ template <typename T>
+ bool HasLoggedChannel(std::string_view name, const Node *node = nullptr) {
+ const Channel *channel =
+ configuration::GetChannel(logged_configuration(), name,
+ T::GetFullyQualifiedName(), "", node, true);
+ if (channel == nullptr) return false;
+ return channel->logger() != LoggerConfig::NOT_LOGGED;
+ }
+
+ SimulatedEventLoopFactory *event_loop_factory() {
+ return event_loop_factory_;
+ }
+
+ std::string_view name() const { return log_files_[0].name; }
+
+ // Set whether to exit the SimulatedEventLoopFactory when we finish reading
+ // the logfile.
+ void set_exit_on_finish(bool exit_on_finish) {
+ exit_on_finish_ = exit_on_finish;
+ }
+
+ private:
+ const Channel *RemapChannel(const EventLoop *event_loop,
+ const Channel *channel);
+
+ // Queues at least max_out_of_order_duration_ messages into channels_.
+ void QueueMessages();
+ // Handle constructing a configuration with all the additional remapped
+ // channels from calls to RemapLoggedChannel.
+ void MakeRemappedConfig();
+
+ // Returns the number of nodes.
+ size_t nodes_count() const {
+ return !configuration::MultiNode(logged_configuration())
+ ? 1u
+ : logged_configuration()->nodes()->size();
+ }
+
+ const std::vector<LogFile> log_files_;
+
+ // Class to manage sending RemoteMessages on the provided node after the
+ // correct delay.
+ class RemoteMessageSender {
+ public:
+ RemoteMessageSender(aos::Sender<message_bridge::RemoteMessage> sender,
+ EventLoop *event_loop);
+ RemoteMessageSender(RemoteMessageSender const &) = delete;
+ RemoteMessageSender &operator=(RemoteMessageSender const &) = delete;
+
+ // Sends the provided message. If monotonic_timestamp_time is min_time,
+ // send it immediately.
+ void Send(
+ FlatbufferDetachedBuffer<message_bridge::RemoteMessage> remote_message,
+ monotonic_clock::time_point monotonic_timestamp_time);
+
+ private:
+ // Handles actually sending the timestamp if we were delayed.
+ void SendTimestamp();
+ // Handles scheduling the timer to send at the correct time.
+ void ScheduleTimestamp();
+
+ EventLoop *event_loop_;
+ aos::Sender<message_bridge::RemoteMessage> sender_;
+ aos::TimerHandler *timer_;
+
+ // Time we are scheduled for, or min_time if we aren't scheduled.
+ monotonic_clock::time_point scheduled_time_ = monotonic_clock::min_time;
+
+ struct Timestamp {
+ Timestamp(FlatbufferDetachedBuffer<message_bridge::RemoteMessage>
+ new_remote_message,
+ monotonic_clock::time_point new_monotonic_timestamp_time)
+ : remote_message(std::move(new_remote_message)),
+ monotonic_timestamp_time(new_monotonic_timestamp_time) {}
+ FlatbufferDetachedBuffer<message_bridge::RemoteMessage> remote_message;
+ monotonic_clock::time_point monotonic_timestamp_time;
+ };
+
+ // List of messages to send. The timer works through them and then disables
+ // itself automatically.
+ std::deque<Timestamp> remote_timestamps_;
+ };
+
+ // State per node.
+ class State {
+ public:
+ State(std::unique_ptr<TimestampMapper> timestamp_mapper);
+
+ // Connects up the timestamp mappers.
+ void AddPeer(State *peer);
+
+ TimestampMapper *timestamp_mapper() { return timestamp_mapper_.get(); }
+
+ // Returns the next sorted message with all the timestamps extracted and
+ // matched.
+ TimestampedMessage PopOldest();
+
+ // Returns the monotonic time of the oldest message.
+ monotonic_clock::time_point OldestMessageTime() const;
+
+ // Primes the queues inside State. Should be called before calling
+ // OldestMessageTime.
+ void SeedSortedMessages();
+
+ // Returns the starting time for this node.
+ monotonic_clock::time_point monotonic_start_time() const {
+ return timestamp_mapper_ ? timestamp_mapper_->monotonic_start_time()
+ : monotonic_clock::min_time;
+ }
+ realtime_clock::time_point realtime_start_time() const {
+ return timestamp_mapper_ ? timestamp_mapper_->realtime_start_time()
+ : realtime_clock::min_time;
+ }
+
+ // Sets the node event loop factory for replaying into a
+ // SimulatedEventLoopFactory. Returns the EventLoop to use.
+ EventLoop *SetNodeEventLoopFactory(
+ NodeEventLoopFactory *node_event_loop_factory);
+
+ // Sets and gets the event loop to use.
+ void set_event_loop(EventLoop *event_loop) { event_loop_ = event_loop; }
+ EventLoop *event_loop() { return event_loop_; }
+
+ // Sets the current realtime offset from the monotonic clock for this node
+ // (if we are on a simulated event loop).
+ void SetRealtimeOffset(monotonic_clock::time_point monotonic_time,
+ realtime_clock::time_point realtime_time) {
+ if (node_event_loop_factory_ != nullptr) {
+ node_event_loop_factory_->SetRealtimeOffset(monotonic_time,
+ realtime_time);
+ }
+ }
+
+ // Returns the MessageHeader sender to log delivery timestamps to for the
+ // provided remote node.
+ RemoteMessageSender *RemoteTimestampSender(const Node *delivered_node);
+
+ // Converts a timestamp from the monotonic clock on this node to the
+ // distributed clock.
+ distributed_clock::time_point ToDistributedClock(
+ monotonic_clock::time_point time) {
+ return node_event_loop_factory_->ToDistributedClock(time);
+ }
+
+ // Returns the current time on the remote node which sends messages on
+ // channel_index.
+ monotonic_clock::time_point monotonic_remote_now(size_t channel_index) {
+ return channel_source_state_[channel_index]
+ ->node_event_loop_factory_->monotonic_now();
+ }
+
+ // Returns the start time of the remote for the provided channel.
+ monotonic_clock::time_point monotonic_remote_start_time(
+ size_t channel_index) {
+ return channel_source_state_[channel_index]->monotonic_start_time();
+ }
+
+ distributed_clock::time_point RemoteToDistributedClock(
+ size_t channel_index, monotonic_clock::time_point time) {
+ return channel_source_state_[channel_index]
+ ->node_event_loop_factory_->ToDistributedClock(time);
+ }
+
+ const Node *remote_node(size_t channel_index) {
+ return channel_source_state_[channel_index]
+ ->node_event_loop_factory_->node();
+ }
+
+ monotonic_clock::time_point monotonic_now() {
+ return node_event_loop_factory_->monotonic_now();
+ }
+
+ // Sets the number of channels.
+ void SetChannelCount(size_t count);
+
+ // Sets the sender, filter, and target factory for a channel.
+ void SetChannel(size_t logged_channel_index, size_t factory_channel_index,
+ std::unique_ptr<RawSender> sender,
+ message_bridge::NoncausalOffsetEstimator *filter,
+ RemoteMessageSender *remote_timestamp_sender,
+ State *source_state);
+
+ // Unregisters everything so we can destory the event loop.
+ void Deregister();
+
+ // Sets the current TimerHandle for the replay callback.
+ void set_timer_handler(TimerHandler *timer_handler) {
+ timer_handler_ = timer_handler;
+ }
+
+ // Sets the next wakeup time on the replay callback.
+ void Setup(monotonic_clock::time_point next_time) {
+ timer_handler_->Setup(next_time);
+ }
+
+ // Sends a buffer on the provided channel index.
+ bool Send(const TimestampedMessage ×tamped_message);
+
+ // Returns a debug string for the channel merger.
+ std::string DebugString() const {
+ if (!timestamp_mapper_) {
+ return "";
+ }
+ return timestamp_mapper_->DebugString();
+ }
+
+ private:
+ // Log file.
+ std::unique_ptr<TimestampMapper> timestamp_mapper_;
+
+ // Senders.
+ std::vector<std::unique_ptr<RawSender>> channels_;
+ std::vector<RemoteMessageSender *> remote_timestamp_senders_;
+ // The mapping from logged channel index to sent channel index. Needed for
+ // sending out MessageHeaders.
+ std::vector<int> factory_channel_index_;
+
+ struct ContiguousSentTimestamp {
+ // Most timestamps make it through the network, so it saves a ton of
+ // memory and CPU to store the start and end, and search for valid ranges.
+ // For one of the logs I looked at, we had 2 ranges for 4 days.
+ //
+ // Save monotonic times as well to help if a queue index ever wraps. Odds
+ // are very low, but doesn't hurt.
+ //
+ // The starting time and matching queue index.
+ monotonic_clock::time_point starting_monotonic_event_time =
+ monotonic_clock::min_time;
+ uint32_t starting_queue_index = 0xffffffff;
+
+ // Ending time and queue index.
+ monotonic_clock::time_point ending_monotonic_event_time =
+ monotonic_clock::max_time;
+ uint32_t ending_queue_index = 0xffffffff;
+
+ // The queue index that the first message was *actually* sent with. The
+ // queue indices are assumed to be contiguous through this range.
+ uint32_t actual_queue_index = 0xffffffff;
+ };
+
+ // Stores all the timestamps that have been sent on this channel. This is
+ // only done for channels which are forwarded and on the node which
+ // initially sends the message. Compress using ranges and offsets.
+ std::vector<std::unique_ptr<std::vector<ContiguousSentTimestamp>>>
+ queue_index_map_;
+
+ // Factory (if we are in sim) that this loop was created on.
+ NodeEventLoopFactory *node_event_loop_factory_ = nullptr;
+ std::unique_ptr<EventLoop> event_loop_unique_ptr_;
+ // Event loop.
+ EventLoop *event_loop_ = nullptr;
+ // And timer used to send messages.
+ TimerHandler *timer_handler_;
+
+ // Filters (or nullptr if it isn't a forwarded channel) for each channel.
+ // This corresponds to the object which is shared among all the channels
+ // going between 2 nodes. The second element in the tuple indicates if this
+ // is the primary direction or not.
+ std::vector<message_bridge::NoncausalOffsetEstimator *> filters_;
+
+ // List of NodeEventLoopFactorys (or nullptr if it isn't a forwarded
+ // channel) which correspond to the originating node.
+ std::vector<State *> channel_source_state_;
+
+ std::map<const Node *, std::unique_ptr<RemoteMessageSender>>
+ remote_timestamp_senders_map_;
+ };
+
+ // Node index -> State.
+ std::vector<std::unique_ptr<State>> states_;
+
+ // Creates the requested filter if it doesn't exist, regardless of whether
+ // these nodes can actually communicate directly. The second return value
+ // reports if this is the primary direction or not.
+ message_bridge::NoncausalOffsetEstimator *GetFilter(const Node *node_a,
+ const Node *node_b);
+
+ // List of filters for a connection. The pointer to the first node will be
+ // less than the second node.
+ std::unique_ptr<message_bridge::MultiNodeNoncausalOffsetEstimator> filters_;
+
+ std::unique_ptr<FlatbufferDetachedBuffer<Configuration>>
+ remapped_configuration_buffer_;
+
+ std::unique_ptr<SimulatedEventLoopFactory> event_loop_factory_unique_ptr_;
+ SimulatedEventLoopFactory *event_loop_factory_ = nullptr;
+
+ // Map of channel indices to new name. The channel index will be an index into
+ // logged_configuration(), and the string key will be the name of the channel
+ // to send on instead of the logged channel name.
+ struct RemappedChannel {
+ std::string remapped_name;
+ std::string new_type;
+ };
+ std::map<size_t, RemappedChannel> remapped_channels_;
+ std::vector<MapT> maps_;
+
+ // Number of nodes which still have data to send. This is used to figure out
+ // when to exit.
+ size_t live_nodes_ = 0;
+
+ const Configuration *remapped_configuration_ = nullptr;
+ const Configuration *replay_configuration_ = nullptr;
+
+ // If true, the replay timer will ignore any missing data. This is used
+ // during startup when we are bootstrapping everything and trying to get to
+ // the start of all the log files.
+ bool ignore_missing_data_ = false;
+
+ // Whether to exit the SimulatedEventLoop when we finish reading the logs.
+ bool exit_on_finish_ = true;
+};
+
+} // namespace logger
+} // namespace aos
+
+#endif // AOS_EVENTS_LOGGING_LOG_READER_H_