blob: 3bdfca4c0d1f601bc60ca55a15b0607ae488243f [file] [log] [blame]
Austin Schuhb06f03b2021-02-17 22:00:37 -08001#ifndef AOS_EVENTS_LOGGING_LOG_READER_H_
2#define AOS_EVENTS_LOGGING_LOG_READER_H_
Austin Schuhe309d2a2019-11-29 13:25:21 -08003
Austin Schuh8bd96322020-02-13 21:18:22 -08004#include <chrono>
Austin Schuhe309d2a2019-11-29 13:25:21 -08005#include <deque>
James Kuszmaula16a7912022-06-17 10:58:12 -07006#include <queue>
James Kuszmaulc3f34d12022-08-15 15:57:55 -07007#include <string_view>
Austin Schuh2f8fd752020-09-01 22:38:28 -07008#include <tuple>
Austin Schuh6f3babe2020-01-26 20:34:50 -08009#include <vector>
Austin Schuhe309d2a2019-11-29 13:25:21 -080010
Philipp Schrader790cb542023-07-05 21:06:52 -070011#include "flatbuffers/flatbuffers.h"
12
James Kuszmaulc3f34d12022-08-15 15:57:55 -070013#include "aos/condition.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080014#include "aos/events/event_loop.h"
Austin Schuhf6f9bf32020-10-11 14:37:43 -070015#include "aos/events/logging/logfile_sorting.h"
Austin Schuha36c8902019-12-30 18:07:15 -080016#include "aos/events/logging/logfile_utils.h"
James Kuszmaul38735e82019-12-07 16:42:06 -080017#include "aos/events/logging/logger_generated.h"
James Kuszmaula16a7912022-06-17 10:58:12 -070018#include "aos/events/logging/replay_timing_generated.h"
James Kuszmaul09632422022-05-25 15:56:19 -070019#include "aos/events/shm_event_loop.h"
Austin Schuh92547522019-12-28 14:33:43 -080020#include "aos/events/simulated_event_loop.h"
James Kuszmaulc3f34d12022-08-15 15:57:55 -070021#include "aos/mutex/mutex.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -070022#include "aos/network/message_bridge_server_generated.h"
Austin Schuh0ca1fd32020-12-18 22:53:05 -080023#include "aos/network/multinode_timestamp_filter.h"
Austin Schuh0de30f32020-12-06 12:44:28 -080024#include "aos/network/remote_message_generated.h"
Austin Schuh8bd96322020-02-13 21:18:22 -080025#include "aos/network/timestamp_filter.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080026#include "aos/time/time.h"
James Kuszmaula16a7912022-06-17 10:58:12 -070027#include "aos/util/threaded_queue.h"
James Kuszmaulc3f34d12022-08-15 15:57:55 -070028#include "aos/uuid.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080029
30namespace aos {
31namespace logger {
32
Austin Schuhe33c08d2022-02-03 18:15:21 -080033class EventNotifier;
34
Eric Schmiedebergb38477e2022-12-02 16:08:04 -070035// Vector of pair of name and type of the channel
Sanjay Narayanan5ec00232022-07-08 15:21:30 -070036using ReplayChannels = std::vector<std::pair<std::string, std::string>>;
Eric Schmiedebergb38477e2022-12-02 16:08:04 -070037// Vector of channel indices
Naman Guptacf6d4422023-03-01 11:41:00 -080038using ReplayChannelIndices = std::vector<size_t>;
Eric Schmiedebergb38477e2022-12-02 16:08:04 -070039
Austin Schuh6f3babe2020-01-26 20:34:50 -080040// We end up with one of the following 3 log file types.
41//
42// Single node logged as the source node.
43// -> Replayed just on the source node.
44//
45// Forwarding timestamps only logged from the perspective of the destination
46// node.
47// -> Matched with data on source node and logged.
48//
49// Forwarding timestamps with data logged as the destination node.
50// -> Replayed just as the destination
51// -> Replayed as the source (Much harder, ordering is not defined)
52//
53// Duplicate data logged. -> CHECK that it matches and explode otherwise.
54//
55// This can be boiled down to a set of constraints and tools.
56//
57// 1) Forwarding timestamps and data need to be logged separately.
58// 2) Any forwarded data logged on the destination node needs to be logged
59// separately such that it can be sorted.
60//
61// 1) Log reader needs to be able to sort a list of log files.
62// 2) Log reader needs to be able to merge sorted lists of log files.
63// 3) Log reader needs to be able to match timestamps with messages.
64//
65// We also need to be able to generate multiple views of a log file depending on
66// the target.
James Kuszmaul298b4a22023-06-28 20:01:03 -070067//
68// In general, we aim to guarantee that if you are using the LogReader
69// "normally" you should be able to observe all the messages that existed on the
70// live system between the start time and the end of the logfile, and that
71// CHECK-failures will be generated if the LogReader cannot satisfy that
72// guarantee. There are currently a few deliberate exceptions to this:
73// * Any channel marked NOT_LOGGED in the configuration is known not to
74// have been logged and thus will be silently absent in log replay.
75// * If an incomplete set of log files is provided to the reader (e.g.,
76// only logs logged on a single node on a multi-node system), then
77// any *individual* channel as observed on a given node will be
78// consistent, but similarly to a NOT_LOGGED channel, some data may
79// not be available.
80// * At the end of a log, data for some channels/nodes may end before
81// others; during this time period, you may observe silently dropped
82// messages. This will be most obvious on uncleanly terminated logs or
83// when merging logfiles across nodes (as the logs on different nodes
84// will not finish at identical times).
Austin Schuh6f3babe2020-01-26 20:34:50 -080085
Austin Schuhe309d2a2019-11-29 13:25:21 -080086// Replays all the channels in the logfile to the event loop.
87class LogReader {
88 public:
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -080089 // If you want to supply a new configuration that will be used for replay
90 // (e.g., to change message rates, or to populate an updated schema), then
91 // pass it in here. It must provide all the channels that the original logged
92 // config did.
Austin Schuh6f3babe2020-01-26 20:34:50 -080093 //
Eric Schmiedebergb38477e2022-12-02 16:08:04 -070094 // If certain messages should not be replayed, the replay_channels param can
95 // be used as an inclusive list of channels for messages to be replayed.
96 //
Austin Schuh287d43d2020-12-04 20:19:33 -080097 // The single file constructor calls SortParts internally.
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -080098 LogReader(std::string_view filename,
Eric Schmiedebergb38477e2022-12-02 16:08:04 -070099 const Configuration *replay_configuration = nullptr,
100 const ReplayChannels *replay_channels = nullptr);
Austin Schuh287d43d2020-12-04 20:19:33 -0800101 LogReader(std::vector<LogFile> log_files,
Eric Schmiedebergb38477e2022-12-02 16:08:04 -0700102 const Configuration *replay_configuration = nullptr,
103 const ReplayChannels *replay_channels = nullptr);
James Kuszmaul7daef362019-12-31 18:28:17 -0800104 ~LogReader();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800105
Austin Schuh6331ef92020-01-07 18:28:09 -0800106 // Registers all the callbacks to send the log file data out on an event loop
107 // created in event_loop_factory. This also updates time to be at the start
108 // of the log file by running until the log file starts.
109 // Note: the configuration used in the factory should be configuration()
110 // below, but can be anything as long as the locations needed to send
111 // everything are available.
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800112 void Register(SimulatedEventLoopFactory *event_loop_factory);
Austin Schuhe33c08d2022-02-03 18:15:21 -0800113
Austin Schuh58646e22021-08-23 23:51:46 -0700114 // Registers all the callbacks to send the log file data out to an event loop
115 // factory. This does not start replaying or change the current distributed
116 // time of the factory. It does change the monotonic clocks to be right.
117 void RegisterWithoutStarting(SimulatedEventLoopFactory *event_loop_factory);
Austin Schuhe33c08d2022-02-03 18:15:21 -0800118 // Runs the log until the last start time. Register above is defined as:
119 // Register(...) {
120 // RegisterWithoutStarting
121 // StartAfterRegister
122 // }
123 // This should generally be considered as a stepping stone to convert from
124 // Register() to RegisterWithoutStarting() incrementally.
125 void StartAfterRegister(SimulatedEventLoopFactory *event_loop_factory);
126
Austin Schuh6331ef92020-01-07 18:28:09 -0800127 // Creates an SimulatedEventLoopFactory accessible via event_loop_factory(),
128 // and then calls Register.
129 void Register();
James Kuszmaul09632422022-05-25 15:56:19 -0700130
Austin Schuh6331ef92020-01-07 18:28:09 -0800131 // Registers callbacks for all the events after the log file starts. This is
132 // only useful when replaying live.
Austin Schuhe309d2a2019-11-29 13:25:21 -0800133 void Register(EventLoop *event_loop);
Austin Schuh6331ef92020-01-07 18:28:09 -0800134
James Kuszmaula16a7912022-06-17 10:58:12 -0700135 // Sets a sender that should be used for tracking timing statistics. If not
136 // set, no statistics will be recorded.
137 void set_timing_accuracy_sender(
138 const Node *node, aos::Sender<timing::ReplayTiming> timing_sender) {
139 states_[configuration::GetNodeIndex(configuration(), node)]
140 ->set_timing_accuracy_sender(std::move(timing_sender));
141 }
142
Austin Schuh58646e22021-08-23 23:51:46 -0700143 // Called whenever a log file starts for a node.
James Kuszmaul82c3b512023-07-08 20:25:41 -0700144 // More precisely, this will be called on each boot at max of
145 // (realtime_start_time in the logfiles, SetStartTime()). If a given boot
146 // occurs entirely before the realtime_start_time, the OnStart handler will
147 // never get called for that boot.
148 //
149 // realtime_start_time is defined below, but/ essentially is the time at which
150 // message channels will start being internall consistent on a given node
151 // (i.e., when the logger started). Note: If you wish to see a watcher
152 // triggered for *every* message in a log, OnStart() will not be
153 // sufficient--messages (possibly multiple messages) may be present on
154 // channels prior to the start time. If attempting to do this, prefer to use
155 // NodeEventLoopFactory::OnStart.
Austin Schuh58646e22021-08-23 23:51:46 -0700156 void OnStart(std::function<void()> fn);
157 void OnStart(const Node *node, std::function<void()> fn);
James Kuszmaul82c3b512023-07-08 20:25:41 -0700158 // Called whenever a log file ends for a node on a given boot, or at the
159 // realtime_end_time specified by a flag or SetEndTime().
160 //
161 // A log file "ends" when there are no more messages to be replayed for that
162 // boot.
163 //
164 // If OnStart() is not called for a given boot, the OnEnd() handlers will not
165 // be called either. OnEnd() handlers will not be called if the logfile for a
166 // given boot has missing data that causes us to terminate replay early.
Austin Schuh58646e22021-08-23 23:51:46 -0700167 void OnEnd(std::function<void()> fn);
168 void OnEnd(const Node *node, std::function<void()> fn);
169
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800170 // Unregisters the senders. You only need to call this if you separately
171 // supplied an event loop or event loop factory and the lifetimes are such
172 // that they need to be explicitly destroyed before the LogReader destructor
173 // gets called.
Austin Schuhe309d2a2019-11-29 13:25:21 -0800174 void Deregister();
175
Austin Schuh0c297012020-09-16 18:41:59 -0700176 // Returns the configuration being used for replay from the log file.
177 // Note that this may be different from the configuration actually used for
178 // handling events. You should generally only use this to create a
179 // SimulatedEventLoopFactory, and then get the configuration from there for
180 // everything else.
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800181 const Configuration *logged_configuration() const;
Austin Schuh11d43732020-09-21 17:28:30 -0700182 // Returns the configuration being used for replay from the log file.
183 // Note that this may be different from the configuration actually used for
184 // handling events. You should generally only use this to create a
185 // SimulatedEventLoopFactory, and then get the configuration from there for
186 // everything else.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800187 // The pointer is invalidated whenever RemapLoggedChannel is called.
Austin Schuh15649d62019-12-28 16:36:38 -0800188 const Configuration *configuration() const;
189
Austin Schuh6f3babe2020-01-26 20:34:50 -0800190 // Returns the nodes that this log file was created on. This is a list of
Austin Schuh07676622021-01-21 18:59:17 -0800191 // pointers to a node in the nodes() list inside logged_configuration().
192 std::vector<const Node *> LoggedNodes() const;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800193
194 // Returns the starting timestamp for the log file.
James Kuszmaul298b4a22023-06-28 20:01:03 -0700195 // All logged channels for the specified node should be entirely available
196 // after the specified time (i.e., any message that was available on the node
197 // in question after the monotonic start time but before the logs end and
198 // whose channel is present in any of the provided logs will either be
199 // available in the log or will result in an internal CHECK-failure of the
200 // LogReader if it would be skipped).
Austin Schuh11d43732020-09-21 17:28:30 -0700201 monotonic_clock::time_point monotonic_start_time(
202 const Node *node = nullptr) const;
203 realtime_clock::time_point realtime_start_time(
204 const Node *node = nullptr) const;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800205
Austin Schuhe33c08d2022-02-03 18:15:21 -0800206 // Sets the start and end times to replay data until for all nodes. This
207 // overrides the --start_time and --end_time flags. The default is to replay
208 // all data.
209 void SetStartTime(std::string start_time);
210 void SetStartTime(realtime_clock::time_point start_time);
211 void SetEndTime(std::string end_time);
212 void SetEndTime(realtime_clock::time_point end_time);
213
James Kuszmaul53da7f32022-09-11 11:11:55 -0700214 // Enum to use for indicating how RemapLoggedChannel behaves when there is
215 // already a channel with the remapped name (e.g., as may happen when
216 // replaying a logfile that was itself generated from replay).
217 enum class RemapConflict {
218 // LOG(FATAL) on conflicts in remappings.
219 kDisallow,
220 // If we run into a conflict, attempt to remap the channel we would be
221 // overriding (and continue to do so if remapping *that* channel also
222 // generates a conflict).
223 // This will mean that if we repeatedly replay a log, we will end up
224 // stacking more and more /original's on the start of the oldest version
225 // of the channels.
226 kCascade
227 };
228
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800229 // Causes the logger to publish the provided channel on a different name so
230 // that replayed applications can publish on the proper channel name without
231 // interference. This operates on raw channel names, without any node or
232 // application specific mappings.
James Kuszmaul53da7f32022-09-11 11:11:55 -0700233 void RemapLoggedChannel(
234 std::string_view name, std::string_view type,
235 std::string_view add_prefix = "/original", std::string_view new_type = "",
236 RemapConflict conflict_handling = RemapConflict::kCascade);
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800237 template <typename T>
James Kuszmaul53da7f32022-09-11 11:11:55 -0700238 void RemapLoggedChannel(
239 std::string_view name, std::string_view add_prefix = "/original",
240 std::string_view new_type = "",
241 RemapConflict conflict_handling = RemapConflict::kCascade) {
242 RemapLoggedChannel(name, T::GetFullyQualifiedName(), add_prefix, new_type,
243 conflict_handling);
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800244 }
Austin Schuh01b4c352020-09-21 23:09:39 -0700245 // Remaps the provided channel, though this respects node mappings, and
246 // preserves them too. This makes it so if /aos -> /pi1/aos on one node,
247 // /original/aos -> /original/pi1/aos on the same node after renaming, just
Austin Schuh0de30f32020-12-06 12:44:28 -0800248 // like you would hope. If new_type is not empty, the new channel will use
249 // the provided type instead. This allows for renaming messages.
Austin Schuh01b4c352020-09-21 23:09:39 -0700250 //
251 // TODO(austin): If you have 2 nodes remapping something to the same channel,
252 // this doesn't handle that. No use cases exist yet for that, so it isn't
253 // being done yet.
James Kuszmaul53da7f32022-09-11 11:11:55 -0700254 void RemapLoggedChannel(
255 std::string_view name, std::string_view type, const Node *node,
256 std::string_view add_prefix = "/original", std::string_view new_type = "",
257 RemapConflict conflict_handling = RemapConflict::kCascade);
Brian Silvermande9f3ff2020-04-28 16:56:58 -0700258 template <typename T>
James Kuszmaul53da7f32022-09-11 11:11:55 -0700259 void RemapLoggedChannel(
260 std::string_view name, const Node *node,
261 std::string_view add_prefix = "/original", std::string_view new_type = "",
262 RemapConflict conflict_handling = RemapConflict::kCascade) {
Austin Schuh0de30f32020-12-06 12:44:28 -0800263 RemapLoggedChannel(name, T::GetFullyQualifiedName(), node, add_prefix,
James Kuszmaul53da7f32022-09-11 11:11:55 -0700264 new_type, conflict_handling);
Austin Schuh01b4c352020-09-21 23:09:39 -0700265 }
266
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700267 // Similar to RemapLoggedChannel(), but lets you specify a name for the new
268 // channel without constraints. This is useful when an application has been
269 // updated to use new channels but you want to support replaying old logs. By
270 // default, this will not add any maps for the new channel. Use add_maps to
271 // specify any maps you'd like added.
272 void RenameLoggedChannel(std::string_view name, std::string_view type,
273 std::string_view new_name,
274 const std::vector<MapT> &add_maps = {});
275 template <typename T>
276 void RenameLoggedChannel(std::string_view name, std::string_view new_name,
277 const std::vector<MapT> &add_maps = {}) {
278 RenameLoggedChannel(name, T::GetFullyQualifiedName(), new_name, add_maps);
279 }
280 // The following overloads are more suitable for multi-node configurations,
281 // and let you rename a channel on a specific node.
282 void RenameLoggedChannel(std::string_view name, std::string_view type,
283 const Node *node, std::string_view new_name,
284 const std::vector<MapT> &add_maps = {});
285 template <typename T>
286 void RenameLoggedChannel(std::string_view name, const Node *node,
287 std::string_view new_name,
288 const std::vector<MapT> &add_maps = {}) {
289 RenameLoggedChannel(name, T::GetFullyQualifiedName(), node, new_name,
290 add_maps);
291 }
292
Austin Schuh01b4c352020-09-21 23:09:39 -0700293 template <typename T>
294 bool HasChannel(std::string_view name, const Node *node = nullptr) {
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700295 return HasChannel(name, T::GetFullyQualifiedName(), node);
296 }
297 bool HasChannel(std::string_view name, std::string_view type,
298 const Node *node) {
299 return configuration::GetChannel(logged_configuration(), name, type, "",
300 node, true) != nullptr;
Brian Silvermande9f3ff2020-04-28 16:56:58 -0700301 }
302
Austin Schuh82529062021-12-08 12:09:52 -0800303 template <typename T>
304 void MaybeRemapLoggedChannel(std::string_view name,
305 const Node *node = nullptr) {
306 if (HasChannel<T>(name, node)) {
307 RemapLoggedChannel<T>(name, node);
308 }
309 }
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700310 template <typename T>
311 void MaybeRenameLoggedChannel(std::string_view name, const Node *node,
312 std::string_view new_name,
313 const std::vector<MapT> &add_maps = {}) {
314 if (HasChannel<T>(name, node)) {
315 RenameLoggedChannel<T>(name, node, new_name, add_maps);
316 }
317 }
Austin Schuh82529062021-12-08 12:09:52 -0800318
James Kuszmaul4f106fb2021-01-05 20:53:02 -0800319 // Returns true if the channel exists on the node and was logged.
320 template <typename T>
321 bool HasLoggedChannel(std::string_view name, const Node *node = nullptr) {
Austin Schuh5ee56872021-01-30 16:53:34 -0800322 const Channel *channel =
323 configuration::GetChannel(logged_configuration(), name,
324 T::GetFullyQualifiedName(), "", node, true);
James Kuszmaul4f106fb2021-01-05 20:53:02 -0800325 if (channel == nullptr) return false;
326 return channel->logger() != LoggerConfig::NOT_LOGGED;
327 }
328
Austin Schuh1c227352021-09-17 12:53:54 -0700329 // Returns a list of all the original channels from remapping.
330 std::vector<const Channel *> RemappedChannels() const;
331
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800332 SimulatedEventLoopFactory *event_loop_factory() {
333 return event_loop_factory_;
334 }
335
Austin Schuh0ca51f32020-12-25 21:51:45 -0800336 std::string_view name() const { return log_files_[0].name; }
Austin Schuh84dd1332023-05-03 13:09:47 -0700337 std::string_view log_event_uuid() const {
338 return log_files_[0].log_event_uuid;
339 }
Austin Schuh0c297012020-09-16 18:41:59 -0700340
James Kuszmaul71a81932020-12-15 21:08:01 -0800341 // Set whether to exit the SimulatedEventLoopFactory when we finish reading
342 // the logfile.
343 void set_exit_on_finish(bool exit_on_finish) {
344 exit_on_finish_ = exit_on_finish;
345 }
James Kuszmaulb11a1502022-07-01 16:02:25 -0700346 bool exit_on_finish() const { return exit_on_finish_; }
James Kuszmaul71a81932020-12-15 21:08:01 -0800347
James Kuszmaulb67409b2022-06-20 16:25:03 -0700348 // Sets the realtime replay rate. A value of 1.0 will cause the scheduler to
349 // try to play events in realtime. 0.5 will run at half speed. Use infinity
350 // (the default) to run as fast as possible. This can be changed during
351 // run-time.
352 // Only applies when running against a SimulatedEventLoopFactory.
353 void SetRealtimeReplayRate(double replay_rate);
354
Austin Schuhe309d2a2019-11-29 13:25:21 -0800355 private:
Austin Schuh58646e22021-08-23 23:51:46 -0700356 void Register(EventLoop *event_loop, const Node *node);
357
358 void RegisterDuringStartup(EventLoop *event_loop, const Node *node);
359
360 const Channel *RemapChannel(const EventLoop *event_loop, const Node *node,
Austin Schuh6f3babe2020-01-26 20:34:50 -0800361 const Channel *channel);
362
Austin Schuhe309d2a2019-11-29 13:25:21 -0800363 // Queues at least max_out_of_order_duration_ messages into channels_.
364 void QueueMessages();
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800365 // Handle constructing a configuration with all the additional remapped
366 // channels from calls to RemapLoggedChannel.
367 void MakeRemappedConfig();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800368
Austin Schuh2f8fd752020-09-01 22:38:28 -0700369 // Returns the number of nodes.
370 size_t nodes_count() const {
371 return !configuration::MultiNode(logged_configuration())
372 ? 1u
373 : logged_configuration()->nodes()->size();
374 }
375
James Kuszmaulb11a1502022-07-01 16:02:25 -0700376 // Handles when an individual node hits the realtime end time, exitting the
377 // entire event loop once all nodes are stopped.
378 void NoticeRealtimeEnd();
379
Austin Schuh287d43d2020-12-04 20:19:33 -0800380 const std::vector<LogFile> log_files_;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800381
Austin Schuh969cd602021-01-03 00:09:45 -0800382 // Class to manage sending RemoteMessages on the provided node after the
383 // correct delay.
Austin Schuh5ee56872021-01-30 16:53:34 -0800384 class RemoteMessageSender {
Austin Schuh969cd602021-01-03 00:09:45 -0800385 public:
386 RemoteMessageSender(aos::Sender<message_bridge::RemoteMessage> sender,
387 EventLoop *event_loop);
388 RemoteMessageSender(RemoteMessageSender const &) = delete;
389 RemoteMessageSender &operator=(RemoteMessageSender const &) = delete;
390
391 // Sends the provided message. If monotonic_timestamp_time is min_time,
392 // send it immediately.
393 void Send(
394 FlatbufferDetachedBuffer<message_bridge::RemoteMessage> remote_message,
Austin Schuh58646e22021-08-23 23:51:46 -0700395 BootTimestamp monotonic_timestamp_time, size_t source_boot_count);
Austin Schuh969cd602021-01-03 00:09:45 -0800396
397 private:
398 // Handles actually sending the timestamp if we were delayed.
399 void SendTimestamp();
400 // Handles scheduling the timer to send at the correct time.
401 void ScheduleTimestamp();
402
403 EventLoop *event_loop_;
404 aos::Sender<message_bridge::RemoteMessage> sender_;
405 aos::TimerHandler *timer_;
406
407 // Time we are scheduled for, or min_time if we aren't scheduled.
408 monotonic_clock::time_point scheduled_time_ = monotonic_clock::min_time;
409
410 struct Timestamp {
411 Timestamp(FlatbufferDetachedBuffer<message_bridge::RemoteMessage>
412 new_remote_message,
413 monotonic_clock::time_point new_monotonic_timestamp_time)
414 : remote_message(std::move(new_remote_message)),
415 monotonic_timestamp_time(new_monotonic_timestamp_time) {}
416 FlatbufferDetachedBuffer<message_bridge::RemoteMessage> remote_message;
417 monotonic_clock::time_point monotonic_timestamp_time;
418 };
419
420 // List of messages to send. The timer works through them and then disables
421 // itself automatically.
422 std::deque<Timestamp> remote_timestamps_;
423 };
424
Austin Schuh6f3babe2020-01-26 20:34:50 -0800425 // State per node.
Austin Schuh858c9f32020-08-31 16:56:12 -0700426 class State {
427 public:
James Kuszmaula16a7912022-06-17 10:58:12 -0700428 // Whether we should spin up a separate thread for buffering up messages.
429 // Only allowed in realtime replay--see comments on threading_ member for
430 // details.
431 enum class ThreadedBuffering { kYes, kNo };
James Kuszmaul09632422022-05-25 15:56:19 -0700432 State(std::unique_ptr<TimestampMapper> timestamp_mapper,
433 message_bridge::MultiNodeNoncausalOffsetEstimator *multinode_filters,
James Kuszmaulb11a1502022-07-01 16:02:25 -0700434 std::function<void()> notice_realtime_end, const Node *node,
435 ThreadedBuffering threading,
Naman Guptacf6d4422023-03-01 11:41:00 -0800436 std::unique_ptr<const ReplayChannelIndices> replay_channel_indices);
Austin Schuh287d43d2020-12-04 20:19:33 -0800437
438 // Connects up the timestamp mappers.
439 void AddPeer(State *peer);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800440
Austin Schuhe639ea12021-01-25 13:00:22 -0800441 TimestampMapper *timestamp_mapper() { return timestamp_mapper_.get(); }
442
Austin Schuhdda74ec2021-01-03 19:30:37 -0800443 // Returns the next sorted message with all the timestamps extracted and
444 // matched.
445 TimestampedMessage PopOldest();
Austin Schuh188eabe2020-12-29 23:41:13 -0800446
Austin Schuh858c9f32020-08-31 16:56:12 -0700447 // Returns the monotonic time of the oldest message.
James Kuszmaula16a7912022-06-17 10:58:12 -0700448 BootTimestamp SingleThreadedOldestMessageTime();
449 // Returns the monotonic time of the oldest message, handling querying the
450 // separate thread of ThreadedBuffering was set.
451 BootTimestamp MultiThreadedOldestMessageTime();
Austin Schuh58646e22021-08-23 23:51:46 -0700452
453 size_t boot_count() const {
454 // If we are replaying directly into an event loop, we can't reboot. So
455 // we will stay stuck on the 0th boot.
James Kuszmaul09632422022-05-25 15:56:19 -0700456 if (!node_event_loop_factory_) {
457 if (event_loop_ == nullptr) {
458 // If boot_count is being checked after startup for any of the
459 // non-primary nodes, then returning 0 may not be accurate (since
460 // remote nodes *can* reboot even if the EventLoop being played to
461 // can't).
462 CHECK(!started_);
463 CHECK(!stopped_);
464 }
465 return 0u;
466 }
Austin Schuh58646e22021-08-23 23:51:46 -0700467 return node_event_loop_factory_->boot_count();
468 }
Austin Schuh858c9f32020-08-31 16:56:12 -0700469
470 // Primes the queues inside State. Should be called before calling
471 // OldestMessageTime.
472 void SeedSortedMessages();
Austin Schuh8bd96322020-02-13 21:18:22 -0800473
Austin Schuh58646e22021-08-23 23:51:46 -0700474 void SetupStartupTimer() {
475 const monotonic_clock::time_point start_time =
476 monotonic_start_time(boot_count());
477 if (start_time == monotonic_clock::min_time) {
478 LOG(ERROR)
479 << "No start time, skipping, please figure out when this happens";
Austin Schuhe33c08d2022-02-03 18:15:21 -0800480 NotifyLogfileStart();
Austin Schuh58646e22021-08-23 23:51:46 -0700481 return;
482 }
James Kuszmaul09632422022-05-25 15:56:19 -0700483 if (node_event_loop_factory_) {
484 CHECK_GE(start_time + clock_offset(), event_loop_->monotonic_now());
485 }
486 startup_timer_->Setup(start_time + clock_offset());
Austin Schuh58646e22021-08-23 23:51:46 -0700487 }
488
489 void set_startup_timer(TimerHandler *timer_handler) {
490 startup_timer_ = timer_handler;
491 if (startup_timer_) {
492 if (event_loop_->node() != nullptr) {
493 startup_timer_->set_name(absl::StrCat(
494 event_loop_->node()->name()->string_view(), "_startup"));
495 } else {
496 startup_timer_->set_name("startup");
497 }
498 }
499 }
500
Austin Schuh858c9f32020-08-31 16:56:12 -0700501 // Returns the starting time for this node.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700502 monotonic_clock::time_point monotonic_start_time(size_t boot_count) const {
503 return timestamp_mapper_
504 ? timestamp_mapper_->monotonic_start_time(boot_count)
505 : monotonic_clock::min_time;
Austin Schuh858c9f32020-08-31 16:56:12 -0700506 }
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700507 realtime_clock::time_point realtime_start_time(size_t boot_count) const {
508 return timestamp_mapper_
509 ? timestamp_mapper_->realtime_start_time(boot_count)
510 : realtime_clock::min_time;
Austin Schuh858c9f32020-08-31 16:56:12 -0700511 }
512
513 // Sets the node event loop factory for replaying into a
514 // SimulatedEventLoopFactory. Returns the EventLoop to use.
Austin Schuh60e77942022-05-16 17:48:24 -0700515 void SetNodeEventLoopFactory(NodeEventLoopFactory *node_event_loop_factory,
516 SimulatedEventLoopFactory *event_loop_factory);
Austin Schuh858c9f32020-08-31 16:56:12 -0700517
518 // Sets and gets the event loop to use.
519 void set_event_loop(EventLoop *event_loop) { event_loop_ = event_loop; }
520 EventLoop *event_loop() { return event_loop_; }
521
Austin Schuh58646e22021-08-23 23:51:46 -0700522 const Node *node() const { return node_; }
523
524 void Register(EventLoop *event_loop);
525
526 void OnStart(std::function<void()> fn);
527 void OnEnd(std::function<void()> fn);
528
Austin Schuh858c9f32020-08-31 16:56:12 -0700529 // Sets the current realtime offset from the monotonic clock for this node
530 // (if we are on a simulated event loop).
531 void SetRealtimeOffset(monotonic_clock::time_point monotonic_time,
532 realtime_clock::time_point realtime_time) {
533 if (node_event_loop_factory_ != nullptr) {
534 node_event_loop_factory_->SetRealtimeOffset(monotonic_time,
535 realtime_time);
536 }
537 }
538
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700539 // Returns the MessageHeader sender to log delivery timestamps to for the
540 // provided remote node.
Austin Schuh61e973f2021-02-21 21:43:56 -0800541 RemoteMessageSender *RemoteTimestampSender(const Channel *channel,
542 const Connection *connection);
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700543
Austin Schuh858c9f32020-08-31 16:56:12 -0700544 // Converts a timestamp from the monotonic clock on this node to the
545 // distributed clock.
546 distributed_clock::time_point ToDistributedClock(
547 monotonic_clock::time_point time) {
James Kuszmaul09632422022-05-25 15:56:19 -0700548 CHECK(node_event_loop_factory_);
Austin Schuh858c9f32020-08-31 16:56:12 -0700549 return node_event_loop_factory_->ToDistributedClock(time);
550 }
551
Austin Schuh858c9f32020-08-31 16:56:12 -0700552 // Returns the current time on the remote node which sends messages on
553 // channel_index.
Austin Schuh58646e22021-08-23 23:51:46 -0700554 BootTimestamp monotonic_remote_now(size_t channel_index) {
555 State *s = channel_source_state_[channel_index];
556 return BootTimestamp{
557 .boot = s->boot_count(),
558 .time = s->node_event_loop_factory_->monotonic_now()};
Austin Schuh858c9f32020-08-31 16:56:12 -0700559 }
560
Austin Schuh5ee56872021-01-30 16:53:34 -0800561 // Returns the start time of the remote for the provided channel.
562 monotonic_clock::time_point monotonic_remote_start_time(
Austin Schuh58646e22021-08-23 23:51:46 -0700563 size_t boot_count, size_t channel_index) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700564 return channel_source_state_[channel_index]->monotonic_start_time(
565 boot_count);
Austin Schuh5ee56872021-01-30 16:53:34 -0800566 }
567
Austin Schuh58646e22021-08-23 23:51:46 -0700568 void DestroyEventLoop() { event_loop_unique_ptr_.reset(); }
569
570 EventLoop *MakeEventLoop() {
571 CHECK(!event_loop_unique_ptr_);
James Kuszmaul890c2492022-04-06 14:59:31 -0700572 // TODO(james): Enable exclusive senders on LogReader to allow us to
573 // ensure we are remapping channels correctly.
574 event_loop_unique_ptr_ = node_event_loop_factory_->MakeEventLoop(
575 "log_reader", {NodeEventLoopFactory::CheckSentTooFast::kNo,
James Kuszmaul94ca5132022-07-19 09:11:08 -0700576 NodeEventLoopFactory::ExclusiveSenders::kYes,
577 NonExclusiveChannels()});
Austin Schuh58646e22021-08-23 23:51:46 -0700578 return event_loop_unique_ptr_.get();
579 }
580
Austin Schuh2f8fd752020-09-01 22:38:28 -0700581 distributed_clock::time_point RemoteToDistributedClock(
582 size_t channel_index, monotonic_clock::time_point time) {
James Kuszmaul09632422022-05-25 15:56:19 -0700583 CHECK(node_event_loop_factory_);
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700584 return channel_source_state_[channel_index]
585 ->node_event_loop_factory_->ToDistributedClock(time);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700586 }
587
588 const Node *remote_node(size_t channel_index) {
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700589 return channel_source_state_[channel_index]
590 ->node_event_loop_factory_->node();
Austin Schuh2f8fd752020-09-01 22:38:28 -0700591 }
592
Stephan Pleines559fa6c2022-01-06 17:23:51 -0800593 monotonic_clock::time_point monotonic_now() const {
Alexei Strotsb8c3a702023-04-19 21:38:25 -0700594 CHECK_NOTNULL(event_loop_);
James Kuszmaul09632422022-05-25 15:56:19 -0700595 return event_loop_->monotonic_now();
Austin Schuh2f8fd752020-09-01 22:38:28 -0700596 }
597
Austin Schuh858c9f32020-08-31 16:56:12 -0700598 // Sets the number of channels.
599 void SetChannelCount(size_t count);
600
601 // Sets the sender, filter, and target factory for a channel.
Austin Schuh969cd602021-01-03 00:09:45 -0800602 void SetChannel(size_t logged_channel_index, size_t factory_channel_index,
603 std::unique_ptr<RawSender> sender,
604 message_bridge::NoncausalOffsetEstimator *filter,
Austin Schuh58646e22021-08-23 23:51:46 -0700605 bool is_forwarded, State *source_state);
606
607 void SetRemoteTimestampSender(size_t logged_channel_index,
608 RemoteMessageSender *remote_timestamp_sender);
609
610 void RunOnStart();
611 void RunOnEnd();
Austin Schuh858c9f32020-08-31 16:56:12 -0700612
Austin Schuhe33c08d2022-02-03 18:15:21 -0800613 // Handles a logfile start event to potentially call the OnStart callbacks.
614 void NotifyLogfileStart();
615 // Handles a start time flag start event to potentially call the OnStart
616 // callbacks.
617 void NotifyFlagStart();
618
619 // Handles a logfile end event to potentially call the OnEnd callbacks.
620 void NotifyLogfileEnd();
621 // Handles a end time flag start event to potentially call the OnEnd
622 // callbacks.
623 void NotifyFlagEnd();
624
Austin Schuh858c9f32020-08-31 16:56:12 -0700625 // Unregisters everything so we can destory the event loop.
Austin Schuh58646e22021-08-23 23:51:46 -0700626 // TODO(austin): Is this needed? OnShutdown should be able to serve this
627 // need.
Austin Schuh858c9f32020-08-31 16:56:12 -0700628 void Deregister();
629
630 // Sets the current TimerHandle for the replay callback.
631 void set_timer_handler(TimerHandler *timer_handler) {
632 timer_handler_ = timer_handler;
Austin Schuh58646e22021-08-23 23:51:46 -0700633 if (timer_handler_) {
634 if (event_loop_->node() != nullptr) {
635 timer_handler_->set_name(absl::StrCat(
636 event_loop_->node()->name()->string_view(), "_main"));
637 } else {
638 timer_handler_->set_name("main");
639 }
640 }
Austin Schuh858c9f32020-08-31 16:56:12 -0700641 }
642
Austin Schuhe33c08d2022-02-03 18:15:21 -0800643 // Creates and registers the --start_time and --end_time event callbacks.
644 void SetStartTimeFlag(realtime_clock::time_point start_time);
645 void SetEndTimeFlag(realtime_clock::time_point end_time);
646
647 // Notices the next message to update the start/end time callbacks.
648 void ObserveNextMessage(monotonic_clock::time_point monotonic_event,
649 realtime_clock::time_point realtime_event);
650
651 // Clears the start and end time flag handlers so we can delete the event
652 // loop.
653 void ClearTimeFlags();
654
Austin Schuh858c9f32020-08-31 16:56:12 -0700655 // Sets the next wakeup time on the replay callback.
656 void Setup(monotonic_clock::time_point next_time) {
James Kuszmaul8866e642022-06-10 16:00:36 -0700657 timer_handler_->Setup(
658 std::max(monotonic_now(), next_time + clock_offset()));
Austin Schuh858c9f32020-08-31 16:56:12 -0700659 }
660
661 // Sends a buffer on the provided channel index.
Austin Schuh287d43d2020-12-04 20:19:33 -0800662 bool Send(const TimestampedMessage &timestamped_message);
Austin Schuh858c9f32020-08-31 16:56:12 -0700663
James Kuszmaulc3f34d12022-08-15 15:57:55 -0700664 void MaybeSetClockOffset();
James Kuszmaul09632422022-05-25 15:56:19 -0700665 std::chrono::nanoseconds clock_offset() const { return clock_offset_; }
666
Austin Schuh858c9f32020-08-31 16:56:12 -0700667 // Returns a debug string for the channel merger.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700668 std::string DebugString() const {
Austin Schuh287d43d2020-12-04 20:19:33 -0800669 if (!timestamp_mapper_) {
Austin Schuhe639ea12021-01-25 13:00:22 -0800670 return "";
Austin Schuh287d43d2020-12-04 20:19:33 -0800671 }
Austin Schuhe639ea12021-01-25 13:00:22 -0800672 return timestamp_mapper_->DebugString();
Austin Schuh2f8fd752020-09-01 22:38:28 -0700673 }
Austin Schuh858c9f32020-08-31 16:56:12 -0700674
Austin Schuh58646e22021-08-23 23:51:46 -0700675 void ClearRemoteTimestampSenders() {
676 channel_timestamp_loggers_.clear();
677 timestamp_loggers_.clear();
678 }
679
Austin Schuhbd5f74a2021-11-11 20:55:38 -0800680 void SetFoundLastMessage(bool val) {
681 found_last_message_ = val;
682 last_message_.resize(factory_channel_index_.size(), false);
683 }
684 bool found_last_message() const { return found_last_message_; }
685
686 void set_last_message(size_t channel_index) {
687 CHECK_LT(channel_index, last_message_.size());
688 last_message_[channel_index] = true;
689 }
690
691 bool last_message(size_t channel_index) {
692 CHECK_LT(channel_index, last_message_.size());
693 return last_message_[channel_index];
694 }
695
James Kuszmaula16a7912022-06-17 10:58:12 -0700696 void set_timing_accuracy_sender(
697 aos::Sender<timing::ReplayTiming> timing_sender) {
698 timing_statistics_sender_ = std::move(timing_sender);
699 OnEnd([this]() { SendMessageTimings(); });
700 }
701
702 // If running with ThreadedBuffering::kYes, will start the processing thread
703 // and queue up messages until the specified time. No-op of
704 // ThreadedBuffering::kNo is set. Should only be called once.
705 void QueueThreadUntil(BootTimestamp time);
706
Austin Schuh858c9f32020-08-31 16:56:12 -0700707 private:
James Kuszmaulc3f34d12022-08-15 15:57:55 -0700708 void TrackMessageSendTiming(const RawSender &sender,
709 monotonic_clock::time_point expected_send_time);
James Kuszmaula16a7912022-06-17 10:58:12 -0700710 void SendMessageTimings();
Austin Schuh858c9f32020-08-31 16:56:12 -0700711 // Log file.
Austin Schuh287d43d2020-12-04 20:19:33 -0800712 std::unique_ptr<TimestampMapper> timestamp_mapper_;
Austin Schuh858c9f32020-08-31 16:56:12 -0700713
Austin Schuh858c9f32020-08-31 16:56:12 -0700714 // Senders.
715 std::vector<std::unique_ptr<RawSender>> channels_;
Austin Schuh969cd602021-01-03 00:09:45 -0800716 std::vector<RemoteMessageSender *> remote_timestamp_senders_;
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700717 // The mapping from logged channel index to sent channel index. Needed for
718 // sending out MessageHeaders.
719 std::vector<int> factory_channel_index_;
720
Austin Schuh9942bae2021-01-07 22:06:44 -0800721 struct ContiguousSentTimestamp {
722 // Most timestamps make it through the network, so it saves a ton of
723 // memory and CPU to store the start and end, and search for valid ranges.
724 // For one of the logs I looked at, we had 2 ranges for 4 days.
725 //
726 // Save monotonic times as well to help if a queue index ever wraps. Odds
727 // are very low, but doesn't hurt.
728 //
729 // The starting time and matching queue index.
730 monotonic_clock::time_point starting_monotonic_event_time =
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700731 monotonic_clock::min_time;
Austin Schuh9942bae2021-01-07 22:06:44 -0800732 uint32_t starting_queue_index = 0xffffffff;
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700733
Austin Schuh9942bae2021-01-07 22:06:44 -0800734 // Ending time and queue index.
735 monotonic_clock::time_point ending_monotonic_event_time =
736 monotonic_clock::max_time;
737 uint32_t ending_queue_index = 0xffffffff;
738
739 // The queue index that the first message was *actually* sent with. The
740 // queue indices are assumed to be contiguous through this range.
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700741 uint32_t actual_queue_index = 0xffffffff;
742 };
743
James Kuszmaul94ca5132022-07-19 09:11:08 -0700744 // Returns a list of channels which LogReader will send on but which may
745 // *also* get sent on by other applications in replay.
746 std::vector<
747 std::pair<const aos::Channel *, NodeEventLoopFactory::ExclusiveSenders>>
748 NonExclusiveChannels();
749
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700750 // Stores all the timestamps that have been sent on this channel. This is
751 // only done for channels which are forwarded and on the node which
Austin Schuh9942bae2021-01-07 22:06:44 -0800752 // initially sends the message. Compress using ranges and offsets.
753 std::vector<std::unique_ptr<std::vector<ContiguousSentTimestamp>>>
754 queue_index_map_;
Austin Schuh858c9f32020-08-31 16:56:12 -0700755
756 // Factory (if we are in sim) that this loop was created on.
757 NodeEventLoopFactory *node_event_loop_factory_ = nullptr;
Austin Schuhe33c08d2022-02-03 18:15:21 -0800758 SimulatedEventLoopFactory *event_loop_factory_ = nullptr;
759
James Kuszmaulb11a1502022-07-01 16:02:25 -0700760 // Callback for when this node hits its realtime end time.
761 std::function<void()> notice_realtime_end_;
762
Austin Schuh858c9f32020-08-31 16:56:12 -0700763 std::unique_ptr<EventLoop> event_loop_unique_ptr_;
764 // Event loop.
Austin Schuh58646e22021-08-23 23:51:46 -0700765 const Node *node_ = nullptr;
Austin Schuh858c9f32020-08-31 16:56:12 -0700766 EventLoop *event_loop_ = nullptr;
767 // And timer used to send messages.
Austin Schuh58646e22021-08-23 23:51:46 -0700768 TimerHandler *timer_handler_ = nullptr;
769 TimerHandler *startup_timer_ = nullptr;
Austin Schuh858c9f32020-08-31 16:56:12 -0700770
Austin Schuhe33c08d2022-02-03 18:15:21 -0800771 std::unique_ptr<EventNotifier> start_event_notifier_;
772 std::unique_ptr<EventNotifier> end_event_notifier_;
773
Austin Schuh8bd96322020-02-13 21:18:22 -0800774 // Filters (or nullptr if it isn't a forwarded channel) for each channel.
775 // This corresponds to the object which is shared among all the channels
776 // going between 2 nodes. The second element in the tuple indicates if this
777 // is the primary direction or not.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700778 std::vector<message_bridge::NoncausalOffsetEstimator *> filters_;
James Kuszmaul09632422022-05-25 15:56:19 -0700779 message_bridge::MultiNodeNoncausalOffsetEstimator *multinode_filters_;
Austin Schuh8bd96322020-02-13 21:18:22 -0800780
Austin Schuh84dd1332023-05-03 13:09:47 -0700781 // List of States (or nullptr if it isn't a forwarded channel) which
782 // correspond to the originating node.
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700783 std::vector<State *> channel_source_state_;
784
Austin Schuh61e973f2021-02-21 21:43:56 -0800785 // This is a cache for channel, connection mapping to the corresponding
786 // sender.
787 absl::btree_map<std::pair<const Channel *, const Connection *>,
788 std::shared_ptr<RemoteMessageSender>>
789 channel_timestamp_loggers_;
790
791 // Mapping from resolved RemoteMessage channel to RemoteMessage sender. This
792 // is the channel that timestamps are published to.
793 absl::btree_map<const Channel *, std::shared_ptr<RemoteMessageSender>>
794 timestamp_loggers_;
Austin Schuh58646e22021-08-23 23:51:46 -0700795
James Kuszmaul09632422022-05-25 15:56:19 -0700796 // Time offset between the log's monotonic clock and the current event
797 // loop's monotonic clock. Useful when replaying logs with non-simulated
798 // event loops.
799 std::chrono::nanoseconds clock_offset_{0};
800
Austin Schuh58646e22021-08-23 23:51:46 -0700801 std::vector<std::function<void()>> on_starts_;
802 std::vector<std::function<void()>> on_ends_;
803
James Kuszmaula16a7912022-06-17 10:58:12 -0700804 std::atomic<bool> stopped_ = false;
805 std::atomic<bool> started_ = false;
Austin Schuhbd5f74a2021-11-11 20:55:38 -0800806
807 bool found_last_message_ = false;
808 std::vector<bool> last_message_;
James Kuszmaula16a7912022-06-17 10:58:12 -0700809
810 std::vector<timing::MessageTimingT> send_timings_;
811 aos::Sender<timing::ReplayTiming> timing_statistics_sender_;
812
813 // Protects access to any internal state after Run() is called. Designed
814 // assuming that only one node is actually executing in replay.
815 // Threading design:
816 // * The worker passed to message_queuer_ has full ownership over all
817 // the log-reading code, timestamp filters, last_queued_message_, etc.
818 // * The main thread should only have exclusive access to the replay
819 // event loop and associated features (mainly senders).
820 // It will pop an item out of the queue (which does maintain a shared_ptr
821 // reference which may also be being used by the message_queuer_ thread,
822 // but having shared_ptr's accessing the same memory from
823 // separate threads is permissible).
824 // Enabling this in simulation is currently infeasible due to a lack of
825 // synchronization in the MultiNodeNoncausalOffsetEstimator. Essentially,
826 // when the message_queuer_ thread attempts to read/pop messages from the
827 // timestamp_mapper_, it will end up calling callbacks that update the
828 // internal state of the MultiNodeNoncausalOffsetEstimator. Simultaneously,
829 // the event scheduler that is running in the main thread to orchestrate the
830 // simulation will be querying the estimator to know what the clocks on the
831 // various nodes are at, leading to potential issues.
832 ThreadedBuffering threading_;
833 std::optional<BootTimestamp> last_queued_message_;
834 std::optional<util::ThreadedQueue<TimestampedMessage, BootTimestamp>>
835 message_queuer_;
Eric Schmiedebergb38477e2022-12-02 16:08:04 -0700836
837 // If a ReplayChannels was passed to LogReader, this will hold the
838 // indices of the channels to replay for the Node represented by
839 // the instance of LogReader::State.
Naman Guptacf6d4422023-03-01 11:41:00 -0800840 std::unique_ptr<const ReplayChannelIndices> replay_channel_indices_;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800841 };
842
Eric Schmiedebergb38477e2022-12-02 16:08:04 -0700843 // If a ReplayChannels was passed to LogReader then creates a
Naman Guptacf6d4422023-03-01 11:41:00 -0800844 // ReplayChannelIndices for the given node. Otherwise, returns a nullptr.
845 std::unique_ptr<const ReplayChannelIndices> MaybeMakeReplayChannelIndices(
Eric Schmiedebergb38477e2022-12-02 16:08:04 -0700846 const Node *node);
847
Austin Schuh8bd96322020-02-13 21:18:22 -0800848 // Node index -> State.
849 std::vector<std::unique_ptr<State>> states_;
850
851 // Creates the requested filter if it doesn't exist, regardless of whether
852 // these nodes can actually communicate directly. The second return value
853 // reports if this is the primary direction or not.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700854 message_bridge::NoncausalOffsetEstimator *GetFilter(const Node *node_a,
855 const Node *node_b);
Austin Schuh8bd96322020-02-13 21:18:22 -0800856
Austin Schuh8bd96322020-02-13 21:18:22 -0800857 // List of filters for a connection. The pointer to the first node will be
858 // less than the second node.
Austin Schuh0ca1fd32020-12-18 22:53:05 -0800859 std::unique_ptr<message_bridge::MultiNodeNoncausalOffsetEstimator> filters_;
Austin Schuh8bd96322020-02-13 21:18:22 -0800860
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800861 std::unique_ptr<FlatbufferDetachedBuffer<Configuration>>
862 remapped_configuration_buffer_;
863
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800864 std::unique_ptr<SimulatedEventLoopFactory> event_loop_factory_unique_ptr_;
865 SimulatedEventLoopFactory *event_loop_factory_ = nullptr;
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800866
867 // Map of channel indices to new name. The channel index will be an index into
868 // logged_configuration(), and the string key will be the name of the channel
869 // to send on instead of the logged channel name.
Austin Schuh0de30f32020-12-06 12:44:28 -0800870 struct RemappedChannel {
871 std::string remapped_name;
872 std::string new_type;
873 };
874 std::map<size_t, RemappedChannel> remapped_channels_;
Austin Schuh01b4c352020-09-21 23:09:39 -0700875 std::vector<MapT> maps_;
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800876
Austin Schuh6f3babe2020-01-26 20:34:50 -0800877 // Number of nodes which still have data to send. This is used to figure out
878 // when to exit.
879 size_t live_nodes_ = 0;
880
James Kuszmaulb11a1502022-07-01 16:02:25 -0700881 // Similar counter to live_nodes_, but for tracking which individual nodes are
882 // running and have yet to hit the realtime end time, if any.
883 size_t live_nodes_with_realtime_time_end_ = 0;
884
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800885 const Configuration *remapped_configuration_ = nullptr;
886 const Configuration *replay_configuration_ = nullptr;
Austin Schuhcde938c2020-02-02 17:30:07 -0800887
Eric Schmiedebergb38477e2022-12-02 16:08:04 -0700888 // If a ReplayChannels was passed to LogReader, this will hold the
889 // name and type of channels to replay which is used when creating States.
890 const ReplayChannels *replay_channels_ = nullptr;
891
Austin Schuhcde938c2020-02-02 17:30:07 -0800892 // If true, the replay timer will ignore any missing data. This is used
893 // during startup when we are bootstrapping everything and trying to get to
894 // the start of all the log files.
895 bool ignore_missing_data_ = false;
James Kuszmaul71a81932020-12-15 21:08:01 -0800896
897 // Whether to exit the SimulatedEventLoop when we finish reading the logs.
898 bool exit_on_finish_ = true;
Austin Schuhe33c08d2022-02-03 18:15:21 -0800899
900 realtime_clock::time_point start_time_ = realtime_clock::min_time;
901 realtime_clock::time_point end_time_ = realtime_clock::max_time;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800902};
903
904} // namespace logger
905} // namespace aos
906
Austin Schuhb06f03b2021-02-17 22:00:37 -0800907#endif // AOS_EVENTS_LOGGING_LOG_READER_H_