blob: 85809b1edb73f2dd226d5bc4b7d3deaa53e32d6d [file] [log] [blame]
James Kuszmaul38735e82019-12-07 16:42:06 -08001#include "aos/events/logging/logger.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -08002
3#include <fcntl.h>
Austin Schuh4c4e0092019-12-22 16:18:03 -08004#include <limits.h>
Austin Schuhe309d2a2019-11-29 13:25:21 -08005#include <sys/stat.h>
6#include <sys/types.h>
7#include <sys/uio.h>
8#include <vector>
9
Austin Schuh8bd96322020-02-13 21:18:22 -080010#include "Eigen/Dense"
Austin Schuh2f8fd752020-09-01 22:38:28 -070011#include "absl/strings/escaping.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080012#include "absl/types/span.h"
13#include "aos/events/event_loop.h"
James Kuszmaul38735e82019-12-07 16:42:06 -080014#include "aos/events/logging/logger_generated.h"
Austin Schuh64fab802020-09-09 22:47:47 -070015#include "aos/events/logging/uuid.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080016#include "aos/flatbuffer_merge.h"
Austin Schuh288479d2019-12-18 19:47:52 -080017#include "aos/network/team_number.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080018#include "aos/time/time.h"
19#include "flatbuffers/flatbuffers.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -070020#include "third_party/gmp/gmpxx.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080021
Austin Schuh15649d62019-12-28 16:36:38 -080022DEFINE_bool(skip_missing_forwarding_entries, false,
23 "If true, drop any forwarding entries with missing data. If "
24 "false, CHECK.");
Austin Schuhe309d2a2019-11-29 13:25:21 -080025
Austin Schuh8bd96322020-02-13 21:18:22 -080026DEFINE_bool(timestamps_to_csv, false,
27 "If true, write all the time synchronization information to a set "
28 "of CSV files in /tmp/. This should only be needed when debugging "
29 "time synchronization.");
30
Austin Schuh2f8fd752020-09-01 22:38:28 -070031DEFINE_bool(skip_order_validation, false,
32 "If true, ignore any out of orderness in replay");
33
Austin Schuhe309d2a2019-11-29 13:25:21 -080034namespace aos {
35namespace logger {
Austin Schuhe309d2a2019-11-29 13:25:21 -080036namespace chrono = std::chrono;
37
Austin Schuh64fab802020-09-09 22:47:47 -070038void LogNamer::UpdateHeader(
39 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
40 const UUID &uuid, int parts_index) {
41 header->mutable_message()->mutate_parts_index(parts_index);
42 CHECK_EQ(uuid.string_view().size(),
43 header->mutable_message()->mutable_parts_uuid()->size());
44 std::copy(uuid.string_view().begin(), uuid.string_view().end(),
45 reinterpret_cast<char *>(
46 header->mutable_message()->mutable_parts_uuid()->Data()));
47}
48
Austin Schuh2f8fd752020-09-01 22:38:28 -070049void MultiNodeLogNamer::WriteHeader(
Austin Schuh64fab802020-09-09 22:47:47 -070050 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
Austin Schuh2f8fd752020-09-01 22:38:28 -070051 const Node *node) {
52 if (node == this->node()) {
Austin Schuh64fab802020-09-09 22:47:47 -070053 UpdateHeader(header, uuid_, part_number_);
54 data_writer_->WriteSizedFlatbuffer(header->full_span());
Austin Schuh2f8fd752020-09-01 22:38:28 -070055 } else {
56 for (std::pair<const Channel *const, DataWriter> &data_writer :
57 data_writers_) {
58 if (node == data_writer.second.node) {
Austin Schuh64fab802020-09-09 22:47:47 -070059 UpdateHeader(header, data_writer.second.uuid,
60 data_writer.second.part_number);
61 data_writer.second.writer->WriteSizedFlatbuffer(header->full_span());
Austin Schuh2f8fd752020-09-01 22:38:28 -070062 }
63 }
64 }
65}
66
67void MultiNodeLogNamer::Rotate(
68 const Node *node,
Austin Schuh64fab802020-09-09 22:47:47 -070069 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) {
Austin Schuh2f8fd752020-09-01 22:38:28 -070070 if (node == this->node()) {
71 ++part_number_;
72 *data_writer_ = std::move(*OpenDataWriter());
Austin Schuh64fab802020-09-09 22:47:47 -070073 UpdateHeader(header, uuid_, part_number_);
74 data_writer_->WriteSizedFlatbuffer(header->full_span());
Austin Schuh2f8fd752020-09-01 22:38:28 -070075 } else {
76 for (std::pair<const Channel *const, DataWriter> &data_writer :
77 data_writers_) {
78 if (node == data_writer.second.node) {
79 ++data_writer.second.part_number;
80 data_writer.second.rotate(data_writer.first, &data_writer.second);
Austin Schuh64fab802020-09-09 22:47:47 -070081 UpdateHeader(header, data_writer.second.uuid,
82 data_writer.second.part_number);
83 data_writer.second.writer->WriteSizedFlatbuffer(header->full_span());
Austin Schuh2f8fd752020-09-01 22:38:28 -070084 }
85 }
86 }
87}
88
89Logger::Logger(std::string_view base_name, EventLoop *event_loop,
Austin Schuhe309d2a2019-11-29 13:25:21 -080090 std::chrono::milliseconds polling_period)
Austin Schuh2f8fd752020-09-01 22:38:28 -070091 : Logger(std::make_unique<LocalLogNamer>(base_name, event_loop->node()),
Austin Schuh6f3babe2020-01-26 20:34:50 -080092 event_loop, polling_period) {}
93
94Logger::Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
95 std::chrono::milliseconds polling_period)
Austin Schuhe309d2a2019-11-29 13:25:21 -080096 : event_loop_(event_loop),
Austin Schuh64fab802020-09-09 22:47:47 -070097 uuid_(UUID::Random()),
Austin Schuh6f3babe2020-01-26 20:34:50 -080098 log_namer_(std::move(log_namer)),
Austin Schuhe309d2a2019-11-29 13:25:21 -080099 timer_handler_(event_loop_->AddTimer([this]() { DoLogData(); })),
Austin Schuh2f8fd752020-09-01 22:38:28 -0700100 polling_period_(polling_period),
101 server_statistics_fetcher_(
102 configuration::MultiNode(event_loop_->configuration())
103 ? event_loop_->MakeFetcher<message_bridge::ServerStatistics>(
104 "/aos")
105 : aos::Fetcher<message_bridge::ServerStatistics>()) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800106 VLOG(1) << "Starting logger for " << FlatbufferToJson(event_loop_->node());
107 int channel_index = 0;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700108
109 // Find all the nodes which are logging timestamps on our node.
110 std::set<const Node *> timestamp_logger_nodes;
111 for (const Channel *channel : *event_loop_->configuration()->channels()) {
112 if (!configuration::ChannelIsSendableOnNode(channel, event_loop_->node()) ||
113 !channel->has_destination_nodes()) {
114 continue;
115 }
116 for (const Connection *connection : *channel->destination_nodes()) {
117 const Node *other_node = configuration::GetNode(
118 event_loop_->configuration(), connection->name()->string_view());
119
120 if (configuration::ConnectionDeliveryTimeIsLoggedOnNode(
121 connection, event_loop_->node())) {
122 VLOG(1) << "Timestamps are logged from "
123 << FlatbufferToJson(other_node);
124 timestamp_logger_nodes.insert(other_node);
125 }
126 }
127 }
128
129 std::map<const Channel *, const Node *> timestamp_logger_channels;
130
131 // Now that we have all the nodes accumulated, make remote timestamp loggers
132 // for them.
133 for (const Node *node : timestamp_logger_nodes) {
134 const Channel *channel = configuration::GetChannel(
135 event_loop_->configuration(),
136 absl::StrCat("/aos/remote_timestamps/", node->name()->string_view()),
137 logger::MessageHeader::GetFullyQualifiedName(), event_loop_->name(),
138 event_loop_->node());
139
140 CHECK(channel != nullptr)
141 << ": Remote timestamps are logged on "
142 << event_loop_->node()->name()->string_view()
143 << " but can't find channel /aos/remote_timestamps/"
144 << node->name()->string_view();
145 timestamp_logger_channels.insert(std::make_pair(channel, node));
146 }
147
148 const size_t our_node_index = configuration::GetNodeIndex(
149 event_loop_->configuration(), event_loop_->node());
150
Austin Schuhe309d2a2019-11-29 13:25:21 -0800151 for (const Channel *channel : *event_loop_->configuration()->channels()) {
152 FetcherStruct fs;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700153 fs.node_index = our_node_index;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800154 const bool is_local =
155 configuration::ChannelIsSendableOnNode(channel, event_loop_->node());
156
Austin Schuh15649d62019-12-28 16:36:38 -0800157 const bool is_readable =
158 configuration::ChannelIsReadableOnNode(channel, event_loop_->node());
159 const bool log_message = configuration::ChannelMessageIsLoggedOnNode(
160 channel, event_loop_->node()) &&
161 is_readable;
162
163 const bool log_delivery_times =
164 (event_loop_->node() == nullptr)
165 ? false
166 : configuration::ConnectionDeliveryTimeIsLoggedOnNode(
167 channel, event_loop_->node(), event_loop_->node());
168
Austin Schuh2f8fd752020-09-01 22:38:28 -0700169 // Now, detect a MessageHeader timestamp logger where we should just log the
170 // contents to a file directly.
171 const bool log_contents = timestamp_logger_channels.find(channel) !=
172 timestamp_logger_channels.end();
173 const Node *timestamp_node =
174 log_contents ? timestamp_logger_channels.find(channel)->second
175 : nullptr;
176
177 if (log_message || log_delivery_times || log_contents) {
Austin Schuh15649d62019-12-28 16:36:38 -0800178 fs.fetcher = event_loop->MakeRawFetcher(channel);
179 VLOG(1) << "Logging channel "
180 << configuration::CleanedChannelToString(channel);
181
182 if (log_delivery_times) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800183 VLOG(1) << " Delivery times";
184 fs.timestamp_writer = log_namer_->MakeTimestampWriter(channel);
Austin Schuh15649d62019-12-28 16:36:38 -0800185 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800186 if (log_message) {
187 VLOG(1) << " Data";
188 fs.writer = log_namer_->MakeWriter(channel);
189 if (!is_local) {
190 fs.log_type = LogType::kLogRemoteMessage;
191 }
192 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700193 if (log_contents) {
194 VLOG(1) << "Timestamp logger channel "
195 << configuration::CleanedChannelToString(channel);
196 fs.contents_writer =
197 log_namer_->MakeForwardedTimestampWriter(channel, timestamp_node);
198 fs.node_index = configuration::GetNodeIndex(
199 event_loop_->configuration(), timestamp_node);
200 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800201 fs.channel_index = channel_index;
202 fs.written = false;
203 fetchers_.emplace_back(std::move(fs));
Austin Schuh15649d62019-12-28 16:36:38 -0800204 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800205 ++channel_index;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800206 }
207
Austin Schuh2f8fd752020-09-01 22:38:28 -0700208 node_state_.resize(configuration::MultiNode(event_loop_->configuration())
209 ? event_loop_->configuration()->nodes()->size()
210 : 1u);
Austin Schuhe309d2a2019-11-29 13:25:21 -0800211
Austin Schuh2f8fd752020-09-01 22:38:28 -0700212 for (const Node *node : log_namer_->nodes()) {
213 const int node_index =
214 configuration::GetNodeIndex(event_loop_->configuration(), node);
Austin Schuhe309d2a2019-11-29 13:25:21 -0800215
Austin Schuh2f8fd752020-09-01 22:38:28 -0700216 node_state_[node_index].log_file_header = MakeHeader(node);
217 }
Austin Schuhe309d2a2019-11-29 13:25:21 -0800218
Austin Schuh2f8fd752020-09-01 22:38:28 -0700219 // When things start, we want to log the header, then the most recent
220 // messages available on each fetcher to capture the previous state, then
221 // start polling.
222 event_loop_->OnRun([this]() { StartLogging(); });
Austin Schuhe309d2a2019-11-29 13:25:21 -0800223}
224
Austin Schuh2f8fd752020-09-01 22:38:28 -0700225void Logger::StartLogging() {
226 // Grab data from each channel right before we declare the log file started
227 // so we can capture the latest message on each channel. This lets us have
228 // non periodic messages with configuration that now get logged.
229 for (FetcherStruct &f : fetchers_) {
230 f.written = !f.fetcher->Fetch();
231 }
232
233 // Clear out any old timestamps in case we are re-starting logging.
234 for (size_t i = 0; i < node_state_.size(); ++i) {
235 SetStartTime(i, monotonic_clock::min_time, realtime_clock::min_time);
236 }
237
238 WriteHeader();
239
240 LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node())
241 << " start_time " << last_synchronized_time_;
242
243 timer_handler_->Setup(event_loop_->monotonic_now() + polling_period_,
244 polling_period_);
245}
246
Austin Schuhfa895892020-01-07 20:07:41 -0800247void Logger::WriteHeader() {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700248 if (configuration::MultiNode(event_loop_->configuration())) {
249 server_statistics_fetcher_.Fetch();
250 }
251
252 aos::monotonic_clock::time_point monotonic_start_time =
253 event_loop_->monotonic_now();
254 aos::realtime_clock::time_point realtime_start_time =
255 event_loop_->realtime_now();
256
257 // We need to pick a point in time to declare the log file "started". This
258 // starts here. It needs to be after everything is fetched so that the
259 // fetchers are all pointed at the most recent message before the start
260 // time.
261 last_synchronized_time_ = monotonic_start_time;
262
Austin Schuh6f3babe2020-01-26 20:34:50 -0800263 for (const Node *node : log_namer_->nodes()) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700264 const int node_index =
265 configuration::GetNodeIndex(event_loop_->configuration(), node);
266 MaybeUpdateTimestamp(node, node_index, monotonic_start_time,
267 realtime_start_time);
Austin Schuh64fab802020-09-09 22:47:47 -0700268 log_namer_->WriteHeader(&node_state_[node_index].log_file_header, node);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800269 }
270}
Austin Schuh8bd96322020-02-13 21:18:22 -0800271
Austin Schuh2f8fd752020-09-01 22:38:28 -0700272void Logger::WriteMissingTimestamps() {
273 if (configuration::MultiNode(event_loop_->configuration())) {
274 server_statistics_fetcher_.Fetch();
275 } else {
276 return;
277 }
278
279 if (server_statistics_fetcher_.get() == nullptr) {
280 return;
281 }
282
283 for (const Node *node : log_namer_->nodes()) {
284 const int node_index =
285 configuration::GetNodeIndex(event_loop_->configuration(), node);
286 if (MaybeUpdateTimestamp(
287 node, node_index,
288 server_statistics_fetcher_.context().monotonic_event_time,
289 server_statistics_fetcher_.context().realtime_event_time)) {
Austin Schuh64fab802020-09-09 22:47:47 -0700290 log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700291 }
292 }
293}
294
295void Logger::SetStartTime(size_t node_index,
296 aos::monotonic_clock::time_point monotonic_start_time,
297 aos::realtime_clock::time_point realtime_start_time) {
298 node_state_[node_index].monotonic_start_time = monotonic_start_time;
299 node_state_[node_index].realtime_start_time = realtime_start_time;
300 node_state_[node_index]
301 .log_file_header.mutable_message()
302 ->mutate_monotonic_start_time(
303 std::chrono::duration_cast<std::chrono::nanoseconds>(
304 monotonic_start_time.time_since_epoch())
305 .count());
306 if (node_state_[node_index]
307 .log_file_header.mutable_message()
308 ->has_realtime_start_time()) {
309 node_state_[node_index]
310 .log_file_header.mutable_message()
311 ->mutate_realtime_start_time(
312 std::chrono::duration_cast<std::chrono::nanoseconds>(
313 realtime_start_time.time_since_epoch())
314 .count());
315 }
316}
317
318bool Logger::MaybeUpdateTimestamp(
319 const Node *node, int node_index,
320 aos::monotonic_clock::time_point monotonic_start_time,
321 aos::realtime_clock::time_point realtime_start_time) {
322 // Bail early if there the start times are already set.
323 if (node_state_[node_index].monotonic_start_time !=
324 monotonic_clock::min_time) {
325 return false;
326 }
327 if (configuration::MultiNode(event_loop_->configuration())) {
328 if (event_loop_->node() == node) {
329 // There are no offsets to compute for ourself, so always succeed.
330 SetStartTime(node_index, monotonic_start_time, realtime_start_time);
331 return true;
332 } else if (server_statistics_fetcher_.get() != nullptr) {
333 // We must be a remote node now. Look for the connection and see if it is
334 // connected.
335
336 for (const message_bridge::ServerConnection *connection :
337 *server_statistics_fetcher_->connections()) {
338 if (connection->node()->name()->string_view() !=
339 node->name()->string_view()) {
340 continue;
341 }
342
343 if (connection->state() != message_bridge::State::CONNECTED) {
344 VLOG(1) << node->name()->string_view()
345 << " is not connected, can't start it yet.";
346 break;
347 }
348
349 if (!connection->has_monotonic_offset()) {
350 VLOG(1) << "Missing monotonic offset for setting start time for node "
351 << aos::FlatbufferToJson(node);
352 break;
353 }
354
355 VLOG(1) << "Updating start time for " << aos::FlatbufferToJson(node);
356
357 // Found it and it is connected. Compensate and go.
358 monotonic_start_time +=
359 std::chrono::nanoseconds(connection->monotonic_offset());
360
361 SetStartTime(node_index, monotonic_start_time, realtime_start_time);
362 return true;
363 }
364 }
365 } else {
366 SetStartTime(node_index, monotonic_start_time, realtime_start_time);
367 return true;
368 }
369 return false;
370}
371
372aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> Logger::MakeHeader(
373 const Node *node) {
Austin Schuhfa895892020-01-07 20:07:41 -0800374 // Now write the header with this timestamp in it.
375 flatbuffers::FlatBufferBuilder fbb;
Austin Schuhd7b15da2020-02-17 15:06:11 -0800376 fbb.ForceDefaults(true);
Austin Schuhfa895892020-01-07 20:07:41 -0800377
Austin Schuh2f8fd752020-09-01 22:38:28 -0700378 // TODO(austin): Compress this much more efficiently. There are a bunch of
379 // duplicated schemas.
Austin Schuhfa895892020-01-07 20:07:41 -0800380 flatbuffers::Offset<aos::Configuration> configuration_offset =
381 CopyFlatBuffer(event_loop_->configuration(), &fbb);
382
Austin Schuh64fab802020-09-09 22:47:47 -0700383 flatbuffers::Offset<flatbuffers::String> name_offset =
Austin Schuhfa895892020-01-07 20:07:41 -0800384 fbb.CreateString(network::GetHostname());
385
Austin Schuh64fab802020-09-09 22:47:47 -0700386 flatbuffers::Offset<flatbuffers::String> logger_uuid_offset =
387 fbb.CreateString(uuid_.string_view());
388
389 flatbuffers::Offset<flatbuffers::String> parts_uuid_offset =
390 fbb.CreateString("00000000-0000-4000-8000-000000000000");
391
Austin Schuhfa895892020-01-07 20:07:41 -0800392 flatbuffers::Offset<Node> node_offset;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700393
394 if (configuration::MultiNode(event_loop_->configuration())) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800395 node_offset = CopyFlatBuffer(node, &fbb);
Austin Schuhfa895892020-01-07 20:07:41 -0800396 }
397
398 aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
399
Austin Schuh64fab802020-09-09 22:47:47 -0700400 log_file_header_builder.add_name(name_offset);
Austin Schuhfa895892020-01-07 20:07:41 -0800401
402 // Only add the node if we are running in a multinode configuration.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800403 if (node != nullptr) {
Austin Schuhfa895892020-01-07 20:07:41 -0800404 log_file_header_builder.add_node(node_offset);
405 }
406
407 log_file_header_builder.add_configuration(configuration_offset);
408 // The worst case theoretical out of order is the polling period times 2.
409 // One message could get logged right after the boundary, but be for right
410 // before the next boundary. And the reverse could happen for another
411 // message. Report back 3x to be extra safe, and because the cost isn't
412 // huge on the read side.
413 log_file_header_builder.add_max_out_of_order_duration(
414 std::chrono::duration_cast<std::chrono::nanoseconds>(3 * polling_period_)
415 .count());
416
417 log_file_header_builder.add_monotonic_start_time(
418 std::chrono::duration_cast<std::chrono::nanoseconds>(
Austin Schuh2f8fd752020-09-01 22:38:28 -0700419 monotonic_clock::min_time.time_since_epoch())
Austin Schuhfa895892020-01-07 20:07:41 -0800420 .count());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700421 if (node == event_loop_->node()) {
422 log_file_header_builder.add_realtime_start_time(
423 std::chrono::duration_cast<std::chrono::nanoseconds>(
424 realtime_clock::min_time.time_since_epoch())
425 .count());
Austin Schuh6f3babe2020-01-26 20:34:50 -0800426 }
427
Austin Schuh64fab802020-09-09 22:47:47 -0700428 log_file_header_builder.add_logger_uuid(logger_uuid_offset);
429
430 log_file_header_builder.add_parts_uuid(parts_uuid_offset);
431 log_file_header_builder.add_parts_index(0);
432
Austin Schuh2f8fd752020-09-01 22:38:28 -0700433 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
434 return fbb.Release();
435}
436
437void Logger::Rotate() {
438 for (const Node *node : log_namer_->nodes()) {
439 const int node_index =
440 configuration::GetNodeIndex(event_loop_->configuration(), node);
Austin Schuh64fab802020-09-09 22:47:47 -0700441 log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700442 }
443}
444
445void Logger::LogUntil(monotonic_clock::time_point t) {
446 WriteMissingTimestamps();
447
448 // Write each channel to disk, one at a time.
449 for (FetcherStruct &f : fetchers_) {
450 while (true) {
451 if (f.written) {
452 if (!f.fetcher->FetchNext()) {
453 VLOG(2) << "No new data on "
454 << configuration::CleanedChannelToString(
455 f.fetcher->channel());
456 break;
457 } else {
458 f.written = false;
459 }
460 }
461
462 CHECK(!f.written);
463
464 // TODO(james): Write tests to exercise this logic.
465 if (f.fetcher->context().monotonic_event_time < t) {
466 if (f.writer != nullptr) {
467 // Write!
468 flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
469 max_header_size_);
470 fbb.ForceDefaults(true);
471
472 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
473 f.channel_index, f.log_type));
474
475 VLOG(2) << "Writing data as node "
476 << FlatbufferToJson(event_loop_->node()) << " for channel "
477 << configuration::CleanedChannelToString(f.fetcher->channel())
478 << " to " << f.writer->filename() << " data "
479 << FlatbufferToJson(
480 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
481 fbb.GetBufferPointer()));
482
483 max_header_size_ = std::max(
484 max_header_size_, fbb.GetSize() - f.fetcher->context().size);
485 f.writer->QueueSizedFlatbuffer(&fbb);
486 }
487
488 if (f.timestamp_writer != nullptr) {
489 // And now handle timestamps.
490 flatbuffers::FlatBufferBuilder fbb;
491 fbb.ForceDefaults(true);
492
493 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
494 f.channel_index,
495 LogType::kLogDeliveryTimeOnly));
496
497 VLOG(2) << "Writing timestamps as node "
498 << FlatbufferToJson(event_loop_->node()) << " for channel "
499 << configuration::CleanedChannelToString(f.fetcher->channel())
500 << " to " << f.timestamp_writer->filename() << " timestamp "
501 << FlatbufferToJson(
502 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
503 fbb.GetBufferPointer()));
504
505 f.timestamp_writer->QueueSizedFlatbuffer(&fbb);
506 }
507
508 if (f.contents_writer != nullptr) {
509 // And now handle the special message contents channel. Copy the
510 // message into a FlatBufferBuilder and save it to disk.
511 // TODO(austin): We can be more efficient here when we start to
512 // care...
513 flatbuffers::FlatBufferBuilder fbb;
514 fbb.ForceDefaults(true);
515
516 const MessageHeader *msg =
517 flatbuffers::GetRoot<MessageHeader>(f.fetcher->context().data);
518
519 logger::MessageHeader::Builder message_header_builder(fbb);
520
521 // Note: this must match the same order as MessageBridgeServer and
522 // PackMessage. We want identical headers to have identical
523 // on-the-wire formats to make comparing them easier.
524 message_header_builder.add_channel_index(msg->channel_index());
525
526 message_header_builder.add_queue_index(msg->queue_index());
527 message_header_builder.add_monotonic_sent_time(
528 msg->monotonic_sent_time());
529 message_header_builder.add_realtime_sent_time(
530 msg->realtime_sent_time());
531
532 message_header_builder.add_monotonic_remote_time(
533 msg->monotonic_remote_time());
534 message_header_builder.add_realtime_remote_time(
535 msg->realtime_remote_time());
536 message_header_builder.add_remote_queue_index(
537 msg->remote_queue_index());
538
539 fbb.FinishSizePrefixed(message_header_builder.Finish());
540
541 f.contents_writer->QueueSizedFlatbuffer(&fbb);
542 }
543
544 f.written = true;
545 } else {
546 break;
547 }
548 }
549 }
550 last_synchronized_time_ = t;
Austin Schuhfa895892020-01-07 20:07:41 -0800551}
552
Austin Schuhe309d2a2019-11-29 13:25:21 -0800553void Logger::DoLogData() {
554 // We want to guarentee that messages aren't out of order by more than
555 // max_out_of_order_duration. To do this, we need sync points. Every write
556 // cycle should be a sync point.
Austin Schuhfa895892020-01-07 20:07:41 -0800557 const monotonic_clock::time_point monotonic_now =
558 event_loop_->monotonic_now();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800559
560 do {
561 // Move the sync point up by at most polling_period. This forces one sync
562 // per iteration, even if it is small.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700563 LogUntil(
564 std::min(last_synchronized_time_ + polling_period_, monotonic_now));
Austin Schuhe309d2a2019-11-29 13:25:21 -0800565
Austin Schuhe309d2a2019-11-29 13:25:21 -0800566 // If we missed cycles, we could be pretty far behind. Spin until we are
567 // caught up.
568 } while (last_synchronized_time_ + polling_period_ < monotonic_now);
Austin Schuhe309d2a2019-11-29 13:25:21 -0800569}
570
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800571LogReader::LogReader(std::string_view filename,
572 const Configuration *replay_configuration)
Austin Schuhfa895892020-01-07 20:07:41 -0800573 : LogReader(std::vector<std::string>{std::string(filename)},
574 replay_configuration) {}
575
576LogReader::LogReader(const std::vector<std::string> &filenames,
577 const Configuration *replay_configuration)
Austin Schuh6f3babe2020-01-26 20:34:50 -0800578 : LogReader(std::vector<std::vector<std::string>>{filenames},
579 replay_configuration) {}
580
581LogReader::LogReader(const std::vector<std::vector<std::string>> &filenames,
582 const Configuration *replay_configuration)
583 : filenames_(filenames),
584 log_file_header_(ReadHeader(filenames[0][0])),
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800585 replay_configuration_(replay_configuration) {
Austin Schuh6331ef92020-01-07 18:28:09 -0800586 MakeRemappedConfig();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800587
Austin Schuh6aa77be2020-02-22 21:06:40 -0800588 if (replay_configuration) {
589 CHECK_EQ(configuration::MultiNode(configuration()),
590 configuration::MultiNode(replay_configuration))
Austin Schuh2f8fd752020-09-01 22:38:28 -0700591 << ": Log file and replay config need to both be multi or single "
592 "node.";
Austin Schuh6aa77be2020-02-22 21:06:40 -0800593 }
594
Austin Schuh6f3babe2020-01-26 20:34:50 -0800595 if (!configuration::MultiNode(configuration())) {
Austin Schuh858c9f32020-08-31 16:56:12 -0700596 states_.emplace_back(
597 std::make_unique<State>(std::make_unique<ChannelMerger>(filenames)));
Austin Schuh8bd96322020-02-13 21:18:22 -0800598 } else {
Austin Schuh6aa77be2020-02-22 21:06:40 -0800599 if (replay_configuration) {
James Kuszmaul46d82582020-05-09 19:50:09 -0700600 CHECK_EQ(logged_configuration()->nodes()->size(),
Austin Schuh6aa77be2020-02-22 21:06:40 -0800601 replay_configuration->nodes()->size())
Austin Schuh2f8fd752020-09-01 22:38:28 -0700602 << ": Log file and replay config need to have matching nodes "
603 "lists.";
James Kuszmaul46d82582020-05-09 19:50:09 -0700604 for (const Node *node : *logged_configuration()->nodes()) {
605 if (configuration::GetNode(replay_configuration, node) == nullptr) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700606 LOG(FATAL) << "Found node " << FlatbufferToJson(node)
607 << " in logged config that is not present in the replay "
608 "config.";
James Kuszmaul46d82582020-05-09 19:50:09 -0700609 }
610 }
Austin Schuh6aa77be2020-02-22 21:06:40 -0800611 }
Austin Schuh8bd96322020-02-13 21:18:22 -0800612 states_.resize(configuration()->nodes()->size());
Austin Schuh6f3babe2020-01-26 20:34:50 -0800613 }
Austin Schuhe309d2a2019-11-29 13:25:21 -0800614}
615
Austin Schuh6aa77be2020-02-22 21:06:40 -0800616LogReader::~LogReader() {
Austin Schuh39580f12020-08-01 14:44:08 -0700617 if (event_loop_factory_unique_ptr_) {
618 Deregister();
619 } else if (event_loop_factory_ != nullptr) {
620 LOG(FATAL) << "Must call Deregister before the SimulatedEventLoopFactory "
621 "is destroyed";
622 }
Austin Schuh8bd96322020-02-13 21:18:22 -0800623 if (offset_fp_ != nullptr) {
624 fclose(offset_fp_);
625 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700626 // Zero out some buffers. It's easy to do use-after-frees on these, so make
627 // it more obvious.
Austin Schuh39580f12020-08-01 14:44:08 -0700628 if (remapped_configuration_buffer_) {
629 remapped_configuration_buffer_->Wipe();
630 }
631 log_file_header_.Wipe();
Austin Schuh8bd96322020-02-13 21:18:22 -0800632}
Austin Schuhe309d2a2019-11-29 13:25:21 -0800633
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800634const Configuration *LogReader::logged_configuration() const {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800635 return log_file_header_.message().configuration();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800636}
637
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800638const Configuration *LogReader::configuration() const {
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800639 return remapped_configuration_;
640}
641
Austin Schuh6f3babe2020-01-26 20:34:50 -0800642std::vector<const Node *> LogReader::Nodes() const {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700643 // Because the Node pointer will only be valid if it actually points to
644 // memory owned by remapped_configuration_, we need to wait for the
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800645 // remapped_configuration_ to be populated before accessing it.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800646 //
647 // Also, note, that when ever a map is changed, the nodes in here are
648 // invalidated.
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800649 CHECK(remapped_configuration_ != nullptr)
650 << ": Need to call Register before the node() pointer will be valid.";
Austin Schuh6f3babe2020-01-26 20:34:50 -0800651 return configuration::GetNodes(remapped_configuration_);
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800652}
Austin Schuh15649d62019-12-28 16:36:38 -0800653
Austin Schuh6f3babe2020-01-26 20:34:50 -0800654monotonic_clock::time_point LogReader::monotonic_start_time(const Node *node) {
Austin Schuh8bd96322020-02-13 21:18:22 -0800655 State *state =
656 states_[configuration::GetNodeIndex(configuration(), node)].get();
657 CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
658
Austin Schuh858c9f32020-08-31 16:56:12 -0700659 return state->monotonic_start_time();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800660}
661
Austin Schuh6f3babe2020-01-26 20:34:50 -0800662realtime_clock::time_point LogReader::realtime_start_time(const Node *node) {
Austin Schuh8bd96322020-02-13 21:18:22 -0800663 State *state =
664 states_[configuration::GetNodeIndex(configuration(), node)].get();
665 CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
666
Austin Schuh858c9f32020-08-31 16:56:12 -0700667 return state->realtime_start_time();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800668}
669
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800670void LogReader::Register() {
671 event_loop_factory_unique_ptr_ =
Austin Schuhac0771c2020-01-07 18:36:30 -0800672 std::make_unique<SimulatedEventLoopFactory>(configuration());
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800673 Register(event_loop_factory_unique_ptr_.get());
674}
675
Austin Schuh92547522019-12-28 14:33:43 -0800676void LogReader::Register(SimulatedEventLoopFactory *event_loop_factory) {
Austin Schuh92547522019-12-28 14:33:43 -0800677 event_loop_factory_ = event_loop_factory;
Austin Schuh92547522019-12-28 14:33:43 -0800678
Austin Schuh6f3babe2020-01-26 20:34:50 -0800679 for (const Node *node : configuration::GetNodes(configuration())) {
Austin Schuh8bd96322020-02-13 21:18:22 -0800680 const size_t node_index =
681 configuration::GetNodeIndex(configuration(), node);
Austin Schuh858c9f32020-08-31 16:56:12 -0700682 states_[node_index] =
683 std::make_unique<State>(std::make_unique<ChannelMerger>(filenames_));
Austin Schuh8bd96322020-02-13 21:18:22 -0800684 State *state = states_[node_index].get();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800685
Austin Schuh858c9f32020-08-31 16:56:12 -0700686 Register(state->SetNodeEventLoopFactory(
687 event_loop_factory_->GetNodeEventLoopFactory(node)));
Austin Schuhcde938c2020-02-02 17:30:07 -0800688 }
James Kuszmaul46d82582020-05-09 19:50:09 -0700689 if (live_nodes_ == 0) {
690 LOG(FATAL)
691 << "Don't have logs from any of the nodes in the replay config--are "
692 "you sure that the replay config matches the original config?";
693 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800694
Austin Schuh2f8fd752020-09-01 22:38:28 -0700695 // We need to now seed our per-node time offsets and get everything set up
696 // to run.
697 const size_t num_nodes = nodes_count();
Austin Schuhcde938c2020-02-02 17:30:07 -0800698
Austin Schuh8bd96322020-02-13 21:18:22 -0800699 // It is easiest to solve for per node offsets with a matrix rather than
700 // trying to solve the equations by hand. So let's get after it.
701 //
702 // Now, build up the map matrix.
703 //
Austin Schuh2f8fd752020-09-01 22:38:28 -0700704 // offset_matrix_ = (map_matrix_ + slope_matrix_) * [ta; tb; tc]
705 map_matrix_ = Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>::Zero(
706 filters_.size() + 1, num_nodes);
707 slope_matrix_ =
708 Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>::Zero(
709 filters_.size() + 1, num_nodes);
Austin Schuhcde938c2020-02-02 17:30:07 -0800710
Austin Schuh2f8fd752020-09-01 22:38:28 -0700711 offset_matrix_ =
712 Eigen::Matrix<mpq_class, Eigen::Dynamic, 1>::Zero(filters_.size() + 1);
713 valid_matrix_ =
714 Eigen::Matrix<bool, Eigen::Dynamic, 1>::Zero(filters_.size() + 1);
715 last_valid_matrix_ =
716 Eigen::Matrix<bool, Eigen::Dynamic, 1>::Zero(filters_.size() + 1);
Austin Schuhcde938c2020-02-02 17:30:07 -0800717
Austin Schuh2f8fd752020-09-01 22:38:28 -0700718 time_offset_matrix_ = Eigen::VectorXd::Zero(num_nodes);
719 time_slope_matrix_ = Eigen::VectorXd::Zero(num_nodes);
Austin Schuh8bd96322020-02-13 21:18:22 -0800720
Austin Schuh2f8fd752020-09-01 22:38:28 -0700721 // All times should average out to the distributed clock.
722 for (int i = 0; i < map_matrix_.cols(); ++i) {
723 // 1/num_nodes.
724 map_matrix_(0, i) = mpq_class(1, num_nodes);
725 }
726 valid_matrix_(0) = true;
Austin Schuh8bd96322020-02-13 21:18:22 -0800727
728 {
729 // Now, add the a - b -> sample elements.
730 size_t i = 1;
731 for (std::pair<const std::tuple<const Node *, const Node *>,
Austin Schuh2f8fd752020-09-01 22:38:28 -0700732 std::tuple<message_bridge::NoncausalOffsetEstimator>>
733 &filter : filters_) {
Austin Schuh8bd96322020-02-13 21:18:22 -0800734 const Node *const node_a = std::get<0>(filter.first);
735 const Node *const node_b = std::get<1>(filter.first);
736
737 const size_t node_a_index =
738 configuration::GetNodeIndex(configuration(), node_a);
739 const size_t node_b_index =
740 configuration::GetNodeIndex(configuration(), node_b);
741
Austin Schuh2f8fd752020-09-01 22:38:28 -0700742 // -a
743 map_matrix_(i, node_a_index) = mpq_class(-1);
744 // +b
745 map_matrix_(i, node_b_index) = mpq_class(1);
Austin Schuh8bd96322020-02-13 21:18:22 -0800746
747 // -> sample
Austin Schuh2f8fd752020-09-01 22:38:28 -0700748 std::get<0>(filter.second)
749 .set_slope_pointer(&slope_matrix_(i, node_a_index));
750 std::get<0>(filter.second).set_offset_pointer(&offset_matrix_(i, 0));
751
752 valid_matrix_(i) = false;
753 std::get<0>(filter.second).set_valid_pointer(&valid_matrix_(i));
Austin Schuh8bd96322020-02-13 21:18:22 -0800754
755 ++i;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800756 }
757 }
758
Austin Schuh858c9f32020-08-31 16:56:12 -0700759 for (std::unique_ptr<State> &state : states_) {
760 state->SeedSortedMessages();
761 }
762
Austin Schuh2f8fd752020-09-01 22:38:28 -0700763 // Rank of the map matrix tells you if all the nodes are in communication
764 // with each other, which tells you if the offsets are observable.
765 const size_t connected_nodes =
766 Eigen::FullPivLU<
767 Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>>(map_matrix_)
768 .rank();
769
770 // We don't need to support isolated nodes until someone has a real use
771 // case.
772 CHECK_EQ(connected_nodes, num_nodes)
773 << ": There is a node which isn't communicating with the rest.";
774
775 // And solve.
Austin Schuh8bd96322020-02-13 21:18:22 -0800776 UpdateOffsets();
777
Austin Schuh2f8fd752020-09-01 22:38:28 -0700778 // We want to start the log file at the last start time of the log files
779 // from all the nodes. Compute how long each node's simulation needs to run
780 // to move time to this point.
Austin Schuh8bd96322020-02-13 21:18:22 -0800781 distributed_clock::time_point start_time = distributed_clock::min_time;
Austin Schuhcde938c2020-02-02 17:30:07 -0800782
Austin Schuh2f8fd752020-09-01 22:38:28 -0700783 // TODO(austin): We want an "OnStart" callback for each node rather than
784 // running until the last node.
785
Austin Schuh8bd96322020-02-13 21:18:22 -0800786 for (std::unique_ptr<State> &state : states_) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700787 VLOG(1) << "Start time is " << state->monotonic_start_time() << " for node "
788 << MaybeNodeName(state->event_loop()->node()) << "now "
789 << state->monotonic_now();
790 // And start computing the start time on the distributed clock now that
791 // that works.
Austin Schuh858c9f32020-08-31 16:56:12 -0700792 start_time = std::max(
793 start_time, state->ToDistributedClock(state->monotonic_start_time()));
Austin Schuhcde938c2020-02-02 17:30:07 -0800794 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700795
796 CHECK_GE(start_time, distributed_clock::epoch())
797 << ": Hmm, we have a node starting before the start of time. Offset "
798 "everything.";
Austin Schuhcde938c2020-02-02 17:30:07 -0800799
Austin Schuh6f3babe2020-01-26 20:34:50 -0800800 // Forwarding is tracked per channel. If it is enabled, we want to turn it
801 // off. Otherwise messages replayed will get forwarded across to the other
Austin Schuh2f8fd752020-09-01 22:38:28 -0700802 // nodes, and also replayed on the other nodes. This may not satisfy all
803 // our users, but it'll start the discussion.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800804 if (configuration::MultiNode(event_loop_factory_->configuration())) {
805 for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
806 const Channel *channel = logged_configuration()->channels()->Get(i);
807 const Node *node = configuration::GetNode(
808 configuration(), channel->source_node()->string_view());
809
Austin Schuh8bd96322020-02-13 21:18:22 -0800810 State *state =
811 states_[configuration::GetNodeIndex(configuration(), node)].get();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800812
813 const Channel *remapped_channel =
Austin Schuh858c9f32020-08-31 16:56:12 -0700814 RemapChannel(state->event_loop(), channel);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800815
816 event_loop_factory_->DisableForwarding(remapped_channel);
817 }
Austin Schuh4c3b9702020-08-30 11:34:55 -0700818
819 // If we are replaying a log, we don't want a bunch of redundant messages
820 // from both the real message bridge and simulated message bridge.
821 event_loop_factory_->DisableStatistics();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800822 }
823
Austin Schuhcde938c2020-02-02 17:30:07 -0800824 // While we are starting the system up, we might be relying on matching data
825 // to timestamps on log files where the timestamp log file starts before the
826 // data. In this case, it is reasonable to expect missing data.
827 ignore_missing_data_ = true;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700828 VLOG(1) << "Running until " << start_time << " in Register";
Austin Schuh8bd96322020-02-13 21:18:22 -0800829 event_loop_factory_->RunFor(start_time.time_since_epoch());
Brian Silverman8a32ce62020-08-12 12:02:38 -0700830 VLOG(1) << "At start time";
Austin Schuhcde938c2020-02-02 17:30:07 -0800831 // Now that we are running for real, missing data means that the log file is
832 // corrupted or went wrong.
833 ignore_missing_data_ = false;
Austin Schuh92547522019-12-28 14:33:43 -0800834
Austin Schuh8bd96322020-02-13 21:18:22 -0800835 for (std::unique_ptr<State> &state : states_) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700836 // Make the RT clock be correct before handing it to the user.
837 if (state->realtime_start_time() != realtime_clock::min_time) {
838 state->SetRealtimeOffset(state->monotonic_start_time(),
839 state->realtime_start_time());
840 }
841 VLOG(1) << "Start time is " << state->monotonic_start_time() << " for node "
842 << MaybeNodeName(state->event_loop()->node()) << "now "
843 << state->monotonic_now();
844 }
845
846 if (FLAGS_timestamps_to_csv) {
847 for (std::pair<const std::tuple<const Node *, const Node *>,
848 std::tuple<message_bridge::NoncausalOffsetEstimator>>
849 &filter : filters_) {
850 const Node *const node_a = std::get<0>(filter.first);
851 const Node *const node_b = std::get<1>(filter.first);
852
853 std::get<0>(filter.second)
854 .SetFirstFwdTime(event_loop_factory_->GetNodeEventLoopFactory(node_a)
855 ->monotonic_now());
856 std::get<0>(filter.second)
857 .SetFirstRevTime(event_loop_factory_->GetNodeEventLoopFactory(node_b)
858 ->monotonic_now());
859 }
Austin Schuh8bd96322020-02-13 21:18:22 -0800860 }
861}
862
Austin Schuh2f8fd752020-09-01 22:38:28 -0700863void LogReader::UpdateOffsets() {
864 VLOG(2) << "Samples are " << offset_matrix_;
865 VLOG(2) << "Map is " << (map_matrix_ + slope_matrix_);
866 std::tie(time_slope_matrix_, time_offset_matrix_) = SolveOffsets();
867 Eigen::IOFormat HeavyFmt(Eigen::FullPrecision, 0, ", ", ";\n", "[", "]", "[",
868 "]");
869 VLOG(1) << "First slope " << time_slope_matrix_.transpose().format(HeavyFmt)
870 << " offset " << time_offset_matrix_.transpose().format(HeavyFmt);
871
872 size_t node_index = 0;
873 for (std::unique_ptr<State> &state : states_) {
874 state->SetDistributedOffset(offset(node_index), slope(node_index));
875 VLOG(1) << "Offset for node " << node_index << " "
876 << MaybeNodeName(state->event_loop()->node()) << "is "
877 << aos::distributed_clock::time_point(offset(node_index))
878 << " slope " << std::setprecision(9) << std::fixed
879 << slope(node_index);
880 ++node_index;
881 }
882
883 if (VLOG_IS_ON(1)) {
884 LogFit("Offset is");
885 }
886}
887
888void LogReader::LogFit(std::string_view prefix) {
889 for (std::unique_ptr<State> &state : states_) {
890 VLOG(1) << MaybeNodeName(state->event_loop()->node()) << " now "
891 << state->monotonic_now() << " distributed "
892 << event_loop_factory_->distributed_now();
893 }
894
895 for (std::pair<const std::tuple<const Node *, const Node *>,
896 std::tuple<message_bridge::NoncausalOffsetEstimator>> &filter :
897 filters_) {
898 message_bridge::NoncausalOffsetEstimator *estimator =
899 &std::get<0>(filter.second);
900
901 if (estimator->a_timestamps().size() == 0 &&
902 estimator->b_timestamps().size() == 0) {
903 continue;
904 }
905
906 if (VLOG_IS_ON(1)) {
907 estimator->LogFit(prefix);
908 }
909
910 const Node *const node_a = std::get<0>(filter.first);
911 const Node *const node_b = std::get<1>(filter.first);
912
913 const size_t node_a_index =
914 configuration::GetNodeIndex(configuration(), node_a);
915 const size_t node_b_index =
916 configuration::GetNodeIndex(configuration(), node_b);
917
918 const double recovered_slope =
919 slope(node_b_index) / slope(node_a_index) - 1.0;
920 const int64_t recovered_offset =
921 offset(node_b_index).count() - offset(node_a_index).count() *
922 slope(node_b_index) /
923 slope(node_a_index);
924
925 VLOG(1) << "Recovered slope " << std::setprecision(20) << recovered_slope
926 << " (error " << recovered_slope - estimator->fit().slope() << ") "
927 << " offset " << std::setprecision(20) << recovered_offset
928 << " (error "
929 << recovered_offset - estimator->fit().offset().count() << ")";
930
931 const aos::distributed_clock::time_point a0 =
932 states_[node_a_index]->ToDistributedClock(
933 std::get<0>(estimator->a_timestamps()[0]));
934 const aos::distributed_clock::time_point a1 =
935 states_[node_a_index]->ToDistributedClock(
936 std::get<0>(estimator->a_timestamps()[1]));
937
938 VLOG(1) << node_a->name()->string_view() << " timestamps()[0] = "
939 << std::get<0>(estimator->a_timestamps()[0]) << " -> " << a0
940 << " distributed -> " << node_b->name()->string_view() << " "
941 << states_[node_b_index]->FromDistributedClock(a0) << " should be "
942 << aos::monotonic_clock::time_point(
943 std::chrono::nanoseconds(static_cast<int64_t>(
944 std::get<0>(estimator->a_timestamps()[0])
945 .time_since_epoch()
946 .count() *
947 (1.0 + estimator->fit().slope()))) +
948 estimator->fit().offset())
949 << ((a0 <= event_loop_factory_->distributed_now())
950 ? ""
951 : " After now, investigate");
952 VLOG(1) << node_a->name()->string_view() << " timestamps()[1] = "
953 << std::get<0>(estimator->a_timestamps()[1]) << " -> " << a1
954 << " distributed -> " << node_b->name()->string_view() << " "
955 << states_[node_b_index]->FromDistributedClock(a1) << " should be "
956 << aos::monotonic_clock::time_point(
957 std::chrono::nanoseconds(static_cast<int64_t>(
958 std::get<0>(estimator->a_timestamps()[1])
959 .time_since_epoch()
960 .count() *
961 (1.0 + estimator->fit().slope()))) +
962 estimator->fit().offset())
963 << ((event_loop_factory_->distributed_now() <= a1)
964 ? ""
965 : " Before now, investigate");
966
967 const aos::distributed_clock::time_point b0 =
968 states_[node_b_index]->ToDistributedClock(
969 std::get<0>(estimator->b_timestamps()[0]));
970 const aos::distributed_clock::time_point b1 =
971 states_[node_b_index]->ToDistributedClock(
972 std::get<0>(estimator->b_timestamps()[1]));
973
974 VLOG(1) << node_b->name()->string_view() << " timestamps()[0] = "
975 << std::get<0>(estimator->b_timestamps()[0]) << " -> " << b0
976 << " distributed -> " << node_a->name()->string_view() << " "
977 << states_[node_a_index]->FromDistributedClock(b0)
978 << ((b0 <= event_loop_factory_->distributed_now())
979 ? ""
980 : " After now, investigate");
981 VLOG(1) << node_b->name()->string_view() << " timestamps()[1] = "
982 << std::get<0>(estimator->b_timestamps()[1]) << " -> " << b1
983 << " distributed -> " << node_a->name()->string_view() << " "
984 << states_[node_a_index]->FromDistributedClock(b1)
985 << ((event_loop_factory_->distributed_now() <= b1)
986 ? ""
987 : " Before now, investigate");
988 }
989}
990
991message_bridge::NoncausalOffsetEstimator *LogReader::GetFilter(
Austin Schuh8bd96322020-02-13 21:18:22 -0800992 const Node *node_a, const Node *node_b) {
993 CHECK_NE(node_a, node_b);
994 CHECK_EQ(configuration::GetNode(configuration(), node_a), node_a);
995 CHECK_EQ(configuration::GetNode(configuration(), node_b), node_b);
996
997 if (node_a > node_b) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700998 return GetFilter(node_b, node_a);
Austin Schuh8bd96322020-02-13 21:18:22 -0800999 }
1000
1001 auto tuple = std::make_tuple(node_a, node_b);
1002
1003 auto it = filters_.find(tuple);
1004
1005 if (it == filters_.end()) {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001006 auto &x =
1007 filters_
1008 .insert(std::make_pair(
1009 tuple, std::make_tuple(message_bridge::NoncausalOffsetEstimator(
1010 node_a, node_b))))
1011 .first->second;
Austin Schuh8bd96322020-02-13 21:18:22 -08001012 if (FLAGS_timestamps_to_csv) {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001013 std::get<0>(x).SetFwdCsvFileName(absl::StrCat(
1014 "/tmp/timestamp_noncausal_", node_a->name()->string_view(), "_",
1015 node_b->name()->string_view()));
1016 std::get<0>(x).SetRevCsvFileName(absl::StrCat(
1017 "/tmp/timestamp_noncausal_", node_b->name()->string_view(), "_",
1018 node_a->name()->string_view()));
Austin Schuh8bd96322020-02-13 21:18:22 -08001019 }
1020
Austin Schuh2f8fd752020-09-01 22:38:28 -07001021 return &std::get<0>(x);
Austin Schuh8bd96322020-02-13 21:18:22 -08001022 } else {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001023 return &std::get<0>(it->second);
Austin Schuh8bd96322020-02-13 21:18:22 -08001024 }
1025}
1026
Austin Schuh8bd96322020-02-13 21:18:22 -08001027
Austin Schuhe309d2a2019-11-29 13:25:21 -08001028void LogReader::Register(EventLoop *event_loop) {
Austin Schuh8bd96322020-02-13 21:18:22 -08001029 State *state =
1030 states_[configuration::GetNodeIndex(configuration(), event_loop->node())]
1031 .get();
Austin Schuh6f3babe2020-01-26 20:34:50 -08001032
Austin Schuh858c9f32020-08-31 16:56:12 -07001033 state->set_event_loop(event_loop);
Austin Schuhe309d2a2019-11-29 13:25:21 -08001034
Tyler Chatow67ddb032020-01-12 14:30:04 -08001035 // We don't run timing reports when trying to print out logged data, because
1036 // otherwise we would end up printing out the timing reports themselves...
1037 // This is only really relevant when we are replaying into a simulation.
Austin Schuh6f3babe2020-01-26 20:34:50 -08001038 event_loop->SkipTimingReport();
1039 event_loop->SkipAosLog();
Austin Schuh39788ff2019-12-01 18:22:57 -08001040
Austin Schuh858c9f32020-08-31 16:56:12 -07001041 const bool has_data = state->SetNode();
Austin Schuhe309d2a2019-11-29 13:25:21 -08001042
Austin Schuh858c9f32020-08-31 16:56:12 -07001043 state->SetChannelCount(logged_configuration()->channels()->size());
Austin Schuh8bd96322020-02-13 21:18:22 -08001044
Austin Schuh858c9f32020-08-31 16:56:12 -07001045 for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
Austin Schuh6f3babe2020-01-26 20:34:50 -08001046 const Channel *channel =
1047 RemapChannel(event_loop, logged_configuration()->channels()->Get(i));
Austin Schuh6331ef92020-01-07 18:28:09 -08001048
Austin Schuh858c9f32020-08-31 16:56:12 -07001049 NodeEventLoopFactory *channel_target_event_loop_factory = nullptr;
Austin Schuh2f8fd752020-09-01 22:38:28 -07001050 message_bridge::NoncausalOffsetEstimator *filter = nullptr;
Austin Schuh8bd96322020-02-13 21:18:22 -08001051
1052 if (!configuration::ChannelIsSendableOnNode(channel, event_loop->node()) &&
1053 configuration::ChannelIsReadableOnNode(channel, event_loop->node())) {
1054 const Node *target_node = configuration::GetNode(
1055 event_loop->configuration(), channel->source_node()->string_view());
Austin Schuh858c9f32020-08-31 16:56:12 -07001056 filter = GetFilter(event_loop->node(), target_node);
Austin Schuh8bd96322020-02-13 21:18:22 -08001057
1058 if (event_loop_factory_ != nullptr) {
Austin Schuh858c9f32020-08-31 16:56:12 -07001059 channel_target_event_loop_factory =
Austin Schuh8bd96322020-02-13 21:18:22 -08001060 event_loop_factory_->GetNodeEventLoopFactory(target_node);
1061 }
1062 }
Austin Schuh858c9f32020-08-31 16:56:12 -07001063
1064 state->SetChannel(i, event_loop->MakeRawSender(channel), filter,
1065 channel_target_event_loop_factory);
Austin Schuhe309d2a2019-11-29 13:25:21 -08001066 }
1067
Austin Schuh6aa77be2020-02-22 21:06:40 -08001068 // If we didn't find any log files with data in them, we won't ever get a
1069 // callback or be live. So skip the rest of the setup.
1070 if (!has_data) {
1071 return;
1072 }
1073
Austin Schuh858c9f32020-08-31 16:56:12 -07001074 state->set_timer_handler(event_loop->AddTimer([this, state]() {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001075 VLOG(1) << "Starting sending " << MaybeNodeName(state->event_loop()->node())
1076 << "at " << state->event_loop()->context().monotonic_event_time
1077 << " now " << state->monotonic_now();
Austin Schuh858c9f32020-08-31 16:56:12 -07001078 if (state->OldestMessageTime() == monotonic_clock::max_time) {
Austin Schuh6f3babe2020-01-26 20:34:50 -08001079 --live_nodes_;
Austin Schuh2f8fd752020-09-01 22:38:28 -07001080 VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Node down!";
Austin Schuh6f3babe2020-01-26 20:34:50 -08001081 if (live_nodes_ == 0) {
1082 event_loop_factory_->Exit();
1083 }
James Kuszmaul314f1672020-01-03 20:02:08 -08001084 return;
1085 }
Austin Schuh6f3babe2020-01-26 20:34:50 -08001086 TimestampMerger::DeliveryTimestamp channel_timestamp;
Austin Schuh05b70472020-01-01 17:11:17 -08001087 int channel_index;
1088 FlatbufferVector<MessageHeader> channel_data =
1089 FlatbufferVector<MessageHeader>::Empty();
1090
Austin Schuh2f8fd752020-09-01 22:38:28 -07001091 if (VLOG_IS_ON(1)) {
1092 LogFit("Offset was");
1093 }
1094
1095 bool update_time;
Austin Schuh05b70472020-01-01 17:11:17 -08001096 std::tie(channel_timestamp, channel_index, channel_data) =
Austin Schuh2f8fd752020-09-01 22:38:28 -07001097 state->PopOldest(&update_time);
Austin Schuh05b70472020-01-01 17:11:17 -08001098
Austin Schuhe309d2a2019-11-29 13:25:21 -08001099 const monotonic_clock::time_point monotonic_now =
Austin Schuh858c9f32020-08-31 16:56:12 -07001100 state->event_loop()->context().monotonic_event_time;
Austin Schuh2f8fd752020-09-01 22:38:28 -07001101 if (!FLAGS_skip_order_validation) {
1102 CHECK(monotonic_now == channel_timestamp.monotonic_event_time)
1103 << ": " << FlatbufferToJson(state->event_loop()->node()) << " Now "
1104 << monotonic_now << " trying to send "
1105 << channel_timestamp.monotonic_event_time << " failure "
1106 << state->DebugString();
1107 } else if (monotonic_now != channel_timestamp.monotonic_event_time) {
1108 LOG(WARNING) << "Check failed: monotonic_now == "
1109 "channel_timestamp.monotonic_event_time) ("
1110 << monotonic_now << " vs. "
1111 << channel_timestamp.monotonic_event_time
1112 << "): " << FlatbufferToJson(state->event_loop()->node())
1113 << " Now " << monotonic_now << " trying to send "
1114 << channel_timestamp.monotonic_event_time << " failure "
1115 << state->DebugString();
1116 }
Austin Schuhe309d2a2019-11-29 13:25:21 -08001117
Austin Schuh6f3babe2020-01-26 20:34:50 -08001118 if (channel_timestamp.monotonic_event_time >
Austin Schuh858c9f32020-08-31 16:56:12 -07001119 state->monotonic_start_time() ||
Austin Schuh15649d62019-12-28 16:36:38 -08001120 event_loop_factory_ != nullptr) {
Austin Schuh8bd96322020-02-13 21:18:22 -08001121 if ((!ignore_missing_data_ && !FLAGS_skip_missing_forwarding_entries &&
Austin Schuh858c9f32020-08-31 16:56:12 -07001122 !state->at_end()) ||
Austin Schuh05b70472020-01-01 17:11:17 -08001123 channel_data.message().data() != nullptr) {
1124 CHECK(channel_data.message().data() != nullptr)
1125 << ": Got a message without data. Forwarding entry which was "
Austin Schuh2f8fd752020-09-01 22:38:28 -07001126 "not matched? Use --skip_missing_forwarding_entries to "
1127 "ignore "
Austin Schuh15649d62019-12-28 16:36:38 -08001128 "this.";
Austin Schuh92547522019-12-28 14:33:43 -08001129
Austin Schuh2f8fd752020-09-01 22:38:28 -07001130 if (update_time) {
Austin Schuh8bd96322020-02-13 21:18:22 -08001131 // Confirm that the message was sent on the sending node before the
1132 // destination node (this node). As a proxy, do this by making sure
1133 // that time on the source node is past when the message was sent.
Austin Schuh2f8fd752020-09-01 22:38:28 -07001134 if (!FLAGS_skip_order_validation) {
1135 CHECK_LT(channel_timestamp.monotonic_remote_time,
1136 state->monotonic_remote_now(channel_index))
1137 << state->event_loop()->node()->name()->string_view() << " to "
1138 << state->remote_node(channel_index)->name()->string_view()
1139 << " " << state->DebugString();
1140 } else if (channel_timestamp.monotonic_remote_time >=
1141 state->monotonic_remote_now(channel_index)) {
1142 LOG(WARNING)
1143 << "Check failed: channel_timestamp.monotonic_remote_time < "
1144 "state->monotonic_remote_now(channel_index) ("
1145 << channel_timestamp.monotonic_remote_time << " vs. "
1146 << state->monotonic_remote_now(channel_index) << ") "
1147 << state->event_loop()->node()->name()->string_view() << " to "
1148 << state->remote_node(channel_index)->name()->string_view()
1149 << " currently " << channel_timestamp.monotonic_event_time
1150 << " ("
1151 << state->ToDistributedClock(
1152 channel_timestamp.monotonic_event_time)
1153 << ") remote event time "
1154 << channel_timestamp.monotonic_remote_time << " ("
1155 << state->RemoteToDistributedClock(
1156 channel_index, channel_timestamp.monotonic_remote_time)
1157 << ") " << state->DebugString();
1158 }
Austin Schuh8bd96322020-02-13 21:18:22 -08001159
1160 if (FLAGS_timestamps_to_csv) {
1161 if (offset_fp_ == nullptr) {
1162 offset_fp_ = fopen("/tmp/offsets.csv", "w");
1163 fprintf(
1164 offset_fp_,
1165 "# time_since_start, offset node 0, offset node 1, ...\n");
1166 first_time_ = channel_timestamp.realtime_event_time;
1167 }
1168
1169 fprintf(offset_fp_, "%.9f",
1170 std::chrono::duration_cast<std::chrono::duration<double>>(
1171 channel_timestamp.realtime_event_time - first_time_)
1172 .count());
Austin Schuh2f8fd752020-09-01 22:38:28 -07001173 for (int i = 1; i < time_offset_matrix_.rows(); ++i) {
1174 fprintf(offset_fp_, ", %.9f",
1175 time_offset_matrix_(i, 0) +
1176 time_slope_matrix_(i, 0) *
1177 chrono::duration<double>(
1178 event_loop_factory_->distributed_now()
1179 .time_since_epoch())
1180 .count());
Austin Schuh8bd96322020-02-13 21:18:22 -08001181 }
1182 fprintf(offset_fp_, "\n");
1183 }
Austin Schuh8bd96322020-02-13 21:18:22 -08001184 }
1185
Austin Schuh15649d62019-12-28 16:36:38 -08001186 // If we have access to the factory, use it to fix the realtime time.
Austin Schuh858c9f32020-08-31 16:56:12 -07001187 state->SetRealtimeOffset(channel_timestamp.monotonic_event_time,
1188 channel_timestamp.realtime_event_time);
Austin Schuh15649d62019-12-28 16:36:38 -08001189
Austin Schuh2f8fd752020-09-01 22:38:28 -07001190 VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Sending "
1191 << channel_timestamp.monotonic_event_time;
1192 // TODO(austin): std::move channel_data in and make that efficient in
1193 // simulation.
Austin Schuh858c9f32020-08-31 16:56:12 -07001194 state->Send(channel_index, channel_data.message().data()->Data(),
1195 channel_data.message().data()->size(),
1196 channel_timestamp.monotonic_remote_time,
1197 channel_timestamp.realtime_remote_time,
1198 channel_timestamp.remote_queue_index);
Austin Schuh2f8fd752020-09-01 22:38:28 -07001199 } else if (state->at_end() && !ignore_missing_data_) {
Austin Schuh8bd96322020-02-13 21:18:22 -08001200 // We are at the end of the log file and found missing data. Finish
Austin Schuh2f8fd752020-09-01 22:38:28 -07001201 // reading the rest of the log file and call it quits. We don't want
1202 // to replay partial data.
Austin Schuh858c9f32020-08-31 16:56:12 -07001203 while (state->OldestMessageTime() != monotonic_clock::max_time) {
1204 bool update_time_dummy;
1205 state->PopOldest(&update_time_dummy);
Austin Schuh8bd96322020-02-13 21:18:22 -08001206 }
Austin Schuh2f8fd752020-09-01 22:38:28 -07001207 } else {
1208 CHECK(channel_data.message().data() == nullptr) << ": Nullptr";
Austin Schuh92547522019-12-28 14:33:43 -08001209 }
Austin Schuhe309d2a2019-11-29 13:25:21 -08001210 } else {
Austin Schuh6f3babe2020-01-26 20:34:50 -08001211 LOG(WARNING)
1212 << "Not sending data from before the start of the log file. "
1213 << channel_timestamp.monotonic_event_time.time_since_epoch().count()
1214 << " start " << monotonic_start_time().time_since_epoch().count()
1215 << " " << FlatbufferToJson(channel_data);
Austin Schuhe309d2a2019-11-29 13:25:21 -08001216 }
1217
Austin Schuh858c9f32020-08-31 16:56:12 -07001218 const monotonic_clock::time_point next_time = state->OldestMessageTime();
Austin Schuh6f3babe2020-01-26 20:34:50 -08001219 if (next_time != monotonic_clock::max_time) {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001220 VLOG(1) << "Scheduling " << MaybeNodeName(state->event_loop()->node())
1221 << "wakeup for " << next_time << "("
1222 << state->ToDistributedClock(next_time)
1223 << " distributed), now is " << state->monotonic_now();
Austin Schuh858c9f32020-08-31 16:56:12 -07001224 state->Setup(next_time);
James Kuszmaul314f1672020-01-03 20:02:08 -08001225 } else {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001226 VLOG(1) << MaybeNodeName(state->event_loop()->node())
1227 << "No next message, scheduling shutdown";
1228 // Set a timer up immediately after now to die. If we don't do this,
1229 // then the senders waiting on the message we just read will never get
1230 // called.
Austin Schuheecb9282020-01-08 17:43:30 -08001231 if (event_loop_factory_ != nullptr) {
Austin Schuh858c9f32020-08-31 16:56:12 -07001232 state->Setup(monotonic_now + event_loop_factory_->send_delay() +
1233 std::chrono::nanoseconds(1));
Austin Schuheecb9282020-01-08 17:43:30 -08001234 }
Austin Schuhe309d2a2019-11-29 13:25:21 -08001235 }
Austin Schuh8bd96322020-02-13 21:18:22 -08001236
Austin Schuh2f8fd752020-09-01 22:38:28 -07001237 // Once we make this call, the current time changes. So do everything
1238 // which involves time before changing it. That especially includes
1239 // sending the message.
1240 if (update_time) {
1241 VLOG(1) << MaybeNodeName(state->event_loop()->node())
1242 << "updating offsets";
1243
1244 std::vector<aos::monotonic_clock::time_point> before_times;
1245 before_times.resize(states_.size());
1246 std::transform(states_.begin(), states_.end(), before_times.begin(),
1247 [](const std::unique_ptr<State> &state) {
1248 return state->monotonic_now();
1249 });
1250
1251 for (size_t i = 0; i < states_.size(); ++i) {
1252 VLOG(1) << MaybeNodeName(
1253 states_[i]->event_loop()->node())
1254 << "before " << states_[i]->monotonic_now();
1255 }
1256
Austin Schuh8bd96322020-02-13 21:18:22 -08001257 UpdateOffsets();
Austin Schuh2f8fd752020-09-01 22:38:28 -07001258 VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Now is now "
1259 << state->monotonic_now();
1260
1261 for (size_t i = 0; i < states_.size(); ++i) {
1262 VLOG(1) << MaybeNodeName(
1263 states_[i]->event_loop()->node())
1264 << "after " << states_[i]->monotonic_now();
1265 }
1266
1267 // TODO(austin): We should be perfect.
1268 const std::chrono::nanoseconds kTolerance{3};
1269 if (!FLAGS_skip_order_validation) {
1270 CHECK_GE(next_time, state->monotonic_now())
1271 << ": Time skipped the next event.";
1272
1273 for (size_t i = 0; i < states_.size(); ++i) {
1274 CHECK_GE(states_[i]->monotonic_now(), before_times[i] - kTolerance)
1275 << ": Time changed too much on node "
1276 << MaybeNodeName(states_[i]->event_loop()->node());
1277 CHECK_LE(states_[i]->monotonic_now(), before_times[i] + kTolerance)
1278 << ": Time changed too much on node "
1279 << states_[i]->event_loop()->node()->name()->string_view();
1280 }
1281 } else {
1282 if (next_time < state->monotonic_now()) {
1283 LOG(WARNING) << "Check failed: next_time >= "
1284 "state->monotonic_now() ("
1285 << next_time << " vs. " << state->monotonic_now()
1286 << "): Time skipped the next event.";
1287 }
1288 for (size_t i = 0; i < states_.size(); ++i) {
1289 if (states_[i]->monotonic_now() >= before_times[i] - kTolerance) {
1290 LOG(WARNING) << "Check failed: "
1291 "states_[i]->monotonic_now() "
1292 ">= before_times[i] - kTolerance ("
1293 << states_[i]->monotonic_now() << " vs. "
1294 << before_times[i] - kTolerance
1295 << ") : Time changed too much on node "
1296 << MaybeNodeName(states_[i]->event_loop()->node());
1297 }
1298 if (states_[i]->monotonic_now() <= before_times[i] + kTolerance) {
1299 LOG(WARNING) << "Check failed: "
1300 "states_[i]->monotonic_now() "
1301 "<= before_times[i] + kTolerance ("
1302 << states_[i]->monotonic_now() << " vs. "
1303 << before_times[i] - kTolerance
1304 << ") : Time changed too much on node "
1305 << MaybeNodeName(states_[i]->event_loop()->node());
1306 }
1307 }
1308 }
Austin Schuh8bd96322020-02-13 21:18:22 -08001309 }
Austin Schuh2f8fd752020-09-01 22:38:28 -07001310
1311 VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Done sending at "
1312 << state->event_loop()->context().monotonic_event_time << " now "
1313 << state->monotonic_now();
Austin Schuh858c9f32020-08-31 16:56:12 -07001314 }));
Austin Schuhe309d2a2019-11-29 13:25:21 -08001315
Austin Schuh6f3babe2020-01-26 20:34:50 -08001316 ++live_nodes_;
1317
Austin Schuh858c9f32020-08-31 16:56:12 -07001318 if (state->OldestMessageTime() != monotonic_clock::max_time) {
1319 event_loop->OnRun([state]() { state->Setup(state->OldestMessageTime()); });
Austin Schuhe309d2a2019-11-29 13:25:21 -08001320 }
1321}
1322
1323void LogReader::Deregister() {
James Kuszmaul84ff3e52020-01-03 19:48:53 -08001324 // Make sure that things get destroyed in the correct order, rather than
1325 // relying on getting the order correct in the class definition.
Austin Schuh8bd96322020-02-13 21:18:22 -08001326 for (std::unique_ptr<State> &state : states_) {
Austin Schuh858c9f32020-08-31 16:56:12 -07001327 state->Deregister();
Austin Schuhe309d2a2019-11-29 13:25:21 -08001328 }
Austin Schuh92547522019-12-28 14:33:43 -08001329
James Kuszmaul84ff3e52020-01-03 19:48:53 -08001330 event_loop_factory_unique_ptr_.reset();
1331 event_loop_factory_ = nullptr;
Austin Schuhe309d2a2019-11-29 13:25:21 -08001332}
1333
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001334void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
1335 std::string_view add_prefix) {
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001336 for (size_t ii = 0; ii < logged_configuration()->channels()->size(); ++ii) {
1337 const Channel *const channel = logged_configuration()->channels()->Get(ii);
1338 if (channel->name()->str() == name &&
1339 channel->type()->string_view() == type) {
1340 CHECK_EQ(0u, remapped_channels_.count(ii))
1341 << "Already remapped channel "
1342 << configuration::CleanedChannelToString(channel);
1343 remapped_channels_[ii] = std::string(add_prefix) + std::string(name);
1344 VLOG(1) << "Remapping channel "
1345 << configuration::CleanedChannelToString(channel)
1346 << " to have name " << remapped_channels_[ii];
Austin Schuh6331ef92020-01-07 18:28:09 -08001347 MakeRemappedConfig();
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001348 return;
1349 }
1350 }
1351 LOG(FATAL) << "Unabled to locate channel with name " << name << " and type "
1352 << type;
1353}
1354
1355void LogReader::MakeRemappedConfig() {
Austin Schuh8bd96322020-02-13 21:18:22 -08001356 for (std::unique_ptr<State> &state : states_) {
Austin Schuh6aa77be2020-02-22 21:06:40 -08001357 if (state) {
Austin Schuh858c9f32020-08-31 16:56:12 -07001358 CHECK(!state->event_loop())
Austin Schuh6aa77be2020-02-22 21:06:40 -08001359 << ": Can't change the mapping after the events are scheduled.";
1360 }
Austin Schuh6f3babe2020-01-26 20:34:50 -08001361 }
Austin Schuhac0771c2020-01-07 18:36:30 -08001362
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001363 // If no remapping occurred and we are using the original config, then there
1364 // is nothing interesting to do here.
1365 if (remapped_channels_.empty() && replay_configuration_ == nullptr) {
Austin Schuh6f3babe2020-01-26 20:34:50 -08001366 remapped_configuration_ = logged_configuration();
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001367 return;
1368 }
1369 // Config to copy Channel definitions from. Use the specified
1370 // replay_configuration_ if it has been provided.
1371 const Configuration *const base_config = replay_configuration_ == nullptr
1372 ? logged_configuration()
1373 : replay_configuration_;
1374 // The remapped config will be identical to the base_config, except that it
1375 // will have a bunch of extra channels in the channel list, which are exact
1376 // copies of the remapped channels, but with different names.
1377 // Because the flatbuffers API is a pain to work with, this requires a bit of
1378 // a song-and-dance to get copied over.
1379 // The order of operations is to:
1380 // 1) Make a flatbuffer builder for a config that will just contain a list of
1381 // the new channels that we want to add.
1382 // 2) For each channel that we are remapping:
1383 // a) Make a buffer/builder and construct into it a Channel table that only
1384 // contains the new name for the channel.
1385 // b) Merge the new channel with just the name into the channel that we are
1386 // trying to copy, built in the flatbuffer builder made in 1. This gives
1387 // us the new channel definition that we need.
1388 // 3) Using this list of offsets, build the Configuration of just new
1389 // Channels.
1390 // 4) Merge the Configuration with the new Channels into the base_config.
1391 // 5) Call MergeConfiguration() on that result to give MergeConfiguration a
1392 // chance to sanitize the config.
1393
1394 // This is the builder that we use for the config containing all the new
1395 // channels.
1396 flatbuffers::FlatBufferBuilder new_config_fbb;
Austin Schuhd7b15da2020-02-17 15:06:11 -08001397 new_config_fbb.ForceDefaults(true);
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001398 std::vector<flatbuffers::Offset<Channel>> channel_offsets;
1399 for (auto &pair : remapped_channels_) {
1400 // This is the builder that we use for creating the Channel with just the
1401 // new name.
1402 flatbuffers::FlatBufferBuilder new_name_fbb;
Austin Schuhd7b15da2020-02-17 15:06:11 -08001403 new_name_fbb.ForceDefaults(true);
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001404 const flatbuffers::Offset<flatbuffers::String> name_offset =
1405 new_name_fbb.CreateString(pair.second);
1406 ChannelBuilder new_name_builder(new_name_fbb);
1407 new_name_builder.add_name(name_offset);
1408 new_name_fbb.Finish(new_name_builder.Finish());
1409 const FlatbufferDetachedBuffer<Channel> new_name = new_name_fbb.Release();
Austin Schuh2f8fd752020-09-01 22:38:28 -07001410 // Retrieve the channel that we want to copy, confirming that it is
1411 // actually present in base_config.
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001412 const Channel *const base_channel = CHECK_NOTNULL(configuration::GetChannel(
1413 base_config, logged_configuration()->channels()->Get(pair.first), "",
1414 nullptr));
1415 // Actually create the new channel and put it into the vector of Offsets
1416 // that we will use to create the new Configuration.
1417 channel_offsets.emplace_back(MergeFlatBuffers<Channel>(
1418 reinterpret_cast<const flatbuffers::Table *>(base_channel),
1419 reinterpret_cast<const flatbuffers::Table *>(&new_name.message()),
1420 &new_config_fbb));
1421 }
1422 // Create the Configuration containing the new channels that we want to add.
Austin Schuhfa895892020-01-07 20:07:41 -08001423 const auto new_name_vector_offsets =
1424 new_config_fbb.CreateVector(channel_offsets);
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001425 ConfigurationBuilder new_config_builder(new_config_fbb);
1426 new_config_builder.add_channels(new_name_vector_offsets);
1427 new_config_fbb.Finish(new_config_builder.Finish());
1428 const FlatbufferDetachedBuffer<Configuration> new_name_config =
1429 new_config_fbb.Release();
1430 // Merge the new channels configuration into the base_config, giving us the
1431 // remapped configuration.
1432 remapped_configuration_buffer_ =
1433 std::make_unique<FlatbufferDetachedBuffer<Configuration>>(
1434 MergeFlatBuffers<Configuration>(base_config,
1435 &new_name_config.message()));
1436 // Call MergeConfiguration to deal with sanitizing the config.
1437 remapped_configuration_buffer_ =
1438 std::make_unique<FlatbufferDetachedBuffer<Configuration>>(
1439 configuration::MergeConfiguration(*remapped_configuration_buffer_));
1440
1441 remapped_configuration_ = &remapped_configuration_buffer_->message();
1442}
1443
Austin Schuh6f3babe2020-01-26 20:34:50 -08001444const Channel *LogReader::RemapChannel(const EventLoop *event_loop,
1445 const Channel *channel) {
1446 std::string_view channel_name = channel->name()->string_view();
1447 std::string_view channel_type = channel->type()->string_view();
1448 const int channel_index =
1449 configuration::ChannelIndex(logged_configuration(), channel);
1450 // If the channel is remapped, find the correct channel name to use.
1451 if (remapped_channels_.count(channel_index) > 0) {
Austin Schuhee711052020-08-24 16:06:09 -07001452 VLOG(3) << "Got remapped channel on "
Austin Schuh6f3babe2020-01-26 20:34:50 -08001453 << configuration::CleanedChannelToString(channel);
1454 channel_name = remapped_channels_[channel_index];
1455 }
1456
Austin Schuhee711052020-08-24 16:06:09 -07001457 VLOG(2) << "Going to remap channel " << channel_name << " " << channel_type;
Austin Schuh6f3babe2020-01-26 20:34:50 -08001458 const Channel *remapped_channel = configuration::GetChannel(
1459 event_loop->configuration(), channel_name, channel_type,
1460 event_loop->name(), event_loop->node());
1461
1462 CHECK(remapped_channel != nullptr)
1463 << ": Unable to send {\"name\": \"" << channel_name << "\", \"type\": \""
1464 << channel_type << "\"} because it is not in the provided configuration.";
1465
1466 return remapped_channel;
1467}
1468
Austin Schuh858c9f32020-08-31 16:56:12 -07001469LogReader::State::State(std::unique_ptr<ChannelMerger> channel_merger)
1470 : channel_merger_(std::move(channel_merger)) {}
1471
1472EventLoop *LogReader::State::SetNodeEventLoopFactory(
1473 NodeEventLoopFactory *node_event_loop_factory) {
1474 node_event_loop_factory_ = node_event_loop_factory;
1475 event_loop_unique_ptr_ =
1476 node_event_loop_factory_->MakeEventLoop("log_reader");
1477 return event_loop_unique_ptr_.get();
1478}
1479
1480void LogReader::State::SetChannelCount(size_t count) {
1481 channels_.resize(count);
1482 filters_.resize(count);
1483 channel_target_event_loop_factory_.resize(count);
1484}
1485
1486void LogReader::State::SetChannel(
1487 size_t channel, std::unique_ptr<RawSender> sender,
Austin Schuh2f8fd752020-09-01 22:38:28 -07001488 message_bridge::NoncausalOffsetEstimator *filter,
Austin Schuh858c9f32020-08-31 16:56:12 -07001489 NodeEventLoopFactory *channel_target_event_loop_factory) {
1490 channels_[channel] = std::move(sender);
1491 filters_[channel] = filter;
1492 channel_target_event_loop_factory_[channel] =
1493 channel_target_event_loop_factory;
1494}
1495
1496std::tuple<TimestampMerger::DeliveryTimestamp, int,
1497 FlatbufferVector<MessageHeader>>
1498LogReader::State::PopOldest(bool *update_time) {
1499 CHECK_GT(sorted_messages_.size(), 0u);
1500
1501 std::tuple<TimestampMerger::DeliveryTimestamp, int,
Austin Schuh2f8fd752020-09-01 22:38:28 -07001502 FlatbufferVector<MessageHeader>,
1503 message_bridge::NoncausalOffsetEstimator *>
Austin Schuh858c9f32020-08-31 16:56:12 -07001504 result = std::move(sorted_messages_.front());
Austin Schuh2f8fd752020-09-01 22:38:28 -07001505 VLOG(2) << MaybeNodeName(event_loop_->node()) << "PopOldest Popping "
Austin Schuh858c9f32020-08-31 16:56:12 -07001506 << std::get<0>(result).monotonic_event_time;
1507 sorted_messages_.pop_front();
1508 SeedSortedMessages();
1509
Austin Schuh2f8fd752020-09-01 22:38:28 -07001510 if (std::get<3>(result) != nullptr) {
1511 *update_time = std::get<3>(result)->Pop(
1512 event_loop_->node(), std::get<0>(result).monotonic_event_time);
1513 } else {
1514 *update_time = false;
1515 }
Austin Schuh858c9f32020-08-31 16:56:12 -07001516 return std::make_tuple(std::get<0>(result), std::get<1>(result),
1517 std::move(std::get<2>(result)));
1518}
1519
1520monotonic_clock::time_point LogReader::State::OldestMessageTime() const {
1521 if (sorted_messages_.size() > 0) {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001522 VLOG(2) << MaybeNodeName(event_loop_->node()) << "oldest message at "
Austin Schuh858c9f32020-08-31 16:56:12 -07001523 << std::get<0>(sorted_messages_.front()).monotonic_event_time;
1524 return std::get<0>(sorted_messages_.front()).monotonic_event_time;
1525 }
1526
1527 return channel_merger_->OldestMessageTime();
1528}
1529
1530void LogReader::State::SeedSortedMessages() {
1531 const aos::monotonic_clock::time_point end_queue_time =
1532 (sorted_messages_.size() > 0
1533 ? std::get<0>(sorted_messages_.front()).monotonic_event_time
1534 : channel_merger_->monotonic_start_time()) +
1535 std::chrono::seconds(2);
1536
1537 while (true) {
1538 if (channel_merger_->OldestMessageTime() == monotonic_clock::max_time) {
1539 return;
1540 }
1541 if (sorted_messages_.size() > 0) {
1542 // Stop placing sorted messages on the list once we have 2 seconds
1543 // queued up (but queue at least until the log starts.
1544 if (end_queue_time <
1545 std::get<0>(sorted_messages_.back()).monotonic_event_time) {
1546 return;
1547 }
1548 }
1549
1550 TimestampMerger::DeliveryTimestamp channel_timestamp;
1551 int channel_index;
1552 FlatbufferVector<MessageHeader> channel_data =
1553 FlatbufferVector<MessageHeader>::Empty();
1554
Austin Schuh2f8fd752020-09-01 22:38:28 -07001555 message_bridge::NoncausalOffsetEstimator *filter = nullptr;
1556
Austin Schuh858c9f32020-08-31 16:56:12 -07001557 std::tie(channel_timestamp, channel_index, channel_data) =
1558 channel_merger_->PopOldest();
1559
Austin Schuh2f8fd752020-09-01 22:38:28 -07001560 // Skip any messages without forwarding information.
1561 if (channel_timestamp.monotonic_remote_time != monotonic_clock::min_time) {
1562 // Got a forwarding timestamp!
1563 filter = filters_[channel_index];
1564
1565 CHECK(filter != nullptr);
1566
1567 // Call the correct method depending on if we are the forward or
1568 // reverse direction here.
1569 filter->Sample(event_loop_->node(),
1570 channel_timestamp.monotonic_event_time,
1571 channel_timestamp.monotonic_remote_time);
1572 }
Austin Schuh858c9f32020-08-31 16:56:12 -07001573 sorted_messages_.emplace_back(channel_timestamp, channel_index,
Austin Schuh2f8fd752020-09-01 22:38:28 -07001574 std::move(channel_data), filter);
Austin Schuh858c9f32020-08-31 16:56:12 -07001575 }
1576}
1577
1578void LogReader::State::Deregister() {
1579 for (size_t i = 0; i < channels_.size(); ++i) {
1580 channels_[i].reset();
1581 }
1582 event_loop_unique_ptr_.reset();
1583 event_loop_ = nullptr;
1584 timer_handler_ = nullptr;
1585 node_event_loop_factory_ = nullptr;
1586}
1587
Austin Schuhe309d2a2019-11-29 13:25:21 -08001588} // namespace logger
1589} // namespace aos