blob: 89839ecd1ba9f073fce11e0549dc0d7b27776ce6 [file] [log] [blame]
James Kuszmaul38735e82019-12-07 16:42:06 -08001#include "aos/events/logging/logger.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -08002
3#include <fcntl.h>
Austin Schuh4c4e0092019-12-22 16:18:03 -08004#include <limits.h>
Austin Schuhe309d2a2019-11-29 13:25:21 -08005#include <sys/stat.h>
6#include <sys/types.h>
7#include <sys/uio.h>
8#include <vector>
9
Austin Schuh8bd96322020-02-13 21:18:22 -080010#include "Eigen/Dense"
Austin Schuh2f8fd752020-09-01 22:38:28 -070011#include "absl/strings/escaping.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080012#include "absl/types/span.h"
13#include "aos/events/event_loop.h"
James Kuszmaul38735e82019-12-07 16:42:06 -080014#include "aos/events/logging/logger_generated.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080015#include "aos/flatbuffer_merge.h"
Austin Schuh288479d2019-12-18 19:47:52 -080016#include "aos/network/team_number.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080017#include "aos/time/time.h"
18#include "flatbuffers/flatbuffers.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -070019#include "third_party/gmp/gmpxx.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080020
Austin Schuh15649d62019-12-28 16:36:38 -080021DEFINE_bool(skip_missing_forwarding_entries, false,
22 "If true, drop any forwarding entries with missing data. If "
23 "false, CHECK.");
Austin Schuhe309d2a2019-11-29 13:25:21 -080024
Austin Schuh8bd96322020-02-13 21:18:22 -080025DEFINE_bool(timestamps_to_csv, false,
26 "If true, write all the time synchronization information to a set "
27 "of CSV files in /tmp/. This should only be needed when debugging "
28 "time synchronization.");
29
Austin Schuh2f8fd752020-09-01 22:38:28 -070030DEFINE_bool(skip_order_validation, false,
31 "If true, ignore any out of orderness in replay");
32
Austin Schuhe309d2a2019-11-29 13:25:21 -080033namespace aos {
34namespace logger {
Austin Schuhe309d2a2019-11-29 13:25:21 -080035namespace chrono = std::chrono;
36
Austin Schuh2f8fd752020-09-01 22:38:28 -070037void MultiNodeLogNamer::WriteHeader(
38 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> &header,
39 const Node *node) {
40 if (node == this->node()) {
41 data_writer_->WriteSizedFlatbuffer(header.full_span());
42 } else {
43 for (std::pair<const Channel *const, DataWriter> &data_writer :
44 data_writers_) {
45 if (node == data_writer.second.node) {
46 data_writer.second.writer->WriteSizedFlatbuffer(header.full_span());
47 }
48 }
49 }
50}
51
52void MultiNodeLogNamer::Rotate(
53 const Node *node,
54 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> &header) {
55 if (node == this->node()) {
56 ++part_number_;
57 *data_writer_ = std::move(*OpenDataWriter());
58 data_writer_->WriteSizedFlatbuffer(header.full_span());
59 } else {
60 for (std::pair<const Channel *const, DataWriter> &data_writer :
61 data_writers_) {
62 if (node == data_writer.second.node) {
63 ++data_writer.second.part_number;
64 data_writer.second.rotate(data_writer.first, &data_writer.second);
65 data_writer.second.writer->WriteSizedFlatbuffer(header.full_span());
66 }
67 }
68 }
69}
70
71Logger::Logger(std::string_view base_name, EventLoop *event_loop,
Austin Schuhe309d2a2019-11-29 13:25:21 -080072 std::chrono::milliseconds polling_period)
Austin Schuh2f8fd752020-09-01 22:38:28 -070073 : Logger(std::make_unique<LocalLogNamer>(base_name, event_loop->node()),
Austin Schuh6f3babe2020-01-26 20:34:50 -080074 event_loop, polling_period) {}
75
76Logger::Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
77 std::chrono::milliseconds polling_period)
Austin Schuhe309d2a2019-11-29 13:25:21 -080078 : event_loop_(event_loop),
Austin Schuh6f3babe2020-01-26 20:34:50 -080079 log_namer_(std::move(log_namer)),
Austin Schuhe309d2a2019-11-29 13:25:21 -080080 timer_handler_(event_loop_->AddTimer([this]() { DoLogData(); })),
Austin Schuh2f8fd752020-09-01 22:38:28 -070081 polling_period_(polling_period),
82 server_statistics_fetcher_(
83 configuration::MultiNode(event_loop_->configuration())
84 ? event_loop_->MakeFetcher<message_bridge::ServerStatistics>(
85 "/aos")
86 : aos::Fetcher<message_bridge::ServerStatistics>()) {
Austin Schuh6f3babe2020-01-26 20:34:50 -080087 VLOG(1) << "Starting logger for " << FlatbufferToJson(event_loop_->node());
88 int channel_index = 0;
Austin Schuh2f8fd752020-09-01 22:38:28 -070089
90 // Find all the nodes which are logging timestamps on our node.
91 std::set<const Node *> timestamp_logger_nodes;
92 for (const Channel *channel : *event_loop_->configuration()->channels()) {
93 if (!configuration::ChannelIsSendableOnNode(channel, event_loop_->node()) ||
94 !channel->has_destination_nodes()) {
95 continue;
96 }
97 for (const Connection *connection : *channel->destination_nodes()) {
98 const Node *other_node = configuration::GetNode(
99 event_loop_->configuration(), connection->name()->string_view());
100
101 if (configuration::ConnectionDeliveryTimeIsLoggedOnNode(
102 connection, event_loop_->node())) {
103 VLOG(1) << "Timestamps are logged from "
104 << FlatbufferToJson(other_node);
105 timestamp_logger_nodes.insert(other_node);
106 }
107 }
108 }
109
110 std::map<const Channel *, const Node *> timestamp_logger_channels;
111
112 // Now that we have all the nodes accumulated, make remote timestamp loggers
113 // for them.
114 for (const Node *node : timestamp_logger_nodes) {
115 const Channel *channel = configuration::GetChannel(
116 event_loop_->configuration(),
117 absl::StrCat("/aos/remote_timestamps/", node->name()->string_view()),
118 logger::MessageHeader::GetFullyQualifiedName(), event_loop_->name(),
119 event_loop_->node());
120
121 CHECK(channel != nullptr)
122 << ": Remote timestamps are logged on "
123 << event_loop_->node()->name()->string_view()
124 << " but can't find channel /aos/remote_timestamps/"
125 << node->name()->string_view();
126 timestamp_logger_channels.insert(std::make_pair(channel, node));
127 }
128
129 const size_t our_node_index = configuration::GetNodeIndex(
130 event_loop_->configuration(), event_loop_->node());
131
Austin Schuhe309d2a2019-11-29 13:25:21 -0800132 for (const Channel *channel : *event_loop_->configuration()->channels()) {
133 FetcherStruct fs;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700134 fs.node_index = our_node_index;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800135 const bool is_local =
136 configuration::ChannelIsSendableOnNode(channel, event_loop_->node());
137
Austin Schuh15649d62019-12-28 16:36:38 -0800138 const bool is_readable =
139 configuration::ChannelIsReadableOnNode(channel, event_loop_->node());
140 const bool log_message = configuration::ChannelMessageIsLoggedOnNode(
141 channel, event_loop_->node()) &&
142 is_readable;
143
144 const bool log_delivery_times =
145 (event_loop_->node() == nullptr)
146 ? false
147 : configuration::ConnectionDeliveryTimeIsLoggedOnNode(
148 channel, event_loop_->node(), event_loop_->node());
149
Austin Schuh2f8fd752020-09-01 22:38:28 -0700150 // Now, detect a MessageHeader timestamp logger where we should just log the
151 // contents to a file directly.
152 const bool log_contents = timestamp_logger_channels.find(channel) !=
153 timestamp_logger_channels.end();
154 const Node *timestamp_node =
155 log_contents ? timestamp_logger_channels.find(channel)->second
156 : nullptr;
157
158 if (log_message || log_delivery_times || log_contents) {
Austin Schuh15649d62019-12-28 16:36:38 -0800159 fs.fetcher = event_loop->MakeRawFetcher(channel);
160 VLOG(1) << "Logging channel "
161 << configuration::CleanedChannelToString(channel);
162
163 if (log_delivery_times) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800164 VLOG(1) << " Delivery times";
165 fs.timestamp_writer = log_namer_->MakeTimestampWriter(channel);
Austin Schuh15649d62019-12-28 16:36:38 -0800166 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800167 if (log_message) {
168 VLOG(1) << " Data";
169 fs.writer = log_namer_->MakeWriter(channel);
170 if (!is_local) {
171 fs.log_type = LogType::kLogRemoteMessage;
172 }
173 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700174 if (log_contents) {
175 VLOG(1) << "Timestamp logger channel "
176 << configuration::CleanedChannelToString(channel);
177 fs.contents_writer =
178 log_namer_->MakeForwardedTimestampWriter(channel, timestamp_node);
179 fs.node_index = configuration::GetNodeIndex(
180 event_loop_->configuration(), timestamp_node);
181 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800182 fs.channel_index = channel_index;
183 fs.written = false;
184 fetchers_.emplace_back(std::move(fs));
Austin Schuh15649d62019-12-28 16:36:38 -0800185 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800186 ++channel_index;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800187 }
188
Austin Schuh2f8fd752020-09-01 22:38:28 -0700189 node_state_.resize(configuration::MultiNode(event_loop_->configuration())
190 ? event_loop_->configuration()->nodes()->size()
191 : 1u);
Austin Schuhe309d2a2019-11-29 13:25:21 -0800192
Austin Schuh2f8fd752020-09-01 22:38:28 -0700193 for (const Node *node : log_namer_->nodes()) {
194 const int node_index =
195 configuration::GetNodeIndex(event_loop_->configuration(), node);
Austin Schuhe309d2a2019-11-29 13:25:21 -0800196
Austin Schuh2f8fd752020-09-01 22:38:28 -0700197 node_state_[node_index].log_file_header = MakeHeader(node);
198 }
Austin Schuhe309d2a2019-11-29 13:25:21 -0800199
Austin Schuh2f8fd752020-09-01 22:38:28 -0700200 // When things start, we want to log the header, then the most recent
201 // messages available on each fetcher to capture the previous state, then
202 // start polling.
203 event_loop_->OnRun([this]() { StartLogging(); });
Austin Schuhe309d2a2019-11-29 13:25:21 -0800204}
205
Austin Schuh2f8fd752020-09-01 22:38:28 -0700206void Logger::StartLogging() {
207 // Grab data from each channel right before we declare the log file started
208 // so we can capture the latest message on each channel. This lets us have
209 // non periodic messages with configuration that now get logged.
210 for (FetcherStruct &f : fetchers_) {
211 f.written = !f.fetcher->Fetch();
212 }
213
214 // Clear out any old timestamps in case we are re-starting logging.
215 for (size_t i = 0; i < node_state_.size(); ++i) {
216 SetStartTime(i, monotonic_clock::min_time, realtime_clock::min_time);
217 }
218
219 WriteHeader();
220
221 LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node())
222 << " start_time " << last_synchronized_time_;
223
224 timer_handler_->Setup(event_loop_->monotonic_now() + polling_period_,
225 polling_period_);
226}
227
Austin Schuhfa895892020-01-07 20:07:41 -0800228void Logger::WriteHeader() {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700229 if (configuration::MultiNode(event_loop_->configuration())) {
230 server_statistics_fetcher_.Fetch();
231 }
232
233 aos::monotonic_clock::time_point monotonic_start_time =
234 event_loop_->monotonic_now();
235 aos::realtime_clock::time_point realtime_start_time =
236 event_loop_->realtime_now();
237
238 // We need to pick a point in time to declare the log file "started". This
239 // starts here. It needs to be after everything is fetched so that the
240 // fetchers are all pointed at the most recent message before the start
241 // time.
242 last_synchronized_time_ = monotonic_start_time;
243
Austin Schuh6f3babe2020-01-26 20:34:50 -0800244 for (const Node *node : log_namer_->nodes()) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700245 const int node_index =
246 configuration::GetNodeIndex(event_loop_->configuration(), node);
247 MaybeUpdateTimestamp(node, node_index, monotonic_start_time,
248 realtime_start_time);
249 log_namer_->WriteHeader(node_state_[node_index].log_file_header, node);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800250 }
251}
Austin Schuh8bd96322020-02-13 21:18:22 -0800252
Austin Schuh2f8fd752020-09-01 22:38:28 -0700253void Logger::WriteMissingTimestamps() {
254 if (configuration::MultiNode(event_loop_->configuration())) {
255 server_statistics_fetcher_.Fetch();
256 } else {
257 return;
258 }
259
260 if (server_statistics_fetcher_.get() == nullptr) {
261 return;
262 }
263
264 for (const Node *node : log_namer_->nodes()) {
265 const int node_index =
266 configuration::GetNodeIndex(event_loop_->configuration(), node);
267 if (MaybeUpdateTimestamp(
268 node, node_index,
269 server_statistics_fetcher_.context().monotonic_event_time,
270 server_statistics_fetcher_.context().realtime_event_time)) {
271 log_namer_->Rotate(node, node_state_[node_index].log_file_header);
272 }
273 }
274}
275
276void Logger::SetStartTime(size_t node_index,
277 aos::monotonic_clock::time_point monotonic_start_time,
278 aos::realtime_clock::time_point realtime_start_time) {
279 node_state_[node_index].monotonic_start_time = monotonic_start_time;
280 node_state_[node_index].realtime_start_time = realtime_start_time;
281 node_state_[node_index]
282 .log_file_header.mutable_message()
283 ->mutate_monotonic_start_time(
284 std::chrono::duration_cast<std::chrono::nanoseconds>(
285 monotonic_start_time.time_since_epoch())
286 .count());
287 if (node_state_[node_index]
288 .log_file_header.mutable_message()
289 ->has_realtime_start_time()) {
290 node_state_[node_index]
291 .log_file_header.mutable_message()
292 ->mutate_realtime_start_time(
293 std::chrono::duration_cast<std::chrono::nanoseconds>(
294 realtime_start_time.time_since_epoch())
295 .count());
296 }
297}
298
299bool Logger::MaybeUpdateTimestamp(
300 const Node *node, int node_index,
301 aos::monotonic_clock::time_point monotonic_start_time,
302 aos::realtime_clock::time_point realtime_start_time) {
303 // Bail early if there the start times are already set.
304 if (node_state_[node_index].monotonic_start_time !=
305 monotonic_clock::min_time) {
306 return false;
307 }
308 if (configuration::MultiNode(event_loop_->configuration())) {
309 if (event_loop_->node() == node) {
310 // There are no offsets to compute for ourself, so always succeed.
311 SetStartTime(node_index, monotonic_start_time, realtime_start_time);
312 return true;
313 } else if (server_statistics_fetcher_.get() != nullptr) {
314 // We must be a remote node now. Look for the connection and see if it is
315 // connected.
316
317 for (const message_bridge::ServerConnection *connection :
318 *server_statistics_fetcher_->connections()) {
319 if (connection->node()->name()->string_view() !=
320 node->name()->string_view()) {
321 continue;
322 }
323
324 if (connection->state() != message_bridge::State::CONNECTED) {
325 VLOG(1) << node->name()->string_view()
326 << " is not connected, can't start it yet.";
327 break;
328 }
329
330 if (!connection->has_monotonic_offset()) {
331 VLOG(1) << "Missing monotonic offset for setting start time for node "
332 << aos::FlatbufferToJson(node);
333 break;
334 }
335
336 VLOG(1) << "Updating start time for " << aos::FlatbufferToJson(node);
337
338 // Found it and it is connected. Compensate and go.
339 monotonic_start_time +=
340 std::chrono::nanoseconds(connection->monotonic_offset());
341
342 SetStartTime(node_index, monotonic_start_time, realtime_start_time);
343 return true;
344 }
345 }
346 } else {
347 SetStartTime(node_index, monotonic_start_time, realtime_start_time);
348 return true;
349 }
350 return false;
351}
352
353aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> Logger::MakeHeader(
354 const Node *node) {
Austin Schuhfa895892020-01-07 20:07:41 -0800355 // Now write the header with this timestamp in it.
356 flatbuffers::FlatBufferBuilder fbb;
Austin Schuhd7b15da2020-02-17 15:06:11 -0800357 fbb.ForceDefaults(true);
Austin Schuhfa895892020-01-07 20:07:41 -0800358
Austin Schuh2f8fd752020-09-01 22:38:28 -0700359 // TODO(austin): Compress this much more efficiently. There are a bunch of
360 // duplicated schemas.
Austin Schuhfa895892020-01-07 20:07:41 -0800361 flatbuffers::Offset<aos::Configuration> configuration_offset =
362 CopyFlatBuffer(event_loop_->configuration(), &fbb);
363
364 flatbuffers::Offset<flatbuffers::String> string_offset =
365 fbb.CreateString(network::GetHostname());
366
367 flatbuffers::Offset<Node> node_offset;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700368
369 if (configuration::MultiNode(event_loop_->configuration())) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800370 node_offset = CopyFlatBuffer(node, &fbb);
Austin Schuhfa895892020-01-07 20:07:41 -0800371 }
372
373 aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
374
375 log_file_header_builder.add_name(string_offset);
376
377 // Only add the node if we are running in a multinode configuration.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800378 if (node != nullptr) {
Austin Schuhfa895892020-01-07 20:07:41 -0800379 log_file_header_builder.add_node(node_offset);
380 }
381
382 log_file_header_builder.add_configuration(configuration_offset);
383 // The worst case theoretical out of order is the polling period times 2.
384 // One message could get logged right after the boundary, but be for right
385 // before the next boundary. And the reverse could happen for another
386 // message. Report back 3x to be extra safe, and because the cost isn't
387 // huge on the read side.
388 log_file_header_builder.add_max_out_of_order_duration(
389 std::chrono::duration_cast<std::chrono::nanoseconds>(3 * polling_period_)
390 .count());
391
392 log_file_header_builder.add_monotonic_start_time(
393 std::chrono::duration_cast<std::chrono::nanoseconds>(
Austin Schuh2f8fd752020-09-01 22:38:28 -0700394 monotonic_clock::min_time.time_since_epoch())
Austin Schuhfa895892020-01-07 20:07:41 -0800395 .count());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700396 if (node == event_loop_->node()) {
397 log_file_header_builder.add_realtime_start_time(
398 std::chrono::duration_cast<std::chrono::nanoseconds>(
399 realtime_clock::min_time.time_since_epoch())
400 .count());
Austin Schuh6f3babe2020-01-26 20:34:50 -0800401 }
402
Austin Schuh2f8fd752020-09-01 22:38:28 -0700403 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
404 return fbb.Release();
405}
406
407void Logger::Rotate() {
408 for (const Node *node : log_namer_->nodes()) {
409 const int node_index =
410 configuration::GetNodeIndex(event_loop_->configuration(), node);
411 log_namer_->Rotate(node, node_state_[node_index].log_file_header);
412 }
413}
414
415void Logger::LogUntil(monotonic_clock::time_point t) {
416 WriteMissingTimestamps();
417
418 // Write each channel to disk, one at a time.
419 for (FetcherStruct &f : fetchers_) {
420 while (true) {
421 if (f.written) {
422 if (!f.fetcher->FetchNext()) {
423 VLOG(2) << "No new data on "
424 << configuration::CleanedChannelToString(
425 f.fetcher->channel());
426 break;
427 } else {
428 f.written = false;
429 }
430 }
431
432 CHECK(!f.written);
433
434 // TODO(james): Write tests to exercise this logic.
435 if (f.fetcher->context().monotonic_event_time < t) {
436 if (f.writer != nullptr) {
437 // Write!
438 flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
439 max_header_size_);
440 fbb.ForceDefaults(true);
441
442 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
443 f.channel_index, f.log_type));
444
445 VLOG(2) << "Writing data as node "
446 << FlatbufferToJson(event_loop_->node()) << " for channel "
447 << configuration::CleanedChannelToString(f.fetcher->channel())
448 << " to " << f.writer->filename() << " data "
449 << FlatbufferToJson(
450 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
451 fbb.GetBufferPointer()));
452
453 max_header_size_ = std::max(
454 max_header_size_, fbb.GetSize() - f.fetcher->context().size);
455 f.writer->QueueSizedFlatbuffer(&fbb);
456 }
457
458 if (f.timestamp_writer != nullptr) {
459 // And now handle timestamps.
460 flatbuffers::FlatBufferBuilder fbb;
461 fbb.ForceDefaults(true);
462
463 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
464 f.channel_index,
465 LogType::kLogDeliveryTimeOnly));
466
467 VLOG(2) << "Writing timestamps as node "
468 << FlatbufferToJson(event_loop_->node()) << " for channel "
469 << configuration::CleanedChannelToString(f.fetcher->channel())
470 << " to " << f.timestamp_writer->filename() << " timestamp "
471 << FlatbufferToJson(
472 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
473 fbb.GetBufferPointer()));
474
475 f.timestamp_writer->QueueSizedFlatbuffer(&fbb);
476 }
477
478 if (f.contents_writer != nullptr) {
479 // And now handle the special message contents channel. Copy the
480 // message into a FlatBufferBuilder and save it to disk.
481 // TODO(austin): We can be more efficient here when we start to
482 // care...
483 flatbuffers::FlatBufferBuilder fbb;
484 fbb.ForceDefaults(true);
485
486 const MessageHeader *msg =
487 flatbuffers::GetRoot<MessageHeader>(f.fetcher->context().data);
488
489 logger::MessageHeader::Builder message_header_builder(fbb);
490
491 // Note: this must match the same order as MessageBridgeServer and
492 // PackMessage. We want identical headers to have identical
493 // on-the-wire formats to make comparing them easier.
494 message_header_builder.add_channel_index(msg->channel_index());
495
496 message_header_builder.add_queue_index(msg->queue_index());
497 message_header_builder.add_monotonic_sent_time(
498 msg->monotonic_sent_time());
499 message_header_builder.add_realtime_sent_time(
500 msg->realtime_sent_time());
501
502 message_header_builder.add_monotonic_remote_time(
503 msg->monotonic_remote_time());
504 message_header_builder.add_realtime_remote_time(
505 msg->realtime_remote_time());
506 message_header_builder.add_remote_queue_index(
507 msg->remote_queue_index());
508
509 fbb.FinishSizePrefixed(message_header_builder.Finish());
510
511 f.contents_writer->QueueSizedFlatbuffer(&fbb);
512 }
513
514 f.written = true;
515 } else {
516 break;
517 }
518 }
519 }
520 last_synchronized_time_ = t;
Austin Schuhfa895892020-01-07 20:07:41 -0800521}
522
Austin Schuhe309d2a2019-11-29 13:25:21 -0800523void Logger::DoLogData() {
524 // We want to guarentee that messages aren't out of order by more than
525 // max_out_of_order_duration. To do this, we need sync points. Every write
526 // cycle should be a sync point.
Austin Schuhfa895892020-01-07 20:07:41 -0800527 const monotonic_clock::time_point monotonic_now =
528 event_loop_->monotonic_now();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800529
530 do {
531 // Move the sync point up by at most polling_period. This forces one sync
532 // per iteration, even if it is small.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700533 LogUntil(
534 std::min(last_synchronized_time_ + polling_period_, monotonic_now));
Austin Schuhe309d2a2019-11-29 13:25:21 -0800535
Austin Schuhe309d2a2019-11-29 13:25:21 -0800536 // If we missed cycles, we could be pretty far behind. Spin until we are
537 // caught up.
538 } while (last_synchronized_time_ + polling_period_ < monotonic_now);
Austin Schuhe309d2a2019-11-29 13:25:21 -0800539}
540
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800541LogReader::LogReader(std::string_view filename,
542 const Configuration *replay_configuration)
Austin Schuhfa895892020-01-07 20:07:41 -0800543 : LogReader(std::vector<std::string>{std::string(filename)},
544 replay_configuration) {}
545
546LogReader::LogReader(const std::vector<std::string> &filenames,
547 const Configuration *replay_configuration)
Austin Schuh6f3babe2020-01-26 20:34:50 -0800548 : LogReader(std::vector<std::vector<std::string>>{filenames},
549 replay_configuration) {}
550
551LogReader::LogReader(const std::vector<std::vector<std::string>> &filenames,
552 const Configuration *replay_configuration)
553 : filenames_(filenames),
554 log_file_header_(ReadHeader(filenames[0][0])),
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800555 replay_configuration_(replay_configuration) {
Austin Schuh6331ef92020-01-07 18:28:09 -0800556 MakeRemappedConfig();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800557
Austin Schuh6aa77be2020-02-22 21:06:40 -0800558 if (replay_configuration) {
559 CHECK_EQ(configuration::MultiNode(configuration()),
560 configuration::MultiNode(replay_configuration))
Austin Schuh2f8fd752020-09-01 22:38:28 -0700561 << ": Log file and replay config need to both be multi or single "
562 "node.";
Austin Schuh6aa77be2020-02-22 21:06:40 -0800563 }
564
Austin Schuh6f3babe2020-01-26 20:34:50 -0800565 if (!configuration::MultiNode(configuration())) {
Austin Schuh858c9f32020-08-31 16:56:12 -0700566 states_.emplace_back(
567 std::make_unique<State>(std::make_unique<ChannelMerger>(filenames)));
Austin Schuh8bd96322020-02-13 21:18:22 -0800568 } else {
Austin Schuh6aa77be2020-02-22 21:06:40 -0800569 if (replay_configuration) {
James Kuszmaul46d82582020-05-09 19:50:09 -0700570 CHECK_EQ(logged_configuration()->nodes()->size(),
Austin Schuh6aa77be2020-02-22 21:06:40 -0800571 replay_configuration->nodes()->size())
Austin Schuh2f8fd752020-09-01 22:38:28 -0700572 << ": Log file and replay config need to have matching nodes "
573 "lists.";
James Kuszmaul46d82582020-05-09 19:50:09 -0700574 for (const Node *node : *logged_configuration()->nodes()) {
575 if (configuration::GetNode(replay_configuration, node) == nullptr) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700576 LOG(FATAL) << "Found node " << FlatbufferToJson(node)
577 << " in logged config that is not present in the replay "
578 "config.";
James Kuszmaul46d82582020-05-09 19:50:09 -0700579 }
580 }
Austin Schuh6aa77be2020-02-22 21:06:40 -0800581 }
Austin Schuh8bd96322020-02-13 21:18:22 -0800582 states_.resize(configuration()->nodes()->size());
Austin Schuh6f3babe2020-01-26 20:34:50 -0800583 }
Austin Schuhe309d2a2019-11-29 13:25:21 -0800584}
585
Austin Schuh6aa77be2020-02-22 21:06:40 -0800586LogReader::~LogReader() {
Austin Schuh39580f12020-08-01 14:44:08 -0700587 if (event_loop_factory_unique_ptr_) {
588 Deregister();
589 } else if (event_loop_factory_ != nullptr) {
590 LOG(FATAL) << "Must call Deregister before the SimulatedEventLoopFactory "
591 "is destroyed";
592 }
Austin Schuh8bd96322020-02-13 21:18:22 -0800593 if (offset_fp_ != nullptr) {
594 fclose(offset_fp_);
595 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700596 // Zero out some buffers. It's easy to do use-after-frees on these, so make
597 // it more obvious.
Austin Schuh39580f12020-08-01 14:44:08 -0700598 if (remapped_configuration_buffer_) {
599 remapped_configuration_buffer_->Wipe();
600 }
601 log_file_header_.Wipe();
Austin Schuh8bd96322020-02-13 21:18:22 -0800602}
Austin Schuhe309d2a2019-11-29 13:25:21 -0800603
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800604const Configuration *LogReader::logged_configuration() const {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800605 return log_file_header_.message().configuration();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800606}
607
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800608const Configuration *LogReader::configuration() const {
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800609 return remapped_configuration_;
610}
611
Austin Schuh6f3babe2020-01-26 20:34:50 -0800612std::vector<const Node *> LogReader::Nodes() const {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700613 // Because the Node pointer will only be valid if it actually points to
614 // memory owned by remapped_configuration_, we need to wait for the
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800615 // remapped_configuration_ to be populated before accessing it.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800616 //
617 // Also, note, that when ever a map is changed, the nodes in here are
618 // invalidated.
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800619 CHECK(remapped_configuration_ != nullptr)
620 << ": Need to call Register before the node() pointer will be valid.";
Austin Schuh6f3babe2020-01-26 20:34:50 -0800621 return configuration::GetNodes(remapped_configuration_);
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800622}
Austin Schuh15649d62019-12-28 16:36:38 -0800623
Austin Schuh6f3babe2020-01-26 20:34:50 -0800624monotonic_clock::time_point LogReader::monotonic_start_time(const Node *node) {
Austin Schuh8bd96322020-02-13 21:18:22 -0800625 State *state =
626 states_[configuration::GetNodeIndex(configuration(), node)].get();
627 CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
628
Austin Schuh858c9f32020-08-31 16:56:12 -0700629 return state->monotonic_start_time();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800630}
631
Austin Schuh6f3babe2020-01-26 20:34:50 -0800632realtime_clock::time_point LogReader::realtime_start_time(const Node *node) {
Austin Schuh8bd96322020-02-13 21:18:22 -0800633 State *state =
634 states_[configuration::GetNodeIndex(configuration(), node)].get();
635 CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
636
Austin Schuh858c9f32020-08-31 16:56:12 -0700637 return state->realtime_start_time();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800638}
639
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800640void LogReader::Register() {
641 event_loop_factory_unique_ptr_ =
Austin Schuhac0771c2020-01-07 18:36:30 -0800642 std::make_unique<SimulatedEventLoopFactory>(configuration());
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800643 Register(event_loop_factory_unique_ptr_.get());
644}
645
Austin Schuh92547522019-12-28 14:33:43 -0800646void LogReader::Register(SimulatedEventLoopFactory *event_loop_factory) {
Austin Schuh92547522019-12-28 14:33:43 -0800647 event_loop_factory_ = event_loop_factory;
Austin Schuh92547522019-12-28 14:33:43 -0800648
Austin Schuh6f3babe2020-01-26 20:34:50 -0800649 for (const Node *node : configuration::GetNodes(configuration())) {
Austin Schuh8bd96322020-02-13 21:18:22 -0800650 const size_t node_index =
651 configuration::GetNodeIndex(configuration(), node);
Austin Schuh858c9f32020-08-31 16:56:12 -0700652 states_[node_index] =
653 std::make_unique<State>(std::make_unique<ChannelMerger>(filenames_));
Austin Schuh8bd96322020-02-13 21:18:22 -0800654 State *state = states_[node_index].get();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800655
Austin Schuh858c9f32020-08-31 16:56:12 -0700656 Register(state->SetNodeEventLoopFactory(
657 event_loop_factory_->GetNodeEventLoopFactory(node)));
Austin Schuhcde938c2020-02-02 17:30:07 -0800658 }
James Kuszmaul46d82582020-05-09 19:50:09 -0700659 if (live_nodes_ == 0) {
660 LOG(FATAL)
661 << "Don't have logs from any of the nodes in the replay config--are "
662 "you sure that the replay config matches the original config?";
663 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800664
Austin Schuh2f8fd752020-09-01 22:38:28 -0700665 // We need to now seed our per-node time offsets and get everything set up
666 // to run.
667 const size_t num_nodes = nodes_count();
Austin Schuhcde938c2020-02-02 17:30:07 -0800668
Austin Schuh8bd96322020-02-13 21:18:22 -0800669 // It is easiest to solve for per node offsets with a matrix rather than
670 // trying to solve the equations by hand. So let's get after it.
671 //
672 // Now, build up the map matrix.
673 //
Austin Schuh2f8fd752020-09-01 22:38:28 -0700674 // offset_matrix_ = (map_matrix_ + slope_matrix_) * [ta; tb; tc]
675 map_matrix_ = Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>::Zero(
676 filters_.size() + 1, num_nodes);
677 slope_matrix_ =
678 Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>::Zero(
679 filters_.size() + 1, num_nodes);
Austin Schuhcde938c2020-02-02 17:30:07 -0800680
Austin Schuh2f8fd752020-09-01 22:38:28 -0700681 offset_matrix_ =
682 Eigen::Matrix<mpq_class, Eigen::Dynamic, 1>::Zero(filters_.size() + 1);
683 valid_matrix_ =
684 Eigen::Matrix<bool, Eigen::Dynamic, 1>::Zero(filters_.size() + 1);
685 last_valid_matrix_ =
686 Eigen::Matrix<bool, Eigen::Dynamic, 1>::Zero(filters_.size() + 1);
Austin Schuhcde938c2020-02-02 17:30:07 -0800687
Austin Schuh2f8fd752020-09-01 22:38:28 -0700688 time_offset_matrix_ = Eigen::VectorXd::Zero(num_nodes);
689 time_slope_matrix_ = Eigen::VectorXd::Zero(num_nodes);
Austin Schuh8bd96322020-02-13 21:18:22 -0800690
Austin Schuh2f8fd752020-09-01 22:38:28 -0700691 // All times should average out to the distributed clock.
692 for (int i = 0; i < map_matrix_.cols(); ++i) {
693 // 1/num_nodes.
694 map_matrix_(0, i) = mpq_class(1, num_nodes);
695 }
696 valid_matrix_(0) = true;
Austin Schuh8bd96322020-02-13 21:18:22 -0800697
698 {
699 // Now, add the a - b -> sample elements.
700 size_t i = 1;
701 for (std::pair<const std::tuple<const Node *, const Node *>,
Austin Schuh2f8fd752020-09-01 22:38:28 -0700702 std::tuple<message_bridge::NoncausalOffsetEstimator>>
703 &filter : filters_) {
Austin Schuh8bd96322020-02-13 21:18:22 -0800704 const Node *const node_a = std::get<0>(filter.first);
705 const Node *const node_b = std::get<1>(filter.first);
706
707 const size_t node_a_index =
708 configuration::GetNodeIndex(configuration(), node_a);
709 const size_t node_b_index =
710 configuration::GetNodeIndex(configuration(), node_b);
711
Austin Schuh2f8fd752020-09-01 22:38:28 -0700712 // -a
713 map_matrix_(i, node_a_index) = mpq_class(-1);
714 // +b
715 map_matrix_(i, node_b_index) = mpq_class(1);
Austin Schuh8bd96322020-02-13 21:18:22 -0800716
717 // -> sample
Austin Schuh2f8fd752020-09-01 22:38:28 -0700718 std::get<0>(filter.second)
719 .set_slope_pointer(&slope_matrix_(i, node_a_index));
720 std::get<0>(filter.second).set_offset_pointer(&offset_matrix_(i, 0));
721
722 valid_matrix_(i) = false;
723 std::get<0>(filter.second).set_valid_pointer(&valid_matrix_(i));
Austin Schuh8bd96322020-02-13 21:18:22 -0800724
725 ++i;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800726 }
727 }
728
Austin Schuh858c9f32020-08-31 16:56:12 -0700729 for (std::unique_ptr<State> &state : states_) {
730 state->SeedSortedMessages();
731 }
732
Austin Schuh2f8fd752020-09-01 22:38:28 -0700733 // Rank of the map matrix tells you if all the nodes are in communication
734 // with each other, which tells you if the offsets are observable.
735 const size_t connected_nodes =
736 Eigen::FullPivLU<
737 Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>>(map_matrix_)
738 .rank();
739
740 // We don't need to support isolated nodes until someone has a real use
741 // case.
742 CHECK_EQ(connected_nodes, num_nodes)
743 << ": There is a node which isn't communicating with the rest.";
744
745 // And solve.
Austin Schuh8bd96322020-02-13 21:18:22 -0800746 UpdateOffsets();
747
Austin Schuh2f8fd752020-09-01 22:38:28 -0700748 // We want to start the log file at the last start time of the log files
749 // from all the nodes. Compute how long each node's simulation needs to run
750 // to move time to this point.
Austin Schuh8bd96322020-02-13 21:18:22 -0800751 distributed_clock::time_point start_time = distributed_clock::min_time;
Austin Schuhcde938c2020-02-02 17:30:07 -0800752
Austin Schuh2f8fd752020-09-01 22:38:28 -0700753 // TODO(austin): We want an "OnStart" callback for each node rather than
754 // running until the last node.
755
Austin Schuh8bd96322020-02-13 21:18:22 -0800756 for (std::unique_ptr<State> &state : states_) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700757 VLOG(1) << "Start time is " << state->monotonic_start_time() << " for node "
758 << MaybeNodeName(state->event_loop()->node()) << "now "
759 << state->monotonic_now();
760 // And start computing the start time on the distributed clock now that
761 // that works.
Austin Schuh858c9f32020-08-31 16:56:12 -0700762 start_time = std::max(
763 start_time, state->ToDistributedClock(state->monotonic_start_time()));
Austin Schuhcde938c2020-02-02 17:30:07 -0800764 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700765
766 CHECK_GE(start_time, distributed_clock::epoch())
767 << ": Hmm, we have a node starting before the start of time. Offset "
768 "everything.";
Austin Schuhcde938c2020-02-02 17:30:07 -0800769
Austin Schuh6f3babe2020-01-26 20:34:50 -0800770 // Forwarding is tracked per channel. If it is enabled, we want to turn it
771 // off. Otherwise messages replayed will get forwarded across to the other
Austin Schuh2f8fd752020-09-01 22:38:28 -0700772 // nodes, and also replayed on the other nodes. This may not satisfy all
773 // our users, but it'll start the discussion.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800774 if (configuration::MultiNode(event_loop_factory_->configuration())) {
775 for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
776 const Channel *channel = logged_configuration()->channels()->Get(i);
777 const Node *node = configuration::GetNode(
778 configuration(), channel->source_node()->string_view());
779
Austin Schuh8bd96322020-02-13 21:18:22 -0800780 State *state =
781 states_[configuration::GetNodeIndex(configuration(), node)].get();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800782
783 const Channel *remapped_channel =
Austin Schuh858c9f32020-08-31 16:56:12 -0700784 RemapChannel(state->event_loop(), channel);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800785
786 event_loop_factory_->DisableForwarding(remapped_channel);
787 }
Austin Schuh4c3b9702020-08-30 11:34:55 -0700788
789 // If we are replaying a log, we don't want a bunch of redundant messages
790 // from both the real message bridge and simulated message bridge.
791 event_loop_factory_->DisableStatistics();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800792 }
793
Austin Schuhcde938c2020-02-02 17:30:07 -0800794 // While we are starting the system up, we might be relying on matching data
795 // to timestamps on log files where the timestamp log file starts before the
796 // data. In this case, it is reasonable to expect missing data.
797 ignore_missing_data_ = true;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700798 VLOG(1) << "Running until " << start_time << " in Register";
Austin Schuh8bd96322020-02-13 21:18:22 -0800799 event_loop_factory_->RunFor(start_time.time_since_epoch());
Brian Silverman8a32ce62020-08-12 12:02:38 -0700800 VLOG(1) << "At start time";
Austin Schuhcde938c2020-02-02 17:30:07 -0800801 // Now that we are running for real, missing data means that the log file is
802 // corrupted or went wrong.
803 ignore_missing_data_ = false;
Austin Schuh92547522019-12-28 14:33:43 -0800804
Austin Schuh8bd96322020-02-13 21:18:22 -0800805 for (std::unique_ptr<State> &state : states_) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700806 // Make the RT clock be correct before handing it to the user.
807 if (state->realtime_start_time() != realtime_clock::min_time) {
808 state->SetRealtimeOffset(state->monotonic_start_time(),
809 state->realtime_start_time());
810 }
811 VLOG(1) << "Start time is " << state->monotonic_start_time() << " for node "
812 << MaybeNodeName(state->event_loop()->node()) << "now "
813 << state->monotonic_now();
814 }
815
816 if (FLAGS_timestamps_to_csv) {
817 for (std::pair<const std::tuple<const Node *, const Node *>,
818 std::tuple<message_bridge::NoncausalOffsetEstimator>>
819 &filter : filters_) {
820 const Node *const node_a = std::get<0>(filter.first);
821 const Node *const node_b = std::get<1>(filter.first);
822
823 std::get<0>(filter.second)
824 .SetFirstFwdTime(event_loop_factory_->GetNodeEventLoopFactory(node_a)
825 ->monotonic_now());
826 std::get<0>(filter.second)
827 .SetFirstRevTime(event_loop_factory_->GetNodeEventLoopFactory(node_b)
828 ->monotonic_now());
829 }
Austin Schuh8bd96322020-02-13 21:18:22 -0800830 }
831}
832
Austin Schuh2f8fd752020-09-01 22:38:28 -0700833void LogReader::UpdateOffsets() {
834 VLOG(2) << "Samples are " << offset_matrix_;
835 VLOG(2) << "Map is " << (map_matrix_ + slope_matrix_);
836 std::tie(time_slope_matrix_, time_offset_matrix_) = SolveOffsets();
837 Eigen::IOFormat HeavyFmt(Eigen::FullPrecision, 0, ", ", ";\n", "[", "]", "[",
838 "]");
839 VLOG(1) << "First slope " << time_slope_matrix_.transpose().format(HeavyFmt)
840 << " offset " << time_offset_matrix_.transpose().format(HeavyFmt);
841
842 size_t node_index = 0;
843 for (std::unique_ptr<State> &state : states_) {
844 state->SetDistributedOffset(offset(node_index), slope(node_index));
845 VLOG(1) << "Offset for node " << node_index << " "
846 << MaybeNodeName(state->event_loop()->node()) << "is "
847 << aos::distributed_clock::time_point(offset(node_index))
848 << " slope " << std::setprecision(9) << std::fixed
849 << slope(node_index);
850 ++node_index;
851 }
852
853 if (VLOG_IS_ON(1)) {
854 LogFit("Offset is");
855 }
856}
857
858void LogReader::LogFit(std::string_view prefix) {
859 for (std::unique_ptr<State> &state : states_) {
860 VLOG(1) << MaybeNodeName(state->event_loop()->node()) << " now "
861 << state->monotonic_now() << " distributed "
862 << event_loop_factory_->distributed_now();
863 }
864
865 for (std::pair<const std::tuple<const Node *, const Node *>,
866 std::tuple<message_bridge::NoncausalOffsetEstimator>> &filter :
867 filters_) {
868 message_bridge::NoncausalOffsetEstimator *estimator =
869 &std::get<0>(filter.second);
870
871 if (estimator->a_timestamps().size() == 0 &&
872 estimator->b_timestamps().size() == 0) {
873 continue;
874 }
875
876 if (VLOG_IS_ON(1)) {
877 estimator->LogFit(prefix);
878 }
879
880 const Node *const node_a = std::get<0>(filter.first);
881 const Node *const node_b = std::get<1>(filter.first);
882
883 const size_t node_a_index =
884 configuration::GetNodeIndex(configuration(), node_a);
885 const size_t node_b_index =
886 configuration::GetNodeIndex(configuration(), node_b);
887
888 const double recovered_slope =
889 slope(node_b_index) / slope(node_a_index) - 1.0;
890 const int64_t recovered_offset =
891 offset(node_b_index).count() - offset(node_a_index).count() *
892 slope(node_b_index) /
893 slope(node_a_index);
894
895 VLOG(1) << "Recovered slope " << std::setprecision(20) << recovered_slope
896 << " (error " << recovered_slope - estimator->fit().slope() << ") "
897 << " offset " << std::setprecision(20) << recovered_offset
898 << " (error "
899 << recovered_offset - estimator->fit().offset().count() << ")";
900
901 const aos::distributed_clock::time_point a0 =
902 states_[node_a_index]->ToDistributedClock(
903 std::get<0>(estimator->a_timestamps()[0]));
904 const aos::distributed_clock::time_point a1 =
905 states_[node_a_index]->ToDistributedClock(
906 std::get<0>(estimator->a_timestamps()[1]));
907
908 VLOG(1) << node_a->name()->string_view() << " timestamps()[0] = "
909 << std::get<0>(estimator->a_timestamps()[0]) << " -> " << a0
910 << " distributed -> " << node_b->name()->string_view() << " "
911 << states_[node_b_index]->FromDistributedClock(a0) << " should be "
912 << aos::monotonic_clock::time_point(
913 std::chrono::nanoseconds(static_cast<int64_t>(
914 std::get<0>(estimator->a_timestamps()[0])
915 .time_since_epoch()
916 .count() *
917 (1.0 + estimator->fit().slope()))) +
918 estimator->fit().offset())
919 << ((a0 <= event_loop_factory_->distributed_now())
920 ? ""
921 : " After now, investigate");
922 VLOG(1) << node_a->name()->string_view() << " timestamps()[1] = "
923 << std::get<0>(estimator->a_timestamps()[1]) << " -> " << a1
924 << " distributed -> " << node_b->name()->string_view() << " "
925 << states_[node_b_index]->FromDistributedClock(a1) << " should be "
926 << aos::monotonic_clock::time_point(
927 std::chrono::nanoseconds(static_cast<int64_t>(
928 std::get<0>(estimator->a_timestamps()[1])
929 .time_since_epoch()
930 .count() *
931 (1.0 + estimator->fit().slope()))) +
932 estimator->fit().offset())
933 << ((event_loop_factory_->distributed_now() <= a1)
934 ? ""
935 : " Before now, investigate");
936
937 const aos::distributed_clock::time_point b0 =
938 states_[node_b_index]->ToDistributedClock(
939 std::get<0>(estimator->b_timestamps()[0]));
940 const aos::distributed_clock::time_point b1 =
941 states_[node_b_index]->ToDistributedClock(
942 std::get<0>(estimator->b_timestamps()[1]));
943
944 VLOG(1) << node_b->name()->string_view() << " timestamps()[0] = "
945 << std::get<0>(estimator->b_timestamps()[0]) << " -> " << b0
946 << " distributed -> " << node_a->name()->string_view() << " "
947 << states_[node_a_index]->FromDistributedClock(b0)
948 << ((b0 <= event_loop_factory_->distributed_now())
949 ? ""
950 : " After now, investigate");
951 VLOG(1) << node_b->name()->string_view() << " timestamps()[1] = "
952 << std::get<0>(estimator->b_timestamps()[1]) << " -> " << b1
953 << " distributed -> " << node_a->name()->string_view() << " "
954 << states_[node_a_index]->FromDistributedClock(b1)
955 << ((event_loop_factory_->distributed_now() <= b1)
956 ? ""
957 : " Before now, investigate");
958 }
959}
960
961message_bridge::NoncausalOffsetEstimator *LogReader::GetFilter(
Austin Schuh8bd96322020-02-13 21:18:22 -0800962 const Node *node_a, const Node *node_b) {
963 CHECK_NE(node_a, node_b);
964 CHECK_EQ(configuration::GetNode(configuration(), node_a), node_a);
965 CHECK_EQ(configuration::GetNode(configuration(), node_b), node_b);
966
967 if (node_a > node_b) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700968 return GetFilter(node_b, node_a);
Austin Schuh8bd96322020-02-13 21:18:22 -0800969 }
970
971 auto tuple = std::make_tuple(node_a, node_b);
972
973 auto it = filters_.find(tuple);
974
975 if (it == filters_.end()) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700976 auto &x =
977 filters_
978 .insert(std::make_pair(
979 tuple, std::make_tuple(message_bridge::NoncausalOffsetEstimator(
980 node_a, node_b))))
981 .first->second;
Austin Schuh8bd96322020-02-13 21:18:22 -0800982 if (FLAGS_timestamps_to_csv) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700983 std::get<0>(x).SetFwdCsvFileName(absl::StrCat(
984 "/tmp/timestamp_noncausal_", node_a->name()->string_view(), "_",
985 node_b->name()->string_view()));
986 std::get<0>(x).SetRevCsvFileName(absl::StrCat(
987 "/tmp/timestamp_noncausal_", node_b->name()->string_view(), "_",
988 node_a->name()->string_view()));
Austin Schuh8bd96322020-02-13 21:18:22 -0800989 }
990
Austin Schuh2f8fd752020-09-01 22:38:28 -0700991 return &std::get<0>(x);
Austin Schuh8bd96322020-02-13 21:18:22 -0800992 } else {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700993 return &std::get<0>(it->second);
Austin Schuh8bd96322020-02-13 21:18:22 -0800994 }
995}
996
Austin Schuh8bd96322020-02-13 21:18:22 -0800997
Austin Schuhe309d2a2019-11-29 13:25:21 -0800998void LogReader::Register(EventLoop *event_loop) {
Austin Schuh8bd96322020-02-13 21:18:22 -0800999 State *state =
1000 states_[configuration::GetNodeIndex(configuration(), event_loop->node())]
1001 .get();
Austin Schuh6f3babe2020-01-26 20:34:50 -08001002
Austin Schuh858c9f32020-08-31 16:56:12 -07001003 state->set_event_loop(event_loop);
Austin Schuhe309d2a2019-11-29 13:25:21 -08001004
Tyler Chatow67ddb032020-01-12 14:30:04 -08001005 // We don't run timing reports when trying to print out logged data, because
1006 // otherwise we would end up printing out the timing reports themselves...
1007 // This is only really relevant when we are replaying into a simulation.
Austin Schuh6f3babe2020-01-26 20:34:50 -08001008 event_loop->SkipTimingReport();
1009 event_loop->SkipAosLog();
Austin Schuh39788ff2019-12-01 18:22:57 -08001010
Austin Schuh858c9f32020-08-31 16:56:12 -07001011 const bool has_data = state->SetNode();
Austin Schuhe309d2a2019-11-29 13:25:21 -08001012
Austin Schuh858c9f32020-08-31 16:56:12 -07001013 state->SetChannelCount(logged_configuration()->channels()->size());
Austin Schuh8bd96322020-02-13 21:18:22 -08001014
Austin Schuh858c9f32020-08-31 16:56:12 -07001015 for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
Austin Schuh6f3babe2020-01-26 20:34:50 -08001016 const Channel *channel =
1017 RemapChannel(event_loop, logged_configuration()->channels()->Get(i));
Austin Schuh6331ef92020-01-07 18:28:09 -08001018
Austin Schuh858c9f32020-08-31 16:56:12 -07001019 NodeEventLoopFactory *channel_target_event_loop_factory = nullptr;
Austin Schuh2f8fd752020-09-01 22:38:28 -07001020 message_bridge::NoncausalOffsetEstimator *filter = nullptr;
Austin Schuh8bd96322020-02-13 21:18:22 -08001021
1022 if (!configuration::ChannelIsSendableOnNode(channel, event_loop->node()) &&
1023 configuration::ChannelIsReadableOnNode(channel, event_loop->node())) {
1024 const Node *target_node = configuration::GetNode(
1025 event_loop->configuration(), channel->source_node()->string_view());
Austin Schuh858c9f32020-08-31 16:56:12 -07001026 filter = GetFilter(event_loop->node(), target_node);
Austin Schuh8bd96322020-02-13 21:18:22 -08001027
1028 if (event_loop_factory_ != nullptr) {
Austin Schuh858c9f32020-08-31 16:56:12 -07001029 channel_target_event_loop_factory =
Austin Schuh8bd96322020-02-13 21:18:22 -08001030 event_loop_factory_->GetNodeEventLoopFactory(target_node);
1031 }
1032 }
Austin Schuh858c9f32020-08-31 16:56:12 -07001033
1034 state->SetChannel(i, event_loop->MakeRawSender(channel), filter,
1035 channel_target_event_loop_factory);
Austin Schuhe309d2a2019-11-29 13:25:21 -08001036 }
1037
Austin Schuh6aa77be2020-02-22 21:06:40 -08001038 // If we didn't find any log files with data in them, we won't ever get a
1039 // callback or be live. So skip the rest of the setup.
1040 if (!has_data) {
1041 return;
1042 }
1043
Austin Schuh858c9f32020-08-31 16:56:12 -07001044 state->set_timer_handler(event_loop->AddTimer([this, state]() {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001045 VLOG(1) << "Starting sending " << MaybeNodeName(state->event_loop()->node())
1046 << "at " << state->event_loop()->context().monotonic_event_time
1047 << " now " << state->monotonic_now();
Austin Schuh858c9f32020-08-31 16:56:12 -07001048 if (state->OldestMessageTime() == monotonic_clock::max_time) {
Austin Schuh6f3babe2020-01-26 20:34:50 -08001049 --live_nodes_;
Austin Schuh2f8fd752020-09-01 22:38:28 -07001050 VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Node down!";
Austin Schuh6f3babe2020-01-26 20:34:50 -08001051 if (live_nodes_ == 0) {
1052 event_loop_factory_->Exit();
1053 }
James Kuszmaul314f1672020-01-03 20:02:08 -08001054 return;
1055 }
Austin Schuh6f3babe2020-01-26 20:34:50 -08001056 TimestampMerger::DeliveryTimestamp channel_timestamp;
Austin Schuh05b70472020-01-01 17:11:17 -08001057 int channel_index;
1058 FlatbufferVector<MessageHeader> channel_data =
1059 FlatbufferVector<MessageHeader>::Empty();
1060
Austin Schuh2f8fd752020-09-01 22:38:28 -07001061 if (VLOG_IS_ON(1)) {
1062 LogFit("Offset was");
1063 }
1064
1065 bool update_time;
Austin Schuh05b70472020-01-01 17:11:17 -08001066 std::tie(channel_timestamp, channel_index, channel_data) =
Austin Schuh2f8fd752020-09-01 22:38:28 -07001067 state->PopOldest(&update_time);
Austin Schuh05b70472020-01-01 17:11:17 -08001068
Austin Schuhe309d2a2019-11-29 13:25:21 -08001069 const monotonic_clock::time_point monotonic_now =
Austin Schuh858c9f32020-08-31 16:56:12 -07001070 state->event_loop()->context().monotonic_event_time;
Austin Schuh2f8fd752020-09-01 22:38:28 -07001071 if (!FLAGS_skip_order_validation) {
1072 CHECK(monotonic_now == channel_timestamp.monotonic_event_time)
1073 << ": " << FlatbufferToJson(state->event_loop()->node()) << " Now "
1074 << monotonic_now << " trying to send "
1075 << channel_timestamp.monotonic_event_time << " failure "
1076 << state->DebugString();
1077 } else if (monotonic_now != channel_timestamp.monotonic_event_time) {
1078 LOG(WARNING) << "Check failed: monotonic_now == "
1079 "channel_timestamp.monotonic_event_time) ("
1080 << monotonic_now << " vs. "
1081 << channel_timestamp.monotonic_event_time
1082 << "): " << FlatbufferToJson(state->event_loop()->node())
1083 << " Now " << monotonic_now << " trying to send "
1084 << channel_timestamp.monotonic_event_time << " failure "
1085 << state->DebugString();
1086 }
Austin Schuhe309d2a2019-11-29 13:25:21 -08001087
Austin Schuh6f3babe2020-01-26 20:34:50 -08001088 if (channel_timestamp.monotonic_event_time >
Austin Schuh858c9f32020-08-31 16:56:12 -07001089 state->monotonic_start_time() ||
Austin Schuh15649d62019-12-28 16:36:38 -08001090 event_loop_factory_ != nullptr) {
Austin Schuh8bd96322020-02-13 21:18:22 -08001091 if ((!ignore_missing_data_ && !FLAGS_skip_missing_forwarding_entries &&
Austin Schuh858c9f32020-08-31 16:56:12 -07001092 !state->at_end()) ||
Austin Schuh05b70472020-01-01 17:11:17 -08001093 channel_data.message().data() != nullptr) {
1094 CHECK(channel_data.message().data() != nullptr)
1095 << ": Got a message without data. Forwarding entry which was "
Austin Schuh2f8fd752020-09-01 22:38:28 -07001096 "not matched? Use --skip_missing_forwarding_entries to "
1097 "ignore "
Austin Schuh15649d62019-12-28 16:36:38 -08001098 "this.";
Austin Schuh92547522019-12-28 14:33:43 -08001099
Austin Schuh2f8fd752020-09-01 22:38:28 -07001100 if (update_time) {
Austin Schuh8bd96322020-02-13 21:18:22 -08001101 // Confirm that the message was sent on the sending node before the
1102 // destination node (this node). As a proxy, do this by making sure
1103 // that time on the source node is past when the message was sent.
Austin Schuh2f8fd752020-09-01 22:38:28 -07001104 if (!FLAGS_skip_order_validation) {
1105 CHECK_LT(channel_timestamp.monotonic_remote_time,
1106 state->monotonic_remote_now(channel_index))
1107 << state->event_loop()->node()->name()->string_view() << " to "
1108 << state->remote_node(channel_index)->name()->string_view()
1109 << " " << state->DebugString();
1110 } else if (channel_timestamp.monotonic_remote_time >=
1111 state->monotonic_remote_now(channel_index)) {
1112 LOG(WARNING)
1113 << "Check failed: channel_timestamp.monotonic_remote_time < "
1114 "state->monotonic_remote_now(channel_index) ("
1115 << channel_timestamp.monotonic_remote_time << " vs. "
1116 << state->monotonic_remote_now(channel_index) << ") "
1117 << state->event_loop()->node()->name()->string_view() << " to "
1118 << state->remote_node(channel_index)->name()->string_view()
1119 << " currently " << channel_timestamp.monotonic_event_time
1120 << " ("
1121 << state->ToDistributedClock(
1122 channel_timestamp.monotonic_event_time)
1123 << ") remote event time "
1124 << channel_timestamp.monotonic_remote_time << " ("
1125 << state->RemoteToDistributedClock(
1126 channel_index, channel_timestamp.monotonic_remote_time)
1127 << ") " << state->DebugString();
1128 }
Austin Schuh8bd96322020-02-13 21:18:22 -08001129
1130 if (FLAGS_timestamps_to_csv) {
1131 if (offset_fp_ == nullptr) {
1132 offset_fp_ = fopen("/tmp/offsets.csv", "w");
1133 fprintf(
1134 offset_fp_,
1135 "# time_since_start, offset node 0, offset node 1, ...\n");
1136 first_time_ = channel_timestamp.realtime_event_time;
1137 }
1138
1139 fprintf(offset_fp_, "%.9f",
1140 std::chrono::duration_cast<std::chrono::duration<double>>(
1141 channel_timestamp.realtime_event_time - first_time_)
1142 .count());
Austin Schuh2f8fd752020-09-01 22:38:28 -07001143 for (int i = 1; i < time_offset_matrix_.rows(); ++i) {
1144 fprintf(offset_fp_, ", %.9f",
1145 time_offset_matrix_(i, 0) +
1146 time_slope_matrix_(i, 0) *
1147 chrono::duration<double>(
1148 event_loop_factory_->distributed_now()
1149 .time_since_epoch())
1150 .count());
Austin Schuh8bd96322020-02-13 21:18:22 -08001151 }
1152 fprintf(offset_fp_, "\n");
1153 }
Austin Schuh8bd96322020-02-13 21:18:22 -08001154 }
1155
Austin Schuh15649d62019-12-28 16:36:38 -08001156 // If we have access to the factory, use it to fix the realtime time.
Austin Schuh858c9f32020-08-31 16:56:12 -07001157 state->SetRealtimeOffset(channel_timestamp.monotonic_event_time,
1158 channel_timestamp.realtime_event_time);
Austin Schuh15649d62019-12-28 16:36:38 -08001159
Austin Schuh2f8fd752020-09-01 22:38:28 -07001160 VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Sending "
1161 << channel_timestamp.monotonic_event_time;
1162 // TODO(austin): std::move channel_data in and make that efficient in
1163 // simulation.
Austin Schuh858c9f32020-08-31 16:56:12 -07001164 state->Send(channel_index, channel_data.message().data()->Data(),
1165 channel_data.message().data()->size(),
1166 channel_timestamp.monotonic_remote_time,
1167 channel_timestamp.realtime_remote_time,
1168 channel_timestamp.remote_queue_index);
Austin Schuh2f8fd752020-09-01 22:38:28 -07001169 } else if (state->at_end() && !ignore_missing_data_) {
Austin Schuh8bd96322020-02-13 21:18:22 -08001170 // We are at the end of the log file and found missing data. Finish
Austin Schuh2f8fd752020-09-01 22:38:28 -07001171 // reading the rest of the log file and call it quits. We don't want
1172 // to replay partial data.
Austin Schuh858c9f32020-08-31 16:56:12 -07001173 while (state->OldestMessageTime() != monotonic_clock::max_time) {
1174 bool update_time_dummy;
1175 state->PopOldest(&update_time_dummy);
Austin Schuh8bd96322020-02-13 21:18:22 -08001176 }
Austin Schuh2f8fd752020-09-01 22:38:28 -07001177 } else {
1178 CHECK(channel_data.message().data() == nullptr) << ": Nullptr";
Austin Schuh92547522019-12-28 14:33:43 -08001179 }
Austin Schuhe309d2a2019-11-29 13:25:21 -08001180 } else {
Austin Schuh6f3babe2020-01-26 20:34:50 -08001181 LOG(WARNING)
1182 << "Not sending data from before the start of the log file. "
1183 << channel_timestamp.monotonic_event_time.time_since_epoch().count()
1184 << " start " << monotonic_start_time().time_since_epoch().count()
1185 << " " << FlatbufferToJson(channel_data);
Austin Schuhe309d2a2019-11-29 13:25:21 -08001186 }
1187
Austin Schuh858c9f32020-08-31 16:56:12 -07001188 const monotonic_clock::time_point next_time = state->OldestMessageTime();
Austin Schuh6f3babe2020-01-26 20:34:50 -08001189 if (next_time != monotonic_clock::max_time) {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001190 VLOG(1) << "Scheduling " << MaybeNodeName(state->event_loop()->node())
1191 << "wakeup for " << next_time << "("
1192 << state->ToDistributedClock(next_time)
1193 << " distributed), now is " << state->monotonic_now();
Austin Schuh858c9f32020-08-31 16:56:12 -07001194 state->Setup(next_time);
James Kuszmaul314f1672020-01-03 20:02:08 -08001195 } else {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001196 VLOG(1) << MaybeNodeName(state->event_loop()->node())
1197 << "No next message, scheduling shutdown";
1198 // Set a timer up immediately after now to die. If we don't do this,
1199 // then the senders waiting on the message we just read will never get
1200 // called.
Austin Schuheecb9282020-01-08 17:43:30 -08001201 if (event_loop_factory_ != nullptr) {
Austin Schuh858c9f32020-08-31 16:56:12 -07001202 state->Setup(monotonic_now + event_loop_factory_->send_delay() +
1203 std::chrono::nanoseconds(1));
Austin Schuheecb9282020-01-08 17:43:30 -08001204 }
Austin Schuhe309d2a2019-11-29 13:25:21 -08001205 }
Austin Schuh8bd96322020-02-13 21:18:22 -08001206
Austin Schuh2f8fd752020-09-01 22:38:28 -07001207 // Once we make this call, the current time changes. So do everything
1208 // which involves time before changing it. That especially includes
1209 // sending the message.
1210 if (update_time) {
1211 VLOG(1) << MaybeNodeName(state->event_loop()->node())
1212 << "updating offsets";
1213
1214 std::vector<aos::monotonic_clock::time_point> before_times;
1215 before_times.resize(states_.size());
1216 std::transform(states_.begin(), states_.end(), before_times.begin(),
1217 [](const std::unique_ptr<State> &state) {
1218 return state->monotonic_now();
1219 });
1220
1221 for (size_t i = 0; i < states_.size(); ++i) {
1222 VLOG(1) << MaybeNodeName(
1223 states_[i]->event_loop()->node())
1224 << "before " << states_[i]->monotonic_now();
1225 }
1226
Austin Schuh8bd96322020-02-13 21:18:22 -08001227 UpdateOffsets();
Austin Schuh2f8fd752020-09-01 22:38:28 -07001228 VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Now is now "
1229 << state->monotonic_now();
1230
1231 for (size_t i = 0; i < states_.size(); ++i) {
1232 VLOG(1) << MaybeNodeName(
1233 states_[i]->event_loop()->node())
1234 << "after " << states_[i]->monotonic_now();
1235 }
1236
1237 // TODO(austin): We should be perfect.
1238 const std::chrono::nanoseconds kTolerance{3};
1239 if (!FLAGS_skip_order_validation) {
1240 CHECK_GE(next_time, state->monotonic_now())
1241 << ": Time skipped the next event.";
1242
1243 for (size_t i = 0; i < states_.size(); ++i) {
1244 CHECK_GE(states_[i]->monotonic_now(), before_times[i] - kTolerance)
1245 << ": Time changed too much on node "
1246 << MaybeNodeName(states_[i]->event_loop()->node());
1247 CHECK_LE(states_[i]->monotonic_now(), before_times[i] + kTolerance)
1248 << ": Time changed too much on node "
1249 << states_[i]->event_loop()->node()->name()->string_view();
1250 }
1251 } else {
1252 if (next_time < state->monotonic_now()) {
1253 LOG(WARNING) << "Check failed: next_time >= "
1254 "state->monotonic_now() ("
1255 << next_time << " vs. " << state->monotonic_now()
1256 << "): Time skipped the next event.";
1257 }
1258 for (size_t i = 0; i < states_.size(); ++i) {
1259 if (states_[i]->monotonic_now() >= before_times[i] - kTolerance) {
1260 LOG(WARNING) << "Check failed: "
1261 "states_[i]->monotonic_now() "
1262 ">= before_times[i] - kTolerance ("
1263 << states_[i]->monotonic_now() << " vs. "
1264 << before_times[i] - kTolerance
1265 << ") : Time changed too much on node "
1266 << MaybeNodeName(states_[i]->event_loop()->node());
1267 }
1268 if (states_[i]->monotonic_now() <= before_times[i] + kTolerance) {
1269 LOG(WARNING) << "Check failed: "
1270 "states_[i]->monotonic_now() "
1271 "<= before_times[i] + kTolerance ("
1272 << states_[i]->monotonic_now() << " vs. "
1273 << before_times[i] - kTolerance
1274 << ") : Time changed too much on node "
1275 << MaybeNodeName(states_[i]->event_loop()->node());
1276 }
1277 }
1278 }
Austin Schuh8bd96322020-02-13 21:18:22 -08001279 }
Austin Schuh2f8fd752020-09-01 22:38:28 -07001280
1281 VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Done sending at "
1282 << state->event_loop()->context().monotonic_event_time << " now "
1283 << state->monotonic_now();
Austin Schuh858c9f32020-08-31 16:56:12 -07001284 }));
Austin Schuhe309d2a2019-11-29 13:25:21 -08001285
Austin Schuh6f3babe2020-01-26 20:34:50 -08001286 ++live_nodes_;
1287
Austin Schuh858c9f32020-08-31 16:56:12 -07001288 if (state->OldestMessageTime() != monotonic_clock::max_time) {
1289 event_loop->OnRun([state]() { state->Setup(state->OldestMessageTime()); });
Austin Schuhe309d2a2019-11-29 13:25:21 -08001290 }
1291}
1292
1293void LogReader::Deregister() {
James Kuszmaul84ff3e52020-01-03 19:48:53 -08001294 // Make sure that things get destroyed in the correct order, rather than
1295 // relying on getting the order correct in the class definition.
Austin Schuh8bd96322020-02-13 21:18:22 -08001296 for (std::unique_ptr<State> &state : states_) {
Austin Schuh858c9f32020-08-31 16:56:12 -07001297 state->Deregister();
Austin Schuhe309d2a2019-11-29 13:25:21 -08001298 }
Austin Schuh92547522019-12-28 14:33:43 -08001299
James Kuszmaul84ff3e52020-01-03 19:48:53 -08001300 event_loop_factory_unique_ptr_.reset();
1301 event_loop_factory_ = nullptr;
Austin Schuhe309d2a2019-11-29 13:25:21 -08001302}
1303
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001304void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
1305 std::string_view add_prefix) {
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001306 for (size_t ii = 0; ii < logged_configuration()->channels()->size(); ++ii) {
1307 const Channel *const channel = logged_configuration()->channels()->Get(ii);
1308 if (channel->name()->str() == name &&
1309 channel->type()->string_view() == type) {
1310 CHECK_EQ(0u, remapped_channels_.count(ii))
1311 << "Already remapped channel "
1312 << configuration::CleanedChannelToString(channel);
1313 remapped_channels_[ii] = std::string(add_prefix) + std::string(name);
1314 VLOG(1) << "Remapping channel "
1315 << configuration::CleanedChannelToString(channel)
1316 << " to have name " << remapped_channels_[ii];
Austin Schuh6331ef92020-01-07 18:28:09 -08001317 MakeRemappedConfig();
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001318 return;
1319 }
1320 }
1321 LOG(FATAL) << "Unabled to locate channel with name " << name << " and type "
1322 << type;
1323}
1324
1325void LogReader::MakeRemappedConfig() {
Austin Schuh8bd96322020-02-13 21:18:22 -08001326 for (std::unique_ptr<State> &state : states_) {
Austin Schuh6aa77be2020-02-22 21:06:40 -08001327 if (state) {
Austin Schuh858c9f32020-08-31 16:56:12 -07001328 CHECK(!state->event_loop())
Austin Schuh6aa77be2020-02-22 21:06:40 -08001329 << ": Can't change the mapping after the events are scheduled.";
1330 }
Austin Schuh6f3babe2020-01-26 20:34:50 -08001331 }
Austin Schuhac0771c2020-01-07 18:36:30 -08001332
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001333 // If no remapping occurred and we are using the original config, then there
1334 // is nothing interesting to do here.
1335 if (remapped_channels_.empty() && replay_configuration_ == nullptr) {
Austin Schuh6f3babe2020-01-26 20:34:50 -08001336 remapped_configuration_ = logged_configuration();
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001337 return;
1338 }
1339 // Config to copy Channel definitions from. Use the specified
1340 // replay_configuration_ if it has been provided.
1341 const Configuration *const base_config = replay_configuration_ == nullptr
1342 ? logged_configuration()
1343 : replay_configuration_;
1344 // The remapped config will be identical to the base_config, except that it
1345 // will have a bunch of extra channels in the channel list, which are exact
1346 // copies of the remapped channels, but with different names.
1347 // Because the flatbuffers API is a pain to work with, this requires a bit of
1348 // a song-and-dance to get copied over.
1349 // The order of operations is to:
1350 // 1) Make a flatbuffer builder for a config that will just contain a list of
1351 // the new channels that we want to add.
1352 // 2) For each channel that we are remapping:
1353 // a) Make a buffer/builder and construct into it a Channel table that only
1354 // contains the new name for the channel.
1355 // b) Merge the new channel with just the name into the channel that we are
1356 // trying to copy, built in the flatbuffer builder made in 1. This gives
1357 // us the new channel definition that we need.
1358 // 3) Using this list of offsets, build the Configuration of just new
1359 // Channels.
1360 // 4) Merge the Configuration with the new Channels into the base_config.
1361 // 5) Call MergeConfiguration() on that result to give MergeConfiguration a
1362 // chance to sanitize the config.
1363
1364 // This is the builder that we use for the config containing all the new
1365 // channels.
1366 flatbuffers::FlatBufferBuilder new_config_fbb;
Austin Schuhd7b15da2020-02-17 15:06:11 -08001367 new_config_fbb.ForceDefaults(true);
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001368 std::vector<flatbuffers::Offset<Channel>> channel_offsets;
1369 for (auto &pair : remapped_channels_) {
1370 // This is the builder that we use for creating the Channel with just the
1371 // new name.
1372 flatbuffers::FlatBufferBuilder new_name_fbb;
Austin Schuhd7b15da2020-02-17 15:06:11 -08001373 new_name_fbb.ForceDefaults(true);
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001374 const flatbuffers::Offset<flatbuffers::String> name_offset =
1375 new_name_fbb.CreateString(pair.second);
1376 ChannelBuilder new_name_builder(new_name_fbb);
1377 new_name_builder.add_name(name_offset);
1378 new_name_fbb.Finish(new_name_builder.Finish());
1379 const FlatbufferDetachedBuffer<Channel> new_name = new_name_fbb.Release();
Austin Schuh2f8fd752020-09-01 22:38:28 -07001380 // Retrieve the channel that we want to copy, confirming that it is
1381 // actually present in base_config.
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001382 const Channel *const base_channel = CHECK_NOTNULL(configuration::GetChannel(
1383 base_config, logged_configuration()->channels()->Get(pair.first), "",
1384 nullptr));
1385 // Actually create the new channel and put it into the vector of Offsets
1386 // that we will use to create the new Configuration.
1387 channel_offsets.emplace_back(MergeFlatBuffers<Channel>(
1388 reinterpret_cast<const flatbuffers::Table *>(base_channel),
1389 reinterpret_cast<const flatbuffers::Table *>(&new_name.message()),
1390 &new_config_fbb));
1391 }
1392 // Create the Configuration containing the new channels that we want to add.
Austin Schuhfa895892020-01-07 20:07:41 -08001393 const auto new_name_vector_offsets =
1394 new_config_fbb.CreateVector(channel_offsets);
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001395 ConfigurationBuilder new_config_builder(new_config_fbb);
1396 new_config_builder.add_channels(new_name_vector_offsets);
1397 new_config_fbb.Finish(new_config_builder.Finish());
1398 const FlatbufferDetachedBuffer<Configuration> new_name_config =
1399 new_config_fbb.Release();
1400 // Merge the new channels configuration into the base_config, giving us the
1401 // remapped configuration.
1402 remapped_configuration_buffer_ =
1403 std::make_unique<FlatbufferDetachedBuffer<Configuration>>(
1404 MergeFlatBuffers<Configuration>(base_config,
1405 &new_name_config.message()));
1406 // Call MergeConfiguration to deal with sanitizing the config.
1407 remapped_configuration_buffer_ =
1408 std::make_unique<FlatbufferDetachedBuffer<Configuration>>(
1409 configuration::MergeConfiguration(*remapped_configuration_buffer_));
1410
1411 remapped_configuration_ = &remapped_configuration_buffer_->message();
1412}
1413
Austin Schuh6f3babe2020-01-26 20:34:50 -08001414const Channel *LogReader::RemapChannel(const EventLoop *event_loop,
1415 const Channel *channel) {
1416 std::string_view channel_name = channel->name()->string_view();
1417 std::string_view channel_type = channel->type()->string_view();
1418 const int channel_index =
1419 configuration::ChannelIndex(logged_configuration(), channel);
1420 // If the channel is remapped, find the correct channel name to use.
1421 if (remapped_channels_.count(channel_index) > 0) {
Austin Schuhee711052020-08-24 16:06:09 -07001422 VLOG(3) << "Got remapped channel on "
Austin Schuh6f3babe2020-01-26 20:34:50 -08001423 << configuration::CleanedChannelToString(channel);
1424 channel_name = remapped_channels_[channel_index];
1425 }
1426
Austin Schuhee711052020-08-24 16:06:09 -07001427 VLOG(2) << "Going to remap channel " << channel_name << " " << channel_type;
Austin Schuh6f3babe2020-01-26 20:34:50 -08001428 const Channel *remapped_channel = configuration::GetChannel(
1429 event_loop->configuration(), channel_name, channel_type,
1430 event_loop->name(), event_loop->node());
1431
1432 CHECK(remapped_channel != nullptr)
1433 << ": Unable to send {\"name\": \"" << channel_name << "\", \"type\": \""
1434 << channel_type << "\"} because it is not in the provided configuration.";
1435
1436 return remapped_channel;
1437}
1438
Austin Schuh858c9f32020-08-31 16:56:12 -07001439LogReader::State::State(std::unique_ptr<ChannelMerger> channel_merger)
1440 : channel_merger_(std::move(channel_merger)) {}
1441
1442EventLoop *LogReader::State::SetNodeEventLoopFactory(
1443 NodeEventLoopFactory *node_event_loop_factory) {
1444 node_event_loop_factory_ = node_event_loop_factory;
1445 event_loop_unique_ptr_ =
1446 node_event_loop_factory_->MakeEventLoop("log_reader");
1447 return event_loop_unique_ptr_.get();
1448}
1449
1450void LogReader::State::SetChannelCount(size_t count) {
1451 channels_.resize(count);
1452 filters_.resize(count);
1453 channel_target_event_loop_factory_.resize(count);
1454}
1455
1456void LogReader::State::SetChannel(
1457 size_t channel, std::unique_ptr<RawSender> sender,
Austin Schuh2f8fd752020-09-01 22:38:28 -07001458 message_bridge::NoncausalOffsetEstimator *filter,
Austin Schuh858c9f32020-08-31 16:56:12 -07001459 NodeEventLoopFactory *channel_target_event_loop_factory) {
1460 channels_[channel] = std::move(sender);
1461 filters_[channel] = filter;
1462 channel_target_event_loop_factory_[channel] =
1463 channel_target_event_loop_factory;
1464}
1465
1466std::tuple<TimestampMerger::DeliveryTimestamp, int,
1467 FlatbufferVector<MessageHeader>>
1468LogReader::State::PopOldest(bool *update_time) {
1469 CHECK_GT(sorted_messages_.size(), 0u);
1470
1471 std::tuple<TimestampMerger::DeliveryTimestamp, int,
Austin Schuh2f8fd752020-09-01 22:38:28 -07001472 FlatbufferVector<MessageHeader>,
1473 message_bridge::NoncausalOffsetEstimator *>
Austin Schuh858c9f32020-08-31 16:56:12 -07001474 result = std::move(sorted_messages_.front());
Austin Schuh2f8fd752020-09-01 22:38:28 -07001475 VLOG(2) << MaybeNodeName(event_loop_->node()) << "PopOldest Popping "
Austin Schuh858c9f32020-08-31 16:56:12 -07001476 << std::get<0>(result).monotonic_event_time;
1477 sorted_messages_.pop_front();
1478 SeedSortedMessages();
1479
Austin Schuh2f8fd752020-09-01 22:38:28 -07001480 if (std::get<3>(result) != nullptr) {
1481 *update_time = std::get<3>(result)->Pop(
1482 event_loop_->node(), std::get<0>(result).monotonic_event_time);
1483 } else {
1484 *update_time = false;
1485 }
Austin Schuh858c9f32020-08-31 16:56:12 -07001486 return std::make_tuple(std::get<0>(result), std::get<1>(result),
1487 std::move(std::get<2>(result)));
1488}
1489
1490monotonic_clock::time_point LogReader::State::OldestMessageTime() const {
1491 if (sorted_messages_.size() > 0) {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001492 VLOG(2) << MaybeNodeName(event_loop_->node()) << "oldest message at "
Austin Schuh858c9f32020-08-31 16:56:12 -07001493 << std::get<0>(sorted_messages_.front()).monotonic_event_time;
1494 return std::get<0>(sorted_messages_.front()).monotonic_event_time;
1495 }
1496
1497 return channel_merger_->OldestMessageTime();
1498}
1499
1500void LogReader::State::SeedSortedMessages() {
1501 const aos::monotonic_clock::time_point end_queue_time =
1502 (sorted_messages_.size() > 0
1503 ? std::get<0>(sorted_messages_.front()).monotonic_event_time
1504 : channel_merger_->monotonic_start_time()) +
1505 std::chrono::seconds(2);
1506
1507 while (true) {
1508 if (channel_merger_->OldestMessageTime() == monotonic_clock::max_time) {
1509 return;
1510 }
1511 if (sorted_messages_.size() > 0) {
1512 // Stop placing sorted messages on the list once we have 2 seconds
1513 // queued up (but queue at least until the log starts.
1514 if (end_queue_time <
1515 std::get<0>(sorted_messages_.back()).monotonic_event_time) {
1516 return;
1517 }
1518 }
1519
1520 TimestampMerger::DeliveryTimestamp channel_timestamp;
1521 int channel_index;
1522 FlatbufferVector<MessageHeader> channel_data =
1523 FlatbufferVector<MessageHeader>::Empty();
1524
Austin Schuh2f8fd752020-09-01 22:38:28 -07001525 message_bridge::NoncausalOffsetEstimator *filter = nullptr;
1526
Austin Schuh858c9f32020-08-31 16:56:12 -07001527 std::tie(channel_timestamp, channel_index, channel_data) =
1528 channel_merger_->PopOldest();
1529
Austin Schuh2f8fd752020-09-01 22:38:28 -07001530 // Skip any messages without forwarding information.
1531 if (channel_timestamp.monotonic_remote_time != monotonic_clock::min_time) {
1532 // Got a forwarding timestamp!
1533 filter = filters_[channel_index];
1534
1535 CHECK(filter != nullptr);
1536
1537 // Call the correct method depending on if we are the forward or
1538 // reverse direction here.
1539 filter->Sample(event_loop_->node(),
1540 channel_timestamp.monotonic_event_time,
1541 channel_timestamp.monotonic_remote_time);
1542 }
Austin Schuh858c9f32020-08-31 16:56:12 -07001543 sorted_messages_.emplace_back(channel_timestamp, channel_index,
Austin Schuh2f8fd752020-09-01 22:38:28 -07001544 std::move(channel_data), filter);
Austin Schuh858c9f32020-08-31 16:56:12 -07001545 }
1546}
1547
1548void LogReader::State::Deregister() {
1549 for (size_t i = 0; i < channels_.size(); ++i) {
1550 channels_[i].reset();
1551 }
1552 event_loop_unique_ptr_.reset();
1553 event_loop_ = nullptr;
1554 timer_handler_ = nullptr;
1555 node_event_loop_factory_ = nullptr;
1556}
1557
Austin Schuhe309d2a2019-11-29 13:25:21 -08001558} // namespace logger
1559} // namespace aos