blob: 07cca4866530d621df72dd84322685f832316dda [file] [log] [blame]
Austin Schuhb06f03b2021-02-17 22:00:37 -08001#ifndef AOS_EVENTS_LOGGING_LOG_READER_H_
2#define AOS_EVENTS_LOGGING_LOG_READER_H_
Austin Schuhe309d2a2019-11-29 13:25:21 -08003
Austin Schuh8bd96322020-02-13 21:18:22 -08004#include <chrono>
Austin Schuhe309d2a2019-11-29 13:25:21 -08005#include <deque>
James Kuszmaula16a7912022-06-17 10:58:12 -07006#include <queue>
James Kuszmaulc3f34d12022-08-15 15:57:55 -07007#include <string_view>
Austin Schuh2f8fd752020-09-01 22:38:28 -07008#include <tuple>
Austin Schuh6f3babe2020-01-26 20:34:50 -08009#include <vector>
Austin Schuhe309d2a2019-11-29 13:25:21 -080010
James Kuszmaulc3f34d12022-08-15 15:57:55 -070011#include "aos/condition.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080012#include "aos/events/event_loop.h"
Austin Schuhf6f9bf32020-10-11 14:37:43 -070013#include "aos/events/logging/logfile_sorting.h"
Austin Schuha36c8902019-12-30 18:07:15 -080014#include "aos/events/logging/logfile_utils.h"
James Kuszmaul38735e82019-12-07 16:42:06 -080015#include "aos/events/logging/logger_generated.h"
James Kuszmaula16a7912022-06-17 10:58:12 -070016#include "aos/events/logging/replay_timing_generated.h"
James Kuszmaul09632422022-05-25 15:56:19 -070017#include "aos/events/shm_event_loop.h"
Austin Schuh92547522019-12-28 14:33:43 -080018#include "aos/events/simulated_event_loop.h"
James Kuszmaulc3f34d12022-08-15 15:57:55 -070019#include "aos/mutex/mutex.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -070020#include "aos/network/message_bridge_server_generated.h"
Austin Schuh0ca1fd32020-12-18 22:53:05 -080021#include "aos/network/multinode_timestamp_filter.h"
Austin Schuh0de30f32020-12-06 12:44:28 -080022#include "aos/network/remote_message_generated.h"
Austin Schuh8bd96322020-02-13 21:18:22 -080023#include "aos/network/timestamp_filter.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080024#include "aos/time/time.h"
James Kuszmaula16a7912022-06-17 10:58:12 -070025#include "aos/util/threaded_queue.h"
James Kuszmaulc3f34d12022-08-15 15:57:55 -070026#include "aos/uuid.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080027#include "flatbuffers/flatbuffers.h"
28
29namespace aos {
30namespace logger {
31
Austin Schuhe33c08d2022-02-03 18:15:21 -080032class EventNotifier;
33
Austin Schuh6f3babe2020-01-26 20:34:50 -080034// We end up with one of the following 3 log file types.
35//
36// Single node logged as the source node.
37// -> Replayed just on the source node.
38//
39// Forwarding timestamps only logged from the perspective of the destination
40// node.
41// -> Matched with data on source node and logged.
42//
43// Forwarding timestamps with data logged as the destination node.
44// -> Replayed just as the destination
45// -> Replayed as the source (Much harder, ordering is not defined)
46//
47// Duplicate data logged. -> CHECK that it matches and explode otherwise.
48//
49// This can be boiled down to a set of constraints and tools.
50//
51// 1) Forwarding timestamps and data need to be logged separately.
52// 2) Any forwarded data logged on the destination node needs to be logged
53// separately such that it can be sorted.
54//
55// 1) Log reader needs to be able to sort a list of log files.
56// 2) Log reader needs to be able to merge sorted lists of log files.
57// 3) Log reader needs to be able to match timestamps with messages.
58//
59// We also need to be able to generate multiple views of a log file depending on
60// the target.
61
Austin Schuhe309d2a2019-11-29 13:25:21 -080062// Replays all the channels in the logfile to the event loop.
63class LogReader {
64 public:
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -080065 // If you want to supply a new configuration that will be used for replay
66 // (e.g., to change message rates, or to populate an updated schema), then
67 // pass it in here. It must provide all the channels that the original logged
68 // config did.
Austin Schuh6f3babe2020-01-26 20:34:50 -080069 //
Austin Schuh287d43d2020-12-04 20:19:33 -080070 // The single file constructor calls SortParts internally.
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -080071 LogReader(std::string_view filename,
72 const Configuration *replay_configuration = nullptr);
Austin Schuh287d43d2020-12-04 20:19:33 -080073 LogReader(std::vector<LogFile> log_files,
Austin Schuh11d43732020-09-21 17:28:30 -070074 const Configuration *replay_configuration = nullptr);
James Kuszmaul7daef362019-12-31 18:28:17 -080075 ~LogReader();
Austin Schuhe309d2a2019-11-29 13:25:21 -080076
Austin Schuh6331ef92020-01-07 18:28:09 -080077 // Registers all the callbacks to send the log file data out on an event loop
78 // created in event_loop_factory. This also updates time to be at the start
79 // of the log file by running until the log file starts.
80 // Note: the configuration used in the factory should be configuration()
81 // below, but can be anything as long as the locations needed to send
82 // everything are available.
James Kuszmaul84ff3e52020-01-03 19:48:53 -080083 void Register(SimulatedEventLoopFactory *event_loop_factory);
Austin Schuhe33c08d2022-02-03 18:15:21 -080084
Austin Schuh58646e22021-08-23 23:51:46 -070085 // Registers all the callbacks to send the log file data out to an event loop
86 // factory. This does not start replaying or change the current distributed
87 // time of the factory. It does change the monotonic clocks to be right.
88 void RegisterWithoutStarting(SimulatedEventLoopFactory *event_loop_factory);
Austin Schuhe33c08d2022-02-03 18:15:21 -080089 // Runs the log until the last start time. Register above is defined as:
90 // Register(...) {
91 // RegisterWithoutStarting
92 // StartAfterRegister
93 // }
94 // This should generally be considered as a stepping stone to convert from
95 // Register() to RegisterWithoutStarting() incrementally.
96 void StartAfterRegister(SimulatedEventLoopFactory *event_loop_factory);
97
Austin Schuh6331ef92020-01-07 18:28:09 -080098 // Creates an SimulatedEventLoopFactory accessible via event_loop_factory(),
99 // and then calls Register.
100 void Register();
James Kuszmaul09632422022-05-25 15:56:19 -0700101
Austin Schuh6331ef92020-01-07 18:28:09 -0800102 // Registers callbacks for all the events after the log file starts. This is
103 // only useful when replaying live.
Austin Schuhe309d2a2019-11-29 13:25:21 -0800104 void Register(EventLoop *event_loop);
Austin Schuh6331ef92020-01-07 18:28:09 -0800105
James Kuszmaula16a7912022-06-17 10:58:12 -0700106 // Sets a sender that should be used for tracking timing statistics. If not
107 // set, no statistics will be recorded.
108 void set_timing_accuracy_sender(
109 const Node *node, aos::Sender<timing::ReplayTiming> timing_sender) {
110 states_[configuration::GetNodeIndex(configuration(), node)]
111 ->set_timing_accuracy_sender(std::move(timing_sender));
112 }
113
Austin Schuh58646e22021-08-23 23:51:46 -0700114 // Called whenever a log file starts for a node.
115 void OnStart(std::function<void()> fn);
116 void OnStart(const Node *node, std::function<void()> fn);
117 // Called whenever a log file ends for a node.
118 void OnEnd(std::function<void()> fn);
119 void OnEnd(const Node *node, std::function<void()> fn);
120
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800121 // Unregisters the senders. You only need to call this if you separately
122 // supplied an event loop or event loop factory and the lifetimes are such
123 // that they need to be explicitly destroyed before the LogReader destructor
124 // gets called.
Austin Schuhe309d2a2019-11-29 13:25:21 -0800125 void Deregister();
126
Austin Schuh0c297012020-09-16 18:41:59 -0700127 // Returns the configuration being used for replay from the log file.
128 // Note that this may be different from the configuration actually used for
129 // handling events. You should generally only use this to create a
130 // SimulatedEventLoopFactory, and then get the configuration from there for
131 // everything else.
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800132 const Configuration *logged_configuration() const;
Austin Schuh11d43732020-09-21 17:28:30 -0700133 // Returns the configuration being used for replay from the log file.
134 // Note that this may be different from the configuration actually used for
135 // handling events. You should generally only use this to create a
136 // SimulatedEventLoopFactory, and then get the configuration from there for
137 // everything else.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800138 // The pointer is invalidated whenever RemapLoggedChannel is called.
Austin Schuh15649d62019-12-28 16:36:38 -0800139 const Configuration *configuration() const;
140
Austin Schuh6f3babe2020-01-26 20:34:50 -0800141 // Returns the nodes that this log file was created on. This is a list of
Austin Schuh07676622021-01-21 18:59:17 -0800142 // pointers to a node in the nodes() list inside logged_configuration().
143 std::vector<const Node *> LoggedNodes() const;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800144
145 // Returns the starting timestamp for the log file.
Austin Schuh11d43732020-09-21 17:28:30 -0700146 monotonic_clock::time_point monotonic_start_time(
147 const Node *node = nullptr) const;
148 realtime_clock::time_point realtime_start_time(
149 const Node *node = nullptr) const;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800150
Austin Schuhe33c08d2022-02-03 18:15:21 -0800151 // Sets the start and end times to replay data until for all nodes. This
152 // overrides the --start_time and --end_time flags. The default is to replay
153 // all data.
154 void SetStartTime(std::string start_time);
155 void SetStartTime(realtime_clock::time_point start_time);
156 void SetEndTime(std::string end_time);
157 void SetEndTime(realtime_clock::time_point end_time);
158
James Kuszmaul53da7f32022-09-11 11:11:55 -0700159 // Enum to use for indicating how RemapLoggedChannel behaves when there is
160 // already a channel with the remapped name (e.g., as may happen when
161 // replaying a logfile that was itself generated from replay).
162 enum class RemapConflict {
163 // LOG(FATAL) on conflicts in remappings.
164 kDisallow,
165 // If we run into a conflict, attempt to remap the channel we would be
166 // overriding (and continue to do so if remapping *that* channel also
167 // generates a conflict).
168 // This will mean that if we repeatedly replay a log, we will end up
169 // stacking more and more /original's on the start of the oldest version
170 // of the channels.
171 kCascade
172 };
173
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800174 // Causes the logger to publish the provided channel on a different name so
175 // that replayed applications can publish on the proper channel name without
176 // interference. This operates on raw channel names, without any node or
177 // application specific mappings.
James Kuszmaul53da7f32022-09-11 11:11:55 -0700178 void RemapLoggedChannel(
179 std::string_view name, std::string_view type,
180 std::string_view add_prefix = "/original", std::string_view new_type = "",
181 RemapConflict conflict_handling = RemapConflict::kCascade);
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800182 template <typename T>
James Kuszmaul53da7f32022-09-11 11:11:55 -0700183 void RemapLoggedChannel(
184 std::string_view name, std::string_view add_prefix = "/original",
185 std::string_view new_type = "",
186 RemapConflict conflict_handling = RemapConflict::kCascade) {
187 RemapLoggedChannel(name, T::GetFullyQualifiedName(), add_prefix, new_type,
188 conflict_handling);
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800189 }
190
Austin Schuh01b4c352020-09-21 23:09:39 -0700191 // Remaps the provided channel, though this respects node mappings, and
192 // preserves them too. This makes it so if /aos -> /pi1/aos on one node,
193 // /original/aos -> /original/pi1/aos on the same node after renaming, just
Austin Schuh0de30f32020-12-06 12:44:28 -0800194 // like you would hope. If new_type is not empty, the new channel will use
195 // the provided type instead. This allows for renaming messages.
Austin Schuh01b4c352020-09-21 23:09:39 -0700196 //
197 // TODO(austin): If you have 2 nodes remapping something to the same channel,
198 // this doesn't handle that. No use cases exist yet for that, so it isn't
199 // being done yet.
James Kuszmaul53da7f32022-09-11 11:11:55 -0700200 void RemapLoggedChannel(
201 std::string_view name, std::string_view type, const Node *node,
202 std::string_view add_prefix = "/original", std::string_view new_type = "",
203 RemapConflict conflict_handling = RemapConflict::kCascade);
Brian Silvermande9f3ff2020-04-28 16:56:58 -0700204 template <typename T>
James Kuszmaul53da7f32022-09-11 11:11:55 -0700205 void RemapLoggedChannel(
206 std::string_view name, const Node *node,
207 std::string_view add_prefix = "/original", std::string_view new_type = "",
208 RemapConflict conflict_handling = RemapConflict::kCascade) {
Austin Schuh0de30f32020-12-06 12:44:28 -0800209 RemapLoggedChannel(name, T::GetFullyQualifiedName(), node, add_prefix,
James Kuszmaul53da7f32022-09-11 11:11:55 -0700210 new_type, conflict_handling);
Austin Schuh01b4c352020-09-21 23:09:39 -0700211 }
212
213 template <typename T>
214 bool HasChannel(std::string_view name, const Node *node = nullptr) {
Austin Schuh0ca51f32020-12-25 21:51:45 -0800215 return configuration::GetChannel(logged_configuration(), name,
Austin Schuh0de30f32020-12-06 12:44:28 -0800216 T::GetFullyQualifiedName(), "", node,
217 true) != nullptr;
Brian Silvermande9f3ff2020-04-28 16:56:58 -0700218 }
219
Austin Schuh82529062021-12-08 12:09:52 -0800220 template <typename T>
221 void MaybeRemapLoggedChannel(std::string_view name,
222 const Node *node = nullptr) {
223 if (HasChannel<T>(name, node)) {
224 RemapLoggedChannel<T>(name, node);
225 }
226 }
227
James Kuszmaul4f106fb2021-01-05 20:53:02 -0800228 // Returns true if the channel exists on the node and was logged.
229 template <typename T>
230 bool HasLoggedChannel(std::string_view name, const Node *node = nullptr) {
Austin Schuh5ee56872021-01-30 16:53:34 -0800231 const Channel *channel =
232 configuration::GetChannel(logged_configuration(), name,
233 T::GetFullyQualifiedName(), "", node, true);
James Kuszmaul4f106fb2021-01-05 20:53:02 -0800234 if (channel == nullptr) return false;
235 return channel->logger() != LoggerConfig::NOT_LOGGED;
236 }
237
Austin Schuh1c227352021-09-17 12:53:54 -0700238 // Returns a list of all the original channels from remapping.
239 std::vector<const Channel *> RemappedChannels() const;
240
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800241 SimulatedEventLoopFactory *event_loop_factory() {
242 return event_loop_factory_;
243 }
244
Austin Schuh0ca51f32020-12-25 21:51:45 -0800245 std::string_view name() const { return log_files_[0].name; }
Austin Schuh0c297012020-09-16 18:41:59 -0700246
James Kuszmaul71a81932020-12-15 21:08:01 -0800247 // Set whether to exit the SimulatedEventLoopFactory when we finish reading
248 // the logfile.
249 void set_exit_on_finish(bool exit_on_finish) {
250 exit_on_finish_ = exit_on_finish;
251 }
252
James Kuszmaulb67409b2022-06-20 16:25:03 -0700253 // Sets the realtime replay rate. A value of 1.0 will cause the scheduler to
254 // try to play events in realtime. 0.5 will run at half speed. Use infinity
255 // (the default) to run as fast as possible. This can be changed during
256 // run-time.
257 // Only applies when running against a SimulatedEventLoopFactory.
258 void SetRealtimeReplayRate(double replay_rate);
259
Austin Schuhe309d2a2019-11-29 13:25:21 -0800260 private:
Austin Schuh58646e22021-08-23 23:51:46 -0700261 void Register(EventLoop *event_loop, const Node *node);
262
263 void RegisterDuringStartup(EventLoop *event_loop, const Node *node);
264
265 const Channel *RemapChannel(const EventLoop *event_loop, const Node *node,
Austin Schuh6f3babe2020-01-26 20:34:50 -0800266 const Channel *channel);
267
Austin Schuhe309d2a2019-11-29 13:25:21 -0800268 // Queues at least max_out_of_order_duration_ messages into channels_.
269 void QueueMessages();
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800270 // Handle constructing a configuration with all the additional remapped
271 // channels from calls to RemapLoggedChannel.
272 void MakeRemappedConfig();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800273
Austin Schuh2f8fd752020-09-01 22:38:28 -0700274 // Returns the number of nodes.
275 size_t nodes_count() const {
276 return !configuration::MultiNode(logged_configuration())
277 ? 1u
278 : logged_configuration()->nodes()->size();
279 }
280
Austin Schuh287d43d2020-12-04 20:19:33 -0800281 const std::vector<LogFile> log_files_;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800282
Austin Schuh969cd602021-01-03 00:09:45 -0800283 // Class to manage sending RemoteMessages on the provided node after the
284 // correct delay.
Austin Schuh5ee56872021-01-30 16:53:34 -0800285 class RemoteMessageSender {
Austin Schuh969cd602021-01-03 00:09:45 -0800286 public:
287 RemoteMessageSender(aos::Sender<message_bridge::RemoteMessage> sender,
288 EventLoop *event_loop);
289 RemoteMessageSender(RemoteMessageSender const &) = delete;
290 RemoteMessageSender &operator=(RemoteMessageSender const &) = delete;
291
292 // Sends the provided message. If monotonic_timestamp_time is min_time,
293 // send it immediately.
294 void Send(
295 FlatbufferDetachedBuffer<message_bridge::RemoteMessage> remote_message,
Austin Schuh58646e22021-08-23 23:51:46 -0700296 BootTimestamp monotonic_timestamp_time, size_t source_boot_count);
Austin Schuh969cd602021-01-03 00:09:45 -0800297
298 private:
299 // Handles actually sending the timestamp if we were delayed.
300 void SendTimestamp();
301 // Handles scheduling the timer to send at the correct time.
302 void ScheduleTimestamp();
303
304 EventLoop *event_loop_;
305 aos::Sender<message_bridge::RemoteMessage> sender_;
306 aos::TimerHandler *timer_;
307
308 // Time we are scheduled for, or min_time if we aren't scheduled.
309 monotonic_clock::time_point scheduled_time_ = monotonic_clock::min_time;
310
311 struct Timestamp {
312 Timestamp(FlatbufferDetachedBuffer<message_bridge::RemoteMessage>
313 new_remote_message,
314 monotonic_clock::time_point new_monotonic_timestamp_time)
315 : remote_message(std::move(new_remote_message)),
316 monotonic_timestamp_time(new_monotonic_timestamp_time) {}
317 FlatbufferDetachedBuffer<message_bridge::RemoteMessage> remote_message;
318 monotonic_clock::time_point monotonic_timestamp_time;
319 };
320
321 // List of messages to send. The timer works through them and then disables
322 // itself automatically.
323 std::deque<Timestamp> remote_timestamps_;
324 };
325
Austin Schuh6f3babe2020-01-26 20:34:50 -0800326 // State per node.
Austin Schuh858c9f32020-08-31 16:56:12 -0700327 class State {
328 public:
James Kuszmaula16a7912022-06-17 10:58:12 -0700329 // Whether we should spin up a separate thread for buffering up messages.
330 // Only allowed in realtime replay--see comments on threading_ member for
331 // details.
332 enum class ThreadedBuffering { kYes, kNo };
James Kuszmaul09632422022-05-25 15:56:19 -0700333 State(std::unique_ptr<TimestampMapper> timestamp_mapper,
334 message_bridge::MultiNodeNoncausalOffsetEstimator *multinode_filters,
James Kuszmaula16a7912022-06-17 10:58:12 -0700335 const Node *node, ThreadedBuffering threading);
Austin Schuh287d43d2020-12-04 20:19:33 -0800336
337 // Connects up the timestamp mappers.
338 void AddPeer(State *peer);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800339
Austin Schuhe639ea12021-01-25 13:00:22 -0800340 TimestampMapper *timestamp_mapper() { return timestamp_mapper_.get(); }
341
Austin Schuhdda74ec2021-01-03 19:30:37 -0800342 // Returns the next sorted message with all the timestamps extracted and
343 // matched.
344 TimestampedMessage PopOldest();
Austin Schuh188eabe2020-12-29 23:41:13 -0800345
Austin Schuh858c9f32020-08-31 16:56:12 -0700346 // Returns the monotonic time of the oldest message.
James Kuszmaula16a7912022-06-17 10:58:12 -0700347 BootTimestamp SingleThreadedOldestMessageTime();
348 // Returns the monotonic time of the oldest message, handling querying the
349 // separate thread of ThreadedBuffering was set.
350 BootTimestamp MultiThreadedOldestMessageTime();
Austin Schuh58646e22021-08-23 23:51:46 -0700351
352 size_t boot_count() const {
353 // If we are replaying directly into an event loop, we can't reboot. So
354 // we will stay stuck on the 0th boot.
James Kuszmaul09632422022-05-25 15:56:19 -0700355 if (!node_event_loop_factory_) {
356 if (event_loop_ == nullptr) {
357 // If boot_count is being checked after startup for any of the
358 // non-primary nodes, then returning 0 may not be accurate (since
359 // remote nodes *can* reboot even if the EventLoop being played to
360 // can't).
361 CHECK(!started_);
362 CHECK(!stopped_);
363 }
364 return 0u;
365 }
Austin Schuh58646e22021-08-23 23:51:46 -0700366 return node_event_loop_factory_->boot_count();
367 }
Austin Schuh858c9f32020-08-31 16:56:12 -0700368
369 // Primes the queues inside State. Should be called before calling
370 // OldestMessageTime.
371 void SeedSortedMessages();
Austin Schuh8bd96322020-02-13 21:18:22 -0800372
Austin Schuh58646e22021-08-23 23:51:46 -0700373 void SetupStartupTimer() {
374 const monotonic_clock::time_point start_time =
375 monotonic_start_time(boot_count());
376 if (start_time == monotonic_clock::min_time) {
377 LOG(ERROR)
378 << "No start time, skipping, please figure out when this happens";
Austin Schuhe33c08d2022-02-03 18:15:21 -0800379 NotifyLogfileStart();
Austin Schuh58646e22021-08-23 23:51:46 -0700380 return;
381 }
James Kuszmaul09632422022-05-25 15:56:19 -0700382 if (node_event_loop_factory_) {
383 CHECK_GE(start_time + clock_offset(), event_loop_->monotonic_now());
384 }
385 startup_timer_->Setup(start_time + clock_offset());
Austin Schuh58646e22021-08-23 23:51:46 -0700386 }
387
388 void set_startup_timer(TimerHandler *timer_handler) {
389 startup_timer_ = timer_handler;
390 if (startup_timer_) {
391 if (event_loop_->node() != nullptr) {
392 startup_timer_->set_name(absl::StrCat(
393 event_loop_->node()->name()->string_view(), "_startup"));
394 } else {
395 startup_timer_->set_name("startup");
396 }
397 }
398 }
399
Austin Schuh858c9f32020-08-31 16:56:12 -0700400 // Returns the starting time for this node.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700401 monotonic_clock::time_point monotonic_start_time(size_t boot_count) const {
402 return timestamp_mapper_
403 ? timestamp_mapper_->monotonic_start_time(boot_count)
404 : monotonic_clock::min_time;
Austin Schuh858c9f32020-08-31 16:56:12 -0700405 }
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700406 realtime_clock::time_point realtime_start_time(size_t boot_count) const {
407 return timestamp_mapper_
408 ? timestamp_mapper_->realtime_start_time(boot_count)
409 : realtime_clock::min_time;
Austin Schuh858c9f32020-08-31 16:56:12 -0700410 }
411
412 // Sets the node event loop factory for replaying into a
413 // SimulatedEventLoopFactory. Returns the EventLoop to use.
Austin Schuh60e77942022-05-16 17:48:24 -0700414 void SetNodeEventLoopFactory(NodeEventLoopFactory *node_event_loop_factory,
415 SimulatedEventLoopFactory *event_loop_factory);
Austin Schuh858c9f32020-08-31 16:56:12 -0700416
417 // Sets and gets the event loop to use.
418 void set_event_loop(EventLoop *event_loop) { event_loop_ = event_loop; }
419 EventLoop *event_loop() { return event_loop_; }
420
Austin Schuh58646e22021-08-23 23:51:46 -0700421 const Node *node() const { return node_; }
422
423 void Register(EventLoop *event_loop);
424
425 void OnStart(std::function<void()> fn);
426 void OnEnd(std::function<void()> fn);
427
Austin Schuh858c9f32020-08-31 16:56:12 -0700428 // Sets the current realtime offset from the monotonic clock for this node
429 // (if we are on a simulated event loop).
430 void SetRealtimeOffset(monotonic_clock::time_point monotonic_time,
431 realtime_clock::time_point realtime_time) {
432 if (node_event_loop_factory_ != nullptr) {
433 node_event_loop_factory_->SetRealtimeOffset(monotonic_time,
434 realtime_time);
435 }
436 }
437
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700438 // Returns the MessageHeader sender to log delivery timestamps to for the
439 // provided remote node.
Austin Schuh61e973f2021-02-21 21:43:56 -0800440 RemoteMessageSender *RemoteTimestampSender(const Channel *channel,
441 const Connection *connection);
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700442
Austin Schuh858c9f32020-08-31 16:56:12 -0700443 // Converts a timestamp from the monotonic clock on this node to the
444 // distributed clock.
445 distributed_clock::time_point ToDistributedClock(
446 monotonic_clock::time_point time) {
James Kuszmaul09632422022-05-25 15:56:19 -0700447 CHECK(node_event_loop_factory_);
Austin Schuh858c9f32020-08-31 16:56:12 -0700448 return node_event_loop_factory_->ToDistributedClock(time);
449 }
450
Austin Schuh858c9f32020-08-31 16:56:12 -0700451 // Returns the current time on the remote node which sends messages on
452 // channel_index.
Austin Schuh58646e22021-08-23 23:51:46 -0700453 BootTimestamp monotonic_remote_now(size_t channel_index) {
454 State *s = channel_source_state_[channel_index];
455 return BootTimestamp{
456 .boot = s->boot_count(),
457 .time = s->node_event_loop_factory_->monotonic_now()};
Austin Schuh858c9f32020-08-31 16:56:12 -0700458 }
459
Austin Schuh5ee56872021-01-30 16:53:34 -0800460 // Returns the start time of the remote for the provided channel.
461 monotonic_clock::time_point monotonic_remote_start_time(
Austin Schuh58646e22021-08-23 23:51:46 -0700462 size_t boot_count, size_t channel_index) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700463 return channel_source_state_[channel_index]->monotonic_start_time(
464 boot_count);
Austin Schuh5ee56872021-01-30 16:53:34 -0800465 }
466
Austin Schuh58646e22021-08-23 23:51:46 -0700467 void DestroyEventLoop() { event_loop_unique_ptr_.reset(); }
468
469 EventLoop *MakeEventLoop() {
470 CHECK(!event_loop_unique_ptr_);
James Kuszmaul890c2492022-04-06 14:59:31 -0700471 // TODO(james): Enable exclusive senders on LogReader to allow us to
472 // ensure we are remapping channels correctly.
473 event_loop_unique_ptr_ = node_event_loop_factory_->MakeEventLoop(
474 "log_reader", {NodeEventLoopFactory::CheckSentTooFast::kNo,
James Kuszmaul94ca5132022-07-19 09:11:08 -0700475 NodeEventLoopFactory::ExclusiveSenders::kYes,
476 NonExclusiveChannels()});
Austin Schuh58646e22021-08-23 23:51:46 -0700477 return event_loop_unique_ptr_.get();
478 }
479
Austin Schuh2f8fd752020-09-01 22:38:28 -0700480 distributed_clock::time_point RemoteToDistributedClock(
481 size_t channel_index, monotonic_clock::time_point time) {
James Kuszmaul09632422022-05-25 15:56:19 -0700482 CHECK(node_event_loop_factory_);
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700483 return channel_source_state_[channel_index]
484 ->node_event_loop_factory_->ToDistributedClock(time);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700485 }
486
487 const Node *remote_node(size_t channel_index) {
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700488 return channel_source_state_[channel_index]
489 ->node_event_loop_factory_->node();
Austin Schuh2f8fd752020-09-01 22:38:28 -0700490 }
491
Stephan Pleines559fa6c2022-01-06 17:23:51 -0800492 monotonic_clock::time_point monotonic_now() const {
James Kuszmaul09632422022-05-25 15:56:19 -0700493 return event_loop_->monotonic_now();
Austin Schuh2f8fd752020-09-01 22:38:28 -0700494 }
495
Austin Schuh858c9f32020-08-31 16:56:12 -0700496 // Sets the number of channels.
497 void SetChannelCount(size_t count);
498
499 // Sets the sender, filter, and target factory for a channel.
Austin Schuh969cd602021-01-03 00:09:45 -0800500 void SetChannel(size_t logged_channel_index, size_t factory_channel_index,
501 std::unique_ptr<RawSender> sender,
502 message_bridge::NoncausalOffsetEstimator *filter,
Austin Schuh58646e22021-08-23 23:51:46 -0700503 bool is_forwarded, State *source_state);
504
505 void SetRemoteTimestampSender(size_t logged_channel_index,
506 RemoteMessageSender *remote_timestamp_sender);
507
508 void RunOnStart();
509 void RunOnEnd();
Austin Schuh858c9f32020-08-31 16:56:12 -0700510
Austin Schuhe33c08d2022-02-03 18:15:21 -0800511 // Handles a logfile start event to potentially call the OnStart callbacks.
512 void NotifyLogfileStart();
513 // Handles a start time flag start event to potentially call the OnStart
514 // callbacks.
515 void NotifyFlagStart();
516
517 // Handles a logfile end event to potentially call the OnEnd callbacks.
518 void NotifyLogfileEnd();
519 // Handles a end time flag start event to potentially call the OnEnd
520 // callbacks.
521 void NotifyFlagEnd();
522
Austin Schuh858c9f32020-08-31 16:56:12 -0700523 // Unregisters everything so we can destory the event loop.
Austin Schuh58646e22021-08-23 23:51:46 -0700524 // TODO(austin): Is this needed? OnShutdown should be able to serve this
525 // need.
Austin Schuh858c9f32020-08-31 16:56:12 -0700526 void Deregister();
527
528 // Sets the current TimerHandle for the replay callback.
529 void set_timer_handler(TimerHandler *timer_handler) {
530 timer_handler_ = timer_handler;
Austin Schuh58646e22021-08-23 23:51:46 -0700531 if (timer_handler_) {
532 if (event_loop_->node() != nullptr) {
533 timer_handler_->set_name(absl::StrCat(
534 event_loop_->node()->name()->string_view(), "_main"));
535 } else {
536 timer_handler_->set_name("main");
537 }
538 }
Austin Schuh858c9f32020-08-31 16:56:12 -0700539 }
540
Austin Schuhe33c08d2022-02-03 18:15:21 -0800541 // Creates and registers the --start_time and --end_time event callbacks.
542 void SetStartTimeFlag(realtime_clock::time_point start_time);
543 void SetEndTimeFlag(realtime_clock::time_point end_time);
544
545 // Notices the next message to update the start/end time callbacks.
546 void ObserveNextMessage(monotonic_clock::time_point monotonic_event,
547 realtime_clock::time_point realtime_event);
548
549 // Clears the start and end time flag handlers so we can delete the event
550 // loop.
551 void ClearTimeFlags();
552
Austin Schuh858c9f32020-08-31 16:56:12 -0700553 // Sets the next wakeup time on the replay callback.
554 void Setup(monotonic_clock::time_point next_time) {
James Kuszmaul8866e642022-06-10 16:00:36 -0700555 timer_handler_->Setup(
556 std::max(monotonic_now(), next_time + clock_offset()));
Austin Schuh858c9f32020-08-31 16:56:12 -0700557 }
558
559 // Sends a buffer on the provided channel index.
Austin Schuh287d43d2020-12-04 20:19:33 -0800560 bool Send(const TimestampedMessage &timestamped_message);
Austin Schuh858c9f32020-08-31 16:56:12 -0700561
James Kuszmaulc3f34d12022-08-15 15:57:55 -0700562 void MaybeSetClockOffset();
James Kuszmaul09632422022-05-25 15:56:19 -0700563 std::chrono::nanoseconds clock_offset() const { return clock_offset_; }
564
Austin Schuh858c9f32020-08-31 16:56:12 -0700565 // Returns a debug string for the channel merger.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700566 std::string DebugString() const {
Austin Schuh287d43d2020-12-04 20:19:33 -0800567 if (!timestamp_mapper_) {
Austin Schuhe639ea12021-01-25 13:00:22 -0800568 return "";
Austin Schuh287d43d2020-12-04 20:19:33 -0800569 }
Austin Schuhe639ea12021-01-25 13:00:22 -0800570 return timestamp_mapper_->DebugString();
Austin Schuh2f8fd752020-09-01 22:38:28 -0700571 }
Austin Schuh858c9f32020-08-31 16:56:12 -0700572
Austin Schuh58646e22021-08-23 23:51:46 -0700573 void ClearRemoteTimestampSenders() {
574 channel_timestamp_loggers_.clear();
575 timestamp_loggers_.clear();
576 }
577
Austin Schuhbd5f74a2021-11-11 20:55:38 -0800578 void SetFoundLastMessage(bool val) {
579 found_last_message_ = val;
580 last_message_.resize(factory_channel_index_.size(), false);
581 }
582 bool found_last_message() const { return found_last_message_; }
583
584 void set_last_message(size_t channel_index) {
585 CHECK_LT(channel_index, last_message_.size());
586 last_message_[channel_index] = true;
587 }
588
589 bool last_message(size_t channel_index) {
590 CHECK_LT(channel_index, last_message_.size());
591 return last_message_[channel_index];
592 }
593
James Kuszmaula16a7912022-06-17 10:58:12 -0700594 void set_timing_accuracy_sender(
595 aos::Sender<timing::ReplayTiming> timing_sender) {
596 timing_statistics_sender_ = std::move(timing_sender);
597 OnEnd([this]() { SendMessageTimings(); });
598 }
599
600 // If running with ThreadedBuffering::kYes, will start the processing thread
601 // and queue up messages until the specified time. No-op of
602 // ThreadedBuffering::kNo is set. Should only be called once.
603 void QueueThreadUntil(BootTimestamp time);
604
Austin Schuh858c9f32020-08-31 16:56:12 -0700605 private:
James Kuszmaulc3f34d12022-08-15 15:57:55 -0700606 void TrackMessageSendTiming(const RawSender &sender,
607 monotonic_clock::time_point expected_send_time);
James Kuszmaula16a7912022-06-17 10:58:12 -0700608 void SendMessageTimings();
Austin Schuh858c9f32020-08-31 16:56:12 -0700609 // Log file.
Austin Schuh287d43d2020-12-04 20:19:33 -0800610 std::unique_ptr<TimestampMapper> timestamp_mapper_;
Austin Schuh858c9f32020-08-31 16:56:12 -0700611
Austin Schuh858c9f32020-08-31 16:56:12 -0700612 // Senders.
613 std::vector<std::unique_ptr<RawSender>> channels_;
Austin Schuh969cd602021-01-03 00:09:45 -0800614 std::vector<RemoteMessageSender *> remote_timestamp_senders_;
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700615 // The mapping from logged channel index to sent channel index. Needed for
616 // sending out MessageHeaders.
617 std::vector<int> factory_channel_index_;
618
Austin Schuh9942bae2021-01-07 22:06:44 -0800619 struct ContiguousSentTimestamp {
620 // Most timestamps make it through the network, so it saves a ton of
621 // memory and CPU to store the start and end, and search for valid ranges.
622 // For one of the logs I looked at, we had 2 ranges for 4 days.
623 //
624 // Save monotonic times as well to help if a queue index ever wraps. Odds
625 // are very low, but doesn't hurt.
626 //
627 // The starting time and matching queue index.
628 monotonic_clock::time_point starting_monotonic_event_time =
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700629 monotonic_clock::min_time;
Austin Schuh9942bae2021-01-07 22:06:44 -0800630 uint32_t starting_queue_index = 0xffffffff;
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700631
Austin Schuh9942bae2021-01-07 22:06:44 -0800632 // Ending time and queue index.
633 monotonic_clock::time_point ending_monotonic_event_time =
634 monotonic_clock::max_time;
635 uint32_t ending_queue_index = 0xffffffff;
636
637 // The queue index that the first message was *actually* sent with. The
638 // queue indices are assumed to be contiguous through this range.
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700639 uint32_t actual_queue_index = 0xffffffff;
640 };
641
James Kuszmaul94ca5132022-07-19 09:11:08 -0700642 // Returns a list of channels which LogReader will send on but which may
643 // *also* get sent on by other applications in replay.
644 std::vector<
645 std::pair<const aos::Channel *, NodeEventLoopFactory::ExclusiveSenders>>
646 NonExclusiveChannels();
647
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700648 // Stores all the timestamps that have been sent on this channel. This is
649 // only done for channels which are forwarded and on the node which
Austin Schuh9942bae2021-01-07 22:06:44 -0800650 // initially sends the message. Compress using ranges and offsets.
651 std::vector<std::unique_ptr<std::vector<ContiguousSentTimestamp>>>
652 queue_index_map_;
Austin Schuh858c9f32020-08-31 16:56:12 -0700653
654 // Factory (if we are in sim) that this loop was created on.
655 NodeEventLoopFactory *node_event_loop_factory_ = nullptr;
Austin Schuhe33c08d2022-02-03 18:15:21 -0800656 SimulatedEventLoopFactory *event_loop_factory_ = nullptr;
657
Austin Schuh858c9f32020-08-31 16:56:12 -0700658 std::unique_ptr<EventLoop> event_loop_unique_ptr_;
659 // Event loop.
Austin Schuh58646e22021-08-23 23:51:46 -0700660 const Node *node_ = nullptr;
Austin Schuh858c9f32020-08-31 16:56:12 -0700661 EventLoop *event_loop_ = nullptr;
662 // And timer used to send messages.
Austin Schuh58646e22021-08-23 23:51:46 -0700663 TimerHandler *timer_handler_ = nullptr;
664 TimerHandler *startup_timer_ = nullptr;
Austin Schuh858c9f32020-08-31 16:56:12 -0700665
Austin Schuhe33c08d2022-02-03 18:15:21 -0800666 std::unique_ptr<EventNotifier> start_event_notifier_;
667 std::unique_ptr<EventNotifier> end_event_notifier_;
668
Austin Schuh8bd96322020-02-13 21:18:22 -0800669 // Filters (or nullptr if it isn't a forwarded channel) for each channel.
670 // This corresponds to the object which is shared among all the channels
671 // going between 2 nodes. The second element in the tuple indicates if this
672 // is the primary direction or not.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700673 std::vector<message_bridge::NoncausalOffsetEstimator *> filters_;
James Kuszmaul09632422022-05-25 15:56:19 -0700674 message_bridge::MultiNodeNoncausalOffsetEstimator *multinode_filters_;
Austin Schuh8bd96322020-02-13 21:18:22 -0800675
676 // List of NodeEventLoopFactorys (or nullptr if it isn't a forwarded
677 // channel) which correspond to the originating node.
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700678 std::vector<State *> channel_source_state_;
679
Austin Schuh61e973f2021-02-21 21:43:56 -0800680 // This is a cache for channel, connection mapping to the corresponding
681 // sender.
682 absl::btree_map<std::pair<const Channel *, const Connection *>,
683 std::shared_ptr<RemoteMessageSender>>
684 channel_timestamp_loggers_;
685
686 // Mapping from resolved RemoteMessage channel to RemoteMessage sender. This
687 // is the channel that timestamps are published to.
688 absl::btree_map<const Channel *, std::shared_ptr<RemoteMessageSender>>
689 timestamp_loggers_;
Austin Schuh58646e22021-08-23 23:51:46 -0700690
James Kuszmaul09632422022-05-25 15:56:19 -0700691 // Time offset between the log's monotonic clock and the current event
692 // loop's monotonic clock. Useful when replaying logs with non-simulated
693 // event loops.
694 std::chrono::nanoseconds clock_offset_{0};
695
Austin Schuh58646e22021-08-23 23:51:46 -0700696 std::vector<std::function<void()>> on_starts_;
697 std::vector<std::function<void()>> on_ends_;
698
James Kuszmaula16a7912022-06-17 10:58:12 -0700699 std::atomic<bool> stopped_ = false;
700 std::atomic<bool> started_ = false;
Austin Schuhbd5f74a2021-11-11 20:55:38 -0800701
702 bool found_last_message_ = false;
703 std::vector<bool> last_message_;
James Kuszmaula16a7912022-06-17 10:58:12 -0700704
705 std::vector<timing::MessageTimingT> send_timings_;
706 aos::Sender<timing::ReplayTiming> timing_statistics_sender_;
707
708 // Protects access to any internal state after Run() is called. Designed
709 // assuming that only one node is actually executing in replay.
710 // Threading design:
711 // * The worker passed to message_queuer_ has full ownership over all
712 // the log-reading code, timestamp filters, last_queued_message_, etc.
713 // * The main thread should only have exclusive access to the replay
714 // event loop and associated features (mainly senders).
715 // It will pop an item out of the queue (which does maintain a shared_ptr
716 // reference which may also be being used by the message_queuer_ thread,
717 // but having shared_ptr's accessing the same memory from
718 // separate threads is permissible).
719 // Enabling this in simulation is currently infeasible due to a lack of
720 // synchronization in the MultiNodeNoncausalOffsetEstimator. Essentially,
721 // when the message_queuer_ thread attempts to read/pop messages from the
722 // timestamp_mapper_, it will end up calling callbacks that update the
723 // internal state of the MultiNodeNoncausalOffsetEstimator. Simultaneously,
724 // the event scheduler that is running in the main thread to orchestrate the
725 // simulation will be querying the estimator to know what the clocks on the
726 // various nodes are at, leading to potential issues.
727 ThreadedBuffering threading_;
728 std::optional<BootTimestamp> last_queued_message_;
729 std::optional<util::ThreadedQueue<TimestampedMessage, BootTimestamp>>
730 message_queuer_;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800731 };
732
Austin Schuh8bd96322020-02-13 21:18:22 -0800733 // Node index -> State.
734 std::vector<std::unique_ptr<State>> states_;
735
736 // Creates the requested filter if it doesn't exist, regardless of whether
737 // these nodes can actually communicate directly. The second return value
738 // reports if this is the primary direction or not.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700739 message_bridge::NoncausalOffsetEstimator *GetFilter(const Node *node_a,
740 const Node *node_b);
Austin Schuh8bd96322020-02-13 21:18:22 -0800741
Austin Schuh8bd96322020-02-13 21:18:22 -0800742 // List of filters for a connection. The pointer to the first node will be
743 // less than the second node.
Austin Schuh0ca1fd32020-12-18 22:53:05 -0800744 std::unique_ptr<message_bridge::MultiNodeNoncausalOffsetEstimator> filters_;
Austin Schuh8bd96322020-02-13 21:18:22 -0800745
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800746 std::unique_ptr<FlatbufferDetachedBuffer<Configuration>>
747 remapped_configuration_buffer_;
748
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800749 std::unique_ptr<SimulatedEventLoopFactory> event_loop_factory_unique_ptr_;
750 SimulatedEventLoopFactory *event_loop_factory_ = nullptr;
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800751
752 // Map of channel indices to new name. The channel index will be an index into
753 // logged_configuration(), and the string key will be the name of the channel
754 // to send on instead of the logged channel name.
Austin Schuh0de30f32020-12-06 12:44:28 -0800755 struct RemappedChannel {
756 std::string remapped_name;
757 std::string new_type;
758 };
759 std::map<size_t, RemappedChannel> remapped_channels_;
Austin Schuh01b4c352020-09-21 23:09:39 -0700760 std::vector<MapT> maps_;
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800761
Austin Schuh6f3babe2020-01-26 20:34:50 -0800762 // Number of nodes which still have data to send. This is used to figure out
763 // when to exit.
764 size_t live_nodes_ = 0;
765
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800766 const Configuration *remapped_configuration_ = nullptr;
767 const Configuration *replay_configuration_ = nullptr;
Austin Schuhcde938c2020-02-02 17:30:07 -0800768
769 // If true, the replay timer will ignore any missing data. This is used
770 // during startup when we are bootstrapping everything and trying to get to
771 // the start of all the log files.
772 bool ignore_missing_data_ = false;
James Kuszmaul71a81932020-12-15 21:08:01 -0800773
774 // Whether to exit the SimulatedEventLoop when we finish reading the logs.
775 bool exit_on_finish_ = true;
Austin Schuhe33c08d2022-02-03 18:15:21 -0800776
777 realtime_clock::time_point start_time_ = realtime_clock::min_time;
778 realtime_clock::time_point end_time_ = realtime_clock::max_time;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800779};
780
781} // namespace logger
782} // namespace aos
783
Austin Schuhb06f03b2021-02-17 22:00:37 -0800784#endif // AOS_EVENTS_LOGGING_LOG_READER_H_