blob: 637d1aebf4d0057c90ae64527ab49a83c538fd67 [file] [log] [blame]
James Kuszmaul38735e82019-12-07 16:42:06 -08001#include "aos/events/logging/logger.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -08002
3#include <fcntl.h>
Austin Schuh4c4e0092019-12-22 16:18:03 -08004#include <limits.h>
Austin Schuhe309d2a2019-11-29 13:25:21 -08005#include <sys/stat.h>
6#include <sys/types.h>
7#include <sys/uio.h>
8#include <vector>
9
Austin Schuh8bd96322020-02-13 21:18:22 -080010#include "Eigen/Dense"
Austin Schuh2f8fd752020-09-01 22:38:28 -070011#include "absl/strings/escaping.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080012#include "absl/types/span.h"
13#include "aos/events/event_loop.h"
James Kuszmaul38735e82019-12-07 16:42:06 -080014#include "aos/events/logging/logger_generated.h"
Austin Schuh64fab802020-09-09 22:47:47 -070015#include "aos/events/logging/uuid.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080016#include "aos/flatbuffer_merge.h"
Austin Schuh288479d2019-12-18 19:47:52 -080017#include "aos/network/team_number.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080018#include "aos/time/time.h"
19#include "flatbuffers/flatbuffers.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -070020#include "third_party/gmp/gmpxx.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080021
Austin Schuh15649d62019-12-28 16:36:38 -080022DEFINE_bool(skip_missing_forwarding_entries, false,
23 "If true, drop any forwarding entries with missing data. If "
24 "false, CHECK.");
Austin Schuhe309d2a2019-11-29 13:25:21 -080025
Austin Schuh8bd96322020-02-13 21:18:22 -080026DEFINE_bool(timestamps_to_csv, false,
27 "If true, write all the time synchronization information to a set "
28 "of CSV files in /tmp/. This should only be needed when debugging "
29 "time synchronization.");
30
Austin Schuh2f8fd752020-09-01 22:38:28 -070031DEFINE_bool(skip_order_validation, false,
32 "If true, ignore any out of orderness in replay");
33
Austin Schuhe309d2a2019-11-29 13:25:21 -080034namespace aos {
35namespace logger {
Austin Schuhe309d2a2019-11-29 13:25:21 -080036namespace chrono = std::chrono;
37
Austin Schuh64fab802020-09-09 22:47:47 -070038void LogNamer::UpdateHeader(
39 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
40 const UUID &uuid, int parts_index) {
41 header->mutable_message()->mutate_parts_index(parts_index);
42 CHECK_EQ(uuid.string_view().size(),
43 header->mutable_message()->mutable_parts_uuid()->size());
44 std::copy(uuid.string_view().begin(), uuid.string_view().end(),
45 reinterpret_cast<char *>(
46 header->mutable_message()->mutable_parts_uuid()->Data()));
47}
48
Austin Schuh2f8fd752020-09-01 22:38:28 -070049void MultiNodeLogNamer::WriteHeader(
Austin Schuh64fab802020-09-09 22:47:47 -070050 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
Austin Schuh2f8fd752020-09-01 22:38:28 -070051 const Node *node) {
52 if (node == this->node()) {
Austin Schuh64fab802020-09-09 22:47:47 -070053 UpdateHeader(header, uuid_, part_number_);
54 data_writer_->WriteSizedFlatbuffer(header->full_span());
Austin Schuh2f8fd752020-09-01 22:38:28 -070055 } else {
56 for (std::pair<const Channel *const, DataWriter> &data_writer :
57 data_writers_) {
58 if (node == data_writer.second.node) {
Austin Schuh64fab802020-09-09 22:47:47 -070059 UpdateHeader(header, data_writer.second.uuid,
60 data_writer.second.part_number);
61 data_writer.second.writer->WriteSizedFlatbuffer(header->full_span());
Austin Schuh2f8fd752020-09-01 22:38:28 -070062 }
63 }
64 }
65}
66
67void MultiNodeLogNamer::Rotate(
68 const Node *node,
Austin Schuh64fab802020-09-09 22:47:47 -070069 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) {
Austin Schuh2f8fd752020-09-01 22:38:28 -070070 if (node == this->node()) {
71 ++part_number_;
72 *data_writer_ = std::move(*OpenDataWriter());
Austin Schuh64fab802020-09-09 22:47:47 -070073 UpdateHeader(header, uuid_, part_number_);
74 data_writer_->WriteSizedFlatbuffer(header->full_span());
Austin Schuh2f8fd752020-09-01 22:38:28 -070075 } else {
76 for (std::pair<const Channel *const, DataWriter> &data_writer :
77 data_writers_) {
78 if (node == data_writer.second.node) {
79 ++data_writer.second.part_number;
80 data_writer.second.rotate(data_writer.first, &data_writer.second);
Austin Schuh64fab802020-09-09 22:47:47 -070081 UpdateHeader(header, data_writer.second.uuid,
82 data_writer.second.part_number);
83 data_writer.second.writer->WriteSizedFlatbuffer(header->full_span());
Austin Schuh2f8fd752020-09-01 22:38:28 -070084 }
85 }
86 }
87}
88
89Logger::Logger(std::string_view base_name, EventLoop *event_loop,
Austin Schuhe309d2a2019-11-29 13:25:21 -080090 std::chrono::milliseconds polling_period)
Austin Schuh2f8fd752020-09-01 22:38:28 -070091 : Logger(std::make_unique<LocalLogNamer>(base_name, event_loop->node()),
Austin Schuh6f3babe2020-01-26 20:34:50 -080092 event_loop, polling_period) {}
93
94Logger::Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
95 std::chrono::milliseconds polling_period)
Austin Schuhe309d2a2019-11-29 13:25:21 -080096 : event_loop_(event_loop),
Austin Schuh64fab802020-09-09 22:47:47 -070097 uuid_(UUID::Random()),
Austin Schuh6f3babe2020-01-26 20:34:50 -080098 log_namer_(std::move(log_namer)),
Austin Schuhe309d2a2019-11-29 13:25:21 -080099 timer_handler_(event_loop_->AddTimer([this]() { DoLogData(); })),
Austin Schuh2f8fd752020-09-01 22:38:28 -0700100 polling_period_(polling_period),
101 server_statistics_fetcher_(
102 configuration::MultiNode(event_loop_->configuration())
103 ? event_loop_->MakeFetcher<message_bridge::ServerStatistics>(
104 "/aos")
105 : aos::Fetcher<message_bridge::ServerStatistics>()) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800106 VLOG(1) << "Starting logger for " << FlatbufferToJson(event_loop_->node());
107 int channel_index = 0;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700108
109 // Find all the nodes which are logging timestamps on our node.
110 std::set<const Node *> timestamp_logger_nodes;
111 for (const Channel *channel : *event_loop_->configuration()->channels()) {
112 if (!configuration::ChannelIsSendableOnNode(channel, event_loop_->node()) ||
113 !channel->has_destination_nodes()) {
114 continue;
115 }
116 for (const Connection *connection : *channel->destination_nodes()) {
117 const Node *other_node = configuration::GetNode(
118 event_loop_->configuration(), connection->name()->string_view());
119
120 if (configuration::ConnectionDeliveryTimeIsLoggedOnNode(
121 connection, event_loop_->node())) {
122 VLOG(1) << "Timestamps are logged from "
123 << FlatbufferToJson(other_node);
124 timestamp_logger_nodes.insert(other_node);
125 }
126 }
127 }
128
129 std::map<const Channel *, const Node *> timestamp_logger_channels;
130
131 // Now that we have all the nodes accumulated, make remote timestamp loggers
132 // for them.
133 for (const Node *node : timestamp_logger_nodes) {
134 const Channel *channel = configuration::GetChannel(
135 event_loop_->configuration(),
136 absl::StrCat("/aos/remote_timestamps/", node->name()->string_view()),
137 logger::MessageHeader::GetFullyQualifiedName(), event_loop_->name(),
138 event_loop_->node());
139
140 CHECK(channel != nullptr)
141 << ": Remote timestamps are logged on "
142 << event_loop_->node()->name()->string_view()
143 << " but can't find channel /aos/remote_timestamps/"
144 << node->name()->string_view();
145 timestamp_logger_channels.insert(std::make_pair(channel, node));
146 }
147
148 const size_t our_node_index = configuration::GetNodeIndex(
149 event_loop_->configuration(), event_loop_->node());
150
Austin Schuhe309d2a2019-11-29 13:25:21 -0800151 for (const Channel *channel : *event_loop_->configuration()->channels()) {
152 FetcherStruct fs;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700153 fs.node_index = our_node_index;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800154 const bool is_local =
155 configuration::ChannelIsSendableOnNode(channel, event_loop_->node());
156
Austin Schuh15649d62019-12-28 16:36:38 -0800157 const bool is_readable =
158 configuration::ChannelIsReadableOnNode(channel, event_loop_->node());
159 const bool log_message = configuration::ChannelMessageIsLoggedOnNode(
160 channel, event_loop_->node()) &&
161 is_readable;
162
163 const bool log_delivery_times =
164 (event_loop_->node() == nullptr)
165 ? false
166 : configuration::ConnectionDeliveryTimeIsLoggedOnNode(
167 channel, event_loop_->node(), event_loop_->node());
168
Austin Schuh2f8fd752020-09-01 22:38:28 -0700169 // Now, detect a MessageHeader timestamp logger where we should just log the
170 // contents to a file directly.
171 const bool log_contents = timestamp_logger_channels.find(channel) !=
172 timestamp_logger_channels.end();
173 const Node *timestamp_node =
174 log_contents ? timestamp_logger_channels.find(channel)->second
175 : nullptr;
176
177 if (log_message || log_delivery_times || log_contents) {
Austin Schuh15649d62019-12-28 16:36:38 -0800178 fs.fetcher = event_loop->MakeRawFetcher(channel);
179 VLOG(1) << "Logging channel "
180 << configuration::CleanedChannelToString(channel);
181
182 if (log_delivery_times) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800183 VLOG(1) << " Delivery times";
184 fs.timestamp_writer = log_namer_->MakeTimestampWriter(channel);
Austin Schuh15649d62019-12-28 16:36:38 -0800185 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800186 if (log_message) {
187 VLOG(1) << " Data";
188 fs.writer = log_namer_->MakeWriter(channel);
189 if (!is_local) {
190 fs.log_type = LogType::kLogRemoteMessage;
191 }
192 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700193 if (log_contents) {
194 VLOG(1) << "Timestamp logger channel "
195 << configuration::CleanedChannelToString(channel);
196 fs.contents_writer =
197 log_namer_->MakeForwardedTimestampWriter(channel, timestamp_node);
198 fs.node_index = configuration::GetNodeIndex(
199 event_loop_->configuration(), timestamp_node);
200 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800201 fs.channel_index = channel_index;
202 fs.written = false;
203 fetchers_.emplace_back(std::move(fs));
Austin Schuh15649d62019-12-28 16:36:38 -0800204 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800205 ++channel_index;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800206 }
207
Austin Schuh2f8fd752020-09-01 22:38:28 -0700208 node_state_.resize(configuration::MultiNode(event_loop_->configuration())
209 ? event_loop_->configuration()->nodes()->size()
210 : 1u);
Austin Schuhe309d2a2019-11-29 13:25:21 -0800211
Austin Schuh2f8fd752020-09-01 22:38:28 -0700212 for (const Node *node : log_namer_->nodes()) {
213 const int node_index =
214 configuration::GetNodeIndex(event_loop_->configuration(), node);
Austin Schuhe309d2a2019-11-29 13:25:21 -0800215
Austin Schuh2f8fd752020-09-01 22:38:28 -0700216 node_state_[node_index].log_file_header = MakeHeader(node);
217 }
Austin Schuhe309d2a2019-11-29 13:25:21 -0800218
Austin Schuh2f8fd752020-09-01 22:38:28 -0700219 // When things start, we want to log the header, then the most recent
220 // messages available on each fetcher to capture the previous state, then
221 // start polling.
222 event_loop_->OnRun([this]() { StartLogging(); });
Austin Schuhe309d2a2019-11-29 13:25:21 -0800223}
224
Austin Schuh2f8fd752020-09-01 22:38:28 -0700225void Logger::StartLogging() {
226 // Grab data from each channel right before we declare the log file started
227 // so we can capture the latest message on each channel. This lets us have
228 // non periodic messages with configuration that now get logged.
229 for (FetcherStruct &f : fetchers_) {
230 f.written = !f.fetcher->Fetch();
231 }
232
233 // Clear out any old timestamps in case we are re-starting logging.
234 for (size_t i = 0; i < node_state_.size(); ++i) {
235 SetStartTime(i, monotonic_clock::min_time, realtime_clock::min_time);
236 }
237
238 WriteHeader();
239
240 LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node())
241 << " start_time " << last_synchronized_time_;
242
243 timer_handler_->Setup(event_loop_->monotonic_now() + polling_period_,
244 polling_period_);
245}
246
Austin Schuhfa895892020-01-07 20:07:41 -0800247void Logger::WriteHeader() {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700248 if (configuration::MultiNode(event_loop_->configuration())) {
249 server_statistics_fetcher_.Fetch();
250 }
251
252 aos::monotonic_clock::time_point monotonic_start_time =
253 event_loop_->monotonic_now();
254 aos::realtime_clock::time_point realtime_start_time =
255 event_loop_->realtime_now();
256
257 // We need to pick a point in time to declare the log file "started". This
258 // starts here. It needs to be after everything is fetched so that the
259 // fetchers are all pointed at the most recent message before the start
260 // time.
261 last_synchronized_time_ = monotonic_start_time;
262
Austin Schuh6f3babe2020-01-26 20:34:50 -0800263 for (const Node *node : log_namer_->nodes()) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700264 const int node_index =
265 configuration::GetNodeIndex(event_loop_->configuration(), node);
266 MaybeUpdateTimestamp(node, node_index, monotonic_start_time,
267 realtime_start_time);
Austin Schuh64fab802020-09-09 22:47:47 -0700268 log_namer_->WriteHeader(&node_state_[node_index].log_file_header, node);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800269 }
270}
Austin Schuh8bd96322020-02-13 21:18:22 -0800271
Austin Schuh2f8fd752020-09-01 22:38:28 -0700272void Logger::WriteMissingTimestamps() {
273 if (configuration::MultiNode(event_loop_->configuration())) {
274 server_statistics_fetcher_.Fetch();
275 } else {
276 return;
277 }
278
279 if (server_statistics_fetcher_.get() == nullptr) {
280 return;
281 }
282
283 for (const Node *node : log_namer_->nodes()) {
284 const int node_index =
285 configuration::GetNodeIndex(event_loop_->configuration(), node);
286 if (MaybeUpdateTimestamp(
287 node, node_index,
288 server_statistics_fetcher_.context().monotonic_event_time,
289 server_statistics_fetcher_.context().realtime_event_time)) {
Austin Schuh64fab802020-09-09 22:47:47 -0700290 log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700291 }
292 }
293}
294
295void Logger::SetStartTime(size_t node_index,
296 aos::monotonic_clock::time_point monotonic_start_time,
297 aos::realtime_clock::time_point realtime_start_time) {
298 node_state_[node_index].monotonic_start_time = monotonic_start_time;
299 node_state_[node_index].realtime_start_time = realtime_start_time;
300 node_state_[node_index]
301 .log_file_header.mutable_message()
302 ->mutate_monotonic_start_time(
303 std::chrono::duration_cast<std::chrono::nanoseconds>(
304 monotonic_start_time.time_since_epoch())
305 .count());
306 if (node_state_[node_index]
307 .log_file_header.mutable_message()
308 ->has_realtime_start_time()) {
309 node_state_[node_index]
310 .log_file_header.mutable_message()
311 ->mutate_realtime_start_time(
312 std::chrono::duration_cast<std::chrono::nanoseconds>(
313 realtime_start_time.time_since_epoch())
314 .count());
315 }
316}
317
318bool Logger::MaybeUpdateTimestamp(
319 const Node *node, int node_index,
320 aos::monotonic_clock::time_point monotonic_start_time,
321 aos::realtime_clock::time_point realtime_start_time) {
322 // Bail early if there the start times are already set.
323 if (node_state_[node_index].monotonic_start_time !=
324 monotonic_clock::min_time) {
325 return false;
326 }
327 if (configuration::MultiNode(event_loop_->configuration())) {
328 if (event_loop_->node() == node) {
329 // There are no offsets to compute for ourself, so always succeed.
330 SetStartTime(node_index, monotonic_start_time, realtime_start_time);
331 return true;
332 } else if (server_statistics_fetcher_.get() != nullptr) {
333 // We must be a remote node now. Look for the connection and see if it is
334 // connected.
335
336 for (const message_bridge::ServerConnection *connection :
337 *server_statistics_fetcher_->connections()) {
338 if (connection->node()->name()->string_view() !=
339 node->name()->string_view()) {
340 continue;
341 }
342
343 if (connection->state() != message_bridge::State::CONNECTED) {
344 VLOG(1) << node->name()->string_view()
345 << " is not connected, can't start it yet.";
346 break;
347 }
348
349 if (!connection->has_monotonic_offset()) {
350 VLOG(1) << "Missing monotonic offset for setting start time for node "
351 << aos::FlatbufferToJson(node);
352 break;
353 }
354
355 VLOG(1) << "Updating start time for " << aos::FlatbufferToJson(node);
356
357 // Found it and it is connected. Compensate and go.
358 monotonic_start_time +=
359 std::chrono::nanoseconds(connection->monotonic_offset());
360
361 SetStartTime(node_index, monotonic_start_time, realtime_start_time);
362 return true;
363 }
364 }
365 } else {
366 SetStartTime(node_index, monotonic_start_time, realtime_start_time);
367 return true;
368 }
369 return false;
370}
371
372aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> Logger::MakeHeader(
373 const Node *node) {
Austin Schuhfa895892020-01-07 20:07:41 -0800374 // Now write the header with this timestamp in it.
375 flatbuffers::FlatBufferBuilder fbb;
Austin Schuhd7b15da2020-02-17 15:06:11 -0800376 fbb.ForceDefaults(true);
Austin Schuhfa895892020-01-07 20:07:41 -0800377
Austin Schuh2f8fd752020-09-01 22:38:28 -0700378 // TODO(austin): Compress this much more efficiently. There are a bunch of
379 // duplicated schemas.
Austin Schuhfa895892020-01-07 20:07:41 -0800380 flatbuffers::Offset<aos::Configuration> configuration_offset =
381 CopyFlatBuffer(event_loop_->configuration(), &fbb);
382
Austin Schuh64fab802020-09-09 22:47:47 -0700383 flatbuffers::Offset<flatbuffers::String> name_offset =
Austin Schuhfa895892020-01-07 20:07:41 -0800384 fbb.CreateString(network::GetHostname());
385
Austin Schuh64fab802020-09-09 22:47:47 -0700386 flatbuffers::Offset<flatbuffers::String> logger_uuid_offset =
387 fbb.CreateString(uuid_.string_view());
388
389 flatbuffers::Offset<flatbuffers::String> parts_uuid_offset =
390 fbb.CreateString("00000000-0000-4000-8000-000000000000");
391
Austin Schuhfa895892020-01-07 20:07:41 -0800392 flatbuffers::Offset<Node> node_offset;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700393
394 if (configuration::MultiNode(event_loop_->configuration())) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800395 node_offset = CopyFlatBuffer(node, &fbb);
Austin Schuhfa895892020-01-07 20:07:41 -0800396 }
397
398 aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
399
Austin Schuh64fab802020-09-09 22:47:47 -0700400 log_file_header_builder.add_name(name_offset);
Austin Schuhfa895892020-01-07 20:07:41 -0800401
402 // Only add the node if we are running in a multinode configuration.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800403 if (node != nullptr) {
Austin Schuhfa895892020-01-07 20:07:41 -0800404 log_file_header_builder.add_node(node_offset);
405 }
406
407 log_file_header_builder.add_configuration(configuration_offset);
408 // The worst case theoretical out of order is the polling period times 2.
409 // One message could get logged right after the boundary, but be for right
410 // before the next boundary. And the reverse could happen for another
411 // message. Report back 3x to be extra safe, and because the cost isn't
412 // huge on the read side.
413 log_file_header_builder.add_max_out_of_order_duration(
414 std::chrono::duration_cast<std::chrono::nanoseconds>(3 * polling_period_)
415 .count());
416
417 log_file_header_builder.add_monotonic_start_time(
418 std::chrono::duration_cast<std::chrono::nanoseconds>(
Austin Schuh2f8fd752020-09-01 22:38:28 -0700419 monotonic_clock::min_time.time_since_epoch())
Austin Schuhfa895892020-01-07 20:07:41 -0800420 .count());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700421 if (node == event_loop_->node()) {
422 log_file_header_builder.add_realtime_start_time(
423 std::chrono::duration_cast<std::chrono::nanoseconds>(
424 realtime_clock::min_time.time_since_epoch())
425 .count());
Austin Schuh6f3babe2020-01-26 20:34:50 -0800426 }
427
Austin Schuh64fab802020-09-09 22:47:47 -0700428 log_file_header_builder.add_logger_uuid(logger_uuid_offset);
429
430 log_file_header_builder.add_parts_uuid(parts_uuid_offset);
431 log_file_header_builder.add_parts_index(0);
432
Austin Schuh2f8fd752020-09-01 22:38:28 -0700433 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
434 return fbb.Release();
435}
436
437void Logger::Rotate() {
438 for (const Node *node : log_namer_->nodes()) {
439 const int node_index =
440 configuration::GetNodeIndex(event_loop_->configuration(), node);
Austin Schuh64fab802020-09-09 22:47:47 -0700441 log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700442 }
443}
444
445void Logger::LogUntil(monotonic_clock::time_point t) {
446 WriteMissingTimestamps();
447
448 // Write each channel to disk, one at a time.
449 for (FetcherStruct &f : fetchers_) {
450 while (true) {
451 if (f.written) {
452 if (!f.fetcher->FetchNext()) {
453 VLOG(2) << "No new data on "
454 << configuration::CleanedChannelToString(
455 f.fetcher->channel());
456 break;
457 } else {
458 f.written = false;
459 }
460 }
461
462 CHECK(!f.written);
463
464 // TODO(james): Write tests to exercise this logic.
465 if (f.fetcher->context().monotonic_event_time < t) {
466 if (f.writer != nullptr) {
467 // Write!
468 flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
469 max_header_size_);
470 fbb.ForceDefaults(true);
471
472 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
473 f.channel_index, f.log_type));
474
475 VLOG(2) << "Writing data as node "
476 << FlatbufferToJson(event_loop_->node()) << " for channel "
477 << configuration::CleanedChannelToString(f.fetcher->channel())
478 << " to " << f.writer->filename() << " data "
479 << FlatbufferToJson(
480 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
481 fbb.GetBufferPointer()));
482
483 max_header_size_ = std::max(
484 max_header_size_, fbb.GetSize() - f.fetcher->context().size);
485 f.writer->QueueSizedFlatbuffer(&fbb);
486 }
487
488 if (f.timestamp_writer != nullptr) {
489 // And now handle timestamps.
490 flatbuffers::FlatBufferBuilder fbb;
491 fbb.ForceDefaults(true);
492
493 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
494 f.channel_index,
495 LogType::kLogDeliveryTimeOnly));
496
497 VLOG(2) << "Writing timestamps as node "
498 << FlatbufferToJson(event_loop_->node()) << " for channel "
499 << configuration::CleanedChannelToString(f.fetcher->channel())
500 << " to " << f.timestamp_writer->filename() << " timestamp "
501 << FlatbufferToJson(
502 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
503 fbb.GetBufferPointer()));
504
505 f.timestamp_writer->QueueSizedFlatbuffer(&fbb);
506 }
507
508 if (f.contents_writer != nullptr) {
509 // And now handle the special message contents channel. Copy the
510 // message into a FlatBufferBuilder and save it to disk.
511 // TODO(austin): We can be more efficient here when we start to
512 // care...
513 flatbuffers::FlatBufferBuilder fbb;
514 fbb.ForceDefaults(true);
515
516 const MessageHeader *msg =
517 flatbuffers::GetRoot<MessageHeader>(f.fetcher->context().data);
518
519 logger::MessageHeader::Builder message_header_builder(fbb);
520
521 // Note: this must match the same order as MessageBridgeServer and
522 // PackMessage. We want identical headers to have identical
523 // on-the-wire formats to make comparing them easier.
524 message_header_builder.add_channel_index(msg->channel_index());
525
526 message_header_builder.add_queue_index(msg->queue_index());
527 message_header_builder.add_monotonic_sent_time(
528 msg->monotonic_sent_time());
529 message_header_builder.add_realtime_sent_time(
530 msg->realtime_sent_time());
531
532 message_header_builder.add_monotonic_remote_time(
533 msg->monotonic_remote_time());
534 message_header_builder.add_realtime_remote_time(
535 msg->realtime_remote_time());
536 message_header_builder.add_remote_queue_index(
537 msg->remote_queue_index());
538
539 fbb.FinishSizePrefixed(message_header_builder.Finish());
540
541 f.contents_writer->QueueSizedFlatbuffer(&fbb);
542 }
543
544 f.written = true;
545 } else {
546 break;
547 }
548 }
549 }
550 last_synchronized_time_ = t;
Austin Schuhfa895892020-01-07 20:07:41 -0800551}
552
Austin Schuhe309d2a2019-11-29 13:25:21 -0800553void Logger::DoLogData() {
554 // We want to guarentee that messages aren't out of order by more than
555 // max_out_of_order_duration. To do this, we need sync points. Every write
556 // cycle should be a sync point.
Austin Schuhfa895892020-01-07 20:07:41 -0800557 const monotonic_clock::time_point monotonic_now =
558 event_loop_->monotonic_now();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800559
560 do {
561 // Move the sync point up by at most polling_period. This forces one sync
562 // per iteration, even if it is small.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700563 LogUntil(
564 std::min(last_synchronized_time_ + polling_period_, monotonic_now));
Austin Schuhe309d2a2019-11-29 13:25:21 -0800565
Austin Schuhe309d2a2019-11-29 13:25:21 -0800566 // If we missed cycles, we could be pretty far behind. Spin until we are
567 // caught up.
568 } while (last_synchronized_time_ + polling_period_ < monotonic_now);
Austin Schuhe309d2a2019-11-29 13:25:21 -0800569}
570
Austin Schuh5212cad2020-09-09 23:12:09 -0700571std::vector<std::vector<std::string>> SortParts(
572 const std::vector<std::string> &parts) {
573 // Start by grouping all parts by UUID, and extracting the part index.
574 std::map<std::string, std::vector<std::pair<std::string, int>>> parts_list;
575
576 // Sort part files without UUIDs and part indexes as well. Extract everything
577 // useful from the log in the first pass, then sort later.
578 struct LogPart {
579 std::string filename;
580 monotonic_clock::time_point start_time;
581 monotonic_clock::time_point first_message_time;
582 };
583
584 std::vector<LogPart> old_parts;
585
586 for (const std::string &part : parts) {
587 FlatbufferVector<LogFileHeader> log_header = ReadHeader(part);
588
589 // Looks like an old log. No UUID, index, and also single node. We have
590 // little to no multi-node log files in the wild without part UUIDs and
591 // indexes which we care much about.
592 if (!log_header.message().has_parts_uuid() &&
593 !log_header.message().has_parts_index() &&
594 !log_header.message().has_node()) {
595 LogPart log_part;
596 log_part.filename = part;
597 log_part.start_time = monotonic_clock::time_point(
598 chrono::nanoseconds(log_header.message().monotonic_start_time()));
599 FlatbufferVector<MessageHeader> first_message = ReadNthMessage(part, 0);
600 log_part.first_message_time = monotonic_clock::time_point(
601 chrono::nanoseconds(first_message.message().monotonic_sent_time()));
602 old_parts.emplace_back(std::move(log_part));
603 continue;
604 }
605
606 CHECK(log_header.message().has_parts_uuid());
607 CHECK(log_header.message().has_parts_index());
608
609 const std::string parts_uuid = log_header.message().parts_uuid()->str();
610 auto it = parts_list.find(parts_uuid);
611 if (it == parts_list.end()) {
612 it = parts_list
613 .insert(std::make_pair(
614 parts_uuid, std::vector<std::pair<std::string, int>>{}))
615 .first;
616 }
617 it->second.emplace_back(
618 std::make_pair(part, log_header.message().parts_index()));
619 }
620
621 CHECK_NE(old_parts.empty(), parts_list.empty())
622 << ": Can't have a mix of old and new parts.";
623
624 if (!old_parts.empty()) {
625 // Confirm they all have the same start time. Old loggers always used the
626 // same start time.
627 for (const LogPart &p : old_parts) {
628 CHECK_EQ(old_parts[0].start_time, p.start_time);
629 }
630 // Sort by the oldest message in each file.
631 std::sort(old_parts.begin(), old_parts.end(),
632 [](const LogPart &a, const LogPart &b) {
633 return a.first_message_time < b.first_message_time;
634 });
635
636 // Produce the final form.
637 std::vector<std::string> sorted_old_parts;
638 sorted_old_parts.reserve(old_parts.size());
639 for (LogPart &p : old_parts) {
640 sorted_old_parts.emplace_back(std::move(p.filename));
641 }
642 return std::vector<std::vector<std::string>>{std::move(sorted_old_parts)};
643 }
644
645 // Now, sort them and produce the final vector form.
646 std::vector<std::vector<std::string>> result;
647 result.reserve(parts_list.size());
648 for (auto &part : parts_list) {
649 std::sort(part.second.begin(), part.second.end(),
650 [](const std::pair<std::string, int> &a,
651 const std::pair<std::string, int> &b) {
652 return a.second < b.second;
653 });
654 std::vector<std::string> result_line;
655 result_line.reserve(part.second.size());
656 for (std::pair<std::string, int> &p : part.second) {
657 result_line.emplace_back(std::move(p.first));
658 }
659 result.emplace_back(std::move(result_line));
660 }
661 return result;
662}
663
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800664LogReader::LogReader(std::string_view filename,
665 const Configuration *replay_configuration)
Austin Schuhfa895892020-01-07 20:07:41 -0800666 : LogReader(std::vector<std::string>{std::string(filename)},
667 replay_configuration) {}
668
669LogReader::LogReader(const std::vector<std::string> &filenames,
670 const Configuration *replay_configuration)
Austin Schuh6f3babe2020-01-26 20:34:50 -0800671 : LogReader(std::vector<std::vector<std::string>>{filenames},
672 replay_configuration) {}
673
674LogReader::LogReader(const std::vector<std::vector<std::string>> &filenames,
675 const Configuration *replay_configuration)
676 : filenames_(filenames),
677 log_file_header_(ReadHeader(filenames[0][0])),
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800678 replay_configuration_(replay_configuration) {
Austin Schuh6331ef92020-01-07 18:28:09 -0800679 MakeRemappedConfig();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800680
Austin Schuh6aa77be2020-02-22 21:06:40 -0800681 if (replay_configuration) {
682 CHECK_EQ(configuration::MultiNode(configuration()),
683 configuration::MultiNode(replay_configuration))
Austin Schuh2f8fd752020-09-01 22:38:28 -0700684 << ": Log file and replay config need to both be multi or single "
685 "node.";
Austin Schuh6aa77be2020-02-22 21:06:40 -0800686 }
687
Austin Schuh6f3babe2020-01-26 20:34:50 -0800688 if (!configuration::MultiNode(configuration())) {
Austin Schuh858c9f32020-08-31 16:56:12 -0700689 states_.emplace_back(
690 std::make_unique<State>(std::make_unique<ChannelMerger>(filenames)));
Austin Schuh8bd96322020-02-13 21:18:22 -0800691 } else {
Austin Schuh6aa77be2020-02-22 21:06:40 -0800692 if (replay_configuration) {
James Kuszmaul46d82582020-05-09 19:50:09 -0700693 CHECK_EQ(logged_configuration()->nodes()->size(),
Austin Schuh6aa77be2020-02-22 21:06:40 -0800694 replay_configuration->nodes()->size())
Austin Schuh2f8fd752020-09-01 22:38:28 -0700695 << ": Log file and replay config need to have matching nodes "
696 "lists.";
James Kuszmaul46d82582020-05-09 19:50:09 -0700697 for (const Node *node : *logged_configuration()->nodes()) {
698 if (configuration::GetNode(replay_configuration, node) == nullptr) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700699 LOG(FATAL) << "Found node " << FlatbufferToJson(node)
700 << " in logged config that is not present in the replay "
701 "config.";
James Kuszmaul46d82582020-05-09 19:50:09 -0700702 }
703 }
Austin Schuh6aa77be2020-02-22 21:06:40 -0800704 }
Austin Schuh8bd96322020-02-13 21:18:22 -0800705 states_.resize(configuration()->nodes()->size());
Austin Schuh6f3babe2020-01-26 20:34:50 -0800706 }
Austin Schuhe309d2a2019-11-29 13:25:21 -0800707}
708
Austin Schuh6aa77be2020-02-22 21:06:40 -0800709LogReader::~LogReader() {
Austin Schuh39580f12020-08-01 14:44:08 -0700710 if (event_loop_factory_unique_ptr_) {
711 Deregister();
712 } else if (event_loop_factory_ != nullptr) {
713 LOG(FATAL) << "Must call Deregister before the SimulatedEventLoopFactory "
714 "is destroyed";
715 }
Austin Schuh8bd96322020-02-13 21:18:22 -0800716 if (offset_fp_ != nullptr) {
717 fclose(offset_fp_);
718 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700719 // Zero out some buffers. It's easy to do use-after-frees on these, so make
720 // it more obvious.
Austin Schuh39580f12020-08-01 14:44:08 -0700721 if (remapped_configuration_buffer_) {
722 remapped_configuration_buffer_->Wipe();
723 }
724 log_file_header_.Wipe();
Austin Schuh8bd96322020-02-13 21:18:22 -0800725}
Austin Schuhe309d2a2019-11-29 13:25:21 -0800726
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800727const Configuration *LogReader::logged_configuration() const {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800728 return log_file_header_.message().configuration();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800729}
730
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800731const Configuration *LogReader::configuration() const {
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800732 return remapped_configuration_;
733}
734
Austin Schuh6f3babe2020-01-26 20:34:50 -0800735std::vector<const Node *> LogReader::Nodes() const {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700736 // Because the Node pointer will only be valid if it actually points to
737 // memory owned by remapped_configuration_, we need to wait for the
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800738 // remapped_configuration_ to be populated before accessing it.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800739 //
740 // Also, note, that when ever a map is changed, the nodes in here are
741 // invalidated.
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800742 CHECK(remapped_configuration_ != nullptr)
743 << ": Need to call Register before the node() pointer will be valid.";
Austin Schuh6f3babe2020-01-26 20:34:50 -0800744 return configuration::GetNodes(remapped_configuration_);
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800745}
Austin Schuh15649d62019-12-28 16:36:38 -0800746
Austin Schuh6f3babe2020-01-26 20:34:50 -0800747monotonic_clock::time_point LogReader::monotonic_start_time(const Node *node) {
Austin Schuh8bd96322020-02-13 21:18:22 -0800748 State *state =
749 states_[configuration::GetNodeIndex(configuration(), node)].get();
750 CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
751
Austin Schuh858c9f32020-08-31 16:56:12 -0700752 return state->monotonic_start_time();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800753}
754
Austin Schuh6f3babe2020-01-26 20:34:50 -0800755realtime_clock::time_point LogReader::realtime_start_time(const Node *node) {
Austin Schuh8bd96322020-02-13 21:18:22 -0800756 State *state =
757 states_[configuration::GetNodeIndex(configuration(), node)].get();
758 CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
759
Austin Schuh858c9f32020-08-31 16:56:12 -0700760 return state->realtime_start_time();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800761}
762
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800763void LogReader::Register() {
764 event_loop_factory_unique_ptr_ =
Austin Schuhac0771c2020-01-07 18:36:30 -0800765 std::make_unique<SimulatedEventLoopFactory>(configuration());
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800766 Register(event_loop_factory_unique_ptr_.get());
767}
768
Austin Schuh92547522019-12-28 14:33:43 -0800769void LogReader::Register(SimulatedEventLoopFactory *event_loop_factory) {
Austin Schuh92547522019-12-28 14:33:43 -0800770 event_loop_factory_ = event_loop_factory;
Austin Schuh92547522019-12-28 14:33:43 -0800771
Austin Schuh6f3babe2020-01-26 20:34:50 -0800772 for (const Node *node : configuration::GetNodes(configuration())) {
Austin Schuh8bd96322020-02-13 21:18:22 -0800773 const size_t node_index =
774 configuration::GetNodeIndex(configuration(), node);
Austin Schuh858c9f32020-08-31 16:56:12 -0700775 states_[node_index] =
776 std::make_unique<State>(std::make_unique<ChannelMerger>(filenames_));
Austin Schuh8bd96322020-02-13 21:18:22 -0800777 State *state = states_[node_index].get();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800778
Austin Schuh858c9f32020-08-31 16:56:12 -0700779 Register(state->SetNodeEventLoopFactory(
780 event_loop_factory_->GetNodeEventLoopFactory(node)));
Austin Schuhcde938c2020-02-02 17:30:07 -0800781 }
James Kuszmaul46d82582020-05-09 19:50:09 -0700782 if (live_nodes_ == 0) {
783 LOG(FATAL)
784 << "Don't have logs from any of the nodes in the replay config--are "
785 "you sure that the replay config matches the original config?";
786 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800787
Austin Schuh2f8fd752020-09-01 22:38:28 -0700788 // We need to now seed our per-node time offsets and get everything set up
789 // to run.
790 const size_t num_nodes = nodes_count();
Austin Schuhcde938c2020-02-02 17:30:07 -0800791
Austin Schuh8bd96322020-02-13 21:18:22 -0800792 // It is easiest to solve for per node offsets with a matrix rather than
793 // trying to solve the equations by hand. So let's get after it.
794 //
795 // Now, build up the map matrix.
796 //
Austin Schuh2f8fd752020-09-01 22:38:28 -0700797 // offset_matrix_ = (map_matrix_ + slope_matrix_) * [ta; tb; tc]
798 map_matrix_ = Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>::Zero(
799 filters_.size() + 1, num_nodes);
800 slope_matrix_ =
801 Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>::Zero(
802 filters_.size() + 1, num_nodes);
Austin Schuhcde938c2020-02-02 17:30:07 -0800803
Austin Schuh2f8fd752020-09-01 22:38:28 -0700804 offset_matrix_ =
805 Eigen::Matrix<mpq_class, Eigen::Dynamic, 1>::Zero(filters_.size() + 1);
806 valid_matrix_ =
807 Eigen::Matrix<bool, Eigen::Dynamic, 1>::Zero(filters_.size() + 1);
808 last_valid_matrix_ =
809 Eigen::Matrix<bool, Eigen::Dynamic, 1>::Zero(filters_.size() + 1);
Austin Schuhcde938c2020-02-02 17:30:07 -0800810
Austin Schuh2f8fd752020-09-01 22:38:28 -0700811 time_offset_matrix_ = Eigen::VectorXd::Zero(num_nodes);
812 time_slope_matrix_ = Eigen::VectorXd::Zero(num_nodes);
Austin Schuh8bd96322020-02-13 21:18:22 -0800813
Austin Schuh2f8fd752020-09-01 22:38:28 -0700814 // All times should average out to the distributed clock.
815 for (int i = 0; i < map_matrix_.cols(); ++i) {
816 // 1/num_nodes.
817 map_matrix_(0, i) = mpq_class(1, num_nodes);
818 }
819 valid_matrix_(0) = true;
Austin Schuh8bd96322020-02-13 21:18:22 -0800820
821 {
822 // Now, add the a - b -> sample elements.
823 size_t i = 1;
824 for (std::pair<const std::tuple<const Node *, const Node *>,
Austin Schuh2f8fd752020-09-01 22:38:28 -0700825 std::tuple<message_bridge::NoncausalOffsetEstimator>>
826 &filter : filters_) {
Austin Schuh8bd96322020-02-13 21:18:22 -0800827 const Node *const node_a = std::get<0>(filter.first);
828 const Node *const node_b = std::get<1>(filter.first);
829
830 const size_t node_a_index =
831 configuration::GetNodeIndex(configuration(), node_a);
832 const size_t node_b_index =
833 configuration::GetNodeIndex(configuration(), node_b);
834
Austin Schuh2f8fd752020-09-01 22:38:28 -0700835 // -a
836 map_matrix_(i, node_a_index) = mpq_class(-1);
837 // +b
838 map_matrix_(i, node_b_index) = mpq_class(1);
Austin Schuh8bd96322020-02-13 21:18:22 -0800839
840 // -> sample
Austin Schuh2f8fd752020-09-01 22:38:28 -0700841 std::get<0>(filter.second)
842 .set_slope_pointer(&slope_matrix_(i, node_a_index));
843 std::get<0>(filter.second).set_offset_pointer(&offset_matrix_(i, 0));
844
845 valid_matrix_(i) = false;
846 std::get<0>(filter.second).set_valid_pointer(&valid_matrix_(i));
Austin Schuh8bd96322020-02-13 21:18:22 -0800847
848 ++i;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800849 }
850 }
851
Austin Schuh858c9f32020-08-31 16:56:12 -0700852 for (std::unique_ptr<State> &state : states_) {
853 state->SeedSortedMessages();
854 }
855
Austin Schuh2f8fd752020-09-01 22:38:28 -0700856 // Rank of the map matrix tells you if all the nodes are in communication
857 // with each other, which tells you if the offsets are observable.
858 const size_t connected_nodes =
859 Eigen::FullPivLU<
860 Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>>(map_matrix_)
861 .rank();
862
863 // We don't need to support isolated nodes until someone has a real use
864 // case.
865 CHECK_EQ(connected_nodes, num_nodes)
866 << ": There is a node which isn't communicating with the rest.";
867
868 // And solve.
Austin Schuh8bd96322020-02-13 21:18:22 -0800869 UpdateOffsets();
870
Austin Schuh2f8fd752020-09-01 22:38:28 -0700871 // We want to start the log file at the last start time of the log files
872 // from all the nodes. Compute how long each node's simulation needs to run
873 // to move time to this point.
Austin Schuh8bd96322020-02-13 21:18:22 -0800874 distributed_clock::time_point start_time = distributed_clock::min_time;
Austin Schuhcde938c2020-02-02 17:30:07 -0800875
Austin Schuh2f8fd752020-09-01 22:38:28 -0700876 // TODO(austin): We want an "OnStart" callback for each node rather than
877 // running until the last node.
878
Austin Schuh8bd96322020-02-13 21:18:22 -0800879 for (std::unique_ptr<State> &state : states_) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700880 VLOG(1) << "Start time is " << state->monotonic_start_time() << " for node "
881 << MaybeNodeName(state->event_loop()->node()) << "now "
882 << state->monotonic_now();
883 // And start computing the start time on the distributed clock now that
884 // that works.
Austin Schuh858c9f32020-08-31 16:56:12 -0700885 start_time = std::max(
886 start_time, state->ToDistributedClock(state->monotonic_start_time()));
Austin Schuhcde938c2020-02-02 17:30:07 -0800887 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700888
889 CHECK_GE(start_time, distributed_clock::epoch())
890 << ": Hmm, we have a node starting before the start of time. Offset "
891 "everything.";
Austin Schuhcde938c2020-02-02 17:30:07 -0800892
Austin Schuh6f3babe2020-01-26 20:34:50 -0800893 // Forwarding is tracked per channel. If it is enabled, we want to turn it
894 // off. Otherwise messages replayed will get forwarded across to the other
Austin Schuh2f8fd752020-09-01 22:38:28 -0700895 // nodes, and also replayed on the other nodes. This may not satisfy all
896 // our users, but it'll start the discussion.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800897 if (configuration::MultiNode(event_loop_factory_->configuration())) {
898 for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
899 const Channel *channel = logged_configuration()->channels()->Get(i);
900 const Node *node = configuration::GetNode(
901 configuration(), channel->source_node()->string_view());
902
Austin Schuh8bd96322020-02-13 21:18:22 -0800903 State *state =
904 states_[configuration::GetNodeIndex(configuration(), node)].get();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800905
906 const Channel *remapped_channel =
Austin Schuh858c9f32020-08-31 16:56:12 -0700907 RemapChannel(state->event_loop(), channel);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800908
909 event_loop_factory_->DisableForwarding(remapped_channel);
910 }
Austin Schuh4c3b9702020-08-30 11:34:55 -0700911
912 // If we are replaying a log, we don't want a bunch of redundant messages
913 // from both the real message bridge and simulated message bridge.
914 event_loop_factory_->DisableStatistics();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800915 }
916
Austin Schuhcde938c2020-02-02 17:30:07 -0800917 // While we are starting the system up, we might be relying on matching data
918 // to timestamps on log files where the timestamp log file starts before the
919 // data. In this case, it is reasonable to expect missing data.
920 ignore_missing_data_ = true;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700921 VLOG(1) << "Running until " << start_time << " in Register";
Austin Schuh8bd96322020-02-13 21:18:22 -0800922 event_loop_factory_->RunFor(start_time.time_since_epoch());
Brian Silverman8a32ce62020-08-12 12:02:38 -0700923 VLOG(1) << "At start time";
Austin Schuhcde938c2020-02-02 17:30:07 -0800924 // Now that we are running for real, missing data means that the log file is
925 // corrupted or went wrong.
926 ignore_missing_data_ = false;
Austin Schuh92547522019-12-28 14:33:43 -0800927
Austin Schuh8bd96322020-02-13 21:18:22 -0800928 for (std::unique_ptr<State> &state : states_) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700929 // Make the RT clock be correct before handing it to the user.
930 if (state->realtime_start_time() != realtime_clock::min_time) {
931 state->SetRealtimeOffset(state->monotonic_start_time(),
932 state->realtime_start_time());
933 }
934 VLOG(1) << "Start time is " << state->monotonic_start_time() << " for node "
935 << MaybeNodeName(state->event_loop()->node()) << "now "
936 << state->monotonic_now();
937 }
938
939 if (FLAGS_timestamps_to_csv) {
940 for (std::pair<const std::tuple<const Node *, const Node *>,
941 std::tuple<message_bridge::NoncausalOffsetEstimator>>
942 &filter : filters_) {
943 const Node *const node_a = std::get<0>(filter.first);
944 const Node *const node_b = std::get<1>(filter.first);
945
946 std::get<0>(filter.second)
947 .SetFirstFwdTime(event_loop_factory_->GetNodeEventLoopFactory(node_a)
948 ->monotonic_now());
949 std::get<0>(filter.second)
950 .SetFirstRevTime(event_loop_factory_->GetNodeEventLoopFactory(node_b)
951 ->monotonic_now());
952 }
Austin Schuh8bd96322020-02-13 21:18:22 -0800953 }
954}
955
Austin Schuh2f8fd752020-09-01 22:38:28 -0700956void LogReader::UpdateOffsets() {
957 VLOG(2) << "Samples are " << offset_matrix_;
958 VLOG(2) << "Map is " << (map_matrix_ + slope_matrix_);
959 std::tie(time_slope_matrix_, time_offset_matrix_) = SolveOffsets();
960 Eigen::IOFormat HeavyFmt(Eigen::FullPrecision, 0, ", ", ";\n", "[", "]", "[",
961 "]");
962 VLOG(1) << "First slope " << time_slope_matrix_.transpose().format(HeavyFmt)
963 << " offset " << time_offset_matrix_.transpose().format(HeavyFmt);
964
965 size_t node_index = 0;
966 for (std::unique_ptr<State> &state : states_) {
967 state->SetDistributedOffset(offset(node_index), slope(node_index));
968 VLOG(1) << "Offset for node " << node_index << " "
969 << MaybeNodeName(state->event_loop()->node()) << "is "
970 << aos::distributed_clock::time_point(offset(node_index))
971 << " slope " << std::setprecision(9) << std::fixed
972 << slope(node_index);
973 ++node_index;
974 }
975
976 if (VLOG_IS_ON(1)) {
977 LogFit("Offset is");
978 }
979}
980
981void LogReader::LogFit(std::string_view prefix) {
982 for (std::unique_ptr<State> &state : states_) {
983 VLOG(1) << MaybeNodeName(state->event_loop()->node()) << " now "
984 << state->monotonic_now() << " distributed "
985 << event_loop_factory_->distributed_now();
986 }
987
988 for (std::pair<const std::tuple<const Node *, const Node *>,
989 std::tuple<message_bridge::NoncausalOffsetEstimator>> &filter :
990 filters_) {
991 message_bridge::NoncausalOffsetEstimator *estimator =
992 &std::get<0>(filter.second);
993
994 if (estimator->a_timestamps().size() == 0 &&
995 estimator->b_timestamps().size() == 0) {
996 continue;
997 }
998
999 if (VLOG_IS_ON(1)) {
1000 estimator->LogFit(prefix);
1001 }
1002
1003 const Node *const node_a = std::get<0>(filter.first);
1004 const Node *const node_b = std::get<1>(filter.first);
1005
1006 const size_t node_a_index =
1007 configuration::GetNodeIndex(configuration(), node_a);
1008 const size_t node_b_index =
1009 configuration::GetNodeIndex(configuration(), node_b);
1010
1011 const double recovered_slope =
1012 slope(node_b_index) / slope(node_a_index) - 1.0;
1013 const int64_t recovered_offset =
1014 offset(node_b_index).count() - offset(node_a_index).count() *
1015 slope(node_b_index) /
1016 slope(node_a_index);
1017
1018 VLOG(1) << "Recovered slope " << std::setprecision(20) << recovered_slope
1019 << " (error " << recovered_slope - estimator->fit().slope() << ") "
1020 << " offset " << std::setprecision(20) << recovered_offset
1021 << " (error "
1022 << recovered_offset - estimator->fit().offset().count() << ")";
1023
1024 const aos::distributed_clock::time_point a0 =
1025 states_[node_a_index]->ToDistributedClock(
1026 std::get<0>(estimator->a_timestamps()[0]));
1027 const aos::distributed_clock::time_point a1 =
1028 states_[node_a_index]->ToDistributedClock(
1029 std::get<0>(estimator->a_timestamps()[1]));
1030
1031 VLOG(1) << node_a->name()->string_view() << " timestamps()[0] = "
1032 << std::get<0>(estimator->a_timestamps()[0]) << " -> " << a0
1033 << " distributed -> " << node_b->name()->string_view() << " "
1034 << states_[node_b_index]->FromDistributedClock(a0) << " should be "
1035 << aos::monotonic_clock::time_point(
1036 std::chrono::nanoseconds(static_cast<int64_t>(
1037 std::get<0>(estimator->a_timestamps()[0])
1038 .time_since_epoch()
1039 .count() *
1040 (1.0 + estimator->fit().slope()))) +
1041 estimator->fit().offset())
1042 << ((a0 <= event_loop_factory_->distributed_now())
1043 ? ""
1044 : " After now, investigate");
1045 VLOG(1) << node_a->name()->string_view() << " timestamps()[1] = "
1046 << std::get<0>(estimator->a_timestamps()[1]) << " -> " << a1
1047 << " distributed -> " << node_b->name()->string_view() << " "
1048 << states_[node_b_index]->FromDistributedClock(a1) << " should be "
1049 << aos::monotonic_clock::time_point(
1050 std::chrono::nanoseconds(static_cast<int64_t>(
1051 std::get<0>(estimator->a_timestamps()[1])
1052 .time_since_epoch()
1053 .count() *
1054 (1.0 + estimator->fit().slope()))) +
1055 estimator->fit().offset())
1056 << ((event_loop_factory_->distributed_now() <= a1)
1057 ? ""
1058 : " Before now, investigate");
1059
1060 const aos::distributed_clock::time_point b0 =
1061 states_[node_b_index]->ToDistributedClock(
1062 std::get<0>(estimator->b_timestamps()[0]));
1063 const aos::distributed_clock::time_point b1 =
1064 states_[node_b_index]->ToDistributedClock(
1065 std::get<0>(estimator->b_timestamps()[1]));
1066
1067 VLOG(1) << node_b->name()->string_view() << " timestamps()[0] = "
1068 << std::get<0>(estimator->b_timestamps()[0]) << " -> " << b0
1069 << " distributed -> " << node_a->name()->string_view() << " "
1070 << states_[node_a_index]->FromDistributedClock(b0)
1071 << ((b0 <= event_loop_factory_->distributed_now())
1072 ? ""
1073 : " After now, investigate");
1074 VLOG(1) << node_b->name()->string_view() << " timestamps()[1] = "
1075 << std::get<0>(estimator->b_timestamps()[1]) << " -> " << b1
1076 << " distributed -> " << node_a->name()->string_view() << " "
1077 << states_[node_a_index]->FromDistributedClock(b1)
1078 << ((event_loop_factory_->distributed_now() <= b1)
1079 ? ""
1080 : " Before now, investigate");
1081 }
1082}
1083
1084message_bridge::NoncausalOffsetEstimator *LogReader::GetFilter(
Austin Schuh8bd96322020-02-13 21:18:22 -08001085 const Node *node_a, const Node *node_b) {
1086 CHECK_NE(node_a, node_b);
1087 CHECK_EQ(configuration::GetNode(configuration(), node_a), node_a);
1088 CHECK_EQ(configuration::GetNode(configuration(), node_b), node_b);
1089
1090 if (node_a > node_b) {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001091 return GetFilter(node_b, node_a);
Austin Schuh8bd96322020-02-13 21:18:22 -08001092 }
1093
1094 auto tuple = std::make_tuple(node_a, node_b);
1095
1096 auto it = filters_.find(tuple);
1097
1098 if (it == filters_.end()) {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001099 auto &x =
1100 filters_
1101 .insert(std::make_pair(
1102 tuple, std::make_tuple(message_bridge::NoncausalOffsetEstimator(
1103 node_a, node_b))))
1104 .first->second;
Austin Schuh8bd96322020-02-13 21:18:22 -08001105 if (FLAGS_timestamps_to_csv) {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001106 std::get<0>(x).SetFwdCsvFileName(absl::StrCat(
1107 "/tmp/timestamp_noncausal_", node_a->name()->string_view(), "_",
1108 node_b->name()->string_view()));
1109 std::get<0>(x).SetRevCsvFileName(absl::StrCat(
1110 "/tmp/timestamp_noncausal_", node_b->name()->string_view(), "_",
1111 node_a->name()->string_view()));
Austin Schuh8bd96322020-02-13 21:18:22 -08001112 }
1113
Austin Schuh2f8fd752020-09-01 22:38:28 -07001114 return &std::get<0>(x);
Austin Schuh8bd96322020-02-13 21:18:22 -08001115 } else {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001116 return &std::get<0>(it->second);
Austin Schuh8bd96322020-02-13 21:18:22 -08001117 }
1118}
1119
Austin Schuh8bd96322020-02-13 21:18:22 -08001120
Austin Schuhe309d2a2019-11-29 13:25:21 -08001121void LogReader::Register(EventLoop *event_loop) {
Austin Schuh8bd96322020-02-13 21:18:22 -08001122 State *state =
1123 states_[configuration::GetNodeIndex(configuration(), event_loop->node())]
1124 .get();
Austin Schuh6f3babe2020-01-26 20:34:50 -08001125
Austin Schuh858c9f32020-08-31 16:56:12 -07001126 state->set_event_loop(event_loop);
Austin Schuhe309d2a2019-11-29 13:25:21 -08001127
Tyler Chatow67ddb032020-01-12 14:30:04 -08001128 // We don't run timing reports when trying to print out logged data, because
1129 // otherwise we would end up printing out the timing reports themselves...
1130 // This is only really relevant when we are replaying into a simulation.
Austin Schuh6f3babe2020-01-26 20:34:50 -08001131 event_loop->SkipTimingReport();
1132 event_loop->SkipAosLog();
Austin Schuh39788ff2019-12-01 18:22:57 -08001133
Austin Schuh858c9f32020-08-31 16:56:12 -07001134 const bool has_data = state->SetNode();
Austin Schuhe309d2a2019-11-29 13:25:21 -08001135
Austin Schuh858c9f32020-08-31 16:56:12 -07001136 state->SetChannelCount(logged_configuration()->channels()->size());
Austin Schuh8bd96322020-02-13 21:18:22 -08001137
Austin Schuh858c9f32020-08-31 16:56:12 -07001138 for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
Austin Schuh6f3babe2020-01-26 20:34:50 -08001139 const Channel *channel =
1140 RemapChannel(event_loop, logged_configuration()->channels()->Get(i));
Austin Schuh6331ef92020-01-07 18:28:09 -08001141
Austin Schuh858c9f32020-08-31 16:56:12 -07001142 NodeEventLoopFactory *channel_target_event_loop_factory = nullptr;
Austin Schuh2f8fd752020-09-01 22:38:28 -07001143 message_bridge::NoncausalOffsetEstimator *filter = nullptr;
Austin Schuh8bd96322020-02-13 21:18:22 -08001144
1145 if (!configuration::ChannelIsSendableOnNode(channel, event_loop->node()) &&
1146 configuration::ChannelIsReadableOnNode(channel, event_loop->node())) {
1147 const Node *target_node = configuration::GetNode(
1148 event_loop->configuration(), channel->source_node()->string_view());
Austin Schuh858c9f32020-08-31 16:56:12 -07001149 filter = GetFilter(event_loop->node(), target_node);
Austin Schuh8bd96322020-02-13 21:18:22 -08001150
1151 if (event_loop_factory_ != nullptr) {
Austin Schuh858c9f32020-08-31 16:56:12 -07001152 channel_target_event_loop_factory =
Austin Schuh8bd96322020-02-13 21:18:22 -08001153 event_loop_factory_->GetNodeEventLoopFactory(target_node);
1154 }
1155 }
Austin Schuh858c9f32020-08-31 16:56:12 -07001156
1157 state->SetChannel(i, event_loop->MakeRawSender(channel), filter,
1158 channel_target_event_loop_factory);
Austin Schuhe309d2a2019-11-29 13:25:21 -08001159 }
1160
Austin Schuh6aa77be2020-02-22 21:06:40 -08001161 // If we didn't find any log files with data in them, we won't ever get a
1162 // callback or be live. So skip the rest of the setup.
1163 if (!has_data) {
1164 return;
1165 }
1166
Austin Schuh858c9f32020-08-31 16:56:12 -07001167 state->set_timer_handler(event_loop->AddTimer([this, state]() {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001168 VLOG(1) << "Starting sending " << MaybeNodeName(state->event_loop()->node())
1169 << "at " << state->event_loop()->context().monotonic_event_time
1170 << " now " << state->monotonic_now();
Austin Schuh858c9f32020-08-31 16:56:12 -07001171 if (state->OldestMessageTime() == monotonic_clock::max_time) {
Austin Schuh6f3babe2020-01-26 20:34:50 -08001172 --live_nodes_;
Austin Schuh2f8fd752020-09-01 22:38:28 -07001173 VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Node down!";
Austin Schuh6f3babe2020-01-26 20:34:50 -08001174 if (live_nodes_ == 0) {
1175 event_loop_factory_->Exit();
1176 }
James Kuszmaul314f1672020-01-03 20:02:08 -08001177 return;
1178 }
Austin Schuh6f3babe2020-01-26 20:34:50 -08001179 TimestampMerger::DeliveryTimestamp channel_timestamp;
Austin Schuh05b70472020-01-01 17:11:17 -08001180 int channel_index;
1181 FlatbufferVector<MessageHeader> channel_data =
1182 FlatbufferVector<MessageHeader>::Empty();
1183
Austin Schuh2f8fd752020-09-01 22:38:28 -07001184 if (VLOG_IS_ON(1)) {
1185 LogFit("Offset was");
1186 }
1187
1188 bool update_time;
Austin Schuh05b70472020-01-01 17:11:17 -08001189 std::tie(channel_timestamp, channel_index, channel_data) =
Austin Schuh2f8fd752020-09-01 22:38:28 -07001190 state->PopOldest(&update_time);
Austin Schuh05b70472020-01-01 17:11:17 -08001191
Austin Schuhe309d2a2019-11-29 13:25:21 -08001192 const monotonic_clock::time_point monotonic_now =
Austin Schuh858c9f32020-08-31 16:56:12 -07001193 state->event_loop()->context().monotonic_event_time;
Austin Schuh2f8fd752020-09-01 22:38:28 -07001194 if (!FLAGS_skip_order_validation) {
1195 CHECK(monotonic_now == channel_timestamp.monotonic_event_time)
1196 << ": " << FlatbufferToJson(state->event_loop()->node()) << " Now "
1197 << monotonic_now << " trying to send "
1198 << channel_timestamp.monotonic_event_time << " failure "
1199 << state->DebugString();
1200 } else if (monotonic_now != channel_timestamp.monotonic_event_time) {
1201 LOG(WARNING) << "Check failed: monotonic_now == "
1202 "channel_timestamp.monotonic_event_time) ("
1203 << monotonic_now << " vs. "
1204 << channel_timestamp.monotonic_event_time
1205 << "): " << FlatbufferToJson(state->event_loop()->node())
1206 << " Now " << monotonic_now << " trying to send "
1207 << channel_timestamp.monotonic_event_time << " failure "
1208 << state->DebugString();
1209 }
Austin Schuhe309d2a2019-11-29 13:25:21 -08001210
Austin Schuh6f3babe2020-01-26 20:34:50 -08001211 if (channel_timestamp.monotonic_event_time >
Austin Schuh858c9f32020-08-31 16:56:12 -07001212 state->monotonic_start_time() ||
Austin Schuh15649d62019-12-28 16:36:38 -08001213 event_loop_factory_ != nullptr) {
Austin Schuh8bd96322020-02-13 21:18:22 -08001214 if ((!ignore_missing_data_ && !FLAGS_skip_missing_forwarding_entries &&
Austin Schuh858c9f32020-08-31 16:56:12 -07001215 !state->at_end()) ||
Austin Schuh05b70472020-01-01 17:11:17 -08001216 channel_data.message().data() != nullptr) {
1217 CHECK(channel_data.message().data() != nullptr)
1218 << ": Got a message without data. Forwarding entry which was "
Austin Schuh2f8fd752020-09-01 22:38:28 -07001219 "not matched? Use --skip_missing_forwarding_entries to "
1220 "ignore "
Austin Schuh15649d62019-12-28 16:36:38 -08001221 "this.";
Austin Schuh92547522019-12-28 14:33:43 -08001222
Austin Schuh2f8fd752020-09-01 22:38:28 -07001223 if (update_time) {
Austin Schuh8bd96322020-02-13 21:18:22 -08001224 // Confirm that the message was sent on the sending node before the
1225 // destination node (this node). As a proxy, do this by making sure
1226 // that time on the source node is past when the message was sent.
Austin Schuh2f8fd752020-09-01 22:38:28 -07001227 if (!FLAGS_skip_order_validation) {
1228 CHECK_LT(channel_timestamp.monotonic_remote_time,
1229 state->monotonic_remote_now(channel_index))
1230 << state->event_loop()->node()->name()->string_view() << " to "
1231 << state->remote_node(channel_index)->name()->string_view()
1232 << " " << state->DebugString();
1233 } else if (channel_timestamp.monotonic_remote_time >=
1234 state->monotonic_remote_now(channel_index)) {
1235 LOG(WARNING)
1236 << "Check failed: channel_timestamp.monotonic_remote_time < "
1237 "state->monotonic_remote_now(channel_index) ("
1238 << channel_timestamp.monotonic_remote_time << " vs. "
1239 << state->monotonic_remote_now(channel_index) << ") "
1240 << state->event_loop()->node()->name()->string_view() << " to "
1241 << state->remote_node(channel_index)->name()->string_view()
1242 << " currently " << channel_timestamp.monotonic_event_time
1243 << " ("
1244 << state->ToDistributedClock(
1245 channel_timestamp.monotonic_event_time)
1246 << ") remote event time "
1247 << channel_timestamp.monotonic_remote_time << " ("
1248 << state->RemoteToDistributedClock(
1249 channel_index, channel_timestamp.monotonic_remote_time)
1250 << ") " << state->DebugString();
1251 }
Austin Schuh8bd96322020-02-13 21:18:22 -08001252
1253 if (FLAGS_timestamps_to_csv) {
1254 if (offset_fp_ == nullptr) {
1255 offset_fp_ = fopen("/tmp/offsets.csv", "w");
1256 fprintf(
1257 offset_fp_,
1258 "# time_since_start, offset node 0, offset node 1, ...\n");
1259 first_time_ = channel_timestamp.realtime_event_time;
1260 }
1261
1262 fprintf(offset_fp_, "%.9f",
1263 std::chrono::duration_cast<std::chrono::duration<double>>(
1264 channel_timestamp.realtime_event_time - first_time_)
1265 .count());
Austin Schuh2f8fd752020-09-01 22:38:28 -07001266 for (int i = 1; i < time_offset_matrix_.rows(); ++i) {
1267 fprintf(offset_fp_, ", %.9f",
1268 time_offset_matrix_(i, 0) +
1269 time_slope_matrix_(i, 0) *
1270 chrono::duration<double>(
1271 event_loop_factory_->distributed_now()
1272 .time_since_epoch())
1273 .count());
Austin Schuh8bd96322020-02-13 21:18:22 -08001274 }
1275 fprintf(offset_fp_, "\n");
1276 }
Austin Schuh8bd96322020-02-13 21:18:22 -08001277 }
1278
Austin Schuh15649d62019-12-28 16:36:38 -08001279 // If we have access to the factory, use it to fix the realtime time.
Austin Schuh858c9f32020-08-31 16:56:12 -07001280 state->SetRealtimeOffset(channel_timestamp.monotonic_event_time,
1281 channel_timestamp.realtime_event_time);
Austin Schuh15649d62019-12-28 16:36:38 -08001282
Austin Schuh2f8fd752020-09-01 22:38:28 -07001283 VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Sending "
1284 << channel_timestamp.monotonic_event_time;
1285 // TODO(austin): std::move channel_data in and make that efficient in
1286 // simulation.
Austin Schuh858c9f32020-08-31 16:56:12 -07001287 state->Send(channel_index, channel_data.message().data()->Data(),
1288 channel_data.message().data()->size(),
1289 channel_timestamp.monotonic_remote_time,
1290 channel_timestamp.realtime_remote_time,
1291 channel_timestamp.remote_queue_index);
Austin Schuh2f8fd752020-09-01 22:38:28 -07001292 } else if (state->at_end() && !ignore_missing_data_) {
Austin Schuh8bd96322020-02-13 21:18:22 -08001293 // We are at the end of the log file and found missing data. Finish
Austin Schuh2f8fd752020-09-01 22:38:28 -07001294 // reading the rest of the log file and call it quits. We don't want
1295 // to replay partial data.
Austin Schuh858c9f32020-08-31 16:56:12 -07001296 while (state->OldestMessageTime() != monotonic_clock::max_time) {
1297 bool update_time_dummy;
1298 state->PopOldest(&update_time_dummy);
Austin Schuh8bd96322020-02-13 21:18:22 -08001299 }
Austin Schuh2f8fd752020-09-01 22:38:28 -07001300 } else {
1301 CHECK(channel_data.message().data() == nullptr) << ": Nullptr";
Austin Schuh92547522019-12-28 14:33:43 -08001302 }
Austin Schuhe309d2a2019-11-29 13:25:21 -08001303 } else {
Austin Schuh6f3babe2020-01-26 20:34:50 -08001304 LOG(WARNING)
1305 << "Not sending data from before the start of the log file. "
1306 << channel_timestamp.monotonic_event_time.time_since_epoch().count()
1307 << " start " << monotonic_start_time().time_since_epoch().count()
1308 << " " << FlatbufferToJson(channel_data);
Austin Schuhe309d2a2019-11-29 13:25:21 -08001309 }
1310
Austin Schuh858c9f32020-08-31 16:56:12 -07001311 const monotonic_clock::time_point next_time = state->OldestMessageTime();
Austin Schuh6f3babe2020-01-26 20:34:50 -08001312 if (next_time != monotonic_clock::max_time) {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001313 VLOG(1) << "Scheduling " << MaybeNodeName(state->event_loop()->node())
1314 << "wakeup for " << next_time << "("
1315 << state->ToDistributedClock(next_time)
1316 << " distributed), now is " << state->monotonic_now();
Austin Schuh858c9f32020-08-31 16:56:12 -07001317 state->Setup(next_time);
James Kuszmaul314f1672020-01-03 20:02:08 -08001318 } else {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001319 VLOG(1) << MaybeNodeName(state->event_loop()->node())
1320 << "No next message, scheduling shutdown";
1321 // Set a timer up immediately after now to die. If we don't do this,
1322 // then the senders waiting on the message we just read will never get
1323 // called.
Austin Schuheecb9282020-01-08 17:43:30 -08001324 if (event_loop_factory_ != nullptr) {
Austin Schuh858c9f32020-08-31 16:56:12 -07001325 state->Setup(monotonic_now + event_loop_factory_->send_delay() +
1326 std::chrono::nanoseconds(1));
Austin Schuheecb9282020-01-08 17:43:30 -08001327 }
Austin Schuhe309d2a2019-11-29 13:25:21 -08001328 }
Austin Schuh8bd96322020-02-13 21:18:22 -08001329
Austin Schuh2f8fd752020-09-01 22:38:28 -07001330 // Once we make this call, the current time changes. So do everything
1331 // which involves time before changing it. That especially includes
1332 // sending the message.
1333 if (update_time) {
1334 VLOG(1) << MaybeNodeName(state->event_loop()->node())
1335 << "updating offsets";
1336
1337 std::vector<aos::monotonic_clock::time_point> before_times;
1338 before_times.resize(states_.size());
1339 std::transform(states_.begin(), states_.end(), before_times.begin(),
1340 [](const std::unique_ptr<State> &state) {
1341 return state->monotonic_now();
1342 });
1343
1344 for (size_t i = 0; i < states_.size(); ++i) {
1345 VLOG(1) << MaybeNodeName(
1346 states_[i]->event_loop()->node())
1347 << "before " << states_[i]->monotonic_now();
1348 }
1349
Austin Schuh8bd96322020-02-13 21:18:22 -08001350 UpdateOffsets();
Austin Schuh2f8fd752020-09-01 22:38:28 -07001351 VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Now is now "
1352 << state->monotonic_now();
1353
1354 for (size_t i = 0; i < states_.size(); ++i) {
1355 VLOG(1) << MaybeNodeName(
1356 states_[i]->event_loop()->node())
1357 << "after " << states_[i]->monotonic_now();
1358 }
1359
1360 // TODO(austin): We should be perfect.
1361 const std::chrono::nanoseconds kTolerance{3};
1362 if (!FLAGS_skip_order_validation) {
1363 CHECK_GE(next_time, state->monotonic_now())
1364 << ": Time skipped the next event.";
1365
1366 for (size_t i = 0; i < states_.size(); ++i) {
1367 CHECK_GE(states_[i]->monotonic_now(), before_times[i] - kTolerance)
1368 << ": Time changed too much on node "
1369 << MaybeNodeName(states_[i]->event_loop()->node());
1370 CHECK_LE(states_[i]->monotonic_now(), before_times[i] + kTolerance)
1371 << ": Time changed too much on node "
1372 << states_[i]->event_loop()->node()->name()->string_view();
1373 }
1374 } else {
1375 if (next_time < state->monotonic_now()) {
1376 LOG(WARNING) << "Check failed: next_time >= "
1377 "state->monotonic_now() ("
1378 << next_time << " vs. " << state->monotonic_now()
1379 << "): Time skipped the next event.";
1380 }
1381 for (size_t i = 0; i < states_.size(); ++i) {
1382 if (states_[i]->monotonic_now() >= before_times[i] - kTolerance) {
1383 LOG(WARNING) << "Check failed: "
1384 "states_[i]->monotonic_now() "
1385 ">= before_times[i] - kTolerance ("
1386 << states_[i]->monotonic_now() << " vs. "
1387 << before_times[i] - kTolerance
1388 << ") : Time changed too much on node "
1389 << MaybeNodeName(states_[i]->event_loop()->node());
1390 }
1391 if (states_[i]->monotonic_now() <= before_times[i] + kTolerance) {
1392 LOG(WARNING) << "Check failed: "
1393 "states_[i]->monotonic_now() "
1394 "<= before_times[i] + kTolerance ("
1395 << states_[i]->monotonic_now() << " vs. "
1396 << before_times[i] - kTolerance
1397 << ") : Time changed too much on node "
1398 << MaybeNodeName(states_[i]->event_loop()->node());
1399 }
1400 }
1401 }
Austin Schuh8bd96322020-02-13 21:18:22 -08001402 }
Austin Schuh2f8fd752020-09-01 22:38:28 -07001403
1404 VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Done sending at "
1405 << state->event_loop()->context().monotonic_event_time << " now "
1406 << state->monotonic_now();
Austin Schuh858c9f32020-08-31 16:56:12 -07001407 }));
Austin Schuhe309d2a2019-11-29 13:25:21 -08001408
Austin Schuh6f3babe2020-01-26 20:34:50 -08001409 ++live_nodes_;
1410
Austin Schuh858c9f32020-08-31 16:56:12 -07001411 if (state->OldestMessageTime() != monotonic_clock::max_time) {
1412 event_loop->OnRun([state]() { state->Setup(state->OldestMessageTime()); });
Austin Schuhe309d2a2019-11-29 13:25:21 -08001413 }
1414}
1415
1416void LogReader::Deregister() {
James Kuszmaul84ff3e52020-01-03 19:48:53 -08001417 // Make sure that things get destroyed in the correct order, rather than
1418 // relying on getting the order correct in the class definition.
Austin Schuh8bd96322020-02-13 21:18:22 -08001419 for (std::unique_ptr<State> &state : states_) {
Austin Schuh858c9f32020-08-31 16:56:12 -07001420 state->Deregister();
Austin Schuhe309d2a2019-11-29 13:25:21 -08001421 }
Austin Schuh92547522019-12-28 14:33:43 -08001422
James Kuszmaul84ff3e52020-01-03 19:48:53 -08001423 event_loop_factory_unique_ptr_.reset();
1424 event_loop_factory_ = nullptr;
Austin Schuhe309d2a2019-11-29 13:25:21 -08001425}
1426
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001427void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
1428 std::string_view add_prefix) {
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001429 for (size_t ii = 0; ii < logged_configuration()->channels()->size(); ++ii) {
1430 const Channel *const channel = logged_configuration()->channels()->Get(ii);
1431 if (channel->name()->str() == name &&
1432 channel->type()->string_view() == type) {
1433 CHECK_EQ(0u, remapped_channels_.count(ii))
1434 << "Already remapped channel "
1435 << configuration::CleanedChannelToString(channel);
1436 remapped_channels_[ii] = std::string(add_prefix) + std::string(name);
1437 VLOG(1) << "Remapping channel "
1438 << configuration::CleanedChannelToString(channel)
1439 << " to have name " << remapped_channels_[ii];
Austin Schuh6331ef92020-01-07 18:28:09 -08001440 MakeRemappedConfig();
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001441 return;
1442 }
1443 }
1444 LOG(FATAL) << "Unabled to locate channel with name " << name << " and type "
1445 << type;
1446}
1447
1448void LogReader::MakeRemappedConfig() {
Austin Schuh8bd96322020-02-13 21:18:22 -08001449 for (std::unique_ptr<State> &state : states_) {
Austin Schuh6aa77be2020-02-22 21:06:40 -08001450 if (state) {
Austin Schuh858c9f32020-08-31 16:56:12 -07001451 CHECK(!state->event_loop())
Austin Schuh6aa77be2020-02-22 21:06:40 -08001452 << ": Can't change the mapping after the events are scheduled.";
1453 }
Austin Schuh6f3babe2020-01-26 20:34:50 -08001454 }
Austin Schuhac0771c2020-01-07 18:36:30 -08001455
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001456 // If no remapping occurred and we are using the original config, then there
1457 // is nothing interesting to do here.
1458 if (remapped_channels_.empty() && replay_configuration_ == nullptr) {
Austin Schuh6f3babe2020-01-26 20:34:50 -08001459 remapped_configuration_ = logged_configuration();
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001460 return;
1461 }
1462 // Config to copy Channel definitions from. Use the specified
1463 // replay_configuration_ if it has been provided.
1464 const Configuration *const base_config = replay_configuration_ == nullptr
1465 ? logged_configuration()
1466 : replay_configuration_;
1467 // The remapped config will be identical to the base_config, except that it
1468 // will have a bunch of extra channels in the channel list, which are exact
1469 // copies of the remapped channels, but with different names.
1470 // Because the flatbuffers API is a pain to work with, this requires a bit of
1471 // a song-and-dance to get copied over.
1472 // The order of operations is to:
1473 // 1) Make a flatbuffer builder for a config that will just contain a list of
1474 // the new channels that we want to add.
1475 // 2) For each channel that we are remapping:
1476 // a) Make a buffer/builder and construct into it a Channel table that only
1477 // contains the new name for the channel.
1478 // b) Merge the new channel with just the name into the channel that we are
1479 // trying to copy, built in the flatbuffer builder made in 1. This gives
1480 // us the new channel definition that we need.
1481 // 3) Using this list of offsets, build the Configuration of just new
1482 // Channels.
1483 // 4) Merge the Configuration with the new Channels into the base_config.
1484 // 5) Call MergeConfiguration() on that result to give MergeConfiguration a
1485 // chance to sanitize the config.
1486
1487 // This is the builder that we use for the config containing all the new
1488 // channels.
1489 flatbuffers::FlatBufferBuilder new_config_fbb;
Austin Schuhd7b15da2020-02-17 15:06:11 -08001490 new_config_fbb.ForceDefaults(true);
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001491 std::vector<flatbuffers::Offset<Channel>> channel_offsets;
1492 for (auto &pair : remapped_channels_) {
1493 // This is the builder that we use for creating the Channel with just the
1494 // new name.
1495 flatbuffers::FlatBufferBuilder new_name_fbb;
Austin Schuhd7b15da2020-02-17 15:06:11 -08001496 new_name_fbb.ForceDefaults(true);
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001497 const flatbuffers::Offset<flatbuffers::String> name_offset =
1498 new_name_fbb.CreateString(pair.second);
1499 ChannelBuilder new_name_builder(new_name_fbb);
1500 new_name_builder.add_name(name_offset);
1501 new_name_fbb.Finish(new_name_builder.Finish());
1502 const FlatbufferDetachedBuffer<Channel> new_name = new_name_fbb.Release();
Austin Schuh2f8fd752020-09-01 22:38:28 -07001503 // Retrieve the channel that we want to copy, confirming that it is
1504 // actually present in base_config.
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001505 const Channel *const base_channel = CHECK_NOTNULL(configuration::GetChannel(
1506 base_config, logged_configuration()->channels()->Get(pair.first), "",
1507 nullptr));
1508 // Actually create the new channel and put it into the vector of Offsets
1509 // that we will use to create the new Configuration.
1510 channel_offsets.emplace_back(MergeFlatBuffers<Channel>(
1511 reinterpret_cast<const flatbuffers::Table *>(base_channel),
1512 reinterpret_cast<const flatbuffers::Table *>(&new_name.message()),
1513 &new_config_fbb));
1514 }
1515 // Create the Configuration containing the new channels that we want to add.
Austin Schuhfa895892020-01-07 20:07:41 -08001516 const auto new_name_vector_offsets =
1517 new_config_fbb.CreateVector(channel_offsets);
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001518 ConfigurationBuilder new_config_builder(new_config_fbb);
1519 new_config_builder.add_channels(new_name_vector_offsets);
1520 new_config_fbb.Finish(new_config_builder.Finish());
1521 const FlatbufferDetachedBuffer<Configuration> new_name_config =
1522 new_config_fbb.Release();
1523 // Merge the new channels configuration into the base_config, giving us the
1524 // remapped configuration.
1525 remapped_configuration_buffer_ =
1526 std::make_unique<FlatbufferDetachedBuffer<Configuration>>(
1527 MergeFlatBuffers<Configuration>(base_config,
1528 &new_name_config.message()));
1529 // Call MergeConfiguration to deal with sanitizing the config.
1530 remapped_configuration_buffer_ =
1531 std::make_unique<FlatbufferDetachedBuffer<Configuration>>(
1532 configuration::MergeConfiguration(*remapped_configuration_buffer_));
1533
1534 remapped_configuration_ = &remapped_configuration_buffer_->message();
1535}
1536
Austin Schuh6f3babe2020-01-26 20:34:50 -08001537const Channel *LogReader::RemapChannel(const EventLoop *event_loop,
1538 const Channel *channel) {
1539 std::string_view channel_name = channel->name()->string_view();
1540 std::string_view channel_type = channel->type()->string_view();
1541 const int channel_index =
1542 configuration::ChannelIndex(logged_configuration(), channel);
1543 // If the channel is remapped, find the correct channel name to use.
1544 if (remapped_channels_.count(channel_index) > 0) {
Austin Schuhee711052020-08-24 16:06:09 -07001545 VLOG(3) << "Got remapped channel on "
Austin Schuh6f3babe2020-01-26 20:34:50 -08001546 << configuration::CleanedChannelToString(channel);
1547 channel_name = remapped_channels_[channel_index];
1548 }
1549
Austin Schuhee711052020-08-24 16:06:09 -07001550 VLOG(2) << "Going to remap channel " << channel_name << " " << channel_type;
Austin Schuh6f3babe2020-01-26 20:34:50 -08001551 const Channel *remapped_channel = configuration::GetChannel(
1552 event_loop->configuration(), channel_name, channel_type,
1553 event_loop->name(), event_loop->node());
1554
1555 CHECK(remapped_channel != nullptr)
1556 << ": Unable to send {\"name\": \"" << channel_name << "\", \"type\": \""
1557 << channel_type << "\"} because it is not in the provided configuration.";
1558
1559 return remapped_channel;
1560}
1561
Austin Schuh858c9f32020-08-31 16:56:12 -07001562LogReader::State::State(std::unique_ptr<ChannelMerger> channel_merger)
1563 : channel_merger_(std::move(channel_merger)) {}
1564
1565EventLoop *LogReader::State::SetNodeEventLoopFactory(
1566 NodeEventLoopFactory *node_event_loop_factory) {
1567 node_event_loop_factory_ = node_event_loop_factory;
1568 event_loop_unique_ptr_ =
1569 node_event_loop_factory_->MakeEventLoop("log_reader");
1570 return event_loop_unique_ptr_.get();
1571}
1572
1573void LogReader::State::SetChannelCount(size_t count) {
1574 channels_.resize(count);
1575 filters_.resize(count);
1576 channel_target_event_loop_factory_.resize(count);
1577}
1578
1579void LogReader::State::SetChannel(
1580 size_t channel, std::unique_ptr<RawSender> sender,
Austin Schuh2f8fd752020-09-01 22:38:28 -07001581 message_bridge::NoncausalOffsetEstimator *filter,
Austin Schuh858c9f32020-08-31 16:56:12 -07001582 NodeEventLoopFactory *channel_target_event_loop_factory) {
1583 channels_[channel] = std::move(sender);
1584 filters_[channel] = filter;
1585 channel_target_event_loop_factory_[channel] =
1586 channel_target_event_loop_factory;
1587}
1588
1589std::tuple<TimestampMerger::DeliveryTimestamp, int,
1590 FlatbufferVector<MessageHeader>>
1591LogReader::State::PopOldest(bool *update_time) {
1592 CHECK_GT(sorted_messages_.size(), 0u);
1593
1594 std::tuple<TimestampMerger::DeliveryTimestamp, int,
Austin Schuh2f8fd752020-09-01 22:38:28 -07001595 FlatbufferVector<MessageHeader>,
1596 message_bridge::NoncausalOffsetEstimator *>
Austin Schuh858c9f32020-08-31 16:56:12 -07001597 result = std::move(sorted_messages_.front());
Austin Schuh2f8fd752020-09-01 22:38:28 -07001598 VLOG(2) << MaybeNodeName(event_loop_->node()) << "PopOldest Popping "
Austin Schuh858c9f32020-08-31 16:56:12 -07001599 << std::get<0>(result).monotonic_event_time;
1600 sorted_messages_.pop_front();
1601 SeedSortedMessages();
1602
Austin Schuh2f8fd752020-09-01 22:38:28 -07001603 if (std::get<3>(result) != nullptr) {
1604 *update_time = std::get<3>(result)->Pop(
1605 event_loop_->node(), std::get<0>(result).monotonic_event_time);
1606 } else {
1607 *update_time = false;
1608 }
Austin Schuh858c9f32020-08-31 16:56:12 -07001609 return std::make_tuple(std::get<0>(result), std::get<1>(result),
1610 std::move(std::get<2>(result)));
1611}
1612
1613monotonic_clock::time_point LogReader::State::OldestMessageTime() const {
1614 if (sorted_messages_.size() > 0) {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001615 VLOG(2) << MaybeNodeName(event_loop_->node()) << "oldest message at "
Austin Schuh858c9f32020-08-31 16:56:12 -07001616 << std::get<0>(sorted_messages_.front()).monotonic_event_time;
1617 return std::get<0>(sorted_messages_.front()).monotonic_event_time;
1618 }
1619
1620 return channel_merger_->OldestMessageTime();
1621}
1622
1623void LogReader::State::SeedSortedMessages() {
1624 const aos::monotonic_clock::time_point end_queue_time =
1625 (sorted_messages_.size() > 0
1626 ? std::get<0>(sorted_messages_.front()).monotonic_event_time
1627 : channel_merger_->monotonic_start_time()) +
1628 std::chrono::seconds(2);
1629
1630 while (true) {
1631 if (channel_merger_->OldestMessageTime() == monotonic_clock::max_time) {
1632 return;
1633 }
1634 if (sorted_messages_.size() > 0) {
1635 // Stop placing sorted messages on the list once we have 2 seconds
1636 // queued up (but queue at least until the log starts.
1637 if (end_queue_time <
1638 std::get<0>(sorted_messages_.back()).monotonic_event_time) {
1639 return;
1640 }
1641 }
1642
1643 TimestampMerger::DeliveryTimestamp channel_timestamp;
1644 int channel_index;
1645 FlatbufferVector<MessageHeader> channel_data =
1646 FlatbufferVector<MessageHeader>::Empty();
1647
Austin Schuh2f8fd752020-09-01 22:38:28 -07001648 message_bridge::NoncausalOffsetEstimator *filter = nullptr;
1649
Austin Schuh858c9f32020-08-31 16:56:12 -07001650 std::tie(channel_timestamp, channel_index, channel_data) =
1651 channel_merger_->PopOldest();
1652
Austin Schuh2f8fd752020-09-01 22:38:28 -07001653 // Skip any messages without forwarding information.
1654 if (channel_timestamp.monotonic_remote_time != monotonic_clock::min_time) {
1655 // Got a forwarding timestamp!
1656 filter = filters_[channel_index];
1657
1658 CHECK(filter != nullptr);
1659
1660 // Call the correct method depending on if we are the forward or
1661 // reverse direction here.
1662 filter->Sample(event_loop_->node(),
1663 channel_timestamp.monotonic_event_time,
1664 channel_timestamp.monotonic_remote_time);
1665 }
Austin Schuh858c9f32020-08-31 16:56:12 -07001666 sorted_messages_.emplace_back(channel_timestamp, channel_index,
Austin Schuh2f8fd752020-09-01 22:38:28 -07001667 std::move(channel_data), filter);
Austin Schuh858c9f32020-08-31 16:56:12 -07001668 }
1669}
1670
1671void LogReader::State::Deregister() {
1672 for (size_t i = 0; i < channels_.size(); ++i) {
1673 channels_[i].reset();
1674 }
1675 event_loop_unique_ptr_.reset();
1676 event_loop_ = nullptr;
1677 timer_handler_ = nullptr;
1678 node_event_loop_factory_ = nullptr;
1679}
1680
Austin Schuhe309d2a2019-11-29 13:25:21 -08001681} // namespace logger
1682} // namespace aos