blob: 5a6bb8f5f86c5e84fdfcaa7991bde9deb1407b9e [file] [log] [blame]
James Kuszmaul38735e82019-12-07 16:42:06 -08001#include "aos/events/logging/logger.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -08002
3#include <fcntl.h>
Austin Schuh4c4e0092019-12-22 16:18:03 -08004#include <limits.h>
Austin Schuhe309d2a2019-11-29 13:25:21 -08005#include <sys/stat.h>
6#include <sys/types.h>
7#include <sys/uio.h>
8#include <vector>
9
Austin Schuh8bd96322020-02-13 21:18:22 -080010#include "Eigen/Dense"
Austin Schuh2f8fd752020-09-01 22:38:28 -070011#include "absl/strings/escaping.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080012#include "absl/types/span.h"
13#include "aos/events/event_loop.h"
Austin Schuhf6f9bf32020-10-11 14:37:43 -070014#include "aos/events/logging/logfile_sorting.h"
James Kuszmaul38735e82019-12-07 16:42:06 -080015#include "aos/events/logging/logger_generated.h"
Austin Schuh64fab802020-09-09 22:47:47 -070016#include "aos/events/logging/uuid.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080017#include "aos/flatbuffer_merge.h"
Austin Schuh288479d2019-12-18 19:47:52 -080018#include "aos/network/team_number.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080019#include "aos/time/time.h"
Brian Silvermanae7c0332020-09-30 16:58:23 -070020#include "aos/util/file.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080021#include "flatbuffers/flatbuffers.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -070022#include "third_party/gmp/gmpxx.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080023
Austin Schuh15649d62019-12-28 16:36:38 -080024DEFINE_bool(skip_missing_forwarding_entries, false,
25 "If true, drop any forwarding entries with missing data. If "
26 "false, CHECK.");
Austin Schuhe309d2a2019-11-29 13:25:21 -080027
Austin Schuh8bd96322020-02-13 21:18:22 -080028DEFINE_bool(timestamps_to_csv, false,
29 "If true, write all the time synchronization information to a set "
30 "of CSV files in /tmp/. This should only be needed when debugging "
31 "time synchronization.");
32
Austin Schuh2f8fd752020-09-01 22:38:28 -070033DEFINE_bool(skip_order_validation, false,
34 "If true, ignore any out of orderness in replay");
35
Austin Schuhe309d2a2019-11-29 13:25:21 -080036namespace aos {
37namespace logger {
Austin Schuh0afc4d12020-10-19 11:42:04 -070038namespace {
39// Helper to safely read a header, or CHECK.
Austin Schuhadd6eb32020-11-09 21:24:26 -080040SizePrefixedFlatbufferVector<LogFileHeader> MaybeReadHeaderOrDie(
Austin Schuh287d43d2020-12-04 20:19:33 -080041 const std::vector<LogFile> &log_files) {
42 CHECK_GE(log_files.size(), 1u) << ": Empty filenames list";
43 CHECK_GE(log_files[0].parts.size(), 1u) << ": Empty filenames list";
44 CHECK_GE(log_files[0].parts[0].parts.size(), 1u) << ": Empty filenames list";
Austin Schuhadd6eb32020-11-09 21:24:26 -080045 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> result =
Austin Schuh287d43d2020-12-04 20:19:33 -080046 ReadHeader(log_files[0].parts[0].parts[0]);
Austin Schuh3bd4c402020-11-06 18:19:06 -080047 CHECK(result);
48 return result.value();
Austin Schuh0afc4d12020-10-19 11:42:04 -070049}
Austin Schuhe309d2a2019-11-29 13:25:21 -080050namespace chrono = std::chrono;
Austin Schuh0afc4d12020-10-19 11:42:04 -070051} // namespace
Austin Schuhe309d2a2019-11-29 13:25:21 -080052
Brian Silverman1f345222020-09-24 21:14:48 -070053Logger::Logger(EventLoop *event_loop, const Configuration *configuration,
54 std::function<bool(const Channel *)> should_log)
Austin Schuhe309d2a2019-11-29 13:25:21 -080055 : event_loop_(event_loop),
Austin Schuh0c297012020-09-16 18:41:59 -070056 configuration_(configuration),
Brian Silvermanae7c0332020-09-30 16:58:23 -070057 boot_uuid_(
58 util::ReadFileToStringOrDie("/proc/sys/kernel/random/boot_id")),
Austin Schuh0c297012020-09-16 18:41:59 -070059 name_(network::GetHostname()),
Brian Silverman1f345222020-09-24 21:14:48 -070060 timer_handler_(event_loop_->AddTimer(
61 [this]() { DoLogData(event_loop_->monotonic_now()); })),
Austin Schuh2f8fd752020-09-01 22:38:28 -070062 server_statistics_fetcher_(
63 configuration::MultiNode(event_loop_->configuration())
64 ? event_loop_->MakeFetcher<message_bridge::ServerStatistics>(
65 "/aos")
66 : aos::Fetcher<message_bridge::ServerStatistics>()) {
Brian Silverman1f345222020-09-24 21:14:48 -070067 VLOG(1) << "Creating logger for " << FlatbufferToJson(event_loop_->node());
Austin Schuh2f8fd752020-09-01 22:38:28 -070068
Austin Schuh8d7e0bb2020-10-02 17:57:00 -070069 // Find all the nodes which are logging timestamps on our node. This may
70 // over-estimate if should_log is specified.
71 std::vector<const Node *> timestamp_logger_nodes =
72 configuration::TimestampNodes(configuration_, event_loop_->node());
Austin Schuh2f8fd752020-09-01 22:38:28 -070073
74 std::map<const Channel *, const Node *> timestamp_logger_channels;
75
76 // Now that we have all the nodes accumulated, make remote timestamp loggers
77 // for them.
78 for (const Node *node : timestamp_logger_nodes) {
Austin Schuh8d7e0bb2020-10-02 17:57:00 -070079 // Note: since we are doing a find using the event loop channel, we need to
80 // make sure this channel pointer is part of the event loop configuration,
81 // not configuration_. This only matters when configuration_ !=
82 // event_loop->configuration();
Austin Schuh2f8fd752020-09-01 22:38:28 -070083 const Channel *channel = configuration::GetChannel(
Austin Schuh8d7e0bb2020-10-02 17:57:00 -070084 event_loop->configuration(),
Austin Schuh2f8fd752020-09-01 22:38:28 -070085 absl::StrCat("/aos/remote_timestamps/", node->name()->string_view()),
86 logger::MessageHeader::GetFullyQualifiedName(), event_loop_->name(),
87 event_loop_->node());
88
89 CHECK(channel != nullptr)
90 << ": Remote timestamps are logged on "
91 << event_loop_->node()->name()->string_view()
92 << " but can't find channel /aos/remote_timestamps/"
93 << node->name()->string_view();
Brian Silverman1f345222020-09-24 21:14:48 -070094 if (!should_log(channel)) {
95 continue;
96 }
Austin Schuh2f8fd752020-09-01 22:38:28 -070097 timestamp_logger_channels.insert(std::make_pair(channel, node));
98 }
99
Brian Silvermand90905f2020-09-23 14:42:56 -0700100 const size_t our_node_index =
101 configuration::GetNodeIndex(configuration_, event_loop_->node());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700102
Brian Silverman1f345222020-09-24 21:14:48 -0700103 for (size_t channel_index = 0;
104 channel_index < configuration_->channels()->size(); ++channel_index) {
105 const Channel *const config_channel =
106 configuration_->channels()->Get(channel_index);
Austin Schuh0c297012020-09-16 18:41:59 -0700107 // The MakeRawFetcher method needs a channel which is in the event loop
108 // configuration() object, not the configuration_ object. Go look that up
109 // from the config.
110 const Channel *channel = aos::configuration::GetChannel(
111 event_loop_->configuration(), config_channel->name()->string_view(),
112 config_channel->type()->string_view(), "", event_loop_->node());
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700113 CHECK(channel != nullptr)
114 << ": Failed to look up channel "
115 << aos::configuration::CleanedChannelToString(config_channel);
Brian Silverman1f345222020-09-24 21:14:48 -0700116 if (!should_log(channel)) {
117 continue;
118 }
Austin Schuh0c297012020-09-16 18:41:59 -0700119
Austin Schuhe309d2a2019-11-29 13:25:21 -0800120 FetcherStruct fs;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700121 fs.node_index = our_node_index;
Brian Silverman1f345222020-09-24 21:14:48 -0700122 fs.channel_index = channel_index;
123 fs.channel = channel;
124
Austin Schuh6f3babe2020-01-26 20:34:50 -0800125 const bool is_local =
126 configuration::ChannelIsSendableOnNode(channel, event_loop_->node());
127
Austin Schuh15649d62019-12-28 16:36:38 -0800128 const bool is_readable =
129 configuration::ChannelIsReadableOnNode(channel, event_loop_->node());
Brian Silverman1f345222020-09-24 21:14:48 -0700130 const bool is_logged = configuration::ChannelMessageIsLoggedOnNode(
131 channel, event_loop_->node());
132 const bool log_message = is_logged && is_readable;
Austin Schuh15649d62019-12-28 16:36:38 -0800133
Brian Silverman1f345222020-09-24 21:14:48 -0700134 bool log_delivery_times = false;
135 if (event_loop_->node() != nullptr) {
136 log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
137 channel, event_loop_->node(), event_loop_->node());
138 }
Austin Schuh15649d62019-12-28 16:36:38 -0800139
Austin Schuh2f8fd752020-09-01 22:38:28 -0700140 // Now, detect a MessageHeader timestamp logger where we should just log the
141 // contents to a file directly.
142 const bool log_contents = timestamp_logger_channels.find(channel) !=
143 timestamp_logger_channels.end();
Austin Schuh2f8fd752020-09-01 22:38:28 -0700144
145 if (log_message || log_delivery_times || log_contents) {
Austin Schuh15649d62019-12-28 16:36:38 -0800146 fs.fetcher = event_loop->MakeRawFetcher(channel);
147 VLOG(1) << "Logging channel "
148 << configuration::CleanedChannelToString(channel);
149
150 if (log_delivery_times) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800151 VLOG(1) << " Delivery times";
Brian Silverman1f345222020-09-24 21:14:48 -0700152 fs.wants_timestamp_writer = true;
Austin Schuh15649d62019-12-28 16:36:38 -0800153 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800154 if (log_message) {
155 VLOG(1) << " Data";
Brian Silverman1f345222020-09-24 21:14:48 -0700156 fs.wants_writer = true;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800157 if (!is_local) {
158 fs.log_type = LogType::kLogRemoteMessage;
159 }
160 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700161 if (log_contents) {
162 VLOG(1) << "Timestamp logger channel "
163 << configuration::CleanedChannelToString(channel);
Brian Silverman1f345222020-09-24 21:14:48 -0700164 fs.timestamp_node = timestamp_logger_channels.find(channel)->second;
165 fs.wants_contents_writer = true;
Austin Schuh0c297012020-09-16 18:41:59 -0700166 fs.node_index =
Brian Silverman1f345222020-09-24 21:14:48 -0700167 configuration::GetNodeIndex(configuration_, fs.timestamp_node);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700168 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800169 fetchers_.emplace_back(std::move(fs));
Austin Schuh15649d62019-12-28 16:36:38 -0800170 }
Brian Silverman1f345222020-09-24 21:14:48 -0700171 }
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700172
173 // When we are logging remote timestamps, we need to be able to translate from
174 // the channel index that the event loop uses to the channel index in the
175 // config in the log file.
176 event_loop_to_logged_channel_index_.resize(
177 event_loop->configuration()->channels()->size(), -1);
178 for (size_t event_loop_channel_index = 0;
179 event_loop_channel_index <
180 event_loop->configuration()->channels()->size();
181 ++event_loop_channel_index) {
182 const Channel *event_loop_channel =
183 event_loop->configuration()->channels()->Get(event_loop_channel_index);
184
185 const Channel *logged_channel = aos::configuration::GetChannel(
186 configuration_, event_loop_channel->name()->string_view(),
187 event_loop_channel->type()->string_view(), "",
188 configuration::GetNode(configuration_, event_loop_->node()));
189
190 if (logged_channel != nullptr) {
191 event_loop_to_logged_channel_index_[event_loop_channel_index] =
192 configuration::ChannelIndex(configuration_, logged_channel);
193 }
194 }
Brian Silverman1f345222020-09-24 21:14:48 -0700195}
196
197Logger::~Logger() {
198 if (log_namer_) {
199 // If we are replaying a log file, or in simulation, we want to force the
200 // last bit of data to be logged. The easiest way to deal with this is to
201 // poll everything as we go to destroy the class, ie, shut down the logger,
202 // and write it to disk.
203 StopLogging(event_loop_->monotonic_now());
204 }
205}
206
Brian Silvermanae7c0332020-09-30 16:58:23 -0700207void Logger::StartLogging(std::unique_ptr<LogNamer> log_namer,
208 std::string_view log_start_uuid) {
Brian Silverman1f345222020-09-24 21:14:48 -0700209 CHECK(!log_namer_) << ": Already logging";
210 log_namer_ = std::move(log_namer);
Brian Silvermanae7c0332020-09-30 16:58:23 -0700211 log_event_uuid_ = UUID::Random();
212 log_start_uuid_ = log_start_uuid;
Brian Silverman1f345222020-09-24 21:14:48 -0700213 VLOG(1) << "Starting logger for " << FlatbufferToJson(event_loop_->node());
214
215 // We want to do as much work as possible before the initial Fetch. Time
216 // between that and actually starting to log opens up the possibility of
217 // falling off the end of the queue during that time.
218
219 for (FetcherStruct &f : fetchers_) {
220 if (f.wants_writer) {
221 f.writer = log_namer_->MakeWriter(f.channel);
222 }
223 if (f.wants_timestamp_writer) {
224 f.timestamp_writer = log_namer_->MakeTimestampWriter(f.channel);
225 }
226 if (f.wants_contents_writer) {
227 f.contents_writer = log_namer_->MakeForwardedTimestampWriter(
228 f.channel, CHECK_NOTNULL(f.timestamp_node));
229 }
Austin Schuhe309d2a2019-11-29 13:25:21 -0800230 }
231
Brian Silverman1f345222020-09-24 21:14:48 -0700232 CHECK(node_state_.empty());
Austin Schuh0c297012020-09-16 18:41:59 -0700233 node_state_.resize(configuration::MultiNode(configuration_)
234 ? configuration_->nodes()->size()
Austin Schuh2f8fd752020-09-01 22:38:28 -0700235 : 1u);
Austin Schuhe309d2a2019-11-29 13:25:21 -0800236
Austin Schuh2f8fd752020-09-01 22:38:28 -0700237 for (const Node *node : log_namer_->nodes()) {
Brian Silvermand90905f2020-09-23 14:42:56 -0700238 const int node_index = configuration::GetNodeIndex(configuration_, node);
Austin Schuhe309d2a2019-11-29 13:25:21 -0800239
Austin Schuh2f8fd752020-09-01 22:38:28 -0700240 node_state_[node_index].log_file_header = MakeHeader(node);
241 }
Austin Schuhe309d2a2019-11-29 13:25:21 -0800242
Austin Schuh2f8fd752020-09-01 22:38:28 -0700243 // Grab data from each channel right before we declare the log file started
244 // so we can capture the latest message on each channel. This lets us have
245 // non periodic messages with configuration that now get logged.
246 for (FetcherStruct &f : fetchers_) {
Brian Silvermancb805822020-10-06 17:43:35 -0700247 const auto start = event_loop_->monotonic_now();
248 const bool got_new = f.fetcher->Fetch();
249 const auto end = event_loop_->monotonic_now();
250 RecordFetchResult(start, end, got_new, &f);
251
252 // If there is a message, we want to write it.
253 f.written = f.fetcher->context().data == nullptr;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700254 }
255
256 // Clear out any old timestamps in case we are re-starting logging.
257 for (size_t i = 0; i < node_state_.size(); ++i) {
258 SetStartTime(i, monotonic_clock::min_time, realtime_clock::min_time);
259 }
260
261 WriteHeader();
262
263 LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node())
264 << " start_time " << last_synchronized_time_;
265
266 timer_handler_->Setup(event_loop_->monotonic_now() + polling_period_,
267 polling_period_);
268}
269
Brian Silverman1f345222020-09-24 21:14:48 -0700270std::unique_ptr<LogNamer> Logger::StopLogging(
271 aos::monotonic_clock::time_point end_time) {
272 CHECK(log_namer_) << ": Not logging right now";
273
274 if (end_time != aos::monotonic_clock::min_time) {
275 LogUntil(end_time);
276 }
277 timer_handler_->Disable();
278
279 for (FetcherStruct &f : fetchers_) {
280 f.writer = nullptr;
281 f.timestamp_writer = nullptr;
282 f.contents_writer = nullptr;
283 }
284 node_state_.clear();
285
Brian Silvermanae7c0332020-09-30 16:58:23 -0700286 log_event_uuid_ = UUID::Zero();
287 log_start_uuid_ = std::string();
288
Brian Silverman1f345222020-09-24 21:14:48 -0700289 return std::move(log_namer_);
290}
291
Austin Schuhfa895892020-01-07 20:07:41 -0800292void Logger::WriteHeader() {
Austin Schuh0c297012020-09-16 18:41:59 -0700293 if (configuration::MultiNode(configuration_)) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700294 server_statistics_fetcher_.Fetch();
295 }
296
297 aos::monotonic_clock::time_point monotonic_start_time =
298 event_loop_->monotonic_now();
299 aos::realtime_clock::time_point realtime_start_time =
300 event_loop_->realtime_now();
301
302 // We need to pick a point in time to declare the log file "started". This
303 // starts here. It needs to be after everything is fetched so that the
304 // fetchers are all pointed at the most recent message before the start
305 // time.
306 last_synchronized_time_ = monotonic_start_time;
307
Austin Schuh6f3babe2020-01-26 20:34:50 -0800308 for (const Node *node : log_namer_->nodes()) {
Brian Silvermand90905f2020-09-23 14:42:56 -0700309 const int node_index = configuration::GetNodeIndex(configuration_, node);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700310 MaybeUpdateTimestamp(node, node_index, monotonic_start_time,
311 realtime_start_time);
Austin Schuh64fab802020-09-09 22:47:47 -0700312 log_namer_->WriteHeader(&node_state_[node_index].log_file_header, node);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800313 }
314}
Austin Schuh8bd96322020-02-13 21:18:22 -0800315
Austin Schuh2f8fd752020-09-01 22:38:28 -0700316void Logger::WriteMissingTimestamps() {
Austin Schuh0c297012020-09-16 18:41:59 -0700317 if (configuration::MultiNode(configuration_)) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700318 server_statistics_fetcher_.Fetch();
319 } else {
320 return;
321 }
322
323 if (server_statistics_fetcher_.get() == nullptr) {
324 return;
325 }
326
327 for (const Node *node : log_namer_->nodes()) {
Brian Silvermand90905f2020-09-23 14:42:56 -0700328 const int node_index = configuration::GetNodeIndex(configuration_, node);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700329 if (MaybeUpdateTimestamp(
330 node, node_index,
331 server_statistics_fetcher_.context().monotonic_event_time,
332 server_statistics_fetcher_.context().realtime_event_time)) {
Austin Schuh64fab802020-09-09 22:47:47 -0700333 log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700334 }
335 }
336}
337
338void Logger::SetStartTime(size_t node_index,
339 aos::monotonic_clock::time_point monotonic_start_time,
340 aos::realtime_clock::time_point realtime_start_time) {
341 node_state_[node_index].monotonic_start_time = monotonic_start_time;
342 node_state_[node_index].realtime_start_time = realtime_start_time;
343 node_state_[node_index]
344 .log_file_header.mutable_message()
345 ->mutate_monotonic_start_time(
346 std::chrono::duration_cast<std::chrono::nanoseconds>(
347 monotonic_start_time.time_since_epoch())
348 .count());
349 if (node_state_[node_index]
350 .log_file_header.mutable_message()
351 ->has_realtime_start_time()) {
352 node_state_[node_index]
353 .log_file_header.mutable_message()
354 ->mutate_realtime_start_time(
355 std::chrono::duration_cast<std::chrono::nanoseconds>(
356 realtime_start_time.time_since_epoch())
357 .count());
358 }
359}
360
361bool Logger::MaybeUpdateTimestamp(
362 const Node *node, int node_index,
363 aos::monotonic_clock::time_point monotonic_start_time,
364 aos::realtime_clock::time_point realtime_start_time) {
Brian Silverman87ac0402020-09-17 14:47:01 -0700365 // Bail early if the start times are already set.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700366 if (node_state_[node_index].monotonic_start_time !=
367 monotonic_clock::min_time) {
368 return false;
369 }
Austin Schuh0c297012020-09-16 18:41:59 -0700370 if (configuration::MultiNode(configuration_)) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700371 if (event_loop_->node() == node) {
372 // There are no offsets to compute for ourself, so always succeed.
373 SetStartTime(node_index, monotonic_start_time, realtime_start_time);
374 return true;
375 } else if (server_statistics_fetcher_.get() != nullptr) {
376 // We must be a remote node now. Look for the connection and see if it is
377 // connected.
378
379 for (const message_bridge::ServerConnection *connection :
380 *server_statistics_fetcher_->connections()) {
381 if (connection->node()->name()->string_view() !=
382 node->name()->string_view()) {
383 continue;
384 }
385
386 if (connection->state() != message_bridge::State::CONNECTED) {
387 VLOG(1) << node->name()->string_view()
388 << " is not connected, can't start it yet.";
389 break;
390 }
391
392 if (!connection->has_monotonic_offset()) {
393 VLOG(1) << "Missing monotonic offset for setting start time for node "
394 << aos::FlatbufferToJson(node);
395 break;
396 }
397
398 VLOG(1) << "Updating start time for " << aos::FlatbufferToJson(node);
399
400 // Found it and it is connected. Compensate and go.
401 monotonic_start_time +=
402 std::chrono::nanoseconds(connection->monotonic_offset());
403
404 SetStartTime(node_index, monotonic_start_time, realtime_start_time);
405 return true;
406 }
407 }
408 } else {
409 SetStartTime(node_index, monotonic_start_time, realtime_start_time);
410 return true;
411 }
412 return false;
413}
414
415aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> Logger::MakeHeader(
416 const Node *node) {
Austin Schuhfa895892020-01-07 20:07:41 -0800417 // Now write the header with this timestamp in it.
418 flatbuffers::FlatBufferBuilder fbb;
Austin Schuhd7b15da2020-02-17 15:06:11 -0800419 fbb.ForceDefaults(true);
Austin Schuhfa895892020-01-07 20:07:41 -0800420
Austin Schuh2f8fd752020-09-01 22:38:28 -0700421 // TODO(austin): Compress this much more efficiently. There are a bunch of
422 // duplicated schemas.
Brian Silvermanae7c0332020-09-30 16:58:23 -0700423 const flatbuffers::Offset<aos::Configuration> configuration_offset =
Austin Schuh0c297012020-09-16 18:41:59 -0700424 CopyFlatBuffer(configuration_, &fbb);
Austin Schuhfa895892020-01-07 20:07:41 -0800425
Brian Silvermanae7c0332020-09-30 16:58:23 -0700426 const flatbuffers::Offset<flatbuffers::String> name_offset =
Austin Schuh0c297012020-09-16 18:41:59 -0700427 fbb.CreateString(name_);
Austin Schuhfa895892020-01-07 20:07:41 -0800428
Brian Silvermanae7c0332020-09-30 16:58:23 -0700429 CHECK(log_event_uuid_ != UUID::Zero());
430 const flatbuffers::Offset<flatbuffers::String> log_event_uuid_offset =
431 fbb.CreateString(log_event_uuid_.string_view());
Austin Schuh64fab802020-09-09 22:47:47 -0700432
Brian Silvermanae7c0332020-09-30 16:58:23 -0700433 const flatbuffers::Offset<flatbuffers::String> logger_instance_uuid_offset =
434 fbb.CreateString(logger_instance_uuid_.string_view());
435
436 flatbuffers::Offset<flatbuffers::String> log_start_uuid_offset;
437 if (!log_start_uuid_.empty()) {
438 log_start_uuid_offset = fbb.CreateString(log_start_uuid_);
439 }
440
441 const flatbuffers::Offset<flatbuffers::String> boot_uuid_offset =
442 fbb.CreateString(boot_uuid_);
443
444 const flatbuffers::Offset<flatbuffers::String> parts_uuid_offset =
Austin Schuh64fab802020-09-09 22:47:47 -0700445 fbb.CreateString("00000000-0000-4000-8000-000000000000");
446
Austin Schuhfa895892020-01-07 20:07:41 -0800447 flatbuffers::Offset<Node> node_offset;
Brian Silverman80993c22020-10-01 15:05:19 -0700448 flatbuffers::Offset<Node> logger_node_offset;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700449
Austin Schuh0c297012020-09-16 18:41:59 -0700450 if (configuration::MultiNode(configuration_)) {
Austin Schuha4fc60f2020-11-01 23:06:47 -0800451 // TODO(austin): Reuse the node we just copied in above.
452 node_offset = RecursiveCopyFlatBuffer(node, &fbb);
453 logger_node_offset = RecursiveCopyFlatBuffer(event_loop_->node(), &fbb);
Austin Schuhfa895892020-01-07 20:07:41 -0800454 }
455
456 aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
457
Austin Schuh64fab802020-09-09 22:47:47 -0700458 log_file_header_builder.add_name(name_offset);
Austin Schuhfa895892020-01-07 20:07:41 -0800459
460 // Only add the node if we are running in a multinode configuration.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800461 if (node != nullptr) {
Austin Schuhfa895892020-01-07 20:07:41 -0800462 log_file_header_builder.add_node(node_offset);
Brian Silverman80993c22020-10-01 15:05:19 -0700463 log_file_header_builder.add_logger_node(logger_node_offset);
Austin Schuhfa895892020-01-07 20:07:41 -0800464 }
465
466 log_file_header_builder.add_configuration(configuration_offset);
467 // The worst case theoretical out of order is the polling period times 2.
468 // One message could get logged right after the boundary, but be for right
469 // before the next boundary. And the reverse could happen for another
470 // message. Report back 3x to be extra safe, and because the cost isn't
471 // huge on the read side.
472 log_file_header_builder.add_max_out_of_order_duration(
Brian Silverman1f345222020-09-24 21:14:48 -0700473 std::chrono::nanoseconds(3 * polling_period_).count());
Austin Schuhfa895892020-01-07 20:07:41 -0800474
475 log_file_header_builder.add_monotonic_start_time(
476 std::chrono::duration_cast<std::chrono::nanoseconds>(
Austin Schuh2f8fd752020-09-01 22:38:28 -0700477 monotonic_clock::min_time.time_since_epoch())
Austin Schuhfa895892020-01-07 20:07:41 -0800478 .count());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700479 if (node == event_loop_->node()) {
480 log_file_header_builder.add_realtime_start_time(
481 std::chrono::duration_cast<std::chrono::nanoseconds>(
482 realtime_clock::min_time.time_since_epoch())
483 .count());
Austin Schuh6f3babe2020-01-26 20:34:50 -0800484 }
485
Brian Silvermanae7c0332020-09-30 16:58:23 -0700486 log_file_header_builder.add_log_event_uuid(log_event_uuid_offset);
487 log_file_header_builder.add_logger_instance_uuid(logger_instance_uuid_offset);
488 if (!log_start_uuid_offset.IsNull()) {
489 log_file_header_builder.add_log_start_uuid(log_start_uuid_offset);
490 }
491 log_file_header_builder.add_boot_uuid(boot_uuid_offset);
Austin Schuh64fab802020-09-09 22:47:47 -0700492
493 log_file_header_builder.add_parts_uuid(parts_uuid_offset);
494 log_file_header_builder.add_parts_index(0);
495
Austin Schuh2f8fd752020-09-01 22:38:28 -0700496 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
Austin Schuha4fc60f2020-11-01 23:06:47 -0800497 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> result(
498 fbb.Release());
499
500 CHECK(result.Verify()) << ": Built a corrupted header.";
501
502 return result;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700503}
504
Brian Silvermancb805822020-10-06 17:43:35 -0700505void Logger::ResetStatisics() {
506 max_message_fetch_time_ = std::chrono::nanoseconds::zero();
507 max_message_fetch_time_channel_ = -1;
508 max_message_fetch_time_size_ = -1;
509 total_message_fetch_time_ = std::chrono::nanoseconds::zero();
510 total_message_fetch_count_ = 0;
511 total_message_fetch_bytes_ = 0;
512 total_nop_fetch_time_ = std::chrono::nanoseconds::zero();
513 total_nop_fetch_count_ = 0;
514 max_copy_time_ = std::chrono::nanoseconds::zero();
515 max_copy_time_channel_ = -1;
516 max_copy_time_size_ = -1;
517 total_copy_time_ = std::chrono::nanoseconds::zero();
518 total_copy_count_ = 0;
519 total_copy_bytes_ = 0;
520}
521
Austin Schuh2f8fd752020-09-01 22:38:28 -0700522void Logger::Rotate() {
523 for (const Node *node : log_namer_->nodes()) {
Brian Silvermand90905f2020-09-23 14:42:56 -0700524 const int node_index = configuration::GetNodeIndex(configuration_, node);
Austin Schuh64fab802020-09-09 22:47:47 -0700525 log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700526 }
527}
528
529void Logger::LogUntil(monotonic_clock::time_point t) {
530 WriteMissingTimestamps();
531
532 // Write each channel to disk, one at a time.
533 for (FetcherStruct &f : fetchers_) {
534 while (true) {
535 if (f.written) {
Brian Silvermancb805822020-10-06 17:43:35 -0700536 const auto start = event_loop_->monotonic_now();
537 const bool got_new = f.fetcher->FetchNext();
538 const auto end = event_loop_->monotonic_now();
539 RecordFetchResult(start, end, got_new, &f);
540 if (!got_new) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700541 VLOG(2) << "No new data on "
542 << configuration::CleanedChannelToString(
543 f.fetcher->channel());
544 break;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700545 }
Brian Silvermancb805822020-10-06 17:43:35 -0700546 f.written = false;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700547 }
548
Austin Schuh2f8fd752020-09-01 22:38:28 -0700549 // TODO(james): Write tests to exercise this logic.
Brian Silvermancb805822020-10-06 17:43:35 -0700550 if (f.fetcher->context().monotonic_event_time >= t) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700551 break;
552 }
Brian Silvermancb805822020-10-06 17:43:35 -0700553 if (f.writer != nullptr) {
554 // Write!
555 const auto start = event_loop_->monotonic_now();
556 flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
557 max_header_size_);
558 fbb.ForceDefaults(true);
559
560 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
561 f.channel_index, f.log_type));
562 const auto end = event_loop_->monotonic_now();
563 RecordCreateMessageTime(start, end, &f);
564
565 VLOG(2) << "Writing data as node "
566 << FlatbufferToJson(event_loop_->node()) << " for channel "
567 << configuration::CleanedChannelToString(f.fetcher->channel())
568 << " to " << f.writer->filename() << " data "
569 << FlatbufferToJson(
570 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
571 fbb.GetBufferPointer()));
572
573 max_header_size_ = std::max(max_header_size_,
574 fbb.GetSize() - f.fetcher->context().size);
575 f.writer->QueueSizedFlatbuffer(&fbb);
576 }
577
578 if (f.timestamp_writer != nullptr) {
579 // And now handle timestamps.
580 const auto start = event_loop_->monotonic_now();
581 flatbuffers::FlatBufferBuilder fbb;
582 fbb.ForceDefaults(true);
583
584 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
585 f.channel_index,
586 LogType::kLogDeliveryTimeOnly));
587 const auto end = event_loop_->monotonic_now();
588 RecordCreateMessageTime(start, end, &f);
589
590 VLOG(2) << "Writing timestamps as node "
591 << FlatbufferToJson(event_loop_->node()) << " for channel "
592 << configuration::CleanedChannelToString(f.fetcher->channel())
593 << " to " << f.timestamp_writer->filename() << " timestamp "
594 << FlatbufferToJson(
595 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
596 fbb.GetBufferPointer()));
597
598 f.timestamp_writer->QueueSizedFlatbuffer(&fbb);
599 }
600
601 if (f.contents_writer != nullptr) {
602 const auto start = event_loop_->monotonic_now();
603 // And now handle the special message contents channel. Copy the
604 // message into a FlatBufferBuilder and save it to disk.
605 // TODO(austin): We can be more efficient here when we start to
606 // care...
607 flatbuffers::FlatBufferBuilder fbb;
608 fbb.ForceDefaults(true);
609
610 const MessageHeader *msg =
611 flatbuffers::GetRoot<MessageHeader>(f.fetcher->context().data);
612
613 logger::MessageHeader::Builder message_header_builder(fbb);
614
615 // TODO(austin): This needs to check the channel_index and confirm
616 // that it should be logged before squirreling away the timestamp to
617 // disk. We don't want to log irrelevant timestamps.
618
619 // Note: this must match the same order as MessageBridgeServer and
620 // PackMessage. We want identical headers to have identical
621 // on-the-wire formats to make comparing them easier.
622
623 // Translate from the channel index that the event loop uses to the
624 // channel index in the log file.
625 message_header_builder.add_channel_index(
626 event_loop_to_logged_channel_index_[msg->channel_index()]);
627
628 message_header_builder.add_queue_index(msg->queue_index());
629 message_header_builder.add_monotonic_sent_time(
630 msg->monotonic_sent_time());
631 message_header_builder.add_realtime_sent_time(
632 msg->realtime_sent_time());
633
634 message_header_builder.add_monotonic_remote_time(
635 msg->monotonic_remote_time());
636 message_header_builder.add_realtime_remote_time(
637 msg->realtime_remote_time());
638 message_header_builder.add_remote_queue_index(
639 msg->remote_queue_index());
640
641 fbb.FinishSizePrefixed(message_header_builder.Finish());
642 const auto end = event_loop_->monotonic_now();
643 RecordCreateMessageTime(start, end, &f);
644
645 f.contents_writer->QueueSizedFlatbuffer(&fbb);
646 }
647
648 f.written = true;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700649 }
650 }
651 last_synchronized_time_ = t;
Austin Schuhfa895892020-01-07 20:07:41 -0800652}
653
Brian Silverman1f345222020-09-24 21:14:48 -0700654void Logger::DoLogData(const monotonic_clock::time_point end_time) {
655 // We want to guarantee that messages aren't out of order by more than
Austin Schuhe309d2a2019-11-29 13:25:21 -0800656 // max_out_of_order_duration. To do this, we need sync points. Every write
657 // cycle should be a sync point.
Austin Schuhe309d2a2019-11-29 13:25:21 -0800658
659 do {
660 // Move the sync point up by at most polling_period. This forces one sync
661 // per iteration, even if it is small.
Brian Silverman1f345222020-09-24 21:14:48 -0700662 LogUntil(std::min(last_synchronized_time_ + polling_period_, end_time));
663
664 on_logged_period_();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800665
Austin Schuhe309d2a2019-11-29 13:25:21 -0800666 // If we missed cycles, we could be pretty far behind. Spin until we are
667 // caught up.
Brian Silverman1f345222020-09-24 21:14:48 -0700668 } while (last_synchronized_time_ + polling_period_ < end_time);
Austin Schuhe309d2a2019-11-29 13:25:21 -0800669}
670
Brian Silvermancb805822020-10-06 17:43:35 -0700671void Logger::RecordFetchResult(aos::monotonic_clock::time_point start,
672 aos::monotonic_clock::time_point end,
673 bool got_new, FetcherStruct *fetcher) {
674 const auto duration = end - start;
675 if (!got_new) {
676 ++total_nop_fetch_count_;
677 total_nop_fetch_time_ += duration;
678 return;
679 }
680 ++total_message_fetch_count_;
681 total_message_fetch_bytes_ += fetcher->fetcher->context().size;
682 total_message_fetch_time_ += duration;
683 if (duration > max_message_fetch_time_) {
684 max_message_fetch_time_ = duration;
685 max_message_fetch_time_channel_ = fetcher->channel_index;
686 max_message_fetch_time_size_ = fetcher->fetcher->context().size;
687 }
688}
689
690void Logger::RecordCreateMessageTime(aos::monotonic_clock::time_point start,
691 aos::monotonic_clock::time_point end,
692 FetcherStruct *fetcher) {
693 const auto duration = end - start;
694 total_copy_time_ += duration;
695 ++total_copy_count_;
696 total_copy_bytes_ += fetcher->fetcher->context().size;
697 if (duration > max_copy_time_) {
698 max_copy_time_ = duration;
699 max_copy_time_channel_ = fetcher->channel_index;
700 max_copy_time_size_ = fetcher->fetcher->context().size;
701 }
702}
703
Austin Schuh11d43732020-09-21 17:28:30 -0700704std::vector<std::vector<std::string>> ToLogReaderVector(
705 const std::vector<LogFile> &log_files) {
706 std::vector<std::vector<std::string>> result;
707 for (const LogFile &log_file : log_files) {
708 for (const LogParts &log_parts : log_file.parts) {
709 std::vector<std::string> parts;
710 for (const std::string &part : log_parts.parts) {
711 parts.emplace_back(part);
712 }
713 result.emplace_back(std::move(parts));
714 }
Austin Schuh5212cad2020-09-09 23:12:09 -0700715 }
716 return result;
717}
718
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800719LogReader::LogReader(std::string_view filename,
720 const Configuration *replay_configuration)
Austin Schuh287d43d2020-12-04 20:19:33 -0800721 : LogReader(SortParts({std::string(filename)}), replay_configuration) {}
Austin Schuhfa895892020-01-07 20:07:41 -0800722
Austin Schuh287d43d2020-12-04 20:19:33 -0800723LogReader::LogReader(std::vector<LogFile> log_files,
Austin Schuhfa895892020-01-07 20:07:41 -0800724 const Configuration *replay_configuration)
Austin Schuh287d43d2020-12-04 20:19:33 -0800725 : log_files_(std::move(log_files)),
726 log_file_header_(MaybeReadHeaderOrDie(log_files_)),
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800727 replay_configuration_(replay_configuration) {
Austin Schuh6331ef92020-01-07 18:28:09 -0800728 MakeRemappedConfig();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800729
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700730 // Remap all existing remote timestamp channels. They will be recreated, and
731 // the data logged isn't relevant anymore.
Austin Schuh3c5dae52020-10-06 18:55:18 -0700732 for (const Node *node : configuration::GetNodes(logged_configuration())) {
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700733 std::vector<const Node *> timestamp_logger_nodes =
734 configuration::TimestampNodes(logged_configuration(), node);
735 for (const Node *remote_node : timestamp_logger_nodes) {
736 const std::string channel = absl::StrCat(
737 "/aos/remote_timestamps/", remote_node->name()->string_view());
738 CHECK(HasChannel<logger::MessageHeader>(channel, node))
739 << ": Failed to find {\"name\": \"" << channel << "\", \"type\": \""
740 << logger::MessageHeader::GetFullyQualifiedName() << "\"} for node "
741 << node->name()->string_view();
742 RemapLoggedChannel<logger::MessageHeader>(channel, node);
743 }
744 }
745
Austin Schuh6aa77be2020-02-22 21:06:40 -0800746 if (replay_configuration) {
747 CHECK_EQ(configuration::MultiNode(configuration()),
748 configuration::MultiNode(replay_configuration))
Austin Schuh2f8fd752020-09-01 22:38:28 -0700749 << ": Log file and replay config need to both be multi or single "
750 "node.";
Austin Schuh6aa77be2020-02-22 21:06:40 -0800751 }
752
Austin Schuh6f3babe2020-01-26 20:34:50 -0800753 if (!configuration::MultiNode(configuration())) {
Austin Schuh287d43d2020-12-04 20:19:33 -0800754 states_.emplace_back(std::make_unique<State>(
755 std::make_unique<TimestampMapper>(FilterPartsForNode(log_files_, ""))));
Austin Schuh8bd96322020-02-13 21:18:22 -0800756 } else {
Austin Schuh6aa77be2020-02-22 21:06:40 -0800757 if (replay_configuration) {
James Kuszmaul46d82582020-05-09 19:50:09 -0700758 CHECK_EQ(logged_configuration()->nodes()->size(),
Austin Schuh6aa77be2020-02-22 21:06:40 -0800759 replay_configuration->nodes()->size())
Austin Schuh2f8fd752020-09-01 22:38:28 -0700760 << ": Log file and replay config need to have matching nodes "
761 "lists.";
James Kuszmaul46d82582020-05-09 19:50:09 -0700762 for (const Node *node : *logged_configuration()->nodes()) {
763 if (configuration::GetNode(replay_configuration, node) == nullptr) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700764 LOG(FATAL) << "Found node " << FlatbufferToJson(node)
765 << " in logged config that is not present in the replay "
766 "config.";
James Kuszmaul46d82582020-05-09 19:50:09 -0700767 }
768 }
Austin Schuh6aa77be2020-02-22 21:06:40 -0800769 }
Austin Schuh8bd96322020-02-13 21:18:22 -0800770 states_.resize(configuration()->nodes()->size());
Austin Schuh6f3babe2020-01-26 20:34:50 -0800771 }
Austin Schuhe309d2a2019-11-29 13:25:21 -0800772}
773
Austin Schuh6aa77be2020-02-22 21:06:40 -0800774LogReader::~LogReader() {
Austin Schuh39580f12020-08-01 14:44:08 -0700775 if (event_loop_factory_unique_ptr_) {
776 Deregister();
777 } else if (event_loop_factory_ != nullptr) {
778 LOG(FATAL) << "Must call Deregister before the SimulatedEventLoopFactory "
779 "is destroyed";
780 }
Austin Schuh8bd96322020-02-13 21:18:22 -0800781 if (offset_fp_ != nullptr) {
782 fclose(offset_fp_);
783 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700784 // Zero out some buffers. It's easy to do use-after-frees on these, so make
785 // it more obvious.
Austin Schuh39580f12020-08-01 14:44:08 -0700786 if (remapped_configuration_buffer_) {
787 remapped_configuration_buffer_->Wipe();
788 }
789 log_file_header_.Wipe();
Austin Schuh8bd96322020-02-13 21:18:22 -0800790}
Austin Schuhe309d2a2019-11-29 13:25:21 -0800791
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800792const Configuration *LogReader::logged_configuration() const {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800793 return log_file_header_.message().configuration();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800794}
795
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800796const Configuration *LogReader::configuration() const {
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800797 return remapped_configuration_;
798}
799
Austin Schuh6f3babe2020-01-26 20:34:50 -0800800std::vector<const Node *> LogReader::Nodes() const {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700801 // Because the Node pointer will only be valid if it actually points to
802 // memory owned by remapped_configuration_, we need to wait for the
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800803 // remapped_configuration_ to be populated before accessing it.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800804 //
805 // Also, note, that when ever a map is changed, the nodes in here are
806 // invalidated.
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800807 CHECK(remapped_configuration_ != nullptr)
808 << ": Need to call Register before the node() pointer will be valid.";
Austin Schuh6f3babe2020-01-26 20:34:50 -0800809 return configuration::GetNodes(remapped_configuration_);
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800810}
Austin Schuh15649d62019-12-28 16:36:38 -0800811
Austin Schuh11d43732020-09-21 17:28:30 -0700812monotonic_clock::time_point LogReader::monotonic_start_time(
813 const Node *node) const {
Austin Schuh8bd96322020-02-13 21:18:22 -0800814 State *state =
815 states_[configuration::GetNodeIndex(configuration(), node)].get();
816 CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
817
Austin Schuh858c9f32020-08-31 16:56:12 -0700818 return state->monotonic_start_time();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800819}
820
Austin Schuh11d43732020-09-21 17:28:30 -0700821realtime_clock::time_point LogReader::realtime_start_time(
822 const Node *node) const {
Austin Schuh8bd96322020-02-13 21:18:22 -0800823 State *state =
824 states_[configuration::GetNodeIndex(configuration(), node)].get();
825 CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
826
Austin Schuh858c9f32020-08-31 16:56:12 -0700827 return state->realtime_start_time();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800828}
829
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800830void LogReader::Register() {
831 event_loop_factory_unique_ptr_ =
Austin Schuhac0771c2020-01-07 18:36:30 -0800832 std::make_unique<SimulatedEventLoopFactory>(configuration());
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800833 Register(event_loop_factory_unique_ptr_.get());
834}
835
Austin Schuh92547522019-12-28 14:33:43 -0800836void LogReader::Register(SimulatedEventLoopFactory *event_loop_factory) {
Austin Schuh92547522019-12-28 14:33:43 -0800837 event_loop_factory_ = event_loop_factory;
Austin Schuhe5bbd9e2020-09-21 17:29:20 -0700838 remapped_configuration_ = event_loop_factory_->configuration();
Austin Schuh92547522019-12-28 14:33:43 -0800839
Brian Silvermand90905f2020-09-23 14:42:56 -0700840 for (const Node *node : configuration::GetNodes(configuration())) {
Austin Schuh8bd96322020-02-13 21:18:22 -0800841 const size_t node_index =
842 configuration::GetNodeIndex(configuration(), node);
Austin Schuh287d43d2020-12-04 20:19:33 -0800843 std::vector<LogParts> filtered_parts = FilterPartsForNode(
844 log_files_, node != nullptr ? node->name()->string_view() : "");
845 states_[node_index] = std::make_unique<State>(
846 filtered_parts.size() == 0u
847 ? nullptr
848 : std::make_unique<TimestampMapper>(std::move(filtered_parts)));
Austin Schuh8bd96322020-02-13 21:18:22 -0800849 State *state = states_[node_index].get();
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700850 state->set_event_loop(state->SetNodeEventLoopFactory(
Austin Schuh858c9f32020-08-31 16:56:12 -0700851 event_loop_factory_->GetNodeEventLoopFactory(node)));
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700852
853 state->SetChannelCount(logged_configuration()->channels()->size());
Austin Schuhcde938c2020-02-02 17:30:07 -0800854 }
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700855
Austin Schuh287d43d2020-12-04 20:19:33 -0800856 for (const Node *node : configuration::GetNodes(configuration())) {
857 const size_t node_index =
858 configuration::GetNodeIndex(configuration(), node);
859 State *state = states_[node_index].get();
860 for (const Node *other_node : configuration::GetNodes(configuration())) {
861 const size_t other_node_index =
862 configuration::GetNodeIndex(configuration(), other_node);
863 State *other_state = states_[other_node_index].get();
864 if (other_state != state) {
865 state->AddPeer(other_state);
866 }
867 }
868 }
869
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700870 // Register after making all the State objects so we can build references
871 // between them.
872 for (const Node *node : configuration::GetNodes(configuration())) {
873 const size_t node_index =
874 configuration::GetNodeIndex(configuration(), node);
875 State *state = states_[node_index].get();
876
877 Register(state->event_loop());
878 }
879
James Kuszmaul46d82582020-05-09 19:50:09 -0700880 if (live_nodes_ == 0) {
881 LOG(FATAL)
882 << "Don't have logs from any of the nodes in the replay config--are "
883 "you sure that the replay config matches the original config?";
884 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800885
Austin Schuh2f8fd752020-09-01 22:38:28 -0700886 // We need to now seed our per-node time offsets and get everything set up
887 // to run.
888 const size_t num_nodes = nodes_count();
Austin Schuhcde938c2020-02-02 17:30:07 -0800889
Austin Schuh8bd96322020-02-13 21:18:22 -0800890 // It is easiest to solve for per node offsets with a matrix rather than
891 // trying to solve the equations by hand. So let's get after it.
892 //
893 // Now, build up the map matrix.
894 //
Austin Schuh2f8fd752020-09-01 22:38:28 -0700895 // offset_matrix_ = (map_matrix_ + slope_matrix_) * [ta; tb; tc]
896 map_matrix_ = Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>::Zero(
897 filters_.size() + 1, num_nodes);
898 slope_matrix_ =
899 Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>::Zero(
900 filters_.size() + 1, num_nodes);
Austin Schuhcde938c2020-02-02 17:30:07 -0800901
Austin Schuh2f8fd752020-09-01 22:38:28 -0700902 offset_matrix_ =
903 Eigen::Matrix<mpq_class, Eigen::Dynamic, 1>::Zero(filters_.size() + 1);
904 valid_matrix_ =
905 Eigen::Matrix<bool, Eigen::Dynamic, 1>::Zero(filters_.size() + 1);
906 last_valid_matrix_ =
907 Eigen::Matrix<bool, Eigen::Dynamic, 1>::Zero(filters_.size() + 1);
Austin Schuhcde938c2020-02-02 17:30:07 -0800908
Austin Schuh2f8fd752020-09-01 22:38:28 -0700909 time_offset_matrix_ = Eigen::VectorXd::Zero(num_nodes);
910 time_slope_matrix_ = Eigen::VectorXd::Zero(num_nodes);
Austin Schuh8bd96322020-02-13 21:18:22 -0800911
Austin Schuh2f8fd752020-09-01 22:38:28 -0700912 // All times should average out to the distributed clock.
913 for (int i = 0; i < map_matrix_.cols(); ++i) {
914 // 1/num_nodes.
915 map_matrix_(0, i) = mpq_class(1, num_nodes);
916 }
917 valid_matrix_(0) = true;
Austin Schuh8bd96322020-02-13 21:18:22 -0800918
919 {
920 // Now, add the a - b -> sample elements.
921 size_t i = 1;
922 for (std::pair<const std::tuple<const Node *, const Node *>,
Austin Schuh2f8fd752020-09-01 22:38:28 -0700923 std::tuple<message_bridge::NoncausalOffsetEstimator>>
924 &filter : filters_) {
Austin Schuh8bd96322020-02-13 21:18:22 -0800925 const Node *const node_a = std::get<0>(filter.first);
926 const Node *const node_b = std::get<1>(filter.first);
927
928 const size_t node_a_index =
929 configuration::GetNodeIndex(configuration(), node_a);
930 const size_t node_b_index =
931 configuration::GetNodeIndex(configuration(), node_b);
932
Austin Schuh2f8fd752020-09-01 22:38:28 -0700933 // -a
934 map_matrix_(i, node_a_index) = mpq_class(-1);
935 // +b
936 map_matrix_(i, node_b_index) = mpq_class(1);
Austin Schuh8bd96322020-02-13 21:18:22 -0800937
938 // -> sample
Austin Schuh2f8fd752020-09-01 22:38:28 -0700939 std::get<0>(filter.second)
940 .set_slope_pointer(&slope_matrix_(i, node_a_index));
941 std::get<0>(filter.second).set_offset_pointer(&offset_matrix_(i, 0));
942
943 valid_matrix_(i) = false;
944 std::get<0>(filter.second).set_valid_pointer(&valid_matrix_(i));
Austin Schuh8bd96322020-02-13 21:18:22 -0800945
946 ++i;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800947 }
948 }
949
Austin Schuh858c9f32020-08-31 16:56:12 -0700950 for (std::unique_ptr<State> &state : states_) {
951 state->SeedSortedMessages();
952 }
953
Austin Schuh2f8fd752020-09-01 22:38:28 -0700954 // Rank of the map matrix tells you if all the nodes are in communication
955 // with each other, which tells you if the offsets are observable.
956 const size_t connected_nodes =
957 Eigen::FullPivLU<
958 Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>>(map_matrix_)
959 .rank();
960
961 // We don't need to support isolated nodes until someone has a real use
962 // case.
963 CHECK_EQ(connected_nodes, num_nodes)
964 << ": There is a node which isn't communicating with the rest.";
965
966 // And solve.
Austin Schuh8bd96322020-02-13 21:18:22 -0800967 UpdateOffsets();
968
Austin Schuh2f8fd752020-09-01 22:38:28 -0700969 // We want to start the log file at the last start time of the log files
970 // from all the nodes. Compute how long each node's simulation needs to run
971 // to move time to this point.
Austin Schuh8bd96322020-02-13 21:18:22 -0800972 distributed_clock::time_point start_time = distributed_clock::min_time;
Austin Schuhcde938c2020-02-02 17:30:07 -0800973
Austin Schuh2f8fd752020-09-01 22:38:28 -0700974 // TODO(austin): We want an "OnStart" callback for each node rather than
975 // running until the last node.
976
Austin Schuh8bd96322020-02-13 21:18:22 -0800977 for (std::unique_ptr<State> &state : states_) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700978 VLOG(1) << "Start time is " << state->monotonic_start_time() << " for node "
979 << MaybeNodeName(state->event_loop()->node()) << "now "
980 << state->monotonic_now();
Austin Schuh287d43d2020-12-04 20:19:33 -0800981 if (state->monotonic_start_time() == monotonic_clock::min_time) {
982 continue;
983 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700984 // And start computing the start time on the distributed clock now that
985 // that works.
Austin Schuh858c9f32020-08-31 16:56:12 -0700986 start_time = std::max(
987 start_time, state->ToDistributedClock(state->monotonic_start_time()));
Austin Schuhcde938c2020-02-02 17:30:07 -0800988 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700989
990 CHECK_GE(start_time, distributed_clock::epoch())
991 << ": Hmm, we have a node starting before the start of time. Offset "
992 "everything.";
Austin Schuhcde938c2020-02-02 17:30:07 -0800993
Austin Schuh6f3babe2020-01-26 20:34:50 -0800994 // Forwarding is tracked per channel. If it is enabled, we want to turn it
995 // off. Otherwise messages replayed will get forwarded across to the other
Austin Schuh2f8fd752020-09-01 22:38:28 -0700996 // nodes, and also replayed on the other nodes. This may not satisfy all
997 // our users, but it'll start the discussion.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800998 if (configuration::MultiNode(event_loop_factory_->configuration())) {
999 for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
1000 const Channel *channel = logged_configuration()->channels()->Get(i);
1001 const Node *node = configuration::GetNode(
1002 configuration(), channel->source_node()->string_view());
1003
Austin Schuh8bd96322020-02-13 21:18:22 -08001004 State *state =
1005 states_[configuration::GetNodeIndex(configuration(), node)].get();
Austin Schuh6f3babe2020-01-26 20:34:50 -08001006
1007 const Channel *remapped_channel =
Austin Schuh858c9f32020-08-31 16:56:12 -07001008 RemapChannel(state->event_loop(), channel);
Austin Schuh6f3babe2020-01-26 20:34:50 -08001009
1010 event_loop_factory_->DisableForwarding(remapped_channel);
1011 }
Austin Schuh4c3b9702020-08-30 11:34:55 -07001012
1013 // If we are replaying a log, we don't want a bunch of redundant messages
1014 // from both the real message bridge and simulated message bridge.
1015 event_loop_factory_->DisableStatistics();
Austin Schuh6f3babe2020-01-26 20:34:50 -08001016 }
1017
Austin Schuhcde938c2020-02-02 17:30:07 -08001018 // While we are starting the system up, we might be relying on matching data
1019 // to timestamps on log files where the timestamp log file starts before the
1020 // data. In this case, it is reasonable to expect missing data.
1021 ignore_missing_data_ = true;
Austin Schuh2f8fd752020-09-01 22:38:28 -07001022 VLOG(1) << "Running until " << start_time << " in Register";
Austin Schuh8bd96322020-02-13 21:18:22 -08001023 event_loop_factory_->RunFor(start_time.time_since_epoch());
Brian Silverman8a32ce62020-08-12 12:02:38 -07001024 VLOG(1) << "At start time";
Austin Schuhcde938c2020-02-02 17:30:07 -08001025 // Now that we are running for real, missing data means that the log file is
1026 // corrupted or went wrong.
1027 ignore_missing_data_ = false;
Austin Schuh92547522019-12-28 14:33:43 -08001028
Austin Schuh8bd96322020-02-13 21:18:22 -08001029 for (std::unique_ptr<State> &state : states_) {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001030 // Make the RT clock be correct before handing it to the user.
1031 if (state->realtime_start_time() != realtime_clock::min_time) {
1032 state->SetRealtimeOffset(state->monotonic_start_time(),
1033 state->realtime_start_time());
1034 }
1035 VLOG(1) << "Start time is " << state->monotonic_start_time() << " for node "
1036 << MaybeNodeName(state->event_loop()->node()) << "now "
1037 << state->monotonic_now();
1038 }
1039
1040 if (FLAGS_timestamps_to_csv) {
1041 for (std::pair<const std::tuple<const Node *, const Node *>,
1042 std::tuple<message_bridge::NoncausalOffsetEstimator>>
1043 &filter : filters_) {
1044 const Node *const node_a = std::get<0>(filter.first);
1045 const Node *const node_b = std::get<1>(filter.first);
1046
1047 std::get<0>(filter.second)
1048 .SetFirstFwdTime(event_loop_factory_->GetNodeEventLoopFactory(node_a)
1049 ->monotonic_now());
1050 std::get<0>(filter.second)
1051 .SetFirstRevTime(event_loop_factory_->GetNodeEventLoopFactory(node_b)
1052 ->monotonic_now());
1053 }
Austin Schuh8bd96322020-02-13 21:18:22 -08001054 }
1055}
1056
Austin Schuh2f8fd752020-09-01 22:38:28 -07001057void LogReader::UpdateOffsets() {
1058 VLOG(2) << "Samples are " << offset_matrix_;
1059 VLOG(2) << "Map is " << (map_matrix_ + slope_matrix_);
1060 std::tie(time_slope_matrix_, time_offset_matrix_) = SolveOffsets();
1061 Eigen::IOFormat HeavyFmt(Eigen::FullPrecision, 0, ", ", ";\n", "[", "]", "[",
1062 "]");
1063 VLOG(1) << "First slope " << time_slope_matrix_.transpose().format(HeavyFmt)
1064 << " offset " << time_offset_matrix_.transpose().format(HeavyFmt);
1065
1066 size_t node_index = 0;
1067 for (std::unique_ptr<State> &state : states_) {
1068 state->SetDistributedOffset(offset(node_index), slope(node_index));
1069 VLOG(1) << "Offset for node " << node_index << " "
1070 << MaybeNodeName(state->event_loop()->node()) << "is "
1071 << aos::distributed_clock::time_point(offset(node_index))
1072 << " slope " << std::setprecision(9) << std::fixed
1073 << slope(node_index);
1074 ++node_index;
1075 }
1076
1077 if (VLOG_IS_ON(1)) {
1078 LogFit("Offset is");
1079 }
1080}
1081
1082void LogReader::LogFit(std::string_view prefix) {
1083 for (std::unique_ptr<State> &state : states_) {
1084 VLOG(1) << MaybeNodeName(state->event_loop()->node()) << " now "
1085 << state->monotonic_now() << " distributed "
1086 << event_loop_factory_->distributed_now();
1087 }
1088
1089 for (std::pair<const std::tuple<const Node *, const Node *>,
1090 std::tuple<message_bridge::NoncausalOffsetEstimator>> &filter :
1091 filters_) {
1092 message_bridge::NoncausalOffsetEstimator *estimator =
1093 &std::get<0>(filter.second);
1094
1095 if (estimator->a_timestamps().size() == 0 &&
1096 estimator->b_timestamps().size() == 0) {
1097 continue;
1098 }
1099
1100 if (VLOG_IS_ON(1)) {
1101 estimator->LogFit(prefix);
1102 }
1103
1104 const Node *const node_a = std::get<0>(filter.first);
1105 const Node *const node_b = std::get<1>(filter.first);
1106
1107 const size_t node_a_index =
1108 configuration::GetNodeIndex(configuration(), node_a);
1109 const size_t node_b_index =
1110 configuration::GetNodeIndex(configuration(), node_b);
1111
1112 const double recovered_slope =
1113 slope(node_b_index) / slope(node_a_index) - 1.0;
1114 const int64_t recovered_offset =
1115 offset(node_b_index).count() - offset(node_a_index).count() *
1116 slope(node_b_index) /
1117 slope(node_a_index);
1118
1119 VLOG(1) << "Recovered slope " << std::setprecision(20) << recovered_slope
1120 << " (error " << recovered_slope - estimator->fit().slope() << ") "
1121 << " offset " << std::setprecision(20) << recovered_offset
1122 << " (error "
1123 << recovered_offset - estimator->fit().offset().count() << ")";
1124
1125 const aos::distributed_clock::time_point a0 =
1126 states_[node_a_index]->ToDistributedClock(
1127 std::get<0>(estimator->a_timestamps()[0]));
1128 const aos::distributed_clock::time_point a1 =
1129 states_[node_a_index]->ToDistributedClock(
1130 std::get<0>(estimator->a_timestamps()[1]));
1131
1132 VLOG(1) << node_a->name()->string_view() << " timestamps()[0] = "
1133 << std::get<0>(estimator->a_timestamps()[0]) << " -> " << a0
1134 << " distributed -> " << node_b->name()->string_view() << " "
1135 << states_[node_b_index]->FromDistributedClock(a0) << " should be "
1136 << aos::monotonic_clock::time_point(
1137 std::chrono::nanoseconds(static_cast<int64_t>(
1138 std::get<0>(estimator->a_timestamps()[0])
1139 .time_since_epoch()
1140 .count() *
1141 (1.0 + estimator->fit().slope()))) +
1142 estimator->fit().offset())
1143 << ((a0 <= event_loop_factory_->distributed_now())
1144 ? ""
1145 : " After now, investigate");
1146 VLOG(1) << node_a->name()->string_view() << " timestamps()[1] = "
1147 << std::get<0>(estimator->a_timestamps()[1]) << " -> " << a1
1148 << " distributed -> " << node_b->name()->string_view() << " "
1149 << states_[node_b_index]->FromDistributedClock(a1) << " should be "
1150 << aos::monotonic_clock::time_point(
1151 std::chrono::nanoseconds(static_cast<int64_t>(
1152 std::get<0>(estimator->a_timestamps()[1])
1153 .time_since_epoch()
1154 .count() *
1155 (1.0 + estimator->fit().slope()))) +
1156 estimator->fit().offset())
1157 << ((event_loop_factory_->distributed_now() <= a1)
1158 ? ""
1159 : " Before now, investigate");
1160
1161 const aos::distributed_clock::time_point b0 =
1162 states_[node_b_index]->ToDistributedClock(
1163 std::get<0>(estimator->b_timestamps()[0]));
1164 const aos::distributed_clock::time_point b1 =
1165 states_[node_b_index]->ToDistributedClock(
1166 std::get<0>(estimator->b_timestamps()[1]));
1167
1168 VLOG(1) << node_b->name()->string_view() << " timestamps()[0] = "
1169 << std::get<0>(estimator->b_timestamps()[0]) << " -> " << b0
1170 << " distributed -> " << node_a->name()->string_view() << " "
1171 << states_[node_a_index]->FromDistributedClock(b0)
1172 << ((b0 <= event_loop_factory_->distributed_now())
1173 ? ""
1174 : " After now, investigate");
1175 VLOG(1) << node_b->name()->string_view() << " timestamps()[1] = "
1176 << std::get<0>(estimator->b_timestamps()[1]) << " -> " << b1
1177 << " distributed -> " << node_a->name()->string_view() << " "
1178 << states_[node_a_index]->FromDistributedClock(b1)
1179 << ((event_loop_factory_->distributed_now() <= b1)
1180 ? ""
1181 : " Before now, investigate");
1182 }
1183}
1184
1185message_bridge::NoncausalOffsetEstimator *LogReader::GetFilter(
Austin Schuh8bd96322020-02-13 21:18:22 -08001186 const Node *node_a, const Node *node_b) {
1187 CHECK_NE(node_a, node_b);
1188 CHECK_EQ(configuration::GetNode(configuration(), node_a), node_a);
1189 CHECK_EQ(configuration::GetNode(configuration(), node_b), node_b);
1190
1191 if (node_a > node_b) {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001192 return GetFilter(node_b, node_a);
Austin Schuh8bd96322020-02-13 21:18:22 -08001193 }
1194
1195 auto tuple = std::make_tuple(node_a, node_b);
1196
1197 auto it = filters_.find(tuple);
1198
1199 if (it == filters_.end()) {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001200 auto &x =
1201 filters_
1202 .insert(std::make_pair(
1203 tuple, std::make_tuple(message_bridge::NoncausalOffsetEstimator(
1204 node_a, node_b))))
1205 .first->second;
Austin Schuh8bd96322020-02-13 21:18:22 -08001206 if (FLAGS_timestamps_to_csv) {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001207 std::get<0>(x).SetFwdCsvFileName(absl::StrCat(
1208 "/tmp/timestamp_noncausal_", node_a->name()->string_view(), "_",
1209 node_b->name()->string_view()));
1210 std::get<0>(x).SetRevCsvFileName(absl::StrCat(
1211 "/tmp/timestamp_noncausal_", node_b->name()->string_view(), "_",
1212 node_a->name()->string_view()));
Austin Schuh8bd96322020-02-13 21:18:22 -08001213 }
1214
Austin Schuh2f8fd752020-09-01 22:38:28 -07001215 return &std::get<0>(x);
Austin Schuh8bd96322020-02-13 21:18:22 -08001216 } else {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001217 return &std::get<0>(it->second);
Austin Schuh8bd96322020-02-13 21:18:22 -08001218 }
1219}
1220
Austin Schuhe309d2a2019-11-29 13:25:21 -08001221void LogReader::Register(EventLoop *event_loop) {
Austin Schuh8bd96322020-02-13 21:18:22 -08001222 State *state =
1223 states_[configuration::GetNodeIndex(configuration(), event_loop->node())]
1224 .get();
Austin Schuh6f3babe2020-01-26 20:34:50 -08001225
Austin Schuh858c9f32020-08-31 16:56:12 -07001226 state->set_event_loop(event_loop);
Austin Schuhe309d2a2019-11-29 13:25:21 -08001227
Tyler Chatow67ddb032020-01-12 14:30:04 -08001228 // We don't run timing reports when trying to print out logged data, because
1229 // otherwise we would end up printing out the timing reports themselves...
1230 // This is only really relevant when we are replaying into a simulation.
Austin Schuh6f3babe2020-01-26 20:34:50 -08001231 event_loop->SkipTimingReport();
1232 event_loop->SkipAosLog();
Austin Schuh39788ff2019-12-01 18:22:57 -08001233
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001234 for (size_t logged_channel_index = 0;
1235 logged_channel_index < logged_configuration()->channels()->size();
1236 ++logged_channel_index) {
1237 const Channel *channel = RemapChannel(
1238 event_loop,
1239 logged_configuration()->channels()->Get(logged_channel_index));
Austin Schuh8bd96322020-02-13 21:18:22 -08001240
Austin Schuh2f8fd752020-09-01 22:38:28 -07001241 message_bridge::NoncausalOffsetEstimator *filter = nullptr;
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001242 aos::Sender<MessageHeader> *remote_timestamp_sender = nullptr;
1243
1244 State *source_state = nullptr;
Austin Schuh8bd96322020-02-13 21:18:22 -08001245
1246 if (!configuration::ChannelIsSendableOnNode(channel, event_loop->node()) &&
1247 configuration::ChannelIsReadableOnNode(channel, event_loop->node())) {
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001248 // We've got a message which is being forwarded to this node.
1249 const Node *source_node = configuration::GetNode(
Austin Schuh8bd96322020-02-13 21:18:22 -08001250 event_loop->configuration(), channel->source_node()->string_view());
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001251 filter = GetFilter(event_loop->node(), source_node);
Austin Schuh8bd96322020-02-13 21:18:22 -08001252
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001253 // Delivery timestamps are supposed to be logged back on the source node.
1254 // Configure remote timestamps to be sent.
1255 const bool delivery_time_is_logged =
1256 configuration::ConnectionDeliveryTimeIsLoggedOnNode(
1257 channel, event_loop->node(), source_node);
1258
1259 source_state =
1260 states_[configuration::GetNodeIndex(configuration(), source_node)]
1261 .get();
1262
1263 if (delivery_time_is_logged) {
1264 remote_timestamp_sender =
1265 source_state->RemoteTimestampSender(event_loop->node());
Austin Schuh8bd96322020-02-13 21:18:22 -08001266 }
1267 }
Austin Schuh858c9f32020-08-31 16:56:12 -07001268
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001269 state->SetChannel(
1270 logged_channel_index,
1271 configuration::ChannelIndex(event_loop->configuration(), channel),
1272 event_loop->MakeRawSender(channel), filter, remote_timestamp_sender,
1273 source_state);
Austin Schuhe309d2a2019-11-29 13:25:21 -08001274 }
1275
Austin Schuh6aa77be2020-02-22 21:06:40 -08001276 // If we didn't find any log files with data in them, we won't ever get a
1277 // callback or be live. So skip the rest of the setup.
Austin Schuh287d43d2020-12-04 20:19:33 -08001278 if (state->OldestMessageTime() == monotonic_clock::max_time) {
Austin Schuh6aa77be2020-02-22 21:06:40 -08001279 return;
1280 }
1281
Austin Schuh858c9f32020-08-31 16:56:12 -07001282 state->set_timer_handler(event_loop->AddTimer([this, state]() {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001283 VLOG(1) << "Starting sending " << MaybeNodeName(state->event_loop()->node())
1284 << "at " << state->event_loop()->context().monotonic_event_time
1285 << " now " << state->monotonic_now();
Austin Schuh858c9f32020-08-31 16:56:12 -07001286 if (state->OldestMessageTime() == monotonic_clock::max_time) {
Austin Schuh6f3babe2020-01-26 20:34:50 -08001287 --live_nodes_;
Austin Schuh2f8fd752020-09-01 22:38:28 -07001288 VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Node down!";
Austin Schuh6f3babe2020-01-26 20:34:50 -08001289 if (live_nodes_ == 0) {
1290 event_loop_factory_->Exit();
1291 }
James Kuszmaul314f1672020-01-03 20:02:08 -08001292 return;
1293 }
Austin Schuh2f8fd752020-09-01 22:38:28 -07001294 if (VLOG_IS_ON(1)) {
1295 LogFit("Offset was");
1296 }
1297
1298 bool update_time;
Austin Schuh287d43d2020-12-04 20:19:33 -08001299 TimestampedMessage timestamped_message = state->PopOldest(&update_time);
Austin Schuh05b70472020-01-01 17:11:17 -08001300
Austin Schuhe309d2a2019-11-29 13:25:21 -08001301 const monotonic_clock::time_point monotonic_now =
Austin Schuh858c9f32020-08-31 16:56:12 -07001302 state->event_loop()->context().monotonic_event_time;
Austin Schuh2f8fd752020-09-01 22:38:28 -07001303 if (!FLAGS_skip_order_validation) {
Austin Schuh287d43d2020-12-04 20:19:33 -08001304 CHECK(monotonic_now == timestamped_message.monotonic_event_time)
Austin Schuh2f8fd752020-09-01 22:38:28 -07001305 << ": " << FlatbufferToJson(state->event_loop()->node()) << " Now "
1306 << monotonic_now << " trying to send "
Austin Schuh287d43d2020-12-04 20:19:33 -08001307 << timestamped_message.monotonic_event_time << " failure "
Austin Schuh2f8fd752020-09-01 22:38:28 -07001308 << state->DebugString();
Austin Schuh287d43d2020-12-04 20:19:33 -08001309 } else if (monotonic_now != timestamped_message.monotonic_event_time) {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001310 LOG(WARNING) << "Check failed: monotonic_now == "
Austin Schuh287d43d2020-12-04 20:19:33 -08001311 "timestamped_message.monotonic_event_time) ("
Austin Schuh2f8fd752020-09-01 22:38:28 -07001312 << monotonic_now << " vs. "
Austin Schuh287d43d2020-12-04 20:19:33 -08001313 << timestamped_message.monotonic_event_time
Austin Schuh2f8fd752020-09-01 22:38:28 -07001314 << "): " << FlatbufferToJson(state->event_loop()->node())
1315 << " Now " << monotonic_now << " trying to send "
Austin Schuh287d43d2020-12-04 20:19:33 -08001316 << timestamped_message.monotonic_event_time << " failure "
Austin Schuh2f8fd752020-09-01 22:38:28 -07001317 << state->DebugString();
1318 }
Austin Schuhe309d2a2019-11-29 13:25:21 -08001319
Austin Schuh287d43d2020-12-04 20:19:33 -08001320 if (timestamped_message.monotonic_event_time >
Austin Schuh858c9f32020-08-31 16:56:12 -07001321 state->monotonic_start_time() ||
Austin Schuh15649d62019-12-28 16:36:38 -08001322 event_loop_factory_ != nullptr) {
Austin Schuh8bd96322020-02-13 21:18:22 -08001323 if ((!ignore_missing_data_ && !FLAGS_skip_missing_forwarding_entries &&
Austin Schuh858c9f32020-08-31 16:56:12 -07001324 !state->at_end()) ||
Austin Schuh287d43d2020-12-04 20:19:33 -08001325 timestamped_message.data.span().size() != 0u) {
1326 CHECK_NE(timestamped_message.data.span().size(), 0u)
Austin Schuh05b70472020-01-01 17:11:17 -08001327 << ": Got a message without data. Forwarding entry which was "
Austin Schuh2f8fd752020-09-01 22:38:28 -07001328 "not matched? Use --skip_missing_forwarding_entries to "
Brian Silverman87ac0402020-09-17 14:47:01 -07001329 "ignore this.";
Austin Schuh92547522019-12-28 14:33:43 -08001330
Austin Schuh2f8fd752020-09-01 22:38:28 -07001331 if (update_time) {
Austin Schuh8bd96322020-02-13 21:18:22 -08001332 // Confirm that the message was sent on the sending node before the
1333 // destination node (this node). As a proxy, do this by making sure
1334 // that time on the source node is past when the message was sent.
Austin Schuh2f8fd752020-09-01 22:38:28 -07001335 if (!FLAGS_skip_order_validation) {
Austin Schuh287d43d2020-12-04 20:19:33 -08001336 CHECK_LT(
1337 timestamped_message.monotonic_remote_time,
1338 state->monotonic_remote_now(timestamped_message.channel_index))
Austin Schuh2f8fd752020-09-01 22:38:28 -07001339 << state->event_loop()->node()->name()->string_view() << " to "
Austin Schuh287d43d2020-12-04 20:19:33 -08001340 << state->remote_node(timestamped_message.channel_index)
1341 ->name()
1342 ->string_view()
Austin Schuh2f8fd752020-09-01 22:38:28 -07001343 << " " << state->DebugString();
Austin Schuh287d43d2020-12-04 20:19:33 -08001344 } else if (timestamped_message.monotonic_remote_time >=
1345 state->monotonic_remote_now(
1346 timestamped_message.channel_index)) {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001347 LOG(WARNING)
Austin Schuh287d43d2020-12-04 20:19:33 -08001348 << "Check failed: timestamped_message.monotonic_remote_time < "
1349 "state->monotonic_remote_now(timestamped_message.channel_"
1350 "index) ("
1351 << timestamped_message.monotonic_remote_time << " vs. "
1352 << state->monotonic_remote_now(
1353 timestamped_message.channel_index)
1354 << ") " << state->event_loop()->node()->name()->string_view()
1355 << " to "
1356 << state->remote_node(timestamped_message.channel_index)
1357 ->name()
1358 ->string_view()
1359 << " currently " << timestamped_message.monotonic_event_time
Austin Schuh2f8fd752020-09-01 22:38:28 -07001360 << " ("
1361 << state->ToDistributedClock(
Austin Schuh287d43d2020-12-04 20:19:33 -08001362 timestamped_message.monotonic_event_time)
Austin Schuh2f8fd752020-09-01 22:38:28 -07001363 << ") remote event time "
Austin Schuh287d43d2020-12-04 20:19:33 -08001364 << timestamped_message.monotonic_remote_time << " ("
Austin Schuh2f8fd752020-09-01 22:38:28 -07001365 << state->RemoteToDistributedClock(
Austin Schuh287d43d2020-12-04 20:19:33 -08001366 timestamped_message.channel_index,
1367 timestamped_message.monotonic_remote_time)
Austin Schuh2f8fd752020-09-01 22:38:28 -07001368 << ") " << state->DebugString();
1369 }
Austin Schuh8bd96322020-02-13 21:18:22 -08001370
1371 if (FLAGS_timestamps_to_csv) {
1372 if (offset_fp_ == nullptr) {
1373 offset_fp_ = fopen("/tmp/offsets.csv", "w");
1374 fprintf(
1375 offset_fp_,
1376 "# time_since_start, offset node 0, offset node 1, ...\n");
Austin Schuh287d43d2020-12-04 20:19:33 -08001377 first_time_ = timestamped_message.realtime_event_time;
Austin Schuh8bd96322020-02-13 21:18:22 -08001378 }
1379
1380 fprintf(offset_fp_, "%.9f",
1381 std::chrono::duration_cast<std::chrono::duration<double>>(
Austin Schuh287d43d2020-12-04 20:19:33 -08001382 timestamped_message.realtime_event_time - first_time_)
Austin Schuh8bd96322020-02-13 21:18:22 -08001383 .count());
Austin Schuh2f8fd752020-09-01 22:38:28 -07001384 for (int i = 1; i < time_offset_matrix_.rows(); ++i) {
1385 fprintf(offset_fp_, ", %.9f",
1386 time_offset_matrix_(i, 0) +
1387 time_slope_matrix_(i, 0) *
1388 chrono::duration<double>(
1389 event_loop_factory_->distributed_now()
1390 .time_since_epoch())
1391 .count());
Austin Schuh8bd96322020-02-13 21:18:22 -08001392 }
1393 fprintf(offset_fp_, "\n");
1394 }
Austin Schuh8bd96322020-02-13 21:18:22 -08001395 }
1396
Austin Schuh15649d62019-12-28 16:36:38 -08001397 // If we have access to the factory, use it to fix the realtime time.
Austin Schuh287d43d2020-12-04 20:19:33 -08001398 state->SetRealtimeOffset(timestamped_message.monotonic_event_time,
1399 timestamped_message.realtime_event_time);
Austin Schuh15649d62019-12-28 16:36:38 -08001400
Austin Schuh2f8fd752020-09-01 22:38:28 -07001401 VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Sending "
Austin Schuh287d43d2020-12-04 20:19:33 -08001402 << timestamped_message.monotonic_event_time;
Austin Schuh2f8fd752020-09-01 22:38:28 -07001403 // TODO(austin): std::move channel_data in and make that efficient in
1404 // simulation.
Austin Schuh287d43d2020-12-04 20:19:33 -08001405 state->Send(std::move(timestamped_message));
Austin Schuh2f8fd752020-09-01 22:38:28 -07001406 } else if (state->at_end() && !ignore_missing_data_) {
Austin Schuh8bd96322020-02-13 21:18:22 -08001407 // We are at the end of the log file and found missing data. Finish
Austin Schuh2f8fd752020-09-01 22:38:28 -07001408 // reading the rest of the log file and call it quits. We don't want
1409 // to replay partial data.
Austin Schuh858c9f32020-08-31 16:56:12 -07001410 while (state->OldestMessageTime() != monotonic_clock::max_time) {
1411 bool update_time_dummy;
1412 state->PopOldest(&update_time_dummy);
Austin Schuh8bd96322020-02-13 21:18:22 -08001413 }
Austin Schuh2f8fd752020-09-01 22:38:28 -07001414 } else {
Austin Schuh287d43d2020-12-04 20:19:33 -08001415 CHECK(timestamped_message.data.span().data() == nullptr) << ": Nullptr";
Austin Schuh92547522019-12-28 14:33:43 -08001416 }
Austin Schuhe309d2a2019-11-29 13:25:21 -08001417 } else {
Austin Schuh6f3babe2020-01-26 20:34:50 -08001418 LOG(WARNING)
1419 << "Not sending data from before the start of the log file. "
Austin Schuh287d43d2020-12-04 20:19:33 -08001420 << timestamped_message.monotonic_event_time.time_since_epoch().count()
Austin Schuh6f3babe2020-01-26 20:34:50 -08001421 << " start " << monotonic_start_time().time_since_epoch().count()
Austin Schuhd85baf82020-10-19 11:50:12 -07001422 << " "
Austin Schuh287d43d2020-12-04 20:19:33 -08001423 << FlatbufferToJson(timestamped_message.data,
Austin Schuhd85baf82020-10-19 11:50:12 -07001424 {.multi_line = false, .max_vector_size = 100});
Austin Schuhe309d2a2019-11-29 13:25:21 -08001425 }
1426
Austin Schuh858c9f32020-08-31 16:56:12 -07001427 const monotonic_clock::time_point next_time = state->OldestMessageTime();
Austin Schuh6f3babe2020-01-26 20:34:50 -08001428 if (next_time != monotonic_clock::max_time) {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001429 VLOG(1) << "Scheduling " << MaybeNodeName(state->event_loop()->node())
1430 << "wakeup for " << next_time << "("
1431 << state->ToDistributedClock(next_time)
1432 << " distributed), now is " << state->monotonic_now();
Austin Schuh858c9f32020-08-31 16:56:12 -07001433 state->Setup(next_time);
James Kuszmaul314f1672020-01-03 20:02:08 -08001434 } else {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001435 VLOG(1) << MaybeNodeName(state->event_loop()->node())
1436 << "No next message, scheduling shutdown";
1437 // Set a timer up immediately after now to die. If we don't do this,
1438 // then the senders waiting on the message we just read will never get
1439 // called.
Austin Schuheecb9282020-01-08 17:43:30 -08001440 if (event_loop_factory_ != nullptr) {
Austin Schuh858c9f32020-08-31 16:56:12 -07001441 state->Setup(monotonic_now + event_loop_factory_->send_delay() +
1442 std::chrono::nanoseconds(1));
Austin Schuheecb9282020-01-08 17:43:30 -08001443 }
Austin Schuhe309d2a2019-11-29 13:25:21 -08001444 }
Austin Schuh8bd96322020-02-13 21:18:22 -08001445
Austin Schuh2f8fd752020-09-01 22:38:28 -07001446 // Once we make this call, the current time changes. So do everything
1447 // which involves time before changing it. That especially includes
1448 // sending the message.
1449 if (update_time) {
1450 VLOG(1) << MaybeNodeName(state->event_loop()->node())
1451 << "updating offsets";
1452
1453 std::vector<aos::monotonic_clock::time_point> before_times;
1454 before_times.resize(states_.size());
1455 std::transform(states_.begin(), states_.end(), before_times.begin(),
1456 [](const std::unique_ptr<State> &state) {
1457 return state->monotonic_now();
1458 });
1459
1460 for (size_t i = 0; i < states_.size(); ++i) {
Brian Silvermand90905f2020-09-23 14:42:56 -07001461 VLOG(1) << MaybeNodeName(states_[i]->event_loop()->node()) << "before "
1462 << states_[i]->monotonic_now();
Austin Schuh2f8fd752020-09-01 22:38:28 -07001463 }
1464
Austin Schuh8bd96322020-02-13 21:18:22 -08001465 UpdateOffsets();
Austin Schuh2f8fd752020-09-01 22:38:28 -07001466 VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Now is now "
1467 << state->monotonic_now();
1468
1469 for (size_t i = 0; i < states_.size(); ++i) {
Brian Silvermand90905f2020-09-23 14:42:56 -07001470 VLOG(1) << MaybeNodeName(states_[i]->event_loop()->node()) << "after "
1471 << states_[i]->monotonic_now();
Austin Schuh2f8fd752020-09-01 22:38:28 -07001472 }
1473
1474 // TODO(austin): We should be perfect.
1475 const std::chrono::nanoseconds kTolerance{3};
1476 if (!FLAGS_skip_order_validation) {
1477 CHECK_GE(next_time, state->monotonic_now())
1478 << ": Time skipped the next event.";
1479
1480 for (size_t i = 0; i < states_.size(); ++i) {
1481 CHECK_GE(states_[i]->monotonic_now(), before_times[i] - kTolerance)
1482 << ": Time changed too much on node "
1483 << MaybeNodeName(states_[i]->event_loop()->node());
1484 CHECK_LE(states_[i]->monotonic_now(), before_times[i] + kTolerance)
1485 << ": Time changed too much on node "
1486 << states_[i]->event_loop()->node()->name()->string_view();
1487 }
1488 } else {
1489 if (next_time < state->monotonic_now()) {
1490 LOG(WARNING) << "Check failed: next_time >= "
1491 "state->monotonic_now() ("
1492 << next_time << " vs. " << state->monotonic_now()
1493 << "): Time skipped the next event.";
1494 }
1495 for (size_t i = 0; i < states_.size(); ++i) {
1496 if (states_[i]->monotonic_now() >= before_times[i] - kTolerance) {
1497 LOG(WARNING) << "Check failed: "
1498 "states_[i]->monotonic_now() "
1499 ">= before_times[i] - kTolerance ("
1500 << states_[i]->monotonic_now() << " vs. "
1501 << before_times[i] - kTolerance
1502 << ") : Time changed too much on node "
1503 << MaybeNodeName(states_[i]->event_loop()->node());
1504 }
1505 if (states_[i]->monotonic_now() <= before_times[i] + kTolerance) {
1506 LOG(WARNING) << "Check failed: "
1507 "states_[i]->monotonic_now() "
1508 "<= before_times[i] + kTolerance ("
1509 << states_[i]->monotonic_now() << " vs. "
1510 << before_times[i] - kTolerance
1511 << ") : Time changed too much on node "
1512 << MaybeNodeName(states_[i]->event_loop()->node());
1513 }
1514 }
1515 }
Austin Schuh8bd96322020-02-13 21:18:22 -08001516 }
Austin Schuh2f8fd752020-09-01 22:38:28 -07001517
1518 VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Done sending at "
1519 << state->event_loop()->context().monotonic_event_time << " now "
1520 << state->monotonic_now();
Austin Schuh858c9f32020-08-31 16:56:12 -07001521 }));
Austin Schuhe309d2a2019-11-29 13:25:21 -08001522
Austin Schuh6f3babe2020-01-26 20:34:50 -08001523 ++live_nodes_;
1524
Austin Schuh858c9f32020-08-31 16:56:12 -07001525 if (state->OldestMessageTime() != monotonic_clock::max_time) {
1526 event_loop->OnRun([state]() { state->Setup(state->OldestMessageTime()); });
Austin Schuhe309d2a2019-11-29 13:25:21 -08001527 }
1528}
1529
1530void LogReader::Deregister() {
James Kuszmaul84ff3e52020-01-03 19:48:53 -08001531 // Make sure that things get destroyed in the correct order, rather than
1532 // relying on getting the order correct in the class definition.
Austin Schuh8bd96322020-02-13 21:18:22 -08001533 for (std::unique_ptr<State> &state : states_) {
Austin Schuh858c9f32020-08-31 16:56:12 -07001534 state->Deregister();
Austin Schuhe309d2a2019-11-29 13:25:21 -08001535 }
Austin Schuh92547522019-12-28 14:33:43 -08001536
James Kuszmaul84ff3e52020-01-03 19:48:53 -08001537 event_loop_factory_unique_ptr_.reset();
1538 event_loop_factory_ = nullptr;
Austin Schuhe309d2a2019-11-29 13:25:21 -08001539}
1540
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001541void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
1542 std::string_view add_prefix) {
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001543 for (size_t ii = 0; ii < logged_configuration()->channels()->size(); ++ii) {
1544 const Channel *const channel = logged_configuration()->channels()->Get(ii);
1545 if (channel->name()->str() == name &&
1546 channel->type()->string_view() == type) {
1547 CHECK_EQ(0u, remapped_channels_.count(ii))
1548 << "Already remapped channel "
1549 << configuration::CleanedChannelToString(channel);
1550 remapped_channels_[ii] = std::string(add_prefix) + std::string(name);
1551 VLOG(1) << "Remapping channel "
1552 << configuration::CleanedChannelToString(channel)
1553 << " to have name " << remapped_channels_[ii];
Austin Schuh6331ef92020-01-07 18:28:09 -08001554 MakeRemappedConfig();
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001555 return;
1556 }
1557 }
1558 LOG(FATAL) << "Unabled to locate channel with name " << name << " and type "
1559 << type;
1560}
1561
Austin Schuh01b4c352020-09-21 23:09:39 -07001562void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
1563 const Node *node,
1564 std::string_view add_prefix) {
1565 VLOG(1) << "Node is " << aos::FlatbufferToJson(node);
1566 const Channel *remapped_channel =
1567 configuration::GetChannel(logged_configuration(), name, type, "", node);
1568 CHECK(remapped_channel != nullptr) << ": Failed to find {\"name\": \"" << name
1569 << "\", \"type\": \"" << type << "\"}";
1570 VLOG(1) << "Original {\"name\": \"" << name << "\", \"type\": \"" << type
1571 << "\"}";
1572 VLOG(1) << "Remapped "
1573 << aos::configuration::StrippedChannelToString(remapped_channel);
1574
1575 // We want to make /spray on node 0 go to /0/spray by snooping the maps. And
1576 // we want it to degrade if the heuristics fail to just work.
1577 //
1578 // The easiest way to do this is going to be incredibly specific and verbose.
1579 // Look up /spray, to /0/spray. Then, prefix the result with /original to get
1580 // /original/0/spray. Then, create a map from /original/spray to
1581 // /original/0/spray for just the type we were asked for.
1582 if (name != remapped_channel->name()->string_view()) {
1583 MapT new_map;
1584 new_map.match = std::make_unique<ChannelT>();
1585 new_map.match->name = absl::StrCat(add_prefix, name);
1586 new_map.match->type = type;
1587 if (node != nullptr) {
1588 new_map.match->source_node = node->name()->str();
1589 }
1590 new_map.rename = std::make_unique<ChannelT>();
1591 new_map.rename->name =
1592 absl::StrCat(add_prefix, remapped_channel->name()->string_view());
1593 maps_.emplace_back(std::move(new_map));
1594 }
1595
1596 const size_t channel_index =
1597 configuration::ChannelIndex(logged_configuration(), remapped_channel);
1598 CHECK_EQ(0u, remapped_channels_.count(channel_index))
1599 << "Already remapped channel "
1600 << configuration::CleanedChannelToString(remapped_channel);
1601 remapped_channels_[channel_index] =
1602 absl::StrCat(add_prefix, remapped_channel->name()->string_view());
1603 MakeRemappedConfig();
1604}
1605
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001606void LogReader::MakeRemappedConfig() {
Austin Schuh8bd96322020-02-13 21:18:22 -08001607 for (std::unique_ptr<State> &state : states_) {
Austin Schuh6aa77be2020-02-22 21:06:40 -08001608 if (state) {
Austin Schuh858c9f32020-08-31 16:56:12 -07001609 CHECK(!state->event_loop())
Austin Schuh6aa77be2020-02-22 21:06:40 -08001610 << ": Can't change the mapping after the events are scheduled.";
1611 }
Austin Schuh6f3babe2020-01-26 20:34:50 -08001612 }
Austin Schuhac0771c2020-01-07 18:36:30 -08001613
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001614 // If no remapping occurred and we are using the original config, then there
1615 // is nothing interesting to do here.
1616 if (remapped_channels_.empty() && replay_configuration_ == nullptr) {
Austin Schuh6f3babe2020-01-26 20:34:50 -08001617 remapped_configuration_ = logged_configuration();
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001618 return;
1619 }
1620 // Config to copy Channel definitions from. Use the specified
1621 // replay_configuration_ if it has been provided.
1622 const Configuration *const base_config = replay_configuration_ == nullptr
1623 ? logged_configuration()
1624 : replay_configuration_;
1625 // The remapped config will be identical to the base_config, except that it
1626 // will have a bunch of extra channels in the channel list, which are exact
1627 // copies of the remapped channels, but with different names.
1628 // Because the flatbuffers API is a pain to work with, this requires a bit of
1629 // a song-and-dance to get copied over.
1630 // The order of operations is to:
1631 // 1) Make a flatbuffer builder for a config that will just contain a list of
1632 // the new channels that we want to add.
1633 // 2) For each channel that we are remapping:
1634 // a) Make a buffer/builder and construct into it a Channel table that only
1635 // contains the new name for the channel.
1636 // b) Merge the new channel with just the name into the channel that we are
1637 // trying to copy, built in the flatbuffer builder made in 1. This gives
1638 // us the new channel definition that we need.
1639 // 3) Using this list of offsets, build the Configuration of just new
1640 // Channels.
1641 // 4) Merge the Configuration with the new Channels into the base_config.
1642 // 5) Call MergeConfiguration() on that result to give MergeConfiguration a
1643 // chance to sanitize the config.
1644
1645 // This is the builder that we use for the config containing all the new
1646 // channels.
1647 flatbuffers::FlatBufferBuilder new_config_fbb;
Austin Schuhd7b15da2020-02-17 15:06:11 -08001648 new_config_fbb.ForceDefaults(true);
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001649 std::vector<flatbuffers::Offset<Channel>> channel_offsets;
1650 for (auto &pair : remapped_channels_) {
1651 // This is the builder that we use for creating the Channel with just the
1652 // new name.
1653 flatbuffers::FlatBufferBuilder new_name_fbb;
Austin Schuhd7b15da2020-02-17 15:06:11 -08001654 new_name_fbb.ForceDefaults(true);
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001655 const flatbuffers::Offset<flatbuffers::String> name_offset =
1656 new_name_fbb.CreateString(pair.second);
1657 ChannelBuilder new_name_builder(new_name_fbb);
1658 new_name_builder.add_name(name_offset);
1659 new_name_fbb.Finish(new_name_builder.Finish());
1660 const FlatbufferDetachedBuffer<Channel> new_name = new_name_fbb.Release();
Austin Schuh2f8fd752020-09-01 22:38:28 -07001661 // Retrieve the channel that we want to copy, confirming that it is
1662 // actually present in base_config.
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001663 const Channel *const base_channel = CHECK_NOTNULL(configuration::GetChannel(
1664 base_config, logged_configuration()->channels()->Get(pair.first), "",
1665 nullptr));
1666 // Actually create the new channel and put it into the vector of Offsets
1667 // that we will use to create the new Configuration.
1668 channel_offsets.emplace_back(MergeFlatBuffers<Channel>(
1669 reinterpret_cast<const flatbuffers::Table *>(base_channel),
1670 reinterpret_cast<const flatbuffers::Table *>(&new_name.message()),
1671 &new_config_fbb));
1672 }
1673 // Create the Configuration containing the new channels that we want to add.
Austin Schuh01b4c352020-09-21 23:09:39 -07001674 const auto new_channel_vector_offsets =
Austin Schuhfa895892020-01-07 20:07:41 -08001675 new_config_fbb.CreateVector(channel_offsets);
Austin Schuh01b4c352020-09-21 23:09:39 -07001676
1677 // Now create the new maps.
1678 std::vector<flatbuffers::Offset<Map>> map_offsets;
1679 for (const MapT &map : maps_) {
1680 const flatbuffers::Offset<flatbuffers::String> match_name_offset =
1681 new_config_fbb.CreateString(map.match->name);
1682 const flatbuffers::Offset<flatbuffers::String> match_type_offset =
1683 new_config_fbb.CreateString(map.match->type);
1684 const flatbuffers::Offset<flatbuffers::String> rename_name_offset =
1685 new_config_fbb.CreateString(map.rename->name);
1686 flatbuffers::Offset<flatbuffers::String> match_source_node_offset;
1687 if (!map.match->source_node.empty()) {
1688 match_source_node_offset =
1689 new_config_fbb.CreateString(map.match->source_node);
1690 }
1691 Channel::Builder match_builder(new_config_fbb);
1692 match_builder.add_name(match_name_offset);
1693 match_builder.add_type(match_type_offset);
1694 if (!map.match->source_node.empty()) {
1695 match_builder.add_source_node(match_source_node_offset);
1696 }
1697 const flatbuffers::Offset<Channel> match_offset = match_builder.Finish();
1698
1699 Channel::Builder rename_builder(new_config_fbb);
1700 rename_builder.add_name(rename_name_offset);
1701 const flatbuffers::Offset<Channel> rename_offset = rename_builder.Finish();
1702
1703 Map::Builder map_builder(new_config_fbb);
1704 map_builder.add_match(match_offset);
1705 map_builder.add_rename(rename_offset);
1706 map_offsets.emplace_back(map_builder.Finish());
1707 }
1708
1709 const auto new_maps_offsets = new_config_fbb.CreateVector(map_offsets);
1710
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001711 ConfigurationBuilder new_config_builder(new_config_fbb);
Austin Schuh01b4c352020-09-21 23:09:39 -07001712 new_config_builder.add_channels(new_channel_vector_offsets);
1713 new_config_builder.add_maps(new_maps_offsets);
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001714 new_config_fbb.Finish(new_config_builder.Finish());
1715 const FlatbufferDetachedBuffer<Configuration> new_name_config =
1716 new_config_fbb.Release();
1717 // Merge the new channels configuration into the base_config, giving us the
1718 // remapped configuration.
1719 remapped_configuration_buffer_ =
1720 std::make_unique<FlatbufferDetachedBuffer<Configuration>>(
1721 MergeFlatBuffers<Configuration>(base_config,
1722 &new_name_config.message()));
1723 // Call MergeConfiguration to deal with sanitizing the config.
1724 remapped_configuration_buffer_ =
1725 std::make_unique<FlatbufferDetachedBuffer<Configuration>>(
1726 configuration::MergeConfiguration(*remapped_configuration_buffer_));
1727
1728 remapped_configuration_ = &remapped_configuration_buffer_->message();
1729}
1730
Austin Schuh6f3babe2020-01-26 20:34:50 -08001731const Channel *LogReader::RemapChannel(const EventLoop *event_loop,
1732 const Channel *channel) {
1733 std::string_view channel_name = channel->name()->string_view();
1734 std::string_view channel_type = channel->type()->string_view();
1735 const int channel_index =
1736 configuration::ChannelIndex(logged_configuration(), channel);
1737 // If the channel is remapped, find the correct channel name to use.
1738 if (remapped_channels_.count(channel_index) > 0) {
Austin Schuhee711052020-08-24 16:06:09 -07001739 VLOG(3) << "Got remapped channel on "
Austin Schuh6f3babe2020-01-26 20:34:50 -08001740 << configuration::CleanedChannelToString(channel);
1741 channel_name = remapped_channels_[channel_index];
1742 }
1743
Austin Schuhee711052020-08-24 16:06:09 -07001744 VLOG(2) << "Going to remap channel " << channel_name << " " << channel_type;
Austin Schuh6f3babe2020-01-26 20:34:50 -08001745 const Channel *remapped_channel = configuration::GetChannel(
1746 event_loop->configuration(), channel_name, channel_type,
1747 event_loop->name(), event_loop->node());
1748
1749 CHECK(remapped_channel != nullptr)
1750 << ": Unable to send {\"name\": \"" << channel_name << "\", \"type\": \""
1751 << channel_type << "\"} because it is not in the provided configuration.";
1752
1753 return remapped_channel;
1754}
1755
Austin Schuh287d43d2020-12-04 20:19:33 -08001756LogReader::State::State(std::unique_ptr<TimestampMapper> timestamp_mapper)
1757 : timestamp_mapper_(std::move(timestamp_mapper)) {}
1758
1759void LogReader::State::AddPeer(State *peer) {
1760 if (timestamp_mapper_ && peer->timestamp_mapper_) {
1761 timestamp_mapper_->AddPeer(peer->timestamp_mapper_.get());
1762 }
1763}
Austin Schuh858c9f32020-08-31 16:56:12 -07001764
1765EventLoop *LogReader::State::SetNodeEventLoopFactory(
1766 NodeEventLoopFactory *node_event_loop_factory) {
1767 node_event_loop_factory_ = node_event_loop_factory;
1768 event_loop_unique_ptr_ =
1769 node_event_loop_factory_->MakeEventLoop("log_reader");
1770 return event_loop_unique_ptr_.get();
1771}
1772
1773void LogReader::State::SetChannelCount(size_t count) {
1774 channels_.resize(count);
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001775 remote_timestamp_senders_.resize(count);
Austin Schuh858c9f32020-08-31 16:56:12 -07001776 filters_.resize(count);
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001777 channel_source_state_.resize(count);
1778 factory_channel_index_.resize(count);
1779 queue_index_map_.resize(count);
Austin Schuh858c9f32020-08-31 16:56:12 -07001780}
1781
1782void LogReader::State::SetChannel(
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001783 size_t logged_channel_index, size_t factory_channel_index,
1784 std::unique_ptr<RawSender> sender,
Austin Schuh2f8fd752020-09-01 22:38:28 -07001785 message_bridge::NoncausalOffsetEstimator *filter,
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001786 aos::Sender<MessageHeader> *remote_timestamp_sender, State *source_state) {
1787 channels_[logged_channel_index] = std::move(sender);
1788 filters_[logged_channel_index] = filter;
1789 remote_timestamp_senders_[logged_channel_index] = remote_timestamp_sender;
1790
1791 if (source_state) {
1792 channel_source_state_[logged_channel_index] = source_state;
1793
1794 if (remote_timestamp_sender != nullptr) {
1795 source_state->queue_index_map_[logged_channel_index] =
1796 std::make_unique<std::vector<State::SentTimestamp>>();
1797 }
1798 }
1799
1800 factory_channel_index_[logged_channel_index] = factory_channel_index;
1801}
1802
Austin Schuh287d43d2020-12-04 20:19:33 -08001803bool LogReader::State::Send(const TimestampedMessage &timestamped_message) {
1804 aos::RawSender *sender = channels_[timestamped_message.channel_index].get();
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001805 uint32_t remote_queue_index = 0xffffffff;
1806
Austin Schuh287d43d2020-12-04 20:19:33 -08001807 if (remote_timestamp_senders_[timestamped_message.channel_index] != nullptr) {
1808 std::vector<SentTimestamp> *queue_index_map = CHECK_NOTNULL(
1809 CHECK_NOTNULL(channel_source_state_[timestamped_message.channel_index])
1810 ->queue_index_map_[timestamped_message.channel_index]
1811 .get());
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001812
1813 SentTimestamp search;
Austin Schuh287d43d2020-12-04 20:19:33 -08001814 search.monotonic_event_time = timestamped_message.monotonic_remote_time;
1815 search.realtime_event_time = timestamped_message.realtime_remote_time;
1816 search.queue_index = timestamped_message.remote_queue_index;
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001817
1818 // Find the sent time if available.
1819 auto element = std::lower_bound(
1820 queue_index_map->begin(), queue_index_map->end(), search,
1821 [](SentTimestamp a, SentTimestamp b) {
1822 if (b.monotonic_event_time < a.monotonic_event_time) {
1823 return false;
1824 }
1825 if (b.monotonic_event_time > a.monotonic_event_time) {
1826 return true;
1827 }
1828
1829 if (b.queue_index < a.queue_index) {
1830 return false;
1831 }
1832 if (b.queue_index > a.queue_index) {
1833 return true;
1834 }
1835
1836 CHECK_EQ(a.realtime_event_time, b.realtime_event_time);
1837 return false;
1838 });
1839
1840 // TODO(austin): Be a bit more principled here, but we will want to do that
1841 // after the logger rewrite. We hit this when one node finishes, but the
1842 // other node isn't done yet. So there is no send time, but there is a
1843 // receive time.
1844 if (element != queue_index_map->end()) {
1845 CHECK_EQ(element->monotonic_event_time,
Austin Schuh287d43d2020-12-04 20:19:33 -08001846 timestamped_message.monotonic_remote_time);
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001847 CHECK_EQ(element->realtime_event_time,
Austin Schuh287d43d2020-12-04 20:19:33 -08001848 timestamped_message.realtime_remote_time);
1849 CHECK_EQ(element->queue_index, timestamped_message.remote_queue_index);
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001850
1851 remote_queue_index = element->actual_queue_index;
1852 }
1853 }
1854
1855 // Send! Use the replayed queue index here instead of the logged queue index
1856 // for the remote queue index. This makes re-logging work.
Austin Schuh287d43d2020-12-04 20:19:33 -08001857 const bool sent = sender->Send(
1858 timestamped_message.data.message().data()->Data(),
1859 timestamped_message.data.message().data()->size(),
1860 timestamped_message.monotonic_remote_time,
1861 timestamped_message.realtime_remote_time, remote_queue_index);
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001862 if (!sent) return false;
1863
Austin Schuh287d43d2020-12-04 20:19:33 -08001864 if (queue_index_map_[timestamped_message.channel_index]) {
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001865 SentTimestamp timestamp;
Austin Schuh287d43d2020-12-04 20:19:33 -08001866 timestamp.monotonic_event_time = timestamped_message.monotonic_event_time;
1867 timestamp.realtime_event_time = timestamped_message.realtime_event_time;
1868 timestamp.queue_index = timestamped_message.queue_index;
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001869 timestamp.actual_queue_index = sender->sent_queue_index();
Austin Schuh287d43d2020-12-04 20:19:33 -08001870 queue_index_map_[timestamped_message.channel_index]->emplace_back(
1871 timestamp);
1872 } else if (remote_timestamp_senders_[timestamped_message.channel_index] !=
1873 nullptr) {
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001874 aos::Sender<MessageHeader>::Builder builder =
Austin Schuh287d43d2020-12-04 20:19:33 -08001875 remote_timestamp_senders_[timestamped_message.channel_index]
1876 ->MakeBuilder();
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001877
1878 logger::MessageHeader::Builder message_header_builder =
1879 builder.MakeBuilder<logger::MessageHeader>();
1880
1881 message_header_builder.add_channel_index(
Austin Schuh287d43d2020-12-04 20:19:33 -08001882 factory_channel_index_[timestamped_message.channel_index]);
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001883
1884 // Swap the remote and sent metrics. They are from the sender's
1885 // perspective, not the receiver's perspective.
1886 message_header_builder.add_monotonic_sent_time(
1887 sender->monotonic_sent_time().time_since_epoch().count());
1888 message_header_builder.add_realtime_sent_time(
1889 sender->realtime_sent_time().time_since_epoch().count());
1890 message_header_builder.add_queue_index(sender->sent_queue_index());
1891
1892 message_header_builder.add_monotonic_remote_time(
Austin Schuh287d43d2020-12-04 20:19:33 -08001893 timestamped_message.monotonic_remote_time.time_since_epoch().count());
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001894 message_header_builder.add_realtime_remote_time(
Austin Schuh287d43d2020-12-04 20:19:33 -08001895 timestamped_message.realtime_remote_time.time_since_epoch().count());
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001896
1897 message_header_builder.add_remote_queue_index(remote_queue_index);
1898
1899 builder.Send(message_header_builder.Finish());
1900 }
1901
1902 return true;
1903}
1904
1905aos::Sender<MessageHeader> *LogReader::State::RemoteTimestampSender(
1906 const Node *delivered_node) {
1907 auto sender = remote_timestamp_senders_map_.find(delivered_node);
1908
1909 if (sender == remote_timestamp_senders_map_.end()) {
1910 sender = remote_timestamp_senders_map_
1911 .emplace(std::make_pair(
1912 delivered_node,
1913 event_loop()->MakeSender<MessageHeader>(
1914 absl::StrCat("/aos/remote_timestamps/",
1915 delivered_node->name()->string_view()))))
1916 .first;
1917 }
1918
1919 return &(sender->second);
Austin Schuh858c9f32020-08-31 16:56:12 -07001920}
1921
Austin Schuh287d43d2020-12-04 20:19:33 -08001922TimestampedMessage LogReader::State::PopOldest(bool *update_time) {
Austin Schuh858c9f32020-08-31 16:56:12 -07001923 CHECK_GT(sorted_messages_.size(), 0u);
1924
Austin Schuh287d43d2020-12-04 20:19:33 -08001925 std::tuple<TimestampedMessage, message_bridge::NoncausalOffsetEstimator *>
Austin Schuh858c9f32020-08-31 16:56:12 -07001926 result = std::move(sorted_messages_.front());
Austin Schuh2f8fd752020-09-01 22:38:28 -07001927 VLOG(2) << MaybeNodeName(event_loop_->node()) << "PopOldest Popping "
Austin Schuh858c9f32020-08-31 16:56:12 -07001928 << std::get<0>(result).monotonic_event_time;
1929 sorted_messages_.pop_front();
1930 SeedSortedMessages();
1931
Austin Schuh287d43d2020-12-04 20:19:33 -08001932 if (std::get<1>(result) != nullptr) {
1933 *update_time = std::get<1>(result)->Pop(
Austin Schuh2f8fd752020-09-01 22:38:28 -07001934 event_loop_->node(), std::get<0>(result).monotonic_event_time);
1935 } else {
1936 *update_time = false;
1937 }
Austin Schuh287d43d2020-12-04 20:19:33 -08001938 return std::move(std::get<0>(result));
Austin Schuh858c9f32020-08-31 16:56:12 -07001939}
1940
1941monotonic_clock::time_point LogReader::State::OldestMessageTime() const {
1942 if (sorted_messages_.size() > 0) {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001943 VLOG(2) << MaybeNodeName(event_loop_->node()) << "oldest message at "
Austin Schuh858c9f32020-08-31 16:56:12 -07001944 << std::get<0>(sorted_messages_.front()).monotonic_event_time;
1945 return std::get<0>(sorted_messages_.front()).monotonic_event_time;
1946 }
1947
Austin Schuh287d43d2020-12-04 20:19:33 -08001948 TimestampedMessage *m =
1949 timestamp_mapper_ ? timestamp_mapper_->Front() : nullptr;
1950 if (m == nullptr) {
1951 return monotonic_clock::max_time;
1952 }
1953 return m->monotonic_event_time;
Austin Schuh858c9f32020-08-31 16:56:12 -07001954}
1955
1956void LogReader::State::SeedSortedMessages() {
Austin Schuh287d43d2020-12-04 20:19:33 -08001957 if (!timestamp_mapper_) return;
Austin Schuh858c9f32020-08-31 16:56:12 -07001958 const aos::monotonic_clock::time_point end_queue_time =
1959 (sorted_messages_.size() > 0
1960 ? std::get<0>(sorted_messages_.front()).monotonic_event_time
Austin Schuh287d43d2020-12-04 20:19:33 -08001961 : timestamp_mapper_->monotonic_start_time()) +
Austin Schuh858c9f32020-08-31 16:56:12 -07001962 std::chrono::seconds(2);
1963
1964 while (true) {
Austin Schuh287d43d2020-12-04 20:19:33 -08001965 TimestampedMessage *m = timestamp_mapper_->Front();
1966 if (m == nullptr) {
Austin Schuh858c9f32020-08-31 16:56:12 -07001967 return;
1968 }
1969 if (sorted_messages_.size() > 0) {
1970 // Stop placing sorted messages on the list once we have 2 seconds
1971 // queued up (but queue at least until the log starts.
1972 if (end_queue_time <
1973 std::get<0>(sorted_messages_.back()).monotonic_event_time) {
1974 return;
1975 }
1976 }
1977
Austin Schuh2f8fd752020-09-01 22:38:28 -07001978 message_bridge::NoncausalOffsetEstimator *filter = nullptr;
1979
Austin Schuh287d43d2020-12-04 20:19:33 -08001980 TimestampedMessage timestamped_message = std::move(*m);
1981 timestamp_mapper_->PopFront();
Austin Schuh858c9f32020-08-31 16:56:12 -07001982
Austin Schuh2f8fd752020-09-01 22:38:28 -07001983 // Skip any messages without forwarding information.
Austin Schuh287d43d2020-12-04 20:19:33 -08001984 if (timestamped_message.monotonic_remote_time != monotonic_clock::min_time) {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001985 // Got a forwarding timestamp!
Austin Schuh287d43d2020-12-04 20:19:33 -08001986 filter = filters_[timestamped_message.channel_index];
Austin Schuh2f8fd752020-09-01 22:38:28 -07001987
1988 CHECK(filter != nullptr);
1989
1990 // Call the correct method depending on if we are the forward or
1991 // reverse direction here.
1992 filter->Sample(event_loop_->node(),
Austin Schuh287d43d2020-12-04 20:19:33 -08001993 timestamped_message.monotonic_event_time,
1994 timestamped_message.monotonic_remote_time);
Austin Schuh2f8fd752020-09-01 22:38:28 -07001995 }
Austin Schuh287d43d2020-12-04 20:19:33 -08001996 sorted_messages_.emplace_back(std::move(timestamped_message), filter);
Austin Schuh858c9f32020-08-31 16:56:12 -07001997 }
1998}
1999
2000void LogReader::State::Deregister() {
2001 for (size_t i = 0; i < channels_.size(); ++i) {
2002 channels_[i].reset();
2003 }
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07002004 remote_timestamp_senders_map_.clear();
Austin Schuh858c9f32020-08-31 16:56:12 -07002005 event_loop_unique_ptr_.reset();
2006 event_loop_ = nullptr;
2007 timer_handler_ = nullptr;
2008 node_event_loop_factory_ = nullptr;
2009}
2010
Austin Schuhe309d2a2019-11-29 13:25:21 -08002011} // namespace logger
2012} // namespace aos