| #ifndef AOS_EVENTS_LOGGER_H_ |
| #define AOS_EVENTS_LOGGER_H_ |
| |
| #include <deque> |
| #include <vector> |
| #include <string_view> |
| |
| #include "absl/types/span.h" |
| #include "aos/events/event_loop.h" |
| #include "aos/events/logging/logfile_utils.h" |
| #include "aos/events/logging/logger_generated.h" |
| #include "aos/events/simulated_event_loop.h" |
| #include "aos/time/time.h" |
| #include "flatbuffers/flatbuffers.h" |
| |
| namespace aos { |
| namespace logger { |
| |
| // Logs all channels available in the event loop to disk every 100 ms. |
| // Start by logging one message per channel to capture any state and |
| // configuration that is sent rately on a channel and would affect execution. |
| class Logger { |
| public: |
| Logger(DetachedBufferWriter *writer, EventLoop *event_loop, |
| std::chrono::milliseconds polling_period = |
| std::chrono::milliseconds(100)); |
| |
| private: |
| void DoLogData(); |
| |
| EventLoop *event_loop_; |
| DetachedBufferWriter *writer_; |
| |
| // Structure to track both a fetcher, and if the data fetched has been |
| // written. We may want to delay writing data to disk so that we don't let |
| // data get too far out of order when written to disk so we can avoid making |
| // it too hard to sort when reading. |
| struct FetcherStruct { |
| std::unique_ptr<RawFetcher> fetcher; |
| bool written = false; |
| |
| LogType log_type; |
| }; |
| |
| std::vector<FetcherStruct> fetchers_; |
| TimerHandler *timer_handler_; |
| |
| // Period to poll the channels. |
| const std::chrono::milliseconds polling_period_; |
| |
| // Last time that data was written for all channels to disk. |
| monotonic_clock::time_point last_synchronized_time_; |
| |
| // Max size that the header has consumed. This much extra data will be |
| // reserved in the builder to avoid reallocating. |
| size_t max_header_size_ = 0; |
| }; |
| |
| // Replays all the channels in the logfile to the event loop. |
| class LogReader { |
| public: |
| // If you want to supply a new configuration that will be used for replay |
| // (e.g., to change message rates, or to populate an updated schema), then |
| // pass it in here. It must provide all the channels that the original logged |
| // config did. |
| LogReader(std::string_view filename, |
| const Configuration *replay_configuration = nullptr); |
| ~LogReader(); |
| |
| // Registers everything, but also updates the real time time in sync. Runs |
| // until the log file starts. |
| // Note that if you use any call other than the Register() call with no |
| // arguments, the user is responsible for making sure that the config of the |
| // supplied event loop (factory) provides any necessary remapped configs. |
| void Register(); |
| // Does the same as Register(), except it uses a pre-provided event loop |
| // factory. |
| void Register(SimulatedEventLoopFactory *event_loop_factory); |
| // Registers the timer and senders used to resend the messages from the log |
| // file. |
| void Register(EventLoop *event_loop); |
| // Unregisters the senders. You only need to call this if you separately |
| // supplied an event loop or event loop factory and the lifetimes are such |
| // that they need to be explicitly destroyed before the LogReader destructor |
| // gets called. |
| void Deregister(); |
| |
| // Returns the configuration from the log file. |
| const Configuration *logged_configuration() const; |
| // Returns the configuration being used for replay. |
| const Configuration *configuration() const; |
| |
| // Returns the node that this log file was created on. |
| const Node *node() const; |
| |
| // Returns the starting timestamp for the log file. |
| monotonic_clock::time_point monotonic_start_time(); |
| realtime_clock::time_point realtime_start_time(); |
| |
| // Causes the logger to publish the provided channel on a different name so |
| // that replayed applications can publish on the proper channel name without |
| // interference. This operates on raw channel names, without any node or |
| // application specific mappings. |
| void RemapLoggedChannel(std::string_view name, std::string_view type, |
| std::string_view add_prefix = "/original"); |
| template <typename T> |
| void RemapLoggedChannel(std::string_view name, |
| std::string_view add_prefix = "/original") { |
| RemapLoggedChannel(name, T::GetFullyQualifiedName(), add_prefix); |
| } |
| |
| SimulatedEventLoopFactory *event_loop_factory() { |
| return event_loop_factory_; |
| } |
| |
| // TODO(austin): Add the ability to re-publish the fetched messages. Add 2 |
| // options, one which publishes them *now*, and another which publishes them |
| // to the simulated event loop factory back in time where they actually |
| // happened. |
| |
| private: |
| // Queues at least max_out_of_order_duration_ messages into channels_. |
| void QueueMessages(); |
| // Handle constructing a configuration with all the additional remapped |
| // channels from calls to RemapLoggedChannel. |
| void MakeRemappedConfig(); |
| |
| // Log chunk reader. |
| SortedMessageReader sorted_message_reader_; |
| |
| std::unique_ptr<FlatbufferDetachedBuffer<Configuration>> |
| remapped_configuration_buffer_; |
| |
| std::vector<std::unique_ptr<RawSender>> channels_; |
| |
| std::unique_ptr<EventLoop> event_loop_unique_ptr_; |
| EventLoop *event_loop_ = nullptr; |
| TimerHandler *timer_handler_; |
| |
| std::unique_ptr<SimulatedEventLoopFactory> event_loop_factory_unique_ptr_; |
| SimulatedEventLoopFactory *event_loop_factory_ = nullptr; |
| |
| // Map of channel indices to new name. The channel index will be an index into |
| // logged_configuration(), and the string key will be the name of the channel |
| // to send on instead of the logged channel name. |
| std::map<size_t, std::string> remapped_channels_; |
| |
| const Configuration *remapped_configuration_ = nullptr; |
| const Configuration *replay_configuration_ = nullptr; |
| }; |
| |
| } // namespace logger |
| } // namespace aos |
| |
| #endif // AOS_EVENTS_LOGGER_H_ |