blob: e3dd904f3883b7535acccd1202acaa5a1b93e2fe [file] [log] [blame]
Austin Schuhb06f03b2021-02-17 22:00:37 -08001#include "aos/events/logging/log_reader.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -08002
Austin Schuhaf8a0d32023-05-03 09:53:06 -07003#include <dirent.h>
Austin Schuhe309d2a2019-11-29 13:25:21 -08004#include <fcntl.h>
5#include <sys/stat.h>
6#include <sys/types.h>
7#include <sys/uio.h>
Brian Silverman8ff74aa2021-02-05 16:37:15 -08008
Tyler Chatowbf0609c2021-07-31 16:13:27 -07009#include <climits>
Eric Schmiedebergae00e732023-04-12 15:53:17 -060010#include <utility>
Austin Schuhe309d2a2019-11-29 13:25:21 -080011#include <vector>
12
Austin Schuh2f8fd752020-09-01 22:38:28 -070013#include "absl/strings/escaping.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080014#include "absl/types/span.h"
Philipp Schrader790cb542023-07-05 21:06:52 -070015#include "flatbuffers/flatbuffers.h"
16#include "openssl/sha.h"
17
Austin Schuhe309d2a2019-11-29 13:25:21 -080018#include "aos/events/event_loop.h"
Austin Schuh2dc8c7d2021-07-01 17:41:28 -070019#include "aos/events/logging/boot_timestamp.h"
Austin Schuhf6f9bf32020-10-11 14:37:43 -070020#include "aos/events/logging/logfile_sorting.h"
James Kuszmaul38735e82019-12-07 16:42:06 -080021#include "aos/events/logging/logger_generated.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080022#include "aos/flatbuffer_merge.h"
James Kuszmaul09632422022-05-25 15:56:19 -070023#include "aos/json_to_flatbuffer.h"
Austin Schuh0ca1fd32020-12-18 22:53:05 -080024#include "aos/network/multinode_timestamp_filter.h"
Austin Schuh0de30f32020-12-06 12:44:28 -080025#include "aos/network/remote_message_generated.h"
26#include "aos/network/remote_message_schema.h"
Austin Schuh288479d2019-12-18 19:47:52 -080027#include "aos/network/team_number.h"
Austin Schuh61e973f2021-02-21 21:43:56 -080028#include "aos/network/timestamp_channel.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080029#include "aos/time/time.h"
Brian Silvermanae7c0332020-09-30 16:58:23 -070030#include "aos/util/file.h"
Austin Schuh4385b142021-03-14 21:31:13 -070031#include "aos/uuid.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080032
Austin Schuh15649d62019-12-28 16:36:38 -080033DEFINE_bool(skip_missing_forwarding_entries, false,
34 "If true, drop any forwarding entries with missing data. If "
35 "false, CHECK.");
Austin Schuhe309d2a2019-11-29 13:25:21 -080036
Austin Schuh0ca1fd32020-12-18 22:53:05 -080037DECLARE_bool(timestamps_to_csv);
Austin Schuh8bd96322020-02-13 21:18:22 -080038
Austin Schuh2f8fd752020-09-01 22:38:28 -070039DEFINE_bool(skip_order_validation, false,
40 "If true, ignore any out of orderness in replay");
41
Austin Schuhf0688662020-12-19 15:37:45 -080042DEFINE_double(
43 time_estimation_buffer_seconds, 2.0,
44 "The time to buffer ahead in the log file to accurately reconstruct time.");
45
Austin Schuhe33c08d2022-02-03 18:15:21 -080046DEFINE_string(
47 start_time, "",
48 "If set, start at this point in time in the log on the realtime clock.");
49DEFINE_string(
50 end_time, "",
51 "If set, end at this point in time in the log on the realtime clock.");
52
James Kuszmaul09632422022-05-25 15:56:19 -070053DEFINE_bool(drop_realtime_messages_before_start, false,
54 "If set, will drop any messages sent before the start of the "
55 "logfile in realtime replay. Setting this guarantees consistency "
56 "in timing with the original logfile, but means that you lose "
57 "access to fetched low-frequency messages.");
58
James Kuszmaula16a7912022-06-17 10:58:12 -070059DEFINE_double(
60 threaded_look_ahead_seconds, 2.0,
61 "Time, in seconds, to add to look-ahead when using multi-threaded replay. "
62 "Can validly be zero, but higher values are encouraged for realtime replay "
63 "in order to prevent the replay from ever having to block on waiting for "
64 "the reader to find the next message.");
65
Austin Schuhe309d2a2019-11-29 13:25:21 -080066namespace aos {
Austin Schuh006a9f52021-04-07 16:24:18 -070067namespace configuration {
68// We don't really want to expose this publicly, but log reader doesn't really
69// want to re-implement it.
70void HandleMaps(const flatbuffers::Vector<flatbuffers::Offset<aos::Map>> *maps,
71 std::string *name, std::string_view type, const Node *node);
Tyler Chatowbf0609c2021-07-31 16:13:27 -070072} // namespace configuration
Austin Schuhe309d2a2019-11-29 13:25:21 -080073namespace logger {
Austin Schuh0afc4d12020-10-19 11:42:04 -070074namespace {
Austin Schuh8c399962020-12-25 21:51:45 -080075
Austin Schuh1c227352021-09-17 12:53:54 -070076bool CompareChannels(const Channel *c,
77 ::std::pair<std::string_view, std::string_view> p) {
78 int name_compare = c->name()->string_view().compare(p.first);
79 if (name_compare == 0) {
80 return c->type()->string_view() < p.second;
81 } else if (name_compare < 0) {
82 return true;
83 } else {
84 return false;
85 }
86}
87
88bool EqualsChannels(const Channel *c,
89 ::std::pair<std::string_view, std::string_view> p) {
90 return c->name()->string_view() == p.first &&
91 c->type()->string_view() == p.second;
92}
93
Austin Schuh0de30f32020-12-06 12:44:28 -080094// Copies the channel, removing the schema as we go. If new_name is provided,
95// it is used instead of the name inside the channel. If new_type is provided,
96// it is used instead of the type in the channel.
97flatbuffers::Offset<Channel> CopyChannel(const Channel *c,
98 std::string_view new_name,
99 std::string_view new_type,
100 flatbuffers::FlatBufferBuilder *fbb) {
Austin Schuhdda6db72023-06-21 17:02:34 -0700101 CHECK_EQ(Channel::MiniReflectTypeTable()->num_elems, 14u)
102 << ": Merging logic needs to be updated when the number of channel "
103 "fields changes.";
104
Austin Schuh0de30f32020-12-06 12:44:28 -0800105 flatbuffers::Offset<flatbuffers::String> name_offset =
106 fbb->CreateSharedString(new_name.empty() ? c->name()->string_view()
107 : new_name);
108 flatbuffers::Offset<flatbuffers::String> type_offset =
109 fbb->CreateSharedString(new_type.empty() ? c->type()->str() : new_type);
110 flatbuffers::Offset<flatbuffers::String> source_node_offset =
111 c->has_source_node() ? fbb->CreateSharedString(c->source_node()->str())
112 : 0;
113
114 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Connection>>>
115 destination_nodes_offset =
116 aos::RecursiveCopyVectorTable(c->destination_nodes(), fbb);
117
118 flatbuffers::Offset<
119 flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
120 logger_nodes_offset = aos::CopyVectorSharedString(c->logger_nodes(), fbb);
121
122 Channel::Builder channel_builder(*fbb);
123 channel_builder.add_name(name_offset);
124 channel_builder.add_type(type_offset);
125 if (c->has_frequency()) {
126 channel_builder.add_frequency(c->frequency());
127 }
128 if (c->has_max_size()) {
129 channel_builder.add_max_size(c->max_size());
130 }
131 if (c->has_num_senders()) {
132 channel_builder.add_num_senders(c->num_senders());
133 }
134 if (c->has_num_watchers()) {
135 channel_builder.add_num_watchers(c->num_watchers());
136 }
137 if (!source_node_offset.IsNull()) {
138 channel_builder.add_source_node(source_node_offset);
139 }
140 if (!destination_nodes_offset.IsNull()) {
141 channel_builder.add_destination_nodes(destination_nodes_offset);
142 }
143 if (c->has_logger()) {
144 channel_builder.add_logger(c->logger());
145 }
146 if (!logger_nodes_offset.IsNull()) {
147 channel_builder.add_logger_nodes(logger_nodes_offset);
148 }
149 if (c->has_read_method()) {
150 channel_builder.add_read_method(c->read_method());
151 }
152 if (c->has_num_readers()) {
153 channel_builder.add_num_readers(c->num_readers());
154 }
Austin Schuhdda6db72023-06-21 17:02:34 -0700155 if (c->has_channel_storage_duration()) {
156 channel_builder.add_channel_storage_duration(c->channel_storage_duration());
157 }
Austin Schuh0de30f32020-12-06 12:44:28 -0800158 return channel_builder.Finish();
159}
160
Austin Schuhe309d2a2019-11-29 13:25:21 -0800161namespace chrono = std::chrono;
Austin Schuh0de30f32020-12-06 12:44:28 -0800162using message_bridge::RemoteMessage;
Austin Schuh0afc4d12020-10-19 11:42:04 -0700163} // namespace
Austin Schuhe309d2a2019-11-29 13:25:21 -0800164
Austin Schuhe33c08d2022-02-03 18:15:21 -0800165// Class to manage triggering events on the RT clock while replaying logs. Since
166// the RT clock can only change when we get a message, we only need to update
167// our timers when new messages are read.
168class EventNotifier {
169 public:
170 EventNotifier(EventLoop *event_loop, std::function<void()> fn,
171 std::string_view name,
172 realtime_clock::time_point realtime_event_time)
173 : event_loop_(event_loop),
174 fn_(std::move(fn)),
175 realtime_event_time_(realtime_event_time) {
176 CHECK(event_loop_);
177 event_timer_ = event_loop->AddTimer([this]() { HandleTime(); });
178
179 if (event_loop_->node() != nullptr) {
180 event_timer_->set_name(
181 absl::StrCat(event_loop_->node()->name()->string_view(), "_", name));
182 } else {
183 event_timer_->set_name(name);
184 }
185 }
186
187 ~EventNotifier() { event_timer_->Disable(); }
188
James Kuszmaul09632422022-05-25 15:56:19 -0700189 // Sets the clock offset for realtime playback.
190 void SetClockOffset(std::chrono::nanoseconds clock_offset) {
191 clock_offset_ = clock_offset;
192 }
193
Austin Schuhe33c08d2022-02-03 18:15:21 -0800194 // Returns the event trigger time.
195 realtime_clock::time_point realtime_event_time() const {
196 return realtime_event_time_;
197 }
198
199 // Observes the next message and potentially calls the callback or updates the
200 // timer.
201 void ObserveNextMessage(monotonic_clock::time_point monotonic_message_time,
202 realtime_clock::time_point realtime_message_time) {
203 if (realtime_message_time < realtime_event_time_) {
204 return;
205 }
206 if (called_) {
207 return;
208 }
209
210 // Move the callback wakeup time to the correct time (or make it now if
211 // there's a gap in time) now that we know it is before the next
212 // message.
213 const monotonic_clock::time_point candidate_monotonic =
214 (realtime_event_time_ - realtime_message_time) + monotonic_message_time;
215 const monotonic_clock::time_point monotonic_now =
216 event_loop_->monotonic_now();
217 if (candidate_monotonic < monotonic_now) {
218 // Whops, time went backwards. Just do it now.
219 HandleTime();
220 } else {
Philipp Schradera6712522023-07-05 20:25:11 -0700221 event_timer_->Schedule(candidate_monotonic + clock_offset_);
Austin Schuhe33c08d2022-02-03 18:15:21 -0800222 }
223 }
224
225 private:
226 void HandleTime() {
227 if (!called_) {
228 called_ = true;
229 fn_();
230 }
231 }
232
233 EventLoop *event_loop_ = nullptr;
234 TimerHandler *event_timer_ = nullptr;
235 std::function<void()> fn_;
236
237 const realtime_clock::time_point realtime_event_time_ =
238 realtime_clock::min_time;
239
James Kuszmaul09632422022-05-25 15:56:19 -0700240 std::chrono::nanoseconds clock_offset_{0};
241
Austin Schuhe33c08d2022-02-03 18:15:21 -0800242 bool called_ = false;
243};
244
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800245LogReader::LogReader(std::string_view filename,
Eric Schmiedebergb38477e2022-12-02 16:08:04 -0700246 const Configuration *replay_configuration,
247 const ReplayChannels *replay_channels)
Alexei Strots1f51ac72023-05-15 10:14:54 -0700248 : LogReader(LogFilesContainer(SortParts({std::string(filename)})),
249 replay_configuration, replay_channels) {}
Austin Schuhfa895892020-01-07 20:07:41 -0800250
Austin Schuh287d43d2020-12-04 20:19:33 -0800251LogReader::LogReader(std::vector<LogFile> log_files,
Eric Schmiedebergb38477e2022-12-02 16:08:04 -0700252 const Configuration *replay_configuration,
253 const ReplayChannels *replay_channels)
Alexei Strots1f51ac72023-05-15 10:14:54 -0700254 : LogReader(LogFilesContainer(std::move(log_files)), replay_configuration,
255 replay_channels) {}
256
257LogReader::LogReader(LogFilesContainer log_files,
258 const Configuration *replay_configuration,
259 const ReplayChannels *replay_channels)
Austin Schuh287d43d2020-12-04 20:19:33 -0800260 : log_files_(std::move(log_files)),
Eric Schmiedebergb38477e2022-12-02 16:08:04 -0700261 replay_configuration_(replay_configuration),
262 replay_channels_(replay_channels) {
Austin Schuhe33c08d2022-02-03 18:15:21 -0800263 SetStartTime(FLAGS_start_time);
264 SetEndTime(FLAGS_end_time);
265
Austin Schuh0ca51f32020-12-25 21:51:45 -0800266 {
Alexei Strots1f51ac72023-05-15 10:14:54 -0700267 // Log files container validates that log files shared the same config.
268 const Configuration *config = log_files_.config();
269 CHECK_NOTNULL(config);
Austin Schuh0ca51f32020-12-25 21:51:45 -0800270 }
Austin Schuhdda74ec2021-01-03 19:30:37 -0800271
Eric Schmiedebergb38477e2022-12-02 16:08:04 -0700272 if (replay_channels_ != nullptr) {
273 CHECK(!replay_channels_->empty()) << "replay_channels is empty which means "
274 "no messages will get replayed.";
275 }
276
Austin Schuh6331ef92020-01-07 18:28:09 -0800277 MakeRemappedConfig();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800278
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700279 // Remap all existing remote timestamp channels. They will be recreated, and
280 // the data logged isn't relevant anymore.
Austin Schuh3c5dae52020-10-06 18:55:18 -0700281 for (const Node *node : configuration::GetNodes(logged_configuration())) {
Austin Schuh61e973f2021-02-21 21:43:56 -0800282 message_bridge::ChannelTimestampFinder finder(logged_configuration(),
283 "log_reader", node);
284
285 absl::btree_set<std::string_view> remote_nodes;
286
287 for (const Channel *channel : *logged_configuration()->channels()) {
288 if (!configuration::ChannelIsSendableOnNode(channel, node)) {
289 continue;
290 }
291 if (!channel->has_destination_nodes()) {
292 continue;
293 }
294 for (const Connection *connection : *channel->destination_nodes()) {
295 if (configuration::ConnectionDeliveryTimeIsLoggedOnNode(connection,
296 node)) {
297 // Start by seeing if the split timestamp channels are being used for
298 // this message. If so, remap them.
299 const Channel *timestamp_channel = configuration::GetChannel(
300 logged_configuration(),
301 finder.SplitChannelName(channel, connection),
302 RemoteMessage::GetFullyQualifiedName(), "", node, true);
303
304 if (timestamp_channel != nullptr) {
James Kuszmaul53da7f32022-09-11 11:11:55 -0700305 // If for some reason a timestamp channel is not NOT_LOGGED (which
306 // is unusual), then remap the channel so that the replayed channel
307 // doesn't overlap with the special separate replay we do for
308 // timestamps.
Austin Schuh61e973f2021-02-21 21:43:56 -0800309 if (timestamp_channel->logger() != LoggerConfig::NOT_LOGGED) {
310 RemapLoggedChannel<RemoteMessage>(
311 timestamp_channel->name()->string_view(), node);
312 }
313 continue;
314 }
315
316 // Otherwise collect this one up as a node to look for a combined
317 // channel from. It is more efficient to compare nodes than channels.
Austin Schuh349e7ad2022-04-02 21:12:26 -0700318 LOG(WARNING) << "Failed to find channel "
319 << finder.SplitChannelName(channel, connection)
320 << " on node " << aos::FlatbufferToJson(node);
Austin Schuh61e973f2021-02-21 21:43:56 -0800321 remote_nodes.insert(connection->name()->string_view());
322 }
323 }
324 }
325
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700326 std::vector<const Node *> timestamp_logger_nodes =
327 configuration::TimestampNodes(logged_configuration(), node);
Austin Schuh61e973f2021-02-21 21:43:56 -0800328 for (const std::string_view remote_node : remote_nodes) {
329 const std::string channel = finder.CombinedChannelName(remote_node);
330
Austin Schuh0de30f32020-12-06 12:44:28 -0800331 // See if the log file is an old log with MessageHeader channels in it, or
332 // a newer log with RemoteMessage. If we find an older log, rename the
333 // type too along with the name.
334 if (HasChannel<MessageHeader>(channel, node)) {
335 CHECK(!HasChannel<RemoteMessage>(channel, node))
336 << ": Can't have both a MessageHeader and RemoteMessage remote "
337 "timestamp channel.";
James Kuszmaul4f106fb2021-01-05 20:53:02 -0800338 // In theory, we should check NOT_LOGGED like RemoteMessage and be more
339 // careful about updating the config, but there are fewer and fewer logs
340 // with MessageHeader remote messages, so it isn't worth the effort.
Austin Schuh0de30f32020-12-06 12:44:28 -0800341 RemapLoggedChannel<MessageHeader>(channel, node, "/original",
342 "aos.message_bridge.RemoteMessage");
343 } else {
344 CHECK(HasChannel<RemoteMessage>(channel, node))
345 << ": Failed to find {\"name\": \"" << channel << "\", \"type\": \""
346 << RemoteMessage::GetFullyQualifiedName() << "\"} for node "
347 << node->name()->string_view();
James Kuszmaul4f106fb2021-01-05 20:53:02 -0800348 // Only bother to remap if there's something on the channel. We can
349 // tell if the channel was marked NOT_LOGGED or not. This makes the
350 // config not change un-necesarily when we replay a log with NOT_LOGGED
351 // messages.
352 if (HasLoggedChannel<RemoteMessage>(channel, node)) {
353 RemapLoggedChannel<RemoteMessage>(channel, node);
354 }
Austin Schuh0de30f32020-12-06 12:44:28 -0800355 }
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700356 }
357 }
358
Austin Schuh6aa77be2020-02-22 21:06:40 -0800359 if (replay_configuration) {
360 CHECK_EQ(configuration::MultiNode(configuration()),
361 configuration::MultiNode(replay_configuration))
Austin Schuh2f8fd752020-09-01 22:38:28 -0700362 << ": Log file and replay config need to both be multi or single "
363 "node.";
Austin Schuh6aa77be2020-02-22 21:06:40 -0800364 }
365
Austin Schuh6f3babe2020-01-26 20:34:50 -0800366 if (!configuration::MultiNode(configuration())) {
James Kuszmaul09632422022-05-25 15:56:19 -0700367 states_.resize(1);
Austin Schuh8bd96322020-02-13 21:18:22 -0800368 } else {
Austin Schuh6aa77be2020-02-22 21:06:40 -0800369 if (replay_configuration) {
James Kuszmaul46d82582020-05-09 19:50:09 -0700370 CHECK_EQ(logged_configuration()->nodes()->size(),
Austin Schuh6aa77be2020-02-22 21:06:40 -0800371 replay_configuration->nodes()->size())
Austin Schuh2f8fd752020-09-01 22:38:28 -0700372 << ": Log file and replay config need to have matching nodes "
373 "lists.";
James Kuszmaul46d82582020-05-09 19:50:09 -0700374 for (const Node *node : *logged_configuration()->nodes()) {
375 if (configuration::GetNode(replay_configuration, node) == nullptr) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700376 LOG(FATAL) << "Found node " << FlatbufferToJson(node)
377 << " in logged config that is not present in the replay "
378 "config.";
James Kuszmaul46d82582020-05-09 19:50:09 -0700379 }
380 }
Austin Schuh6aa77be2020-02-22 21:06:40 -0800381 }
Austin Schuh8bd96322020-02-13 21:18:22 -0800382 states_.resize(configuration()->nodes()->size());
Austin Schuh6f3babe2020-01-26 20:34:50 -0800383 }
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600384
385 before_send_callbacks_.resize(configuration()->channels()->size());
Austin Schuhe309d2a2019-11-29 13:25:21 -0800386}
387
Austin Schuh6aa77be2020-02-22 21:06:40 -0800388LogReader::~LogReader() {
Austin Schuh39580f12020-08-01 14:44:08 -0700389 if (event_loop_factory_unique_ptr_) {
390 Deregister();
391 } else if (event_loop_factory_ != nullptr) {
392 LOG(FATAL) << "Must call Deregister before the SimulatedEventLoopFactory "
393 "is destroyed";
394 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700395 // Zero out some buffers. It's easy to do use-after-frees on these, so make
396 // it more obvious.
Austin Schuh39580f12020-08-01 14:44:08 -0700397 if (remapped_configuration_buffer_) {
398 remapped_configuration_buffer_->Wipe();
399 }
Austin Schuh8bd96322020-02-13 21:18:22 -0800400}
Austin Schuhe309d2a2019-11-29 13:25:21 -0800401
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800402const Configuration *LogReader::logged_configuration() const {
Alexei Strots1f51ac72023-05-15 10:14:54 -0700403 return log_files_.config();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800404}
405
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800406const Configuration *LogReader::configuration() const {
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800407 return remapped_configuration_;
408}
409
Austin Schuh07676622021-01-21 18:59:17 -0800410std::vector<const Node *> LogReader::LoggedNodes() const {
411 return configuration::GetNodes(logged_configuration());
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800412}
Austin Schuh15649d62019-12-28 16:36:38 -0800413
Austin Schuh11d43732020-09-21 17:28:30 -0700414monotonic_clock::time_point LogReader::monotonic_start_time(
415 const Node *node) const {
Austin Schuh8bd96322020-02-13 21:18:22 -0800416 State *state =
417 states_[configuration::GetNodeIndex(configuration(), node)].get();
418 CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
419
Austin Schuhf665eb42022-02-03 18:26:25 -0800420 return state->monotonic_start_time(state->boot_count());
Austin Schuhe309d2a2019-11-29 13:25:21 -0800421}
422
Austin Schuh11d43732020-09-21 17:28:30 -0700423realtime_clock::time_point LogReader::realtime_start_time(
424 const Node *node) const {
Austin Schuh8bd96322020-02-13 21:18:22 -0800425 State *state =
426 states_[configuration::GetNodeIndex(configuration(), node)].get();
427 CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
428
Austin Schuhf665eb42022-02-03 18:26:25 -0800429 return state->realtime_start_time(state->boot_count());
Austin Schuhe309d2a2019-11-29 13:25:21 -0800430}
431
Austin Schuh58646e22021-08-23 23:51:46 -0700432void LogReader::OnStart(std::function<void()> fn) {
433 CHECK(!configuration::MultiNode(configuration()));
434 OnStart(nullptr, std::move(fn));
435}
436
437void LogReader::OnStart(const Node *node, std::function<void()> fn) {
438 const int node_index = configuration::GetNodeIndex(configuration(), node);
439 CHECK_GE(node_index, 0);
440 CHECK_LT(node_index, static_cast<int>(states_.size()));
441 State *state = states_[node_index].get();
442 CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
443
444 state->OnStart(std::move(fn));
445}
446
James Kuszmaula16a7912022-06-17 10:58:12 -0700447void LogReader::State::QueueThreadUntil(BootTimestamp time) {
448 if (threading_ == ThreadedBuffering::kYes) {
449 CHECK(!message_queuer_.has_value()) << "Can't start thread twice.";
450 message_queuer_.emplace(
451 [this](const BootTimestamp queue_until) {
452 // This will be called whenever anything prompts us for any state
453 // change; there may be wakeups that result in us not having any new
454 // data to push (even if we aren't done), in which case we will return
455 // nullopt but not done().
456 if (last_queued_message_.has_value() &&
457 queue_until < last_queued_message_) {
458 return util::ThreadedQueue<TimestampedMessage,
459 BootTimestamp>::PushResult{
460 std::nullopt, false,
461 last_queued_message_ == BootTimestamp::max_time()};
462 }
Eric Schmiedebergb38477e2022-12-02 16:08:04 -0700463
James Kuszmaula16a7912022-06-17 10:58:12 -0700464 TimestampedMessage *message = timestamp_mapper_->Front();
465 // Upon reaching the end of the log, exit.
466 if (message == nullptr) {
467 last_queued_message_ = BootTimestamp::max_time();
468 return util::ThreadedQueue<TimestampedMessage,
469 BootTimestamp>::PushResult{std::nullopt,
470 false, true};
471 }
Eric Schmiedebergb38477e2022-12-02 16:08:04 -0700472
James Kuszmaula16a7912022-06-17 10:58:12 -0700473 last_queued_message_ = message->monotonic_event_time;
474 const util::ThreadedQueue<TimestampedMessage,
475 BootTimestamp>::PushResult result{
476 *message, queue_until >= last_queued_message_, false};
477 timestamp_mapper_->PopFront();
478 SeedSortedMessages();
479 return result;
480 },
481 time);
482 // Spin until the first few seconds of messages are queued up so that we
483 // don't end up with delays/inconsistent timing during the first few seconds
484 // of replay.
485 message_queuer_->WaitForNoMoreWork();
486 }
487}
488
Austin Schuh58646e22021-08-23 23:51:46 -0700489void LogReader::State::OnStart(std::function<void()> fn) {
490 on_starts_.emplace_back(std::move(fn));
491}
492
493void LogReader::State::RunOnStart() {
494 SetRealtimeOffset(monotonic_start_time(boot_count()),
495 realtime_start_time(boot_count()));
496
Alexei Strots036d84e2023-05-03 16:05:12 -0700497 VLOG(1) << "Starting for node '" << MaybeNodeName(node()) << "' at time "
Austin Schuh58646e22021-08-23 23:51:46 -0700498 << monotonic_start_time(boot_count());
Austin Schuhe33c08d2022-02-03 18:15:21 -0800499 auto fn = [this]() {
500 for (size_t i = 0; i < on_starts_.size(); ++i) {
501 on_starts_[i]();
502 }
503 };
504 if (event_loop_factory_) {
505 event_loop_factory_->AllowApplicationCreationDuring(std::move(fn));
506 } else {
507 fn();
Austin Schuh58646e22021-08-23 23:51:46 -0700508 }
509 stopped_ = false;
510 started_ = true;
511}
512
513void LogReader::OnEnd(std::function<void()> fn) {
514 CHECK(!configuration::MultiNode(configuration()));
515 OnEnd(nullptr, std::move(fn));
516}
517
518void LogReader::OnEnd(const Node *node, std::function<void()> fn) {
519 const int node_index = configuration::GetNodeIndex(configuration(), node);
520 CHECK_GE(node_index, 0);
521 CHECK_LT(node_index, static_cast<int>(states_.size()));
522 State *state = states_[node_index].get();
523 CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
524
525 state->OnEnd(std::move(fn));
526}
527
528void LogReader::State::OnEnd(std::function<void()> fn) {
529 on_ends_.emplace_back(std::move(fn));
530}
531
532void LogReader::State::RunOnEnd() {
Alexei Strots036d84e2023-05-03 16:05:12 -0700533 VLOG(1) << "Ending for node '" << MaybeNodeName(node()) << "' at time "
Austin Schuh58646e22021-08-23 23:51:46 -0700534 << monotonic_start_time(boot_count());
Austin Schuhe33c08d2022-02-03 18:15:21 -0800535 auto fn = [this]() {
536 for (size_t i = 0; i < on_ends_.size(); ++i) {
537 on_ends_[i]();
538 }
539 };
540 if (event_loop_factory_) {
541 event_loop_factory_->AllowApplicationCreationDuring(std::move(fn));
542 } else {
543 fn();
Austin Schuh58646e22021-08-23 23:51:46 -0700544 }
545
546 stopped_ = true;
Austin Schuhe33c08d2022-02-03 18:15:21 -0800547 started_ = true;
James Kuszmaula16a7912022-06-17 10:58:12 -0700548 if (message_queuer_.has_value()) {
549 message_queuer_->StopPushing();
550 }
Austin Schuh58646e22021-08-23 23:51:46 -0700551}
552
James Kuszmaul94ca5132022-07-19 09:11:08 -0700553std::vector<
554 std::pair<const aos::Channel *, NodeEventLoopFactory::ExclusiveSenders>>
555LogReader::State::NonExclusiveChannels() {
556 CHECK_NOTNULL(node_event_loop_factory_);
557 const aos::Configuration *config = node_event_loop_factory_->configuration();
558 std::vector<
559 std::pair<const aos::Channel *, NodeEventLoopFactory::ExclusiveSenders>>
560 result{// Timing reports can be sent by logged and replayed applications.
561 {aos::configuration::GetChannel(config, "/aos",
562 "aos.timing.Report", "", node_),
563 NodeEventLoopFactory::ExclusiveSenders::kNo},
564 // AOS_LOG may be used in the log and in replay.
565 {aos::configuration::GetChannel(
566 config, "/aos", "aos.logging.LogMessageFbs", "", node_),
567 NodeEventLoopFactory::ExclusiveSenders::kNo}};
568 for (const Node *const node : configuration::GetNodes(config)) {
569 if (node == nullptr) {
570 break;
571 }
572 const Channel *const old_timestamp_channel = aos::configuration::GetChannel(
573 config,
574 absl::StrCat("/aos/remote_timestamps/", node->name()->string_view()),
James Kuszmaula90f3242022-08-03 13:39:59 -0700575 "aos.message_bridge.RemoteMessage", "", node_, /*quiet=*/true);
James Kuszmaul94ca5132022-07-19 09:11:08 -0700576 // The old-style remote timestamp channel can be populated from any
577 // channel, simulated or replayed.
578 if (old_timestamp_channel != nullptr) {
579 result.push_back(std::make_pair(
580 old_timestamp_channel, NodeEventLoopFactory::ExclusiveSenders::kNo));
581 }
582 }
583 // Remove any channels that weren't found due to not existing in the
584 // config.
585 for (size_t ii = 0; ii < result.size();) {
586 if (result[ii].first == nullptr) {
587 result.erase(result.begin() + ii);
588 } else {
589 ++ii;
590 }
591 }
592 return result;
593}
594
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800595void LogReader::Register() {
596 event_loop_factory_unique_ptr_ =
Austin Schuhac0771c2020-01-07 18:36:30 -0800597 std::make_unique<SimulatedEventLoopFactory>(configuration());
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800598 Register(event_loop_factory_unique_ptr_.get());
599}
600
Austin Schuh58646e22021-08-23 23:51:46 -0700601void LogReader::RegisterWithoutStarting(
602 SimulatedEventLoopFactory *event_loop_factory) {
Austin Schuh92547522019-12-28 14:33:43 -0800603 event_loop_factory_ = event_loop_factory;
Austin Schuhe5bbd9e2020-09-21 17:29:20 -0700604 remapped_configuration_ = event_loop_factory_->configuration();
Austin Schuh0ca1fd32020-12-18 22:53:05 -0800605 filters_ =
606 std::make_unique<message_bridge::MultiNodeNoncausalOffsetEstimator>(
Austin Schuhba20ea72021-01-21 16:47:01 -0800607 event_loop_factory_->configuration(), logged_configuration(),
Alexei Strots58017402023-05-03 22:05:06 -0700608 log_files_.boots(), FLAGS_skip_order_validation,
Austin Schuhfe3fb342021-01-16 18:50:37 -0800609 chrono::duration_cast<chrono::nanoseconds>(
610 chrono::duration<double>(FLAGS_time_estimation_buffer_seconds)));
Austin Schuh92547522019-12-28 14:33:43 -0800611
Austin Schuhe639ea12021-01-25 13:00:22 -0800612 std::vector<TimestampMapper *> timestamp_mappers;
Brian Silvermand90905f2020-09-23 14:42:56 -0700613 for (const Node *node : configuration::GetNodes(configuration())) {
Alexei Strots1f51ac72023-05-15 10:14:54 -0700614 size_t node_index = configuration::GetNodeIndex(configuration(), node);
615 std::string_view node_name = MaybeNodeName(node);
Austin Schuh315b96b2020-12-11 21:21:12 -0800616
James Kuszmaula16a7912022-06-17 10:58:12 -0700617 // We don't run with threading on the buffering for simulated event loops
618 // because we haven't attempted to validate how the interactions beteen the
619 // buffering and the timestamp mapper works when running multiple nodes
620 // concurrently.
Austin Schuh287d43d2020-12-04 20:19:33 -0800621 states_[node_index] = std::make_unique<State>(
Alexei Strots1f51ac72023-05-15 10:14:54 -0700622 !log_files_.ContainsPartsForNode(node_name)
Austin Schuh287d43d2020-12-04 20:19:33 -0800623 ? nullptr
Alexei Strots1f51ac72023-05-15 10:14:54 -0700624 : std::make_unique<TimestampMapper>(node_name, log_files_),
James Kuszmaulb11a1502022-07-01 16:02:25 -0700625 filters_.get(), std::bind(&LogReader::NoticeRealtimeEnd, this), node,
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600626 State::ThreadedBuffering::kNo, MaybeMakeReplayChannelIndices(node),
627 before_send_callbacks_);
Austin Schuh8bd96322020-02-13 21:18:22 -0800628 State *state = states_[node_index].get();
Austin Schuh58646e22021-08-23 23:51:46 -0700629 state->SetNodeEventLoopFactory(
Austin Schuhe33c08d2022-02-03 18:15:21 -0800630 event_loop_factory_->GetNodeEventLoopFactory(node),
631 event_loop_factory_);
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700632
633 state->SetChannelCount(logged_configuration()->channels()->size());
Austin Schuhe639ea12021-01-25 13:00:22 -0800634 timestamp_mappers.emplace_back(state->timestamp_mapper());
Austin Schuhcde938c2020-02-02 17:30:07 -0800635 }
Austin Schuhe639ea12021-01-25 13:00:22 -0800636 filters_->SetTimestampMappers(std::move(timestamp_mappers));
637
638 // Note: this needs to be set before any times are pulled, or we won't observe
639 // the timestamps.
Austin Schuh87dd3832021-01-01 23:07:31 -0800640 event_loop_factory_->SetTimeConverter(filters_.get());
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700641
Austin Schuh287d43d2020-12-04 20:19:33 -0800642 for (const Node *node : configuration::GetNodes(configuration())) {
643 const size_t node_index =
644 configuration::GetNodeIndex(configuration(), node);
645 State *state = states_[node_index].get();
646 for (const Node *other_node : configuration::GetNodes(configuration())) {
647 const size_t other_node_index =
648 configuration::GetNodeIndex(configuration(), other_node);
649 State *other_state = states_[other_node_index].get();
650 if (other_state != state) {
651 state->AddPeer(other_state);
652 }
653 }
654 }
655
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700656 // Register after making all the State objects so we can build references
657 // between them.
658 for (const Node *node : configuration::GetNodes(configuration())) {
659 const size_t node_index =
660 configuration::GetNodeIndex(configuration(), node);
661 State *state = states_[node_index].get();
662
Austin Schuh58646e22021-08-23 23:51:46 -0700663 // If we didn't find any log files with data in them, we won't ever get a
664 // callback or be live. So skip the rest of the setup.
James Kuszmaula16a7912022-06-17 10:58:12 -0700665 if (state->SingleThreadedOldestMessageTime() == BootTimestamp::max_time()) {
Austin Schuh58646e22021-08-23 23:51:46 -0700666 continue;
667 }
Eric Schmiedebergb38477e2022-12-02 16:08:04 -0700668
Austin Schuh58646e22021-08-23 23:51:46 -0700669 ++live_nodes_;
670
671 NodeEventLoopFactory *node_factory =
672 event_loop_factory_->GetNodeEventLoopFactory(node);
673 node_factory->OnStartup([this, state, node]() {
674 RegisterDuringStartup(state->MakeEventLoop(), node);
675 });
676 node_factory->OnShutdown([this, state, node]() {
677 RegisterDuringStartup(nullptr, node);
678 state->DestroyEventLoop();
679 });
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700680 }
681
James Kuszmaul46d82582020-05-09 19:50:09 -0700682 if (live_nodes_ == 0) {
683 LOG(FATAL)
684 << "Don't have logs from any of the nodes in the replay config--are "
685 "you sure that the replay config matches the original config?";
686 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800687
Austin Schuh87dd3832021-01-01 23:07:31 -0800688 filters_->CheckGraph();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800689
Austin Schuh858c9f32020-08-31 16:56:12 -0700690 for (std::unique_ptr<State> &state : states_) {
691 state->SeedSortedMessages();
692 }
693
Austin Schuh6f3babe2020-01-26 20:34:50 -0800694 // Forwarding is tracked per channel. If it is enabled, we want to turn it
695 // off. Otherwise messages replayed will get forwarded across to the other
Austin Schuh2f8fd752020-09-01 22:38:28 -0700696 // nodes, and also replayed on the other nodes. This may not satisfy all
697 // our users, but it'll start the discussion.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800698 if (configuration::MultiNode(event_loop_factory_->configuration())) {
699 for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
700 const Channel *channel = logged_configuration()->channels()->Get(i);
701 const Node *node = configuration::GetNode(
702 configuration(), channel->source_node()->string_view());
703
Austin Schuh8bd96322020-02-13 21:18:22 -0800704 State *state =
705 states_[configuration::GetNodeIndex(configuration(), node)].get();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800706
707 const Channel *remapped_channel =
Austin Schuh58646e22021-08-23 23:51:46 -0700708 RemapChannel(state->event_loop(), node, channel);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800709
710 event_loop_factory_->DisableForwarding(remapped_channel);
711 }
Austin Schuh4c3b9702020-08-30 11:34:55 -0700712
713 // If we are replaying a log, we don't want a bunch of redundant messages
714 // from both the real message bridge and simulated message bridge.
James Kuszmaul94ca5132022-07-19 09:11:08 -0700715 event_loop_factory_->PermanentlyDisableStatistics();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800716 }
Austin Schuh891214d2021-11-11 20:35:02 -0800717
718 // Write pseudo start times out to file now that we are all setup.
719 filters_->Start(event_loop_factory_);
Austin Schuh58646e22021-08-23 23:51:46 -0700720}
721
722void LogReader::Register(SimulatedEventLoopFactory *event_loop_factory) {
723 RegisterWithoutStarting(event_loop_factory);
Austin Schuhe33c08d2022-02-03 18:15:21 -0800724 StartAfterRegister(event_loop_factory);
725}
726
727void LogReader::StartAfterRegister(
728 SimulatedEventLoopFactory *event_loop_factory) {
Austin Schuh58646e22021-08-23 23:51:46 -0700729 // We want to start the log file at the last start time of the log files
730 // from all the nodes. Compute how long each node's simulation needs to run
731 // to move time to this point.
732 distributed_clock::time_point start_time = distributed_clock::min_time;
733
734 // TODO(austin): We want an "OnStart" callback for each node rather than
735 // running until the last node.
736
737 for (std::unique_ptr<State> &state : states_) {
Alexei Strotsb8c3a702023-04-19 21:38:25 -0700738 CHECK(state);
Austin Schuh58646e22021-08-23 23:51:46 -0700739 VLOG(1) << "Start time is " << state->monotonic_start_time(0)
Alexei Strots036d84e2023-05-03 16:05:12 -0700740 << " for node '" << MaybeNodeName(state->node()) << "' now "
Austin Schuh58646e22021-08-23 23:51:46 -0700741 << state->monotonic_now();
742 if (state->monotonic_start_time(0) == monotonic_clock::min_time) {
743 continue;
744 }
745 // And start computing the start time on the distributed clock now that
746 // that works.
747 start_time = std::max(
748 start_time, state->ToDistributedClock(state->monotonic_start_time(0)));
749 }
750
751 // TODO(austin): If a node doesn't have a start time, we might not queue
752 // enough. If this happens, we'll explode with a frozen error eventually.
753
754 CHECK_GE(start_time, distributed_clock::epoch())
755 << ": Hmm, we have a node starting before the start of time. Offset "
756 "everything.";
Austin Schuh6f3babe2020-01-26 20:34:50 -0800757
Austin Schuhdda74ec2021-01-03 19:30:37 -0800758 {
Austin Schuhdda74ec2021-01-03 19:30:37 -0800759 VLOG(1) << "Running until " << start_time << " in Register";
760 event_loop_factory_->RunFor(start_time.time_since_epoch());
761 VLOG(1) << "At start time";
Austin Schuhdda74ec2021-01-03 19:30:37 -0800762 }
Austin Schuh92547522019-12-28 14:33:43 -0800763
Austin Schuh8bd96322020-02-13 21:18:22 -0800764 for (std::unique_ptr<State> &state : states_) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700765 // Make the RT clock be correct before handing it to the user.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700766 if (state->realtime_start_time(0) != realtime_clock::min_time) {
767 state->SetRealtimeOffset(state->monotonic_start_time(0),
768 state->realtime_start_time(0));
Austin Schuh2f8fd752020-09-01 22:38:28 -0700769 }
Tyler Chatowbf0609c2021-07-31 16:13:27 -0700770 VLOG(1) << "Start time is " << state->monotonic_start_time(0)
Alexei Strots036d84e2023-05-03 16:05:12 -0700771 << " for node '" << MaybeNodeName(state->event_loop()->node())
772 << "' now " << state->monotonic_now();
Austin Schuh2f8fd752020-09-01 22:38:28 -0700773 }
774
775 if (FLAGS_timestamps_to_csv) {
Austin Schuh0ca1fd32020-12-18 22:53:05 -0800776 filters_->Start(event_loop_factory);
Austin Schuh8bd96322020-02-13 21:18:22 -0800777 }
778}
779
Austin Schuh2f8fd752020-09-01 22:38:28 -0700780message_bridge::NoncausalOffsetEstimator *LogReader::GetFilter(
Austin Schuh8bd96322020-02-13 21:18:22 -0800781 const Node *node_a, const Node *node_b) {
Austin Schuh0ca1fd32020-12-18 22:53:05 -0800782 if (filters_) {
783 return filters_->GetFilter(node_a, node_b);
Austin Schuh8bd96322020-02-13 21:18:22 -0800784 }
Austin Schuh0ca1fd32020-12-18 22:53:05 -0800785 return nullptr;
Austin Schuh8bd96322020-02-13 21:18:22 -0800786}
787
James Kuszmaul09632422022-05-25 15:56:19 -0700788// TODO(jkuszmaul): Make in-line modifications to
789// ServerStatistics/ClientStatistics messages for ShmEventLoop-based replay to
790// avoid messing up anything that depends on them having valid offsets.
Austin Schuhe309d2a2019-11-29 13:25:21 -0800791void LogReader::Register(EventLoop *event_loop) {
James Kuszmaul09632422022-05-25 15:56:19 -0700792 filters_ =
793 std::make_unique<message_bridge::MultiNodeNoncausalOffsetEstimator>(
794 event_loop->configuration(), logged_configuration(),
Alexei Strots58017402023-05-03 22:05:06 -0700795 log_files_.boots(), FLAGS_skip_order_validation,
James Kuszmaul09632422022-05-25 15:56:19 -0700796 chrono::duration_cast<chrono::nanoseconds>(
797 chrono::duration<double>(FLAGS_time_estimation_buffer_seconds)));
798
799 std::vector<TimestampMapper *> timestamp_mappers;
800 for (const Node *node : configuration::GetNodes(configuration())) {
Alexei Strots1f51ac72023-05-15 10:14:54 -0700801 auto node_name = MaybeNodeName(node);
James Kuszmaul09632422022-05-25 15:56:19 -0700802 const size_t node_index =
803 configuration::GetNodeIndex(configuration(), node);
James Kuszmaul09632422022-05-25 15:56:19 -0700804
805 states_[node_index] = std::make_unique<State>(
Alexei Strots1f51ac72023-05-15 10:14:54 -0700806 !log_files_.ContainsPartsForNode(node_name)
James Kuszmaul09632422022-05-25 15:56:19 -0700807 ? nullptr
Alexei Strots1f51ac72023-05-15 10:14:54 -0700808 : std::make_unique<TimestampMapper>(node_name, log_files_),
James Kuszmaulb11a1502022-07-01 16:02:25 -0700809 filters_.get(), std::bind(&LogReader::NoticeRealtimeEnd, this), node,
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600810 State::ThreadedBuffering::kYes, MaybeMakeReplayChannelIndices(node),
811 before_send_callbacks_);
James Kuszmaul09632422022-05-25 15:56:19 -0700812 State *state = states_[node_index].get();
813
814 state->SetChannelCount(logged_configuration()->channels()->size());
815 timestamp_mappers.emplace_back(state->timestamp_mapper());
816 }
817
818 filters_->SetTimestampMappers(std::move(timestamp_mappers));
819
820 for (const Node *node : configuration::GetNodes(configuration())) {
821 const size_t node_index =
822 configuration::GetNodeIndex(configuration(), node);
823 State *state = states_[node_index].get();
824 for (const Node *other_node : configuration::GetNodes(configuration())) {
825 const size_t other_node_index =
826 configuration::GetNodeIndex(configuration(), other_node);
827 State *other_state = states_[other_node_index].get();
828 if (other_state != state) {
829 state->AddPeer(other_state);
830 }
831 }
832 }
833 for (const Node *node : configuration::GetNodes(configuration())) {
834 if (node == nullptr || node->name()->string_view() ==
835 event_loop->node()->name()->string_view()) {
836 Register(event_loop, event_loop->node());
837 } else {
838 Register(nullptr, node);
839 }
840 }
Austin Schuh58646e22021-08-23 23:51:46 -0700841}
842
843void LogReader::Register(EventLoop *event_loop, const Node *node) {
Austin Schuh8bd96322020-02-13 21:18:22 -0800844 State *state =
Austin Schuh58646e22021-08-23 23:51:46 -0700845 states_[configuration::GetNodeIndex(configuration(), node)].get();
846
847 // If we didn't find any log files with data in them, we won't ever get a
848 // callback or be live. So skip the rest of the setup.
James Kuszmaula16a7912022-06-17 10:58:12 -0700849 if (state->SingleThreadedOldestMessageTime() == BootTimestamp::max_time()) {
Austin Schuh58646e22021-08-23 23:51:46 -0700850 return;
851 }
James Kuszmaul09632422022-05-25 15:56:19 -0700852
853 if (event_loop != nullptr) {
854 ++live_nodes_;
855 }
Austin Schuh58646e22021-08-23 23:51:46 -0700856
857 if (event_loop_factory_ != nullptr) {
858 event_loop_factory_->GetNodeEventLoopFactory(node)->OnStartup(
859 [this, event_loop, node]() {
860 RegisterDuringStartup(event_loop, node);
861 });
862 } else {
863 RegisterDuringStartup(event_loop, node);
864 }
865}
866
867void LogReader::RegisterDuringStartup(EventLoop *event_loop, const Node *node) {
James Kuszmaul09632422022-05-25 15:56:19 -0700868 if (event_loop != nullptr) {
Austin Schuh58646e22021-08-23 23:51:46 -0700869 CHECK(event_loop->configuration() == configuration());
870 }
871
872 State *state =
873 states_[configuration::GetNodeIndex(configuration(), node)].get();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800874
James Kuszmaul09632422022-05-25 15:56:19 -0700875 if (event_loop == nullptr) {
Austin Schuhe33c08d2022-02-03 18:15:21 -0800876 state->ClearTimeFlags();
877 }
878
Austin Schuh858c9f32020-08-31 16:56:12 -0700879 state->set_event_loop(event_loop);
Austin Schuhe309d2a2019-11-29 13:25:21 -0800880
Tyler Chatow67ddb032020-01-12 14:30:04 -0800881 // We don't run timing reports when trying to print out logged data, because
882 // otherwise we would end up printing out the timing reports themselves...
883 // This is only really relevant when we are replaying into a simulation.
James Kuszmaul09632422022-05-25 15:56:19 -0700884 if (event_loop != nullptr) {
Austin Schuh58646e22021-08-23 23:51:46 -0700885 event_loop->SkipTimingReport();
886 event_loop->SkipAosLog();
887 }
Austin Schuh39788ff2019-12-01 18:22:57 -0800888
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700889 for (size_t logged_channel_index = 0;
890 logged_channel_index < logged_configuration()->channels()->size();
891 ++logged_channel_index) {
892 const Channel *channel = RemapChannel(
Austin Schuh58646e22021-08-23 23:51:46 -0700893 event_loop, node,
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700894 logged_configuration()->channels()->Get(logged_channel_index));
Austin Schuh8bd96322020-02-13 21:18:22 -0800895
Austin Schuhc0b6c4f2021-10-11 18:28:38 -0700896 const bool logged = channel->logger() != LoggerConfig::NOT_LOGGED;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700897 message_bridge::NoncausalOffsetEstimator *filter = nullptr;
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700898
899 State *source_state = nullptr;
James Kuszmaul09632422022-05-25 15:56:19 -0700900
Austin Schuh58646e22021-08-23 23:51:46 -0700901 if (!configuration::ChannelIsSendableOnNode(channel, node) &&
902 configuration::ChannelIsReadableOnNode(channel, node)) {
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700903 const Node *source_node = configuration::GetNode(
Austin Schuh58646e22021-08-23 23:51:46 -0700904 configuration(), channel->source_node()->string_view());
Austin Schuh8bd96322020-02-13 21:18:22 -0800905
Austin Schuh58646e22021-08-23 23:51:46 -0700906 // We've got a message which is being forwarded to this node.
907 filter = GetFilter(node, source_node);
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700908
909 source_state =
910 states_[configuration::GetNodeIndex(configuration(), source_node)]
911 .get();
Austin Schuh8bd96322020-02-13 21:18:22 -0800912 }
Austin Schuh858c9f32020-08-31 16:56:12 -0700913
Austin Schuh58646e22021-08-23 23:51:46 -0700914 // We are the source, and it is forwarded.
915 const bool is_forwarded =
916 configuration::ChannelIsSendableOnNode(channel, node) &&
917 configuration::ConnectionCount(channel);
918
Austin Schuhc0b6c4f2021-10-11 18:28:38 -0700919 state->SetChannel(
920 logged_channel_index,
921 configuration::ChannelIndex(configuration(), channel),
James Kuszmaul09632422022-05-25 15:56:19 -0700922 event_loop && logged &&
923 configuration::ChannelIsReadableOnNode(channel, node)
924 ? event_loop->MakeRawSender(channel)
925 : nullptr,
Austin Schuhc0b6c4f2021-10-11 18:28:38 -0700926 filter, is_forwarded, source_state);
Austin Schuh58646e22021-08-23 23:51:46 -0700927
Austin Schuhc0b6c4f2021-10-11 18:28:38 -0700928 if (is_forwarded && logged) {
Austin Schuh58646e22021-08-23 23:51:46 -0700929 const Node *source_node = configuration::GetNode(
930 configuration(), channel->source_node()->string_view());
931
932 for (const Connection *connection : *channel->destination_nodes()) {
933 const bool delivery_time_is_logged =
934 configuration::ConnectionDeliveryTimeIsLoggedOnNode(connection,
935 source_node);
936
937 if (delivery_time_is_logged) {
938 State *destination_state =
939 states_[configuration::GetNodeIndex(
940 configuration(), connection->name()->string_view())]
941 .get();
James Kuszmaul09632422022-05-25 15:56:19 -0700942 if (destination_state) {
943 destination_state->SetRemoteTimestampSender(
944 logged_channel_index,
945 event_loop ? state->RemoteTimestampSender(channel, connection)
946 : nullptr);
947 }
Austin Schuh58646e22021-08-23 23:51:46 -0700948 }
949 }
950 }
Austin Schuhe309d2a2019-11-29 13:25:21 -0800951 }
952
Austin Schuh58646e22021-08-23 23:51:46 -0700953 if (!event_loop) {
954 state->ClearRemoteTimestampSenders();
955 state->set_timer_handler(nullptr);
956 state->set_startup_timer(nullptr);
Austin Schuh6aa77be2020-02-22 21:06:40 -0800957 return;
958 }
959
Austin Schuh858c9f32020-08-31 16:56:12 -0700960 state->set_timer_handler(event_loop->AddTimer([this, state]() {
James Kuszmaula16a7912022-06-17 10:58:12 -0700961 if (state->MultiThreadedOldestMessageTime() == BootTimestamp::max_time()) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800962 --live_nodes_;
Alexei Strots036d84e2023-05-03 16:05:12 -0700963 VLOG(1) << "Node '" << MaybeNodeName(state->event_loop()->node())
964 << "' down!";
James Kuszmaula16a7912022-06-17 10:58:12 -0700965 if (exit_on_finish_ && live_nodes_ == 0 &&
966 event_loop_factory_ != nullptr) {
James Kuszmaulb11a1502022-07-01 16:02:25 -0700967 event_loop_factory_->Exit();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800968 }
James Kuszmaul314f1672020-01-03 20:02:08 -0800969 return;
970 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700971
Austin Schuhdda74ec2021-01-03 19:30:37 -0800972 TimestampedMessage timestamped_message = state->PopOldest();
Austin Schuh58646e22021-08-23 23:51:46 -0700973
974 CHECK_EQ(timestamped_message.monotonic_event_time.boot,
975 state->boot_count());
Austin Schuh05b70472020-01-01 17:11:17 -0800976
Austin Schuhe309d2a2019-11-29 13:25:21 -0800977 const monotonic_clock::time_point monotonic_now =
Austin Schuh858c9f32020-08-31 16:56:12 -0700978 state->event_loop()->context().monotonic_event_time;
James Kuszmaul09632422022-05-25 15:56:19 -0700979 if (event_loop_factory_ != nullptr) {
980 // Only enforce exact timing in simulation.
981 if (!FLAGS_skip_order_validation) {
982 CHECK(monotonic_now == timestamped_message.monotonic_event_time.time)
983 << ": " << FlatbufferToJson(state->event_loop()->node()) << " Now "
984 << monotonic_now << " trying to send "
985 << timestamped_message.monotonic_event_time << " failure "
986 << state->DebugString();
987 } else if (BootTimestamp{.boot = state->boot_count(),
988 .time = monotonic_now} !=
989 timestamped_message.monotonic_event_time) {
990 LOG(WARNING) << "Check failed: monotonic_now == "
991 "timestamped_message.monotonic_event_time) ("
992 << monotonic_now << " vs. "
993 << timestamped_message.monotonic_event_time
994 << "): " << FlatbufferToJson(state->event_loop()->node())
995 << " Now " << monotonic_now << " trying to send "
996 << timestamped_message.monotonic_event_time << " failure "
997 << state->DebugString();
998 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700999 }
Austin Schuhe309d2a2019-11-29 13:25:21 -08001000
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001001 if (timestamped_message.monotonic_event_time.time >
1002 state->monotonic_start_time(
1003 timestamped_message.monotonic_event_time.boot) ||
James Kuszmaul09632422022-05-25 15:56:19 -07001004 event_loop_factory_ != nullptr ||
1005 !FLAGS_drop_realtime_messages_before_start) {
Austin Schuhbd5f74a2021-11-11 20:55:38 -08001006 if (timestamped_message.data != nullptr && !state->found_last_message()) {
Austin Schuhdda74ec2021-01-03 19:30:37 -08001007 if (timestamped_message.monotonic_remote_time !=
James Kuszmaul09632422022-05-25 15:56:19 -07001008 BootTimestamp::min_time() &&
1009 !FLAGS_skip_order_validation && event_loop_factory_ != nullptr) {
Austin Schuh8bd96322020-02-13 21:18:22 -08001010 // Confirm that the message was sent on the sending node before the
1011 // destination node (this node). As a proxy, do this by making sure
1012 // that time on the source node is past when the message was sent.
Austin Schuh87dd3832021-01-01 23:07:31 -08001013 //
1014 // TODO(austin): <= means that the cause message (which we know) could
1015 // happen after the effect even though we know they are at the same
1016 // time. I doubt anyone will notice for a bit, but we should really
1017 // fix that.
Austin Schuh58646e22021-08-23 23:51:46 -07001018 BootTimestamp monotonic_remote_now =
1019 state->monotonic_remote_now(timestamped_message.channel_index);
Austin Schuh2f8fd752020-09-01 22:38:28 -07001020 if (!FLAGS_skip_order_validation) {
Austin Schuh58646e22021-08-23 23:51:46 -07001021 CHECK_EQ(timestamped_message.monotonic_remote_time.boot,
Austin Schuh3e20c692021-11-16 20:43:16 -08001022 monotonic_remote_now.boot)
1023 << state->event_loop()->node()->name()->string_view() << " to "
1024 << state->remote_node(timestamped_message.channel_index)
1025 ->name()
1026 ->string_view()
1027 << " while trying to send a message on "
1028 << configuration::CleanedChannelToString(
1029 logged_configuration()->channels()->Get(
1030 timestamped_message.channel_index))
1031 << " " << timestamped_message << " " << state->DebugString();
Austin Schuh58646e22021-08-23 23:51:46 -07001032 CHECK_LE(timestamped_message.monotonic_remote_time,
1033 monotonic_remote_now)
Austin Schuh2f8fd752020-09-01 22:38:28 -07001034 << state->event_loop()->node()->name()->string_view() << " to "
Austin Schuh287d43d2020-12-04 20:19:33 -08001035 << state->remote_node(timestamped_message.channel_index)
1036 ->name()
1037 ->string_view()
Austin Schuh315b96b2020-12-11 21:21:12 -08001038 << " while trying to send a message on "
1039 << configuration::CleanedChannelToString(
1040 logged_configuration()->channels()->Get(
1041 timestamped_message.channel_index))
Austin Schuh2f8fd752020-09-01 22:38:28 -07001042 << " " << state->DebugString();
Austin Schuh58646e22021-08-23 23:51:46 -07001043 } else if (monotonic_remote_now.boot !=
1044 timestamped_message.monotonic_remote_time.boot) {
1045 LOG(WARNING) << "Missmatched boots, " << monotonic_remote_now.boot
1046 << " vs "
1047 << timestamped_message.monotonic_remote_time.boot;
1048 } else if (timestamped_message.monotonic_remote_time >
1049 monotonic_remote_now) {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001050 LOG(WARNING)
Austin Schuh287d43d2020-12-04 20:19:33 -08001051 << "Check failed: timestamped_message.monotonic_remote_time < "
1052 "state->monotonic_remote_now(timestamped_message.channel_"
1053 "index) ("
1054 << timestamped_message.monotonic_remote_time << " vs. "
1055 << state->monotonic_remote_now(
1056 timestamped_message.channel_index)
1057 << ") " << state->event_loop()->node()->name()->string_view()
1058 << " to "
1059 << state->remote_node(timestamped_message.channel_index)
1060 ->name()
1061 ->string_view()
1062 << " currently " << timestamped_message.monotonic_event_time
Austin Schuh2f8fd752020-09-01 22:38:28 -07001063 << " ("
1064 << state->ToDistributedClock(
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001065 timestamped_message.monotonic_event_time.time)
Austin Schuh2f8fd752020-09-01 22:38:28 -07001066 << ") remote event time "
Austin Schuh287d43d2020-12-04 20:19:33 -08001067 << timestamped_message.monotonic_remote_time << " ("
Austin Schuh2f8fd752020-09-01 22:38:28 -07001068 << state->RemoteToDistributedClock(
Austin Schuh287d43d2020-12-04 20:19:33 -08001069 timestamped_message.channel_index,
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001070 timestamped_message.monotonic_remote_time.time)
Austin Schuh2f8fd752020-09-01 22:38:28 -07001071 << ") " << state->DebugString();
1072 }
Austin Schuh8bd96322020-02-13 21:18:22 -08001073 }
1074
Austin Schuh15649d62019-12-28 16:36:38 -08001075 // If we have access to the factory, use it to fix the realtime time.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001076 state->SetRealtimeOffset(timestamped_message.monotonic_event_time.time,
Austin Schuh287d43d2020-12-04 20:19:33 -08001077 timestamped_message.realtime_event_time);
Austin Schuh15649d62019-12-28 16:36:38 -08001078
Alexei Strots036d84e2023-05-03 16:05:12 -07001079 VLOG(1) << "For node '" << MaybeNodeName(state->event_loop()->node())
1080 << "' sending at " << timestamped_message.monotonic_event_time
1081 << " : " << state->DebugString();
Austin Schuh2f8fd752020-09-01 22:38:28 -07001082 // TODO(austin): std::move channel_data in and make that efficient in
1083 // simulation.
Austin Schuh287d43d2020-12-04 20:19:33 -08001084 state->Send(std::move(timestamped_message));
Austin Schuhbd5f74a2021-11-11 20:55:38 -08001085 } else if (state->found_last_message() ||
1086 (!ignore_missing_data_ &&
1087 // When starting up, we can have data which was sent before
1088 // the log starts, but the timestamp was after the log
1089 // starts. This is unreasonable to avoid, so ignore the
1090 // missing data.
1091 timestamped_message.monotonic_remote_time.time >=
1092 state->monotonic_remote_start_time(
1093 timestamped_message.monotonic_remote_time.boot,
1094 timestamped_message.channel_index) &&
1095 !FLAGS_skip_missing_forwarding_entries)) {
1096 if (!state->found_last_message()) {
1097 // We've found a timestamp without data that we expect to have data
1098 // for. This likely means that we are at the end of the log file.
1099 // Record it and CHECK that in the rest of the log file, we don't find
1100 // any more data on that channel. Not all channels will end at the
1101 // same point in time since they can be in different files.
1102 VLOG(1) << "Found the last message on channel "
1103 << timestamped_message.channel_index << ", "
1104 << configuration::CleanedChannelToString(
1105 logged_configuration()->channels()->Get(
1106 timestamped_message.channel_index))
Alexei Strots036d84e2023-05-03 16:05:12 -07001107 << " on node '" << MaybeNodeName(state->event_loop()->node())
1108 << "' at " << timestamped_message;
Austin Schuhdda74ec2021-01-03 19:30:37 -08001109
Austin Schuhbd5f74a2021-11-11 20:55:38 -08001110 // The user might be working with log files from 1 node but forgot to
1111 // configure the infrastructure to log data for a remote channel on
1112 // that node. That can be very hard to debug, even though the log
1113 // reader is doing the right thing. At least log a warning in that
1114 // case and tell the user what is happening so they can either update
1115 // their config to log the channel or can find a log with the data.
Austin Schuh2bb80e02021-03-20 21:46:17 -07001116 const std::vector<std::string> logger_nodes =
Alexei Strots1f51ac72023-05-15 10:14:54 -07001117 log_files_.logger_nodes();
1118 if (!logger_nodes.empty()) {
Austin Schuh2bb80e02021-03-20 21:46:17 -07001119 // We have old logs which don't have the logger nodes logged. In
1120 // that case, we can't be helpful :(
1121 bool data_logged = false;
1122 const Channel *channel = logged_configuration()->channels()->Get(
1123 timestamped_message.channel_index);
1124 for (const std::string &node : logger_nodes) {
1125 data_logged |=
1126 configuration::ChannelMessageIsLoggedOnNode(channel, node);
1127 }
1128 if (!data_logged) {
1129 LOG(WARNING) << "Got a timestamp without any logfiles which "
1130 "could contain data for channel "
1131 << configuration::CleanedChannelToString(channel);
1132 LOG(WARNING) << "Only have logs logged on ["
1133 << absl::StrJoin(logger_nodes, ", ") << "]";
1134 LOG(WARNING)
1135 << "Dropping the rest of the data on "
1136 << state->event_loop()->node()->name()->string_view();
1137 LOG(WARNING)
1138 << "Consider using --skip_missing_forwarding_entries to "
1139 "bypass this, update your config to log it, or add data "
1140 "from one of the nodes it is logged on.";
1141 }
1142 }
Austin Schuhbd5f74a2021-11-11 20:55:38 -08001143 // Now that we found the end of one channel, artificially stop the
1144 // rest by setting the found_last_message bit. It is confusing when
1145 // part of your data gets replayed but not all. The rest of them will
1146 // get dropped as they are replayed to keep memory usage down.
1147 state->SetFoundLastMessage(true);
1148
1149 // Vector storing if we've seen a nullptr message or not per channel.
1150 state->set_last_message(timestamped_message.channel_index);
Austin Schuh2bb80e02021-03-20 21:46:17 -07001151 }
1152
Austin Schuhbd5f74a2021-11-11 20:55:38 -08001153 // Make sure that once we have seen the last message on a channel,
1154 // data doesn't start back up again. If the user wants to play
1155 // through events like this, they can set
1156 // --skip_missing_forwarding_entries or ignore_missing_data_.
1157 if (timestamped_message.data == nullptr) {
1158 state->set_last_message(timestamped_message.channel_index);
1159 } else {
1160 if (state->last_message(timestamped_message.channel_index)) {
1161 LOG(FATAL) << "Found missing data in the middle of the log file on "
1162 "channel "
1163 << timestamped_message.channel_index << " "
1164 << configuration::StrippedChannelToString(
1165 logged_configuration()->channels()->Get(
1166 timestamped_message.channel_index))
1167 << " " << timestamped_message << " "
1168 << state->DebugString();
Austin Schuhdda74ec2021-01-03 19:30:37 -08001169 }
Austin Schuh8bd96322020-02-13 21:18:22 -08001170 }
Austin Schuh92547522019-12-28 14:33:43 -08001171 }
Austin Schuhe309d2a2019-11-29 13:25:21 -08001172 } else {
James Kuszmaul09632422022-05-25 15:56:19 -07001173 LOG(WARNING)
1174 << "Not sending data from before the start of the log file. "
1175 << timestamped_message.monotonic_event_time.time.time_since_epoch()
1176 .count()
1177 << " start "
1178 << monotonic_start_time(state->node()).time_since_epoch().count()
1179 << " timestamped_message.data is null";
Austin Schuhe309d2a2019-11-29 13:25:21 -08001180 }
1181
James Kuszmaula16a7912022-06-17 10:58:12 -07001182 const BootTimestamp next_time = state->MultiThreadedOldestMessageTime();
Austin Schuh58646e22021-08-23 23:51:46 -07001183 if (next_time != BootTimestamp::max_time()) {
1184 if (next_time.boot != state->boot_count()) {
Alexei Strots036d84e2023-05-03 16:05:12 -07001185 VLOG(1) << "Next message for node '"
Austin Schuh58646e22021-08-23 23:51:46 -07001186 << MaybeNodeName(state->event_loop()->node())
Alexei Strots036d84e2023-05-03 16:05:12 -07001187 << "' is on the next boot, " << next_time << " now is "
Austin Schuh58646e22021-08-23 23:51:46 -07001188 << state->monotonic_now();
1189 CHECK(event_loop_factory_);
Austin Schuhe33c08d2022-02-03 18:15:21 -08001190 state->NotifyLogfileEnd();
Austin Schuh58646e22021-08-23 23:51:46 -07001191 return;
1192 }
James Kuszmaul09632422022-05-25 15:56:19 -07001193 if (event_loop_factory_ != nullptr) {
Alexei Strots036d84e2023-05-03 16:05:12 -07001194 VLOG(1) << "Scheduling for node '"
1195 << MaybeNodeName(state->event_loop()->node()) << "' wakeup for "
1196 << next_time.time << "("
James Kuszmaul09632422022-05-25 15:56:19 -07001197 << state->ToDistributedClock(next_time.time)
1198 << " distributed), now is " << state->monotonic_now();
1199 } else {
Alexei Strots036d84e2023-05-03 16:05:12 -07001200 VLOG(1) << "Scheduling for node '"
1201 << MaybeNodeName(state->event_loop()->node()) << "' wakeup for "
1202 << next_time.time << ", now is " << state->monotonic_now();
James Kuszmaul09632422022-05-25 15:56:19 -07001203 }
James Kuszmaula16a7912022-06-17 10:58:12 -07001204 // TODO(james): This can result in negative times getting passed-through
1205 // in realtime replay.
Philipp Schradera6712522023-07-05 20:25:11 -07001206 state->Schedule(next_time.time);
James Kuszmaul314f1672020-01-03 20:02:08 -08001207 } else {
Alexei Strots036d84e2023-05-03 16:05:12 -07001208 VLOG(1) << "Node '" << MaybeNodeName(state->event_loop()->node())
1209 << "': No next message, scheduling shutdown";
Austin Schuhe33c08d2022-02-03 18:15:21 -08001210 state->NotifyLogfileEnd();
Austin Schuh2f8fd752020-09-01 22:38:28 -07001211 // Set a timer up immediately after now to die. If we don't do this,
James Kuszmaul09632422022-05-25 15:56:19 -07001212 // then the watchers waiting on the message we just read will never get
Austin Schuh2f8fd752020-09-01 22:38:28 -07001213 // called.
James Kuszmaul09632422022-05-25 15:56:19 -07001214 // Doesn't apply to single-EventLoop replay since the watchers in question
1215 // are not under our control.
Austin Schuheecb9282020-01-08 17:43:30 -08001216 if (event_loop_factory_ != nullptr) {
Philipp Schradera6712522023-07-05 20:25:11 -07001217 state->Schedule(monotonic_now + event_loop_factory_->send_delay() +
1218 std::chrono::nanoseconds(1));
Austin Schuheecb9282020-01-08 17:43:30 -08001219 }
Austin Schuhe309d2a2019-11-29 13:25:21 -08001220 }
Austin Schuh8bd96322020-02-13 21:18:22 -08001221
Alexei Strots036d84e2023-05-03 16:05:12 -07001222 VLOG(1) << "Node '" << MaybeNodeName(state->event_loop()->node())
1223 << "': Done sending at "
Austin Schuh2f8fd752020-09-01 22:38:28 -07001224 << state->event_loop()->context().monotonic_event_time << " now "
1225 << state->monotonic_now();
Austin Schuh858c9f32020-08-31 16:56:12 -07001226 }));
Austin Schuhe309d2a2019-11-29 13:25:21 -08001227
James Kuszmaula16a7912022-06-17 10:58:12 -07001228 state->SeedSortedMessages();
1229
1230 if (state->SingleThreadedOldestMessageTime() != BootTimestamp::max_time()) {
Austin Schuh58646e22021-08-23 23:51:46 -07001231 state->set_startup_timer(
Austin Schuhe33c08d2022-02-03 18:15:21 -08001232 event_loop->AddTimer([state]() { state->NotifyLogfileStart(); }));
1233 if (start_time_ != realtime_clock::min_time) {
1234 state->SetStartTimeFlag(start_time_);
1235 }
1236 if (end_time_ != realtime_clock::max_time) {
1237 state->SetEndTimeFlag(end_time_);
James Kuszmaulb11a1502022-07-01 16:02:25 -07001238 ++live_nodes_with_realtime_time_end_;
Austin Schuhe33c08d2022-02-03 18:15:21 -08001239 }
Austin Schuh58646e22021-08-23 23:51:46 -07001240 event_loop->OnRun([state]() {
James Kuszmaula16a7912022-06-17 10:58:12 -07001241 BootTimestamp next_time = state->SingleThreadedOldestMessageTime();
Austin Schuh58646e22021-08-23 23:51:46 -07001242 CHECK_EQ(next_time.boot, state->boot_count());
James Kuszmaula16a7912022-06-17 10:58:12 -07001243 // Queue up messages and then set clock offsets (we don't want to set
1244 // clock offsets before we've done the work of getting the first messages
1245 // primed).
1246 state->QueueThreadUntil(
1247 next_time + std::chrono::duration_cast<std::chrono::nanoseconds>(
1248 std::chrono::duration<double>(
1249 FLAGS_threaded_look_ahead_seconds)));
James Kuszmaulc3f34d12022-08-15 15:57:55 -07001250 state->MaybeSetClockOffset();
Philipp Schradera6712522023-07-05 20:25:11 -07001251 state->Schedule(next_time.time);
1252 state->SetUpStartupTimer();
Austin Schuh58646e22021-08-23 23:51:46 -07001253 });
Austin Schuhe309d2a2019-11-29 13:25:21 -08001254 }
1255}
1256
Austin Schuhe33c08d2022-02-03 18:15:21 -08001257void LogReader::SetEndTime(std::string end_time) {
1258 if (end_time.empty()) {
1259 SetEndTime(realtime_clock::max_time);
1260 } else {
1261 std::optional<aos::realtime_clock::time_point> parsed_end_time =
1262 aos::realtime_clock::FromString(end_time);
1263 CHECK(parsed_end_time) << ": Failed to parse end time '" << end_time
1264 << "'. Expected a date in the format of "
1265 "2021-01-15_15-30-35.000000000.";
1266 SetEndTime(*parsed_end_time);
1267 }
1268}
1269
1270void LogReader::SetEndTime(realtime_clock::time_point end_time) {
1271 end_time_ = end_time;
1272}
1273
1274void LogReader::SetStartTime(std::string start_time) {
1275 if (start_time.empty()) {
1276 SetStartTime(realtime_clock::min_time);
1277 } else {
1278 std::optional<aos::realtime_clock::time_point> parsed_start_time =
1279 aos::realtime_clock::FromString(start_time);
1280 CHECK(parsed_start_time) << ": Failed to parse start time '" << start_time
1281 << "'. Expected a date in the format of "
1282 "2021-01-15_15-30-35.000000000.";
1283 SetStartTime(*parsed_start_time);
1284 }
1285}
1286
1287void LogReader::SetStartTime(realtime_clock::time_point start_time) {
1288 start_time_ = start_time;
1289}
1290
Austin Schuhe309d2a2019-11-29 13:25:21 -08001291void LogReader::Deregister() {
James Kuszmaul84ff3e52020-01-03 19:48:53 -08001292 // Make sure that things get destroyed in the correct order, rather than
1293 // relying on getting the order correct in the class definition.
Austin Schuh8bd96322020-02-13 21:18:22 -08001294 for (std::unique_ptr<State> &state : states_) {
Austin Schuh858c9f32020-08-31 16:56:12 -07001295 state->Deregister();
Austin Schuhe309d2a2019-11-29 13:25:21 -08001296 }
Austin Schuh92547522019-12-28 14:33:43 -08001297
James Kuszmaul84ff3e52020-01-03 19:48:53 -08001298 event_loop_factory_unique_ptr_.reset();
1299 event_loop_factory_ = nullptr;
Austin Schuhe309d2a2019-11-29 13:25:21 -08001300}
1301
James Kuszmaul53da7f32022-09-11 11:11:55 -07001302namespace {
1303// Checks if the specified channel name/type exists in the config and, depending
1304// on the value of conflict_handling, calls conflict_handler or just dies.
1305template <typename F>
1306void CheckAndHandleRemapConflict(std::string_view new_name,
1307 std::string_view new_type,
1308 const Configuration *config,
1309 LogReader::RemapConflict conflict_handling,
1310 F conflict_handler) {
1311 const Channel *existing_channel =
1312 configuration::GetChannel(config, new_name, new_type, "", nullptr, true);
1313 if (existing_channel != nullptr) {
1314 switch (conflict_handling) {
1315 case LogReader::RemapConflict::kDisallow:
1316 LOG(FATAL)
1317 << "Channel "
1318 << configuration::StrippedChannelToString(existing_channel)
1319 << " is already used--you can't remap a logged channel to it.";
1320 break;
1321 case LogReader::RemapConflict::kCascade:
1322 LOG(INFO) << "Automatically remapping "
1323 << configuration::StrippedChannelToString(existing_channel)
1324 << " to avoid conflicts.";
1325 conflict_handler();
1326 break;
1327 }
1328 }
1329}
1330} // namespace
1331
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001332void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
Austin Schuh0de30f32020-12-06 12:44:28 -08001333 std::string_view add_prefix,
James Kuszmaul53da7f32022-09-11 11:11:55 -07001334 std::string_view new_type,
1335 RemapConflict conflict_handling) {
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001336 RemapLoggedChannel(name, type, nullptr, add_prefix, new_type,
1337 conflict_handling);
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001338}
1339
Austin Schuh01b4c352020-09-21 23:09:39 -07001340void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
1341 const Node *node,
Austin Schuh0de30f32020-12-06 12:44:28 -08001342 std::string_view add_prefix,
James Kuszmaul53da7f32022-09-11 11:11:55 -07001343 std::string_view new_type,
1344 RemapConflict conflict_handling) {
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001345 if (node != nullptr) {
1346 VLOG(1) << "Node is " << aos::FlatbufferToJson(node);
1347 }
1348 if (replay_channels_ != nullptr) {
1349 CHECK(std::find(replay_channels_->begin(), replay_channels_->end(),
1350 std::make_pair(std::string{name}, std::string{type})) !=
1351 replay_channels_->end())
1352 << "Attempted to remap channel " << name << " " << type
1353 << " which is not included in the replay channels passed to LogReader.";
1354 }
Austin Schuh01b4c352020-09-21 23:09:39 -07001355 const Channel *remapped_channel =
1356 configuration::GetChannel(logged_configuration(), name, type, "", node);
1357 CHECK(remapped_channel != nullptr) << ": Failed to find {\"name\": \"" << name
1358 << "\", \"type\": \"" << type << "\"}";
1359 VLOG(1) << "Original {\"name\": \"" << name << "\", \"type\": \"" << type
1360 << "\"}";
1361 VLOG(1) << "Remapped "
1362 << aos::configuration::StrippedChannelToString(remapped_channel);
1363
1364 // We want to make /spray on node 0 go to /0/spray by snooping the maps. And
1365 // we want it to degrade if the heuristics fail to just work.
1366 //
1367 // The easiest way to do this is going to be incredibly specific and verbose.
1368 // Look up /spray, to /0/spray. Then, prefix the result with /original to get
1369 // /original/0/spray. Then, create a map from /original/spray to
1370 // /original/0/spray for just the type we were asked for.
1371 if (name != remapped_channel->name()->string_view()) {
1372 MapT new_map;
1373 new_map.match = std::make_unique<ChannelT>();
1374 new_map.match->name = absl::StrCat(add_prefix, name);
1375 new_map.match->type = type;
1376 if (node != nullptr) {
1377 new_map.match->source_node = node->name()->str();
1378 }
1379 new_map.rename = std::make_unique<ChannelT>();
1380 new_map.rename->name =
1381 absl::StrCat(add_prefix, remapped_channel->name()->string_view());
1382 maps_.emplace_back(std::move(new_map));
1383 }
1384
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001385 // Then remap the logged channel to the prefixed channel.
Austin Schuh01b4c352020-09-21 23:09:39 -07001386 const size_t channel_index =
1387 configuration::ChannelIndex(logged_configuration(), remapped_channel);
1388 CHECK_EQ(0u, remapped_channels_.count(channel_index))
1389 << "Already remapped channel "
1390 << configuration::CleanedChannelToString(remapped_channel);
Austin Schuh0de30f32020-12-06 12:44:28 -08001391
1392 RemappedChannel remapped_channel_struct;
1393 remapped_channel_struct.remapped_name =
1394 std::string(add_prefix) +
1395 std::string(remapped_channel->name()->string_view());
1396 remapped_channel_struct.new_type = new_type;
James Kuszmaul53da7f32022-09-11 11:11:55 -07001397 const std::string_view remapped_type = new_type.empty() ? type : new_type;
1398 CheckAndHandleRemapConflict(
1399 remapped_channel_struct.remapped_name, remapped_type,
1400 remapped_configuration_, conflict_handling,
1401 [this, &remapped_channel_struct, remapped_type, node, add_prefix,
1402 conflict_handling]() {
1403 RemapLoggedChannel(remapped_channel_struct.remapped_name, remapped_type,
1404 node, add_prefix, "", conflict_handling);
1405 });
Austin Schuh0de30f32020-12-06 12:44:28 -08001406 remapped_channels_[channel_index] = std::move(remapped_channel_struct);
Austin Schuh01b4c352020-09-21 23:09:39 -07001407 MakeRemappedConfig();
1408}
1409
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001410void LogReader::RenameLoggedChannel(const std::string_view name,
1411 const std::string_view type,
1412 const std::string_view new_name,
1413 const std::vector<MapT> &add_maps) {
1414 RenameLoggedChannel(name, type, nullptr, new_name, add_maps);
1415}
1416
1417void LogReader::RenameLoggedChannel(const std::string_view name,
1418 const std::string_view type,
1419 const Node *const node,
1420 const std::string_view new_name,
1421 const std::vector<MapT> &add_maps) {
1422 if (node != nullptr) {
1423 VLOG(1) << "Node is " << aos::FlatbufferToJson(node);
1424 }
1425 // First find the channel and rename it.
1426 const Channel *remapped_channel =
1427 configuration::GetChannel(logged_configuration(), name, type, "", node);
1428 CHECK(remapped_channel != nullptr) << ": Failed to find {\"name\": \"" << name
1429 << "\", \"type\": \"" << type << "\"}";
1430 VLOG(1) << "Original {\"name\": \"" << name << "\", \"type\": \"" << type
1431 << "\"}";
1432 VLOG(1) << "Remapped "
1433 << aos::configuration::StrippedChannelToString(remapped_channel);
1434
1435 const size_t channel_index =
1436 configuration::ChannelIndex(logged_configuration(), remapped_channel);
1437 CHECK_EQ(0u, remapped_channels_.count(channel_index))
1438 << "Already remapped channel "
1439 << configuration::CleanedChannelToString(remapped_channel);
1440
1441 RemappedChannel remapped_channel_struct;
1442 remapped_channel_struct.remapped_name = new_name;
1443 remapped_channel_struct.new_type.clear();
1444 remapped_channels_[channel_index] = std::move(remapped_channel_struct);
1445
1446 // Then add any provided maps.
1447 for (const MapT &map : add_maps) {
1448 maps_.push_back(map);
1449 }
1450
1451 // Finally rewrite the config.
1452 MakeRemappedConfig();
1453}
1454
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001455void LogReader::MakeRemappedConfig() {
Austin Schuh8bd96322020-02-13 21:18:22 -08001456 for (std::unique_ptr<State> &state : states_) {
Austin Schuh6aa77be2020-02-22 21:06:40 -08001457 if (state) {
Austin Schuh858c9f32020-08-31 16:56:12 -07001458 CHECK(!state->event_loop())
Austin Schuh6aa77be2020-02-22 21:06:40 -08001459 << ": Can't change the mapping after the events are scheduled.";
1460 }
Austin Schuh6f3babe2020-01-26 20:34:50 -08001461 }
Austin Schuhac0771c2020-01-07 18:36:30 -08001462
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001463 // If no remapping occurred and we are using the original config, then there
1464 // is nothing interesting to do here.
1465 if (remapped_channels_.empty() && replay_configuration_ == nullptr) {
Austin Schuh6f3babe2020-01-26 20:34:50 -08001466 remapped_configuration_ = logged_configuration();
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001467 return;
1468 }
1469 // Config to copy Channel definitions from. Use the specified
1470 // replay_configuration_ if it has been provided.
1471 const Configuration *const base_config = replay_configuration_ == nullptr
1472 ? logged_configuration()
1473 : replay_configuration_;
Austin Schuh0de30f32020-12-06 12:44:28 -08001474
1475 // Create a config with all the channels, but un-sorted/merged. Collect up
1476 // the schemas while we do this. Call MergeConfiguration to sort everything,
1477 // and then merge it all in together.
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001478
1479 // This is the builder that we use for the config containing all the new
1480 // channels.
Austin Schuh0de30f32020-12-06 12:44:28 -08001481 flatbuffers::FlatBufferBuilder fbb;
1482 fbb.ForceDefaults(true);
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001483 std::vector<flatbuffers::Offset<Channel>> channel_offsets;
Austin Schuh0de30f32020-12-06 12:44:28 -08001484
Austin Schuhdda6db72023-06-21 17:02:34 -07001485 CHECK_EQ(Channel::MiniReflectTypeTable()->num_elems, 14u)
Austin Schuh0de30f32020-12-06 12:44:28 -08001486 << ": Merging logic needs to be updated when the number of channel "
1487 "fields changes.";
1488
1489 // List of schemas.
1490 std::map<std::string_view, FlatbufferVector<reflection::Schema>> schema_map;
1491 // Make sure our new RemoteMessage schema is in there for old logs without it.
1492 schema_map.insert(std::make_pair(
1493 RemoteMessage::GetFullyQualifiedName(),
1494 FlatbufferVector<reflection::Schema>(FlatbufferSpan<reflection::Schema>(
1495 message_bridge::RemoteMessageSchema()))));
1496
1497 // Reconstruct the remapped channels.
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001498 for (auto &pair : remapped_channels_) {
Austin Schuh0de30f32020-12-06 12:44:28 -08001499 const Channel *const c = CHECK_NOTNULL(configuration::GetChannel(
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001500 base_config, logged_configuration()->channels()->Get(pair.first), "",
1501 nullptr));
Austin Schuh0de30f32020-12-06 12:44:28 -08001502 channel_offsets.emplace_back(
1503 CopyChannel(c, pair.second.remapped_name, "", &fbb));
Austin Schuh006a9f52021-04-07 16:24:18 -07001504
1505 if (c->has_destination_nodes()) {
1506 for (const Connection *connection : *c->destination_nodes()) {
1507 switch (connection->timestamp_logger()) {
1508 case LoggerConfig::LOCAL_LOGGER:
1509 case LoggerConfig::NOT_LOGGED:
1510 // There is no timestamp channel associated with this, so ignore it.
1511 break;
1512
1513 case LoggerConfig::REMOTE_LOGGER:
1514 case LoggerConfig::LOCAL_AND_REMOTE_LOGGER:
1515 // We want to make a split timestamp channel regardless of what type
1516 // of log this used to be. No sense propagating the single
1517 // timestamp channel.
1518
1519 CHECK(connection->has_timestamp_logger_nodes());
1520 for (const flatbuffers::String *timestamp_logger_node :
1521 *connection->timestamp_logger_nodes()) {
1522 const Node *node = configuration::GetNode(
1523 logged_configuration(), timestamp_logger_node->string_view());
1524 message_bridge::ChannelTimestampFinder finder(
1525 logged_configuration(), "log_reader", node);
1526
1527 // We are assuming here that all the maps are setup correctly to
1528 // handle arbitrary timestamps. Apply the maps for this node to
1529 // see what name this ends up with.
1530 std::string name = finder.SplitChannelName(
1531 pair.second.remapped_name, c->type()->str(), connection);
1532 std::string unmapped_name = name;
1533 configuration::HandleMaps(logged_configuration()->maps(), &name,
1534 "aos.message_bridge.RemoteMessage",
1535 node);
1536 CHECK_NE(name, unmapped_name)
1537 << ": Remote timestamp channel was not remapped, this is "
1538 "very fishy";
1539 flatbuffers::Offset<flatbuffers::String> channel_name_offset =
1540 fbb.CreateString(name);
1541 flatbuffers::Offset<flatbuffers::String> channel_type_offset =
1542 fbb.CreateString("aos.message_bridge.RemoteMessage");
1543 flatbuffers::Offset<flatbuffers::String> source_node_offset =
1544 fbb.CreateString(timestamp_logger_node->string_view());
1545
1546 // Now, build a channel. Don't log it, 2 senders, and match the
1547 // source frequency.
1548 Channel::Builder channel_builder(fbb);
1549 channel_builder.add_name(channel_name_offset);
1550 channel_builder.add_type(channel_type_offset);
1551 channel_builder.add_source_node(source_node_offset);
1552 channel_builder.add_logger(LoggerConfig::NOT_LOGGED);
1553 channel_builder.add_num_senders(2);
1554 if (c->has_frequency()) {
1555 channel_builder.add_frequency(c->frequency());
1556 }
Austin Schuhdda6db72023-06-21 17:02:34 -07001557 if (c->has_channel_storage_duration()) {
1558 channel_builder.add_channel_storage_duration(
1559 c->channel_storage_duration());
1560 }
Austin Schuh006a9f52021-04-07 16:24:18 -07001561 channel_offsets.emplace_back(channel_builder.Finish());
1562 }
1563 break;
1564 }
1565 }
1566 }
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001567 }
Austin Schuh01b4c352020-09-21 23:09:39 -07001568
Austin Schuh0de30f32020-12-06 12:44:28 -08001569 // Now reconstruct the original channels, translating types as needed
1570 for (const Channel *c : *base_config->channels()) {
1571 // Search for a mapping channel.
1572 std::string_view new_type = "";
1573 for (auto &pair : remapped_channels_) {
1574 const Channel *const remapped_channel =
1575 logged_configuration()->channels()->Get(pair.first);
1576 if (remapped_channel->name()->string_view() == c->name()->string_view() &&
1577 remapped_channel->type()->string_view() == c->type()->string_view()) {
1578 new_type = pair.second.new_type;
1579 break;
1580 }
1581 }
1582
1583 // Copy everything over.
1584 channel_offsets.emplace_back(CopyChannel(c, "", new_type, &fbb));
1585
1586 // Add the schema if it doesn't exist.
1587 if (schema_map.find(c->type()->string_view()) == schema_map.end()) {
1588 CHECK(c->has_schema());
1589 schema_map.insert(std::make_pair(c->type()->string_view(),
1590 RecursiveCopyFlatBuffer(c->schema())));
1591 }
1592 }
1593
1594 // The MergeConfiguration API takes a vector, not a map. Convert.
1595 std::vector<FlatbufferVector<reflection::Schema>> schemas;
1596 while (!schema_map.empty()) {
1597 schemas.emplace_back(std::move(schema_map.begin()->second));
1598 schema_map.erase(schema_map.begin());
1599 }
1600
1601 // Create the Configuration containing the new channels that we want to add.
1602 const flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Channel>>>
1603 channels_offset =
1604 channel_offsets.empty() ? 0 : fbb.CreateVector(channel_offsets);
1605
1606 // Copy over the old maps.
Austin Schuh01b4c352020-09-21 23:09:39 -07001607 std::vector<flatbuffers::Offset<Map>> map_offsets;
Austin Schuh0de30f32020-12-06 12:44:28 -08001608 if (base_config->maps()) {
1609 for (const Map *map : *base_config->maps()) {
1610 map_offsets.emplace_back(aos::RecursiveCopyFlatBuffer(map, &fbb));
1611 }
1612 }
1613
1614 // Now create the new maps. These are second so they take effect first.
Austin Schuh01b4c352020-09-21 23:09:39 -07001615 for (const MapT &map : maps_) {
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001616 CHECK(!map.match->name.empty());
Austin Schuh01b4c352020-09-21 23:09:39 -07001617 const flatbuffers::Offset<flatbuffers::String> match_name_offset =
Austin Schuh0de30f32020-12-06 12:44:28 -08001618 fbb.CreateString(map.match->name);
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001619 flatbuffers::Offset<flatbuffers::String> match_type_offset;
1620 if (!map.match->type.empty()) {
1621 match_type_offset = fbb.CreateString(map.match->type);
1622 }
Austin Schuh01b4c352020-09-21 23:09:39 -07001623 flatbuffers::Offset<flatbuffers::String> match_source_node_offset;
1624 if (!map.match->source_node.empty()) {
Austin Schuh0de30f32020-12-06 12:44:28 -08001625 match_source_node_offset = fbb.CreateString(map.match->source_node);
Austin Schuh01b4c352020-09-21 23:09:39 -07001626 }
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001627 CHECK(!map.rename->name.empty());
1628 const flatbuffers::Offset<flatbuffers::String> rename_name_offset =
1629 fbb.CreateString(map.rename->name);
Austin Schuh0de30f32020-12-06 12:44:28 -08001630 Channel::Builder match_builder(fbb);
Austin Schuh01b4c352020-09-21 23:09:39 -07001631 match_builder.add_name(match_name_offset);
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001632 if (!match_type_offset.IsNull()) {
1633 match_builder.add_type(match_type_offset);
1634 }
1635 if (!match_source_node_offset.IsNull()) {
Austin Schuh01b4c352020-09-21 23:09:39 -07001636 match_builder.add_source_node(match_source_node_offset);
1637 }
1638 const flatbuffers::Offset<Channel> match_offset = match_builder.Finish();
1639
Austin Schuh0de30f32020-12-06 12:44:28 -08001640 Channel::Builder rename_builder(fbb);
Austin Schuh01b4c352020-09-21 23:09:39 -07001641 rename_builder.add_name(rename_name_offset);
1642 const flatbuffers::Offset<Channel> rename_offset = rename_builder.Finish();
1643
Austin Schuh0de30f32020-12-06 12:44:28 -08001644 Map::Builder map_builder(fbb);
Austin Schuh01b4c352020-09-21 23:09:39 -07001645 map_builder.add_match(match_offset);
1646 map_builder.add_rename(rename_offset);
1647 map_offsets.emplace_back(map_builder.Finish());
1648 }
1649
Austin Schuh0de30f32020-12-06 12:44:28 -08001650 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Map>>>
1651 maps_offsets = map_offsets.empty() ? 0 : fbb.CreateVector(map_offsets);
Austin Schuh01b4c352020-09-21 23:09:39 -07001652
Austin Schuh0de30f32020-12-06 12:44:28 -08001653 // And copy everything else over.
1654 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Node>>>
1655 nodes_offset = aos::RecursiveCopyVectorTable(base_config->nodes(), &fbb);
1656
1657 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Application>>>
1658 applications_offset =
1659 aos::RecursiveCopyVectorTable(base_config->applications(), &fbb);
1660
1661 // Now insert everything else in unmodified.
1662 ConfigurationBuilder configuration_builder(fbb);
1663 if (!channels_offset.IsNull()) {
1664 configuration_builder.add_channels(channels_offset);
1665 }
1666 if (!maps_offsets.IsNull()) {
1667 configuration_builder.add_maps(maps_offsets);
1668 }
1669 if (!nodes_offset.IsNull()) {
1670 configuration_builder.add_nodes(nodes_offset);
1671 }
1672 if (!applications_offset.IsNull()) {
1673 configuration_builder.add_applications(applications_offset);
1674 }
1675
1676 if (base_config->has_channel_storage_duration()) {
1677 configuration_builder.add_channel_storage_duration(
1678 base_config->channel_storage_duration());
1679 }
1680
1681 CHECK_EQ(Configuration::MiniReflectTypeTable()->num_elems, 6u)
1682 << ": Merging logic needs to be updated when the number of configuration "
1683 "fields changes.";
1684
1685 fbb.Finish(configuration_builder.Finish());
1686
1687 // Clean it up and return it! By using MergeConfiguration here, we'll
1688 // actually get a deduplicated config for free too.
1689 FlatbufferDetachedBuffer<Configuration> new_merged_config =
1690 configuration::MergeConfiguration(
1691 FlatbufferDetachedBuffer<Configuration>(fbb.Release()));
1692
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001693 remapped_configuration_buffer_ =
1694 std::make_unique<FlatbufferDetachedBuffer<Configuration>>(
Austin Schuh0de30f32020-12-06 12:44:28 -08001695 configuration::MergeConfiguration(new_merged_config, schemas));
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001696
1697 remapped_configuration_ = &remapped_configuration_buffer_->message();
Austin Schuh0de30f32020-12-06 12:44:28 -08001698
1699 // TODO(austin): Lazily re-build to save CPU?
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001700}
1701
Naman Guptacf6d4422023-03-01 11:41:00 -08001702std::unique_ptr<const ReplayChannelIndices>
1703LogReader::MaybeMakeReplayChannelIndices(const Node *node) {
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07001704 if (replay_channels_ == nullptr) {
1705 return nullptr;
1706 } else {
Naman Guptacf6d4422023-03-01 11:41:00 -08001707 std::unique_ptr<ReplayChannelIndices> replay_channel_indices =
1708 std::make_unique<ReplayChannelIndices>();
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07001709 for (auto const &channel : *replay_channels_) {
1710 const Channel *ch = configuration::GetChannel(
1711 logged_configuration(), channel.first, channel.second, "", node);
1712 if (ch == nullptr) {
1713 LOG(WARNING) << "Channel: " << channel.first << " " << channel.second
1714 << " not found in configuration for node: "
1715 << node->name()->string_view() << " Skipping ...";
1716 continue;
1717 }
1718 const size_t channel_index =
1719 configuration::ChannelIndex(logged_configuration(), ch);
Naman Guptacf6d4422023-03-01 11:41:00 -08001720 replay_channel_indices->emplace_back(channel_index);
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07001721 }
Naman Guptacf6d4422023-03-01 11:41:00 -08001722 std::sort(replay_channel_indices->begin(), replay_channel_indices->end());
1723 return replay_channel_indices;
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07001724 }
1725}
1726
Austin Schuh1c227352021-09-17 12:53:54 -07001727std::vector<const Channel *> LogReader::RemappedChannels() const {
1728 std::vector<const Channel *> result;
1729 result.reserve(remapped_channels_.size());
1730 for (auto &pair : remapped_channels_) {
1731 const Channel *const logged_channel =
1732 CHECK_NOTNULL(logged_configuration()->channels()->Get(pair.first));
1733
1734 auto channel_iterator = std::lower_bound(
1735 remapped_configuration_->channels()->cbegin(),
1736 remapped_configuration_->channels()->cend(),
1737 std::make_pair(std::string_view(pair.second.remapped_name),
1738 logged_channel->type()->string_view()),
1739 CompareChannels);
1740
1741 CHECK(channel_iterator != remapped_configuration_->channels()->cend());
1742 CHECK(EqualsChannels(
1743 *channel_iterator,
1744 std::make_pair(std::string_view(pair.second.remapped_name),
1745 logged_channel->type()->string_view())));
1746 result.push_back(*channel_iterator);
1747 }
1748 return result;
1749}
1750
Austin Schuh6f3babe2020-01-26 20:34:50 -08001751const Channel *LogReader::RemapChannel(const EventLoop *event_loop,
Austin Schuh58646e22021-08-23 23:51:46 -07001752 const Node *node,
Austin Schuh6f3babe2020-01-26 20:34:50 -08001753 const Channel *channel) {
1754 std::string_view channel_name = channel->name()->string_view();
1755 std::string_view channel_type = channel->type()->string_view();
1756 const int channel_index =
1757 configuration::ChannelIndex(logged_configuration(), channel);
1758 // If the channel is remapped, find the correct channel name to use.
1759 if (remapped_channels_.count(channel_index) > 0) {
Austin Schuhee711052020-08-24 16:06:09 -07001760 VLOG(3) << "Got remapped channel on "
Austin Schuh6f3babe2020-01-26 20:34:50 -08001761 << configuration::CleanedChannelToString(channel);
Austin Schuh0de30f32020-12-06 12:44:28 -08001762 channel_name = remapped_channels_[channel_index].remapped_name;
Austin Schuh6f3babe2020-01-26 20:34:50 -08001763 }
1764
Austin Schuhee711052020-08-24 16:06:09 -07001765 VLOG(2) << "Going to remap channel " << channel_name << " " << channel_type;
Austin Schuh6f3babe2020-01-26 20:34:50 -08001766 const Channel *remapped_channel = configuration::GetChannel(
Austin Schuh58646e22021-08-23 23:51:46 -07001767 configuration(), channel_name, channel_type,
1768 event_loop ? event_loop->name() : "log_reader", node);
Austin Schuh6f3babe2020-01-26 20:34:50 -08001769
1770 CHECK(remapped_channel != nullptr)
1771 << ": Unable to send {\"name\": \"" << channel_name << "\", \"type\": \""
1772 << channel_type << "\"} because it is not in the provided configuration.";
1773
1774 return remapped_channel;
1775}
1776
James Kuszmaul09632422022-05-25 15:56:19 -07001777LogReader::State::State(
1778 std::unique_ptr<TimestampMapper> timestamp_mapper,
1779 message_bridge::MultiNodeNoncausalOffsetEstimator *multinode_filters,
James Kuszmaulb11a1502022-07-01 16:02:25 -07001780 std::function<void()> notice_realtime_end, const Node *node,
1781 LogReader::State::ThreadedBuffering threading,
Eric Schmiedebergae00e732023-04-12 15:53:17 -06001782 std::unique_ptr<const ReplayChannelIndices> replay_channel_indices,
1783 const std::vector<std::function<void(void *message)>>
1784 &before_send_callbacks)
James Kuszmaul09632422022-05-25 15:56:19 -07001785 : timestamp_mapper_(std::move(timestamp_mapper)),
James Kuszmaulb11a1502022-07-01 16:02:25 -07001786 notice_realtime_end_(notice_realtime_end),
James Kuszmaul09632422022-05-25 15:56:19 -07001787 node_(node),
James Kuszmaula16a7912022-06-17 10:58:12 -07001788 multinode_filters_(multinode_filters),
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07001789 threading_(threading),
Eric Schmiedebergae00e732023-04-12 15:53:17 -06001790 replay_channel_indices_(std::move(replay_channel_indices)),
1791 before_send_callbacks_(before_send_callbacks) {
Naman Guptaa68401c2022-12-08 14:34:06 -08001792 // If timestamp_mapper_ is nullptr, then there are no log parts associated
1793 // with this node. If there are no log parts for the node, there will be no
1794 // log data, and so we do not need to worry about the replay channel filters.
1795 if (replay_channel_indices_ != nullptr && timestamp_mapper_ != nullptr) {
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07001796 timestamp_mapper_->set_replay_channels_callback(
Naman Guptacf6d4422023-03-01 11:41:00 -08001797 [filter = replay_channel_indices_.get()](
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07001798 const TimestampedMessage &message) -> bool {
1799 auto const begin = filter->cbegin();
1800 auto const end = filter->cend();
1801 // TODO: benchmark strategies for channel_index matching
1802 return std::binary_search(begin, end, message.channel_index);
1803 });
1804 }
1805}
Austin Schuh287d43d2020-12-04 20:19:33 -08001806
1807void LogReader::State::AddPeer(State *peer) {
1808 if (timestamp_mapper_ && peer->timestamp_mapper_) {
1809 timestamp_mapper_->AddPeer(peer->timestamp_mapper_.get());
1810 }
1811}
Austin Schuh858c9f32020-08-31 16:56:12 -07001812
Austin Schuh58646e22021-08-23 23:51:46 -07001813void LogReader::State::SetNodeEventLoopFactory(
Austin Schuhe33c08d2022-02-03 18:15:21 -08001814 NodeEventLoopFactory *node_event_loop_factory,
1815 SimulatedEventLoopFactory *event_loop_factory) {
Austin Schuh858c9f32020-08-31 16:56:12 -07001816 node_event_loop_factory_ = node_event_loop_factory;
Austin Schuhe33c08d2022-02-03 18:15:21 -08001817 event_loop_factory_ = event_loop_factory;
Austin Schuh858c9f32020-08-31 16:56:12 -07001818}
1819
1820void LogReader::State::SetChannelCount(size_t count) {
1821 channels_.resize(count);
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001822 remote_timestamp_senders_.resize(count);
Austin Schuh858c9f32020-08-31 16:56:12 -07001823 filters_.resize(count);
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001824 channel_source_state_.resize(count);
1825 factory_channel_index_.resize(count);
1826 queue_index_map_.resize(count);
Austin Schuh858c9f32020-08-31 16:56:12 -07001827}
1828
Austin Schuh58646e22021-08-23 23:51:46 -07001829void LogReader::State::SetRemoteTimestampSender(
1830 size_t logged_channel_index, RemoteMessageSender *remote_timestamp_sender) {
1831 remote_timestamp_senders_[logged_channel_index] = remote_timestamp_sender;
1832}
1833
Austin Schuh858c9f32020-08-31 16:56:12 -07001834void LogReader::State::SetChannel(
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001835 size_t logged_channel_index, size_t factory_channel_index,
1836 std::unique_ptr<RawSender> sender,
Austin Schuh58646e22021-08-23 23:51:46 -07001837 message_bridge::NoncausalOffsetEstimator *filter, bool is_forwarded,
1838 State *source_state) {
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001839 channels_[logged_channel_index] = std::move(sender);
1840 filters_[logged_channel_index] = filter;
Austin Schuh58646e22021-08-23 23:51:46 -07001841 channel_source_state_[logged_channel_index] = source_state;
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001842
Austin Schuh58646e22021-08-23 23:51:46 -07001843 if (is_forwarded) {
1844 queue_index_map_[logged_channel_index] =
1845 std::make_unique<std::vector<State::ContiguousSentTimestamp>>();
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001846 }
1847
1848 factory_channel_index_[logged_channel_index] = factory_channel_index;
1849}
1850
James Kuszmaula16a7912022-06-17 10:58:12 -07001851void LogReader::State::TrackMessageSendTiming(
1852 const RawSender &sender, monotonic_clock::time_point expected_send_time) {
1853 if (event_loop_ == nullptr || !timing_statistics_sender_.valid()) {
1854 return;
1855 }
1856
1857 timing::MessageTimingT sample;
1858 sample.channel = configuration::ChannelIndex(event_loop_->configuration(),
1859 sender.channel());
1860 sample.expected_send_time = expected_send_time.time_since_epoch().count();
1861 sample.actual_send_time =
1862 sender.monotonic_sent_time().time_since_epoch().count();
1863 sample.send_time_error = aos::time::DurationInSeconds(
1864 expected_send_time - sender.monotonic_sent_time());
1865 send_timings_.push_back(sample);
1866
1867 // Somewhat arbitrarily send out timing information in batches of 100. No need
1868 // to create excessive overhead in regenerated logfiles.
1869 // TODO(james): The overhead may be fine.
1870 constexpr size_t kMaxTimesPerStatisticsMessage = 100;
1871 CHECK(timing_statistics_sender_.valid());
1872 if (send_timings_.size() == kMaxTimesPerStatisticsMessage) {
1873 SendMessageTimings();
1874 }
1875}
1876
1877void LogReader::State::SendMessageTimings() {
1878 if (send_timings_.empty() || !timing_statistics_sender_.valid()) {
1879 return;
1880 }
1881 auto builder = timing_statistics_sender_.MakeBuilder();
1882 std::vector<flatbuffers::Offset<timing::MessageTiming>> timing_offsets;
1883 for (const auto &timing : send_timings_) {
1884 timing_offsets.push_back(
1885 timing::MessageTiming::Pack(*builder.fbb(), &timing));
1886 }
1887 send_timings_.clear();
1888 flatbuffers::Offset<
1889 flatbuffers::Vector<flatbuffers::Offset<timing::MessageTiming>>>
1890 timings_offset = builder.fbb()->CreateVector(timing_offsets);
1891 timing::ReplayTiming::Builder timing_builder =
1892 builder.MakeBuilder<timing::ReplayTiming>();
1893 timing_builder.add_messages(timings_offset);
1894 timing_statistics_sender_.CheckOk(builder.Send(timing_builder.Finish()));
1895}
1896
Eric Schmiedebergae00e732023-04-12 15:53:17 -06001897bool LogReader::State::Send(const TimestampedMessage &&timestamped_message) {
Austin Schuh287d43d2020-12-04 20:19:33 -08001898 aos::RawSender *sender = channels_[timestamped_message.channel_index].get();
Austin Schuh58646e22021-08-23 23:51:46 -07001899 CHECK(sender);
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001900 uint32_t remote_queue_index = 0xffffffff;
1901
Austin Schuh287d43d2020-12-04 20:19:33 -08001902 if (remote_timestamp_senders_[timestamped_message.channel_index] != nullptr) {
Austin Schuh58646e22021-08-23 23:51:46 -07001903 State *source_state =
1904 CHECK_NOTNULL(channel_source_state_[timestamped_message.channel_index]);
Austin Schuh9942bae2021-01-07 22:06:44 -08001905 std::vector<ContiguousSentTimestamp> *queue_index_map = CHECK_NOTNULL(
Austin Schuh58646e22021-08-23 23:51:46 -07001906 source_state->queue_index_map_[timestamped_message.channel_index]
Austin Schuh287d43d2020-12-04 20:19:33 -08001907 .get());
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001908
Austin Schuh9942bae2021-01-07 22:06:44 -08001909 struct SentTimestamp {
1910 monotonic_clock::time_point monotonic_event_time;
1911 uint32_t queue_index;
1912 } search;
1913
Austin Schuh58646e22021-08-23 23:51:46 -07001914 CHECK_EQ(timestamped_message.monotonic_remote_time.boot,
1915 source_state->boot_count());
Tyler Chatowbf0609c2021-07-31 16:13:27 -07001916 search.monotonic_event_time =
1917 timestamped_message.monotonic_remote_time.time;
Austin Schuh58646e22021-08-23 23:51:46 -07001918 search.queue_index = timestamped_message.remote_queue_index.index;
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001919
1920 // Find the sent time if available.
1921 auto element = std::lower_bound(
1922 queue_index_map->begin(), queue_index_map->end(), search,
Austin Schuh9942bae2021-01-07 22:06:44 -08001923 [](ContiguousSentTimestamp a, SentTimestamp b) {
1924 if (a.ending_monotonic_event_time < b.monotonic_event_time) {
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001925 return true;
1926 }
Austin Schuh9942bae2021-01-07 22:06:44 -08001927 if (a.starting_monotonic_event_time > b.monotonic_event_time) {
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001928 return false;
1929 }
Austin Schuh9942bae2021-01-07 22:06:44 -08001930
1931 if (a.ending_queue_index < b.queue_index) {
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001932 return true;
1933 }
Austin Schuh9942bae2021-01-07 22:06:44 -08001934 if (a.starting_queue_index >= b.queue_index) {
1935 return false;
1936 }
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001937
Austin Schuh9942bae2021-01-07 22:06:44 -08001938 // If it isn't clearly below or above, it is below. Since we return
1939 // the last element <, this will return a match.
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001940 return false;
1941 });
1942
1943 // TODO(austin): Be a bit more principled here, but we will want to do that
1944 // after the logger rewrite. We hit this when one node finishes, but the
1945 // other node isn't done yet. So there is no send time, but there is a
1946 // receive time.
1947 if (element != queue_index_map->end()) {
Austin Schuh58646e22021-08-23 23:51:46 -07001948 CHECK_EQ(timestamped_message.monotonic_remote_time.boot,
1949 source_state->boot_count());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001950
1951 CHECK_GE(timestamped_message.monotonic_remote_time.time,
Austin Schuh9942bae2021-01-07 22:06:44 -08001952 element->starting_monotonic_event_time);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001953 CHECK_LE(timestamped_message.monotonic_remote_time.time,
Austin Schuh9942bae2021-01-07 22:06:44 -08001954 element->ending_monotonic_event_time);
Austin Schuh58646e22021-08-23 23:51:46 -07001955 CHECK_GE(timestamped_message.remote_queue_index.index,
Austin Schuh9942bae2021-01-07 22:06:44 -08001956 element->starting_queue_index);
Austin Schuh58646e22021-08-23 23:51:46 -07001957 CHECK_LE(timestamped_message.remote_queue_index.index,
Austin Schuh9942bae2021-01-07 22:06:44 -08001958 element->ending_queue_index);
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001959
Austin Schuh58646e22021-08-23 23:51:46 -07001960 remote_queue_index = timestamped_message.remote_queue_index.index +
Austin Schuh9942bae2021-01-07 22:06:44 -08001961 element->actual_queue_index -
1962 element->starting_queue_index;
1963 } else {
1964 VLOG(1) << "No timestamp match in the map.";
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001965 }
Austin Schuh58646e22021-08-23 23:51:46 -07001966 CHECK_EQ(timestamped_message.monotonic_remote_time.boot,
1967 source_state->boot_count());
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001968 }
1969
James Kuszmaul09632422022-05-25 15:56:19 -07001970 if (event_loop_factory_ != nullptr &&
1971 channel_source_state_[timestamped_message.channel_index] != nullptr &&
1972 multinode_filters_ != nullptr) {
1973 // Sanity check that we are using consistent boot uuids.
1974 State *source_state =
1975 channel_source_state_[timestamped_message.channel_index];
1976 CHECK_EQ(multinode_filters_->boot_uuid(
1977 configuration::GetNodeIndex(event_loop_->configuration(),
1978 source_state->node()),
1979 timestamped_message.monotonic_remote_time.boot),
1980 CHECK_NOTNULL(
1981 CHECK_NOTNULL(
1982 channel_source_state_[timestamped_message.channel_index])
1983 ->event_loop_)
1984 ->boot_uuid());
1985 }
1986
Eric Schmiedebergae00e732023-04-12 15:53:17 -06001987 // Right before sending allow the user to process the message.
1988 if (before_send_callbacks_[timestamped_message.channel_index]) {
1989 // Only channels that are forwarded and sent from this State's node will be
1990 // in the queue_index_map_
1991 if (queue_index_map_[timestamped_message.channel_index]) {
1992 before_send_callbacks_[timestamped_message.channel_index](
1993 timestamped_message.data->mutable_data());
1994 }
1995 }
1996
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001997 // Send! Use the replayed queue index here instead of the logged queue index
1998 // for the remote queue index. This makes re-logging work.
Austin Schuhaf8a0d32023-05-03 09:53:06 -07001999 const RawSender::Error err = sender->Send(
Austin Schuhe0ab4de2023-05-03 08:05:08 -07002000 SharedSpan(timestamped_message.data, &timestamped_message.data->span),
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002001 timestamped_message.monotonic_remote_time.time,
Austin Schuh8902fa52021-03-14 22:39:24 -07002002 timestamped_message.realtime_remote_time, remote_queue_index,
2003 (channel_source_state_[timestamped_message.channel_index] != nullptr
James Kuszmaul09632422022-05-25 15:56:19 -07002004 ? CHECK_NOTNULL(multinode_filters_)
2005 ->boot_uuid(configuration::GetNodeIndex(
2006 event_loop_->configuration(),
2007 channel_source_state_[timestamped_message
2008 .channel_index]
2009 ->node()),
2010 timestamped_message.monotonic_remote_time.boot)
Austin Schuh8902fa52021-03-14 22:39:24 -07002011 : event_loop_->boot_uuid()));
milind1f1dca32021-07-03 13:50:07 -07002012 if (err != RawSender::Error::kOk) return false;
James Kuszmaula16a7912022-06-17 10:58:12 -07002013 if (monotonic_start_time(timestamped_message.monotonic_event_time.boot) <=
2014 timestamped_message.monotonic_event_time.time) {
2015 // Only track errors for non-fetched messages.
2016 TrackMessageSendTiming(
2017 *sender,
2018 timestamped_message.monotonic_event_time.time + clock_offset());
2019 }
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07002020
Austin Schuh287d43d2020-12-04 20:19:33 -08002021 if (queue_index_map_[timestamped_message.channel_index]) {
Austin Schuh58646e22021-08-23 23:51:46 -07002022 CHECK_EQ(timestamped_message.monotonic_event_time.boot, boot_count());
Austin Schuh9942bae2021-01-07 22:06:44 -08002023 if (queue_index_map_[timestamped_message.channel_index]->empty()) {
2024 // Nothing here, start a range with 0 length.
2025 ContiguousSentTimestamp timestamp;
2026 timestamp.starting_monotonic_event_time =
2027 timestamp.ending_monotonic_event_time =
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002028 timestamped_message.monotonic_event_time.time;
Austin Schuh9942bae2021-01-07 22:06:44 -08002029 timestamp.starting_queue_index = timestamp.ending_queue_index =
Austin Schuh58646e22021-08-23 23:51:46 -07002030 timestamped_message.queue_index.index;
Austin Schuh9942bae2021-01-07 22:06:44 -08002031 timestamp.actual_queue_index = sender->sent_queue_index();
2032 queue_index_map_[timestamped_message.channel_index]->emplace_back(
2033 timestamp);
2034 } else {
2035 // We've got something. See if the next timestamp is still contiguous. If
2036 // so, grow it.
2037 ContiguousSentTimestamp *back =
2038 &queue_index_map_[timestamped_message.channel_index]->back();
2039 if ((back->starting_queue_index - back->actual_queue_index) ==
milind1f1dca32021-07-03 13:50:07 -07002040 (timestamped_message.queue_index.index -
2041 sender->sent_queue_index())) {
Austin Schuh58646e22021-08-23 23:51:46 -07002042 back->ending_queue_index = timestamped_message.queue_index.index;
Austin Schuh9942bae2021-01-07 22:06:44 -08002043 back->ending_monotonic_event_time =
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002044 timestamped_message.monotonic_event_time.time;
Austin Schuh9942bae2021-01-07 22:06:44 -08002045 } else {
2046 // Otherwise, make a new one.
2047 ContiguousSentTimestamp timestamp;
2048 timestamp.starting_monotonic_event_time =
2049 timestamp.ending_monotonic_event_time =
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002050 timestamped_message.monotonic_event_time.time;
Austin Schuh9942bae2021-01-07 22:06:44 -08002051 timestamp.starting_queue_index = timestamp.ending_queue_index =
Austin Schuh58646e22021-08-23 23:51:46 -07002052 timestamped_message.queue_index.index;
Austin Schuh9942bae2021-01-07 22:06:44 -08002053 timestamp.actual_queue_index = sender->sent_queue_index();
2054 queue_index_map_[timestamped_message.channel_index]->emplace_back(
2055 timestamp);
2056 }
2057 }
2058
2059 // TODO(austin): Should we prune the map? On a many day log, I only saw the
2060 // queue index diverge a couple of elements, which would be a very small
2061 // map.
Austin Schuh287d43d2020-12-04 20:19:33 -08002062 } else if (remote_timestamp_senders_[timestamped_message.channel_index] !=
2063 nullptr) {
James Kuszmaul09632422022-05-25 15:56:19 -07002064 // TODO(james): Currently, If running replay against a single event loop,
2065 // remote timestamps will not get replayed because this code-path only
2066 // gets triggered on the event loop that receives the forwarded message
2067 // that the timestamps correspond to. This code, as written, also doesn't
2068 // correctly handle a non-zero clock_offset for the *_remote_time fields.
Austin Schuh58646e22021-08-23 23:51:46 -07002069 State *source_state =
2070 CHECK_NOTNULL(channel_source_state_[timestamped_message.channel_index]);
2071
Austin Schuh969cd602021-01-03 00:09:45 -08002072 flatbuffers::FlatBufferBuilder fbb;
2073 fbb.ForceDefaults(true);
Austin Schuhcdd90272021-03-15 12:46:16 -07002074 flatbuffers::Offset<flatbuffers::Vector<uint8_t>> boot_uuid_offset =
2075 event_loop_->boot_uuid().PackVector(&fbb);
Austin Schuh315b96b2020-12-11 21:21:12 -08002076
Austin Schuh969cd602021-01-03 00:09:45 -08002077 RemoteMessage::Builder message_header_builder(fbb);
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07002078
2079 message_header_builder.add_channel_index(
Austin Schuh287d43d2020-12-04 20:19:33 -08002080 factory_channel_index_[timestamped_message.channel_index]);
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07002081
2082 // Swap the remote and sent metrics. They are from the sender's
2083 // perspective, not the receiver's perspective.
2084 message_header_builder.add_monotonic_sent_time(
2085 sender->monotonic_sent_time().time_since_epoch().count());
2086 message_header_builder.add_realtime_sent_time(
2087 sender->realtime_sent_time().time_since_epoch().count());
2088 message_header_builder.add_queue_index(sender->sent_queue_index());
2089
Austin Schuh58646e22021-08-23 23:51:46 -07002090 CHECK_EQ(timestamped_message.monotonic_remote_time.boot,
2091 source_state->boot_count());
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07002092 message_header_builder.add_monotonic_remote_time(
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002093 timestamped_message.monotonic_remote_time.time.time_since_epoch()
2094 .count());
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07002095 message_header_builder.add_realtime_remote_time(
Austin Schuh287d43d2020-12-04 20:19:33 -08002096 timestamped_message.realtime_remote_time.time_since_epoch().count());
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07002097
2098 message_header_builder.add_remote_queue_index(remote_queue_index);
Austin Schuh315b96b2020-12-11 21:21:12 -08002099 message_header_builder.add_boot_uuid(boot_uuid_offset);
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07002100
Austin Schuh969cd602021-01-03 00:09:45 -08002101 fbb.Finish(message_header_builder.Finish());
2102
2103 remote_timestamp_senders_[timestamped_message.channel_index]->Send(
2104 FlatbufferDetachedBuffer<RemoteMessage>(fbb.Release()),
Austin Schuh58646e22021-08-23 23:51:46 -07002105 timestamped_message.monotonic_timestamp_time,
2106 source_state->boot_count());
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07002107 }
2108
2109 return true;
2110}
2111
Austin Schuh969cd602021-01-03 00:09:45 -08002112LogReader::RemoteMessageSender::RemoteMessageSender(
2113 aos::Sender<message_bridge::RemoteMessage> sender, EventLoop *event_loop)
2114 : event_loop_(event_loop),
2115 sender_(std::move(sender)),
2116 timer_(event_loop->AddTimer([this]() { SendTimestamp(); })) {}
2117
2118void LogReader::RemoteMessageSender::ScheduleTimestamp() {
2119 if (remote_timestamps_.empty()) {
2120 CHECK_NOTNULL(timer_);
2121 timer_->Disable();
2122 scheduled_time_ = monotonic_clock::min_time;
2123 return;
2124 }
2125
2126 if (scheduled_time_ != remote_timestamps_.front().monotonic_timestamp_time) {
2127 CHECK_NOTNULL(timer_);
Philipp Schradera6712522023-07-05 20:25:11 -07002128 timer_->Schedule(remote_timestamps_.front().monotonic_timestamp_time);
Austin Schuh969cd602021-01-03 00:09:45 -08002129 scheduled_time_ = remote_timestamps_.front().monotonic_timestamp_time;
Austin Schuh3d94be02021-02-12 23:15:20 -08002130 CHECK_GE(scheduled_time_, event_loop_->monotonic_now())
2131 << event_loop_->node()->name()->string_view();
Austin Schuh969cd602021-01-03 00:09:45 -08002132 }
2133}
2134
2135void LogReader::RemoteMessageSender::Send(
2136 FlatbufferDetachedBuffer<RemoteMessage> remote_message,
Austin Schuh58646e22021-08-23 23:51:46 -07002137 BootTimestamp monotonic_timestamp_time, size_t source_boot_count) {
Austin Schuhc41d6a82021-07-16 14:49:23 -07002138 // There are 2 variants of logs.
2139 // 1) Logs without monotonic_timestamp_time
2140 // 2) Logs with monotonic_timestamp_time
2141 //
2142 // As of Jan 2021, we shouldn't have any more logs without
2143 // monotonic_timestamp_time. We don't have data locked up in those logs worth
2144 // the effort of saving.
2145 //
2146 // This gives us 3 cases, 2 of which are undistinguishable.
2147 // 1) Old log without monotonic_timestamp_time.
2148 // 2) New log with monotonic_timestamp_time where the timestamp was logged
2149 // remotely so we actually have monotonic_timestamp_time.
2150 // 3) New log, but the timestamp was logged on the node receiving the message
2151 // so there is no monotonic_timestamp_time.
2152 //
2153 // Our goal when replaying is to accurately reproduce the state of the world
2154 // present when logging. If a timestamp wasn't sent back across the network,
2155 // we shouldn't replay one back across the network.
2156 //
2157 // Given that we don't really care about 1, we can use the presence of the
2158 // timestamp to distinguish 2 and 3, and ignore 1. If we don't have a
2159 // monotonic_timestamp_time, this means the message was logged locally and
2160 // remote timestamps can be ignored.
Austin Schuh58646e22021-08-23 23:51:46 -07002161 if (monotonic_timestamp_time == BootTimestamp::min_time()) {
Austin Schuhc41d6a82021-07-16 14:49:23 -07002162 return;
Austin Schuh969cd602021-01-03 00:09:45 -08002163 }
Austin Schuhc41d6a82021-07-16 14:49:23 -07002164
Austin Schuh58646e22021-08-23 23:51:46 -07002165 CHECK_EQ(monotonic_timestamp_time.boot, source_boot_count);
2166
Austin Schuhc41d6a82021-07-16 14:49:23 -07002167 remote_timestamps_.emplace(
2168 std::upper_bound(
2169 remote_timestamps_.begin(), remote_timestamps_.end(),
Austin Schuh58646e22021-08-23 23:51:46 -07002170 monotonic_timestamp_time.time,
Austin Schuhc41d6a82021-07-16 14:49:23 -07002171 [](const aos::monotonic_clock::time_point monotonic_timestamp_time,
2172 const Timestamp &timestamp) {
2173 return monotonic_timestamp_time <
2174 timestamp.monotonic_timestamp_time;
2175 }),
Austin Schuh58646e22021-08-23 23:51:46 -07002176 std::move(remote_message), monotonic_timestamp_time.time);
Austin Schuhc41d6a82021-07-16 14:49:23 -07002177 ScheduleTimestamp();
Austin Schuh969cd602021-01-03 00:09:45 -08002178}
2179
2180void LogReader::RemoteMessageSender::SendTimestamp() {
Austin Schuh3d94be02021-02-12 23:15:20 -08002181 CHECK_EQ(event_loop_->context().monotonic_event_time, scheduled_time_)
2182 << event_loop_->node()->name()->string_view();
Austin Schuh969cd602021-01-03 00:09:45 -08002183 CHECK(!remote_timestamps_.empty());
2184
2185 // Send out all timestamps at the currently scheduled time.
2186 while (remote_timestamps_.front().monotonic_timestamp_time ==
2187 scheduled_time_) {
milind1f1dca32021-07-03 13:50:07 -07002188 CHECK_EQ(sender_.Send(std::move(remote_timestamps_.front().remote_message)),
2189 RawSender::Error::kOk);
Austin Schuh969cd602021-01-03 00:09:45 -08002190 remote_timestamps_.pop_front();
2191 if (remote_timestamps_.empty()) {
2192 break;
2193 }
2194 }
2195 scheduled_time_ = monotonic_clock::min_time;
2196
2197 ScheduleTimestamp();
2198}
2199
2200LogReader::RemoteMessageSender *LogReader::State::RemoteTimestampSender(
Austin Schuh61e973f2021-02-21 21:43:56 -08002201 const Channel *channel, const Connection *connection) {
2202 message_bridge::ChannelTimestampFinder finder(event_loop_);
2203 // Look at any pre-created channel/connection pairs.
2204 {
2205 auto it =
2206 channel_timestamp_loggers_.find(std::make_pair(channel, connection));
2207 if (it != channel_timestamp_loggers_.end()) {
2208 return it->second.get();
2209 }
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07002210 }
2211
Austin Schuh61e973f2021-02-21 21:43:56 -08002212 // That failed, so resolve the RemoteMessage channel timestamps will be logged
2213 // to.
2214 const Channel *timestamp_channel = finder.ForChannel(channel, connection);
2215
2216 {
2217 // See if that has been created before. If so, cache it in
2218 // channel_timestamp_loggers_ and return.
2219 auto it = timestamp_loggers_.find(timestamp_channel);
2220 if (it != timestamp_loggers_.end()) {
2221 CHECK(channel_timestamp_loggers_
2222 .try_emplace(std::make_pair(channel, connection), it->second)
2223 .second);
2224 return it->second.get();
2225 }
2226 }
2227
2228 // Otherwise, make a sender, save it, and cache it.
2229 auto result = channel_timestamp_loggers_.try_emplace(
2230 std::make_pair(channel, connection),
2231 std::make_shared<RemoteMessageSender>(
2232 event_loop()->MakeSender<RemoteMessage>(
2233 timestamp_channel->name()->string_view()),
2234 event_loop()));
2235
2236 CHECK(timestamp_loggers_.try_emplace(timestamp_channel, result.first->second)
2237 .second);
2238 return result.first->second.get();
Austin Schuh858c9f32020-08-31 16:56:12 -07002239}
2240
Austin Schuhdda74ec2021-01-03 19:30:37 -08002241TimestampedMessage LogReader::State::PopOldest() {
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002242 // multithreaded
James Kuszmaula16a7912022-06-17 10:58:12 -07002243 if (message_queuer_.has_value()) {
2244 std::optional<TimestampedMessage> message = message_queuer_->Pop();
2245 CHECK(message.has_value()) << ": Unexpectedly ran out of messages.";
2246 message_queuer_->SetState(
2247 message.value().monotonic_event_time +
2248 std::chrono::duration_cast<std::chrono::nanoseconds>(
2249 std::chrono::duration<double>(FLAGS_threaded_look_ahead_seconds)));
2250 return message.value();
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002251 } else { // single threaded
James Kuszmaula16a7912022-06-17 10:58:12 -07002252 CHECK(timestamp_mapper_ != nullptr);
2253 TimestampedMessage *result_ptr = timestamp_mapper_->Front();
2254 CHECK(result_ptr != nullptr);
Austin Schuh858c9f32020-08-31 16:56:12 -07002255
James Kuszmaula16a7912022-06-17 10:58:12 -07002256 TimestampedMessage result = std::move(*result_ptr);
Austin Schuhe639ea12021-01-25 13:00:22 -08002257
Alexei Strots036d84e2023-05-03 16:05:12 -07002258 VLOG(2) << "Node '" << MaybeNodeName(event_loop_->node())
2259 << "': PopOldest Popping " << result.monotonic_event_time;
James Kuszmaula16a7912022-06-17 10:58:12 -07002260 timestamp_mapper_->PopFront();
2261 SeedSortedMessages();
Austin Schuh858c9f32020-08-31 16:56:12 -07002262
James Kuszmaula16a7912022-06-17 10:58:12 -07002263 CHECK_EQ(result.monotonic_event_time.boot, boot_count());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002264
James Kuszmaula16a7912022-06-17 10:58:12 -07002265 VLOG(1) << "Popped " << result
2266 << configuration::CleanedChannelToString(
2267 event_loop_->configuration()->channels()->Get(
2268 factory_channel_index_[result.channel_index]));
2269 return result;
2270 }
Austin Schuh858c9f32020-08-31 16:56:12 -07002271}
2272
James Kuszmaula16a7912022-06-17 10:58:12 -07002273BootTimestamp LogReader::State::MultiThreadedOldestMessageTime() {
2274 if (!message_queuer_.has_value()) {
2275 return SingleThreadedOldestMessageTime();
2276 }
2277 std::optional<TimestampedMessage> message = message_queuer_->Peek();
2278 if (!message.has_value()) {
2279 return BootTimestamp::max_time();
2280 }
2281 if (message.value().monotonic_event_time.boot == boot_count()) {
2282 ObserveNextMessage(message.value().monotonic_event_time.time,
2283 message.value().realtime_event_time);
2284 }
2285 return message.value().monotonic_event_time;
2286}
2287
2288BootTimestamp LogReader::State::SingleThreadedOldestMessageTime() {
2289 CHECK(!message_queuer_.has_value())
2290 << "Cannot use SingleThreadedOldestMessageTime() once the queuer thread "
2291 "is created.";
Austin Schuhe639ea12021-01-25 13:00:22 -08002292 if (timestamp_mapper_ == nullptr) {
Austin Schuh58646e22021-08-23 23:51:46 -07002293 return BootTimestamp::max_time();
Austin Schuh287d43d2020-12-04 20:19:33 -08002294 }
Austin Schuhe639ea12021-01-25 13:00:22 -08002295 TimestampedMessage *result_ptr = timestamp_mapper_->Front();
2296 if (result_ptr == nullptr) {
Austin Schuh58646e22021-08-23 23:51:46 -07002297 return BootTimestamp::max_time();
Austin Schuhe639ea12021-01-25 13:00:22 -08002298 }
Alexei Strots036d84e2023-05-03 16:05:12 -07002299 VLOG(2) << "Node '" << MaybeNodeName(node()) << "': oldest message at "
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002300 << result_ptr->monotonic_event_time.time;
Austin Schuhe33c08d2022-02-03 18:15:21 -08002301 if (result_ptr->monotonic_event_time.boot == boot_count()) {
2302 ObserveNextMessage(result_ptr->monotonic_event_time.time,
2303 result_ptr->realtime_event_time);
2304 }
Austin Schuh58646e22021-08-23 23:51:46 -07002305 return result_ptr->monotonic_event_time;
Austin Schuh858c9f32020-08-31 16:56:12 -07002306}
2307
2308void LogReader::State::SeedSortedMessages() {
Austin Schuh287d43d2020-12-04 20:19:33 -08002309 if (!timestamp_mapper_) return;
Austin Schuh858c9f32020-08-31 16:56:12 -07002310
Austin Schuhe639ea12021-01-25 13:00:22 -08002311 timestamp_mapper_->QueueFor(chrono::duration_cast<chrono::seconds>(
2312 chrono::duration<double>(FLAGS_time_estimation_buffer_seconds)));
Austin Schuh858c9f32020-08-31 16:56:12 -07002313}
2314
2315void LogReader::State::Deregister() {
Austin Schuh58646e22021-08-23 23:51:46 -07002316 if (started_ && !stopped_) {
Austin Schuhe33c08d2022-02-03 18:15:21 -08002317 NotifyLogfileEnd();
Austin Schuh58646e22021-08-23 23:51:46 -07002318 }
Austin Schuh858c9f32020-08-31 16:56:12 -07002319 for (size_t i = 0; i < channels_.size(); ++i) {
2320 channels_[i].reset();
2321 }
Austin Schuhe33c08d2022-02-03 18:15:21 -08002322 ClearTimeFlags();
Austin Schuh61e973f2021-02-21 21:43:56 -08002323 channel_timestamp_loggers_.clear();
2324 timestamp_loggers_.clear();
Austin Schuh858c9f32020-08-31 16:56:12 -07002325 event_loop_unique_ptr_.reset();
2326 event_loop_ = nullptr;
2327 timer_handler_ = nullptr;
2328 node_event_loop_factory_ = nullptr;
James Kuszmaula16a7912022-06-17 10:58:12 -07002329 timing_statistics_sender_ = Sender<timing::ReplayTiming>();
Austin Schuh858c9f32020-08-31 16:56:12 -07002330}
2331
Austin Schuhe33c08d2022-02-03 18:15:21 -08002332void LogReader::State::SetStartTimeFlag(realtime_clock::time_point start_time) {
2333 if (start_time != realtime_clock::min_time) {
2334 start_event_notifier_ = std::make_unique<EventNotifier>(
2335 event_loop_, [this]() { NotifyFlagStart(); }, "flag_start", start_time);
2336 }
2337}
2338
2339void LogReader::State::SetEndTimeFlag(realtime_clock::time_point end_time) {
2340 if (end_time != realtime_clock::max_time) {
2341 end_event_notifier_ = std::make_unique<EventNotifier>(
2342 event_loop_, [this]() { NotifyFlagEnd(); }, "flag_end", end_time);
2343 }
2344}
2345
2346void LogReader::State::ObserveNextMessage(
2347 monotonic_clock::time_point monotonic_event,
2348 realtime_clock::time_point realtime_event) {
2349 if (start_event_notifier_) {
2350 start_event_notifier_->ObserveNextMessage(monotonic_event, realtime_event);
2351 }
2352 if (end_event_notifier_) {
2353 end_event_notifier_->ObserveNextMessage(monotonic_event, realtime_event);
2354 }
2355}
2356
2357void LogReader::State::ClearTimeFlags() {
2358 start_event_notifier_.reset();
2359 end_event_notifier_.reset();
2360}
2361
2362void LogReader::State::NotifyLogfileStart() {
James Kuszmaul82c3b512023-07-08 20:25:41 -07002363 // If the start_event_notifier_ is set, that means that a realtime start time
2364 // was set manually; when the override is set, we want to delay any startup
2365 // handlers that would've happened before requested start time until that
2366 // start time.
Austin Schuhe33c08d2022-02-03 18:15:21 -08002367 if (start_event_notifier_) {
Philipp Schrader790cb542023-07-05 21:06:52 -07002368 // Only call OnStart() if the start time for this node
2369 // (realtime_start_time())
Austin Schuhe33c08d2022-02-03 18:15:21 -08002370 if (start_event_notifier_->realtime_event_time() >
2371 realtime_start_time(boot_count())) {
2372 VLOG(1) << "Skipping, " << start_event_notifier_->realtime_event_time()
2373 << " > " << realtime_start_time(boot_count());
2374 return;
2375 }
2376 }
2377 if (found_last_message_) {
2378 VLOG(1) << "Last message already found, bailing";
2379 return;
2380 }
2381 RunOnStart();
2382}
2383
2384void LogReader::State::NotifyFlagStart() {
James Kuszmaul82c3b512023-07-08 20:25:41 -07002385 // Should only be called if start_event_notifier_ has been set (which happens
2386 // as part of setting an explicit start time); only call the startup functions
2387 // that occurred *before* the start flag value.
Austin Schuhe33c08d2022-02-03 18:15:21 -08002388 if (start_event_notifier_->realtime_event_time() >=
2389 realtime_start_time(boot_count())) {
2390 RunOnStart();
2391 }
2392}
2393
2394void LogReader::State::NotifyLogfileEnd() {
James Kuszmaul82c3b512023-07-08 20:25:41 -07002395 // Don't execute the OnEnd handlers if the logfile was ended artifically
2396 // early.
Austin Schuhe33c08d2022-02-03 18:15:21 -08002397 if (found_last_message_) {
2398 return;
2399 }
2400
James Kuszmaul82c3b512023-07-08 20:25:41 -07002401 // Ensure that we only call OnEnd() if OnStart() was already called for this
2402 // boot (and don't call OnEnd() twice).
Austin Schuhe33c08d2022-02-03 18:15:21 -08002403 if (!stopped_ && started_) {
2404 RunOnEnd();
2405 }
2406}
2407
2408void LogReader::State::NotifyFlagEnd() {
James Kuszmaul82c3b512023-07-08 20:25:41 -07002409 // Ensure that we only call OnEnd() if OnStart() was already called for this
2410 // boot (and don't call OnEnd() twice).
Austin Schuhe33c08d2022-02-03 18:15:21 -08002411 if (!stopped_ && started_) {
2412 RunOnEnd();
2413 SetFoundLastMessage(true);
James Kuszmaulb11a1502022-07-01 16:02:25 -07002414 CHECK(notice_realtime_end_);
2415 notice_realtime_end_();
Austin Schuhe33c08d2022-02-03 18:15:21 -08002416 }
2417}
2418
James Kuszmaulc3f34d12022-08-15 15:57:55 -07002419void LogReader::State::MaybeSetClockOffset() {
James Kuszmaul09632422022-05-25 15:56:19 -07002420 if (node_event_loop_factory_ == nullptr) {
2421 // If not running with simulated event loop, set the monotonic clock
2422 // offset.
2423 clock_offset_ = event_loop()->monotonic_now() - monotonic_start_time(0);
2424
2425 if (start_event_notifier_) {
2426 start_event_notifier_->SetClockOffset(clock_offset_);
2427 }
2428 if (end_event_notifier_) {
2429 end_event_notifier_->SetClockOffset(clock_offset_);
2430 }
2431 }
2432}
2433
James Kuszmaulb67409b2022-06-20 16:25:03 -07002434void LogReader::SetRealtimeReplayRate(double replay_rate) {
2435 CHECK(event_loop_factory_ != nullptr)
2436 << ": Can't set replay rate without an event loop factory (have you "
2437 "called Register()?).";
2438 event_loop_factory_->SetRealtimeReplayRate(replay_rate);
2439}
2440
James Kuszmaulb11a1502022-07-01 16:02:25 -07002441void LogReader::NoticeRealtimeEnd() {
2442 CHECK_GE(live_nodes_with_realtime_time_end_, 1u);
2443 --live_nodes_with_realtime_time_end_;
2444 if (live_nodes_with_realtime_time_end_ == 0 && exit_on_finish() &&
2445 event_loop_factory_ != nullptr) {
2446 event_loop_factory_->Exit();
2447 }
2448}
2449
Eric Schmiedebergae00e732023-04-12 15:53:17 -06002450bool LogReader::AreStatesInitialized() const {
2451 for (const auto &state : states_) {
2452 if (state) {
2453 return true;
2454 }
2455 }
2456 return false;
2457}
2458
Austin Schuhe309d2a2019-11-29 13:25:21 -08002459} // namespace logger
2460} // namespace aos