blob: caccf03a5a7ed5f7aaec72bfbc2cc9b63ee4d99b [file] [log] [blame]
James Kuszmaul38735e82019-12-07 16:42:06 -08001#include "aos/events/logging/logger.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -08002
3#include <fcntl.h>
Austin Schuh4c4e0092019-12-22 16:18:03 -08004#include <limits.h>
Austin Schuhe309d2a2019-11-29 13:25:21 -08005#include <sys/stat.h>
6#include <sys/types.h>
7#include <sys/uio.h>
8#include <vector>
9
Austin Schuh8bd96322020-02-13 21:18:22 -080010#include "Eigen/Dense"
Austin Schuh2f8fd752020-09-01 22:38:28 -070011#include "absl/strings/escaping.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080012#include "absl/types/span.h"
13#include "aos/events/event_loop.h"
James Kuszmaul38735e82019-12-07 16:42:06 -080014#include "aos/events/logging/logger_generated.h"
Austin Schuh64fab802020-09-09 22:47:47 -070015#include "aos/events/logging/uuid.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080016#include "aos/flatbuffer_merge.h"
Austin Schuh288479d2019-12-18 19:47:52 -080017#include "aos/network/team_number.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080018#include "aos/time/time.h"
19#include "flatbuffers/flatbuffers.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -070020#include "third_party/gmp/gmpxx.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080021
Austin Schuh15649d62019-12-28 16:36:38 -080022DEFINE_bool(skip_missing_forwarding_entries, false,
23 "If true, drop any forwarding entries with missing data. If "
24 "false, CHECK.");
Austin Schuhe309d2a2019-11-29 13:25:21 -080025
Austin Schuh8bd96322020-02-13 21:18:22 -080026DEFINE_bool(timestamps_to_csv, false,
27 "If true, write all the time synchronization information to a set "
28 "of CSV files in /tmp/. This should only be needed when debugging "
29 "time synchronization.");
30
Austin Schuh2f8fd752020-09-01 22:38:28 -070031DEFINE_bool(skip_order_validation, false,
32 "If true, ignore any out of orderness in replay");
33
Austin Schuhe309d2a2019-11-29 13:25:21 -080034namespace aos {
35namespace logger {
Austin Schuhe309d2a2019-11-29 13:25:21 -080036namespace chrono = std::chrono;
37
Austin Schuh2f8fd752020-09-01 22:38:28 -070038
39Logger::Logger(std::string_view base_name, EventLoop *event_loop,
Austin Schuhe309d2a2019-11-29 13:25:21 -080040 std::chrono::milliseconds polling_period)
Austin Schuh0c297012020-09-16 18:41:59 -070041 : Logger(base_name, event_loop, event_loop->configuration(),
42 polling_period) {}
43Logger::Logger(std::string_view base_name, EventLoop *event_loop,
44 const Configuration *configuration,
45 std::chrono::milliseconds polling_period)
Austin Schuh2f8fd752020-09-01 22:38:28 -070046 : Logger(std::make_unique<LocalLogNamer>(base_name, event_loop->node()),
Austin Schuh0c297012020-09-16 18:41:59 -070047 event_loop, configuration, polling_period) {}
48Logger::Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
49 std::chrono::milliseconds polling_period)
50 : Logger(std::move(log_namer), event_loop, event_loop->configuration(),
51 polling_period) {}
Austin Schuh6f3babe2020-01-26 20:34:50 -080052
53Logger::Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
Austin Schuh0c297012020-09-16 18:41:59 -070054 const Configuration *configuration,
Austin Schuh6f3babe2020-01-26 20:34:50 -080055 std::chrono::milliseconds polling_period)
Austin Schuhe309d2a2019-11-29 13:25:21 -080056 : event_loop_(event_loop),
Austin Schuh64fab802020-09-09 22:47:47 -070057 uuid_(UUID::Random()),
Austin Schuh6f3babe2020-01-26 20:34:50 -080058 log_namer_(std::move(log_namer)),
Austin Schuh0c297012020-09-16 18:41:59 -070059 configuration_(configuration),
60 name_(network::GetHostname()),
Austin Schuhe309d2a2019-11-29 13:25:21 -080061 timer_handler_(event_loop_->AddTimer([this]() { DoLogData(); })),
Austin Schuh2f8fd752020-09-01 22:38:28 -070062 polling_period_(polling_period),
63 server_statistics_fetcher_(
64 configuration::MultiNode(event_loop_->configuration())
65 ? event_loop_->MakeFetcher<message_bridge::ServerStatistics>(
66 "/aos")
67 : aos::Fetcher<message_bridge::ServerStatistics>()) {
Austin Schuh6f3babe2020-01-26 20:34:50 -080068 VLOG(1) << "Starting logger for " << FlatbufferToJson(event_loop_->node());
69 int channel_index = 0;
Austin Schuh2f8fd752020-09-01 22:38:28 -070070
71 // Find all the nodes which are logging timestamps on our node.
72 std::set<const Node *> timestamp_logger_nodes;
Austin Schuh0c297012020-09-16 18:41:59 -070073 for (const Channel *channel : *configuration_->channels()) {
Austin Schuh2f8fd752020-09-01 22:38:28 -070074 if (!configuration::ChannelIsSendableOnNode(channel, event_loop_->node()) ||
75 !channel->has_destination_nodes()) {
76 continue;
77 }
78 for (const Connection *connection : *channel->destination_nodes()) {
79 const Node *other_node = configuration::GetNode(
Austin Schuh0c297012020-09-16 18:41:59 -070080 configuration_, connection->name()->string_view());
Austin Schuh2f8fd752020-09-01 22:38:28 -070081
82 if (configuration::ConnectionDeliveryTimeIsLoggedOnNode(
83 connection, event_loop_->node())) {
84 VLOG(1) << "Timestamps are logged from "
85 << FlatbufferToJson(other_node);
86 timestamp_logger_nodes.insert(other_node);
87 }
88 }
89 }
90
91 std::map<const Channel *, const Node *> timestamp_logger_channels;
92
93 // Now that we have all the nodes accumulated, make remote timestamp loggers
94 // for them.
95 for (const Node *node : timestamp_logger_nodes) {
96 const Channel *channel = configuration::GetChannel(
Austin Schuh0c297012020-09-16 18:41:59 -070097 configuration_,
Austin Schuh2f8fd752020-09-01 22:38:28 -070098 absl::StrCat("/aos/remote_timestamps/", node->name()->string_view()),
99 logger::MessageHeader::GetFullyQualifiedName(), event_loop_->name(),
100 event_loop_->node());
101
102 CHECK(channel != nullptr)
103 << ": Remote timestamps are logged on "
104 << event_loop_->node()->name()->string_view()
105 << " but can't find channel /aos/remote_timestamps/"
106 << node->name()->string_view();
107 timestamp_logger_channels.insert(std::make_pair(channel, node));
108 }
109
110 const size_t our_node_index = configuration::GetNodeIndex(
Austin Schuh0c297012020-09-16 18:41:59 -0700111 configuration_, event_loop_->node());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700112
Austin Schuh0c297012020-09-16 18:41:59 -0700113 for (const Channel *config_channel : *configuration_->channels()) {
114 // The MakeRawFetcher method needs a channel which is in the event loop
115 // configuration() object, not the configuration_ object. Go look that up
116 // from the config.
117 const Channel *channel = aos::configuration::GetChannel(
118 event_loop_->configuration(), config_channel->name()->string_view(),
119 config_channel->type()->string_view(), "", event_loop_->node());
120
Austin Schuhe309d2a2019-11-29 13:25:21 -0800121 FetcherStruct fs;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700122 fs.node_index = our_node_index;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800123 const bool is_local =
124 configuration::ChannelIsSendableOnNode(channel, event_loop_->node());
125
Austin Schuh15649d62019-12-28 16:36:38 -0800126 const bool is_readable =
127 configuration::ChannelIsReadableOnNode(channel, event_loop_->node());
128 const bool log_message = configuration::ChannelMessageIsLoggedOnNode(
129 channel, event_loop_->node()) &&
130 is_readable;
131
132 const bool log_delivery_times =
133 (event_loop_->node() == nullptr)
134 ? false
135 : configuration::ConnectionDeliveryTimeIsLoggedOnNode(
136 channel, event_loop_->node(), event_loop_->node());
137
Austin Schuh2f8fd752020-09-01 22:38:28 -0700138 // Now, detect a MessageHeader timestamp logger where we should just log the
139 // contents to a file directly.
140 const bool log_contents = timestamp_logger_channels.find(channel) !=
141 timestamp_logger_channels.end();
142 const Node *timestamp_node =
143 log_contents ? timestamp_logger_channels.find(channel)->second
144 : nullptr;
145
146 if (log_message || log_delivery_times || log_contents) {
Austin Schuh15649d62019-12-28 16:36:38 -0800147 fs.fetcher = event_loop->MakeRawFetcher(channel);
148 VLOG(1) << "Logging channel "
149 << configuration::CleanedChannelToString(channel);
150
151 if (log_delivery_times) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800152 VLOG(1) << " Delivery times";
153 fs.timestamp_writer = log_namer_->MakeTimestampWriter(channel);
Austin Schuh15649d62019-12-28 16:36:38 -0800154 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800155 if (log_message) {
156 VLOG(1) << " Data";
157 fs.writer = log_namer_->MakeWriter(channel);
158 if (!is_local) {
159 fs.log_type = LogType::kLogRemoteMessage;
160 }
161 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700162 if (log_contents) {
163 VLOG(1) << "Timestamp logger channel "
164 << configuration::CleanedChannelToString(channel);
165 fs.contents_writer =
166 log_namer_->MakeForwardedTimestampWriter(channel, timestamp_node);
Austin Schuh0c297012020-09-16 18:41:59 -0700167 fs.node_index =
168 configuration::GetNodeIndex(configuration_, timestamp_node);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700169 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800170 fs.channel_index = channel_index;
171 fs.written = false;
172 fetchers_.emplace_back(std::move(fs));
Austin Schuh15649d62019-12-28 16:36:38 -0800173 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800174 ++channel_index;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800175 }
176
Austin Schuh0c297012020-09-16 18:41:59 -0700177 node_state_.resize(configuration::MultiNode(configuration_)
178 ? configuration_->nodes()->size()
Austin Schuh2f8fd752020-09-01 22:38:28 -0700179 : 1u);
Austin Schuhe309d2a2019-11-29 13:25:21 -0800180
Austin Schuh2f8fd752020-09-01 22:38:28 -0700181 for (const Node *node : log_namer_->nodes()) {
182 const int node_index =
Austin Schuh0c297012020-09-16 18:41:59 -0700183 configuration::GetNodeIndex(configuration_, node);
Austin Schuhe309d2a2019-11-29 13:25:21 -0800184
Austin Schuh2f8fd752020-09-01 22:38:28 -0700185 node_state_[node_index].log_file_header = MakeHeader(node);
186 }
Austin Schuhe309d2a2019-11-29 13:25:21 -0800187
Austin Schuh2f8fd752020-09-01 22:38:28 -0700188 // When things start, we want to log the header, then the most recent
189 // messages available on each fetcher to capture the previous state, then
190 // start polling.
191 event_loop_->OnRun([this]() { StartLogging(); });
Austin Schuhe309d2a2019-11-29 13:25:21 -0800192}
193
Austin Schuh0c297012020-09-16 18:41:59 -0700194Logger::~Logger() {
195 // If we are replaying a log file, or in simulation, we want to force the last
196 // bit of data to be logged. The easiest way to deal with this is to poll
197 // everything as we go to destroy the class, ie, shut down the logger, and
198 // write it to disk.
199 DoLogData();
200}
201
Austin Schuh2f8fd752020-09-01 22:38:28 -0700202void Logger::StartLogging() {
203 // Grab data from each channel right before we declare the log file started
204 // so we can capture the latest message on each channel. This lets us have
205 // non periodic messages with configuration that now get logged.
206 for (FetcherStruct &f : fetchers_) {
207 f.written = !f.fetcher->Fetch();
208 }
209
210 // Clear out any old timestamps in case we are re-starting logging.
211 for (size_t i = 0; i < node_state_.size(); ++i) {
212 SetStartTime(i, monotonic_clock::min_time, realtime_clock::min_time);
213 }
214
215 WriteHeader();
216
217 LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node())
218 << " start_time " << last_synchronized_time_;
219
220 timer_handler_->Setup(event_loop_->monotonic_now() + polling_period_,
221 polling_period_);
222}
223
Austin Schuhfa895892020-01-07 20:07:41 -0800224void Logger::WriteHeader() {
Austin Schuh0c297012020-09-16 18:41:59 -0700225 if (configuration::MultiNode(configuration_)) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700226 server_statistics_fetcher_.Fetch();
227 }
228
229 aos::monotonic_clock::time_point monotonic_start_time =
230 event_loop_->monotonic_now();
231 aos::realtime_clock::time_point realtime_start_time =
232 event_loop_->realtime_now();
233
234 // We need to pick a point in time to declare the log file "started". This
235 // starts here. It needs to be after everything is fetched so that the
236 // fetchers are all pointed at the most recent message before the start
237 // time.
238 last_synchronized_time_ = monotonic_start_time;
239
Austin Schuh6f3babe2020-01-26 20:34:50 -0800240 for (const Node *node : log_namer_->nodes()) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700241 const int node_index =
Austin Schuh0c297012020-09-16 18:41:59 -0700242 configuration::GetNodeIndex(configuration_, node);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700243 MaybeUpdateTimestamp(node, node_index, monotonic_start_time,
244 realtime_start_time);
Austin Schuh64fab802020-09-09 22:47:47 -0700245 log_namer_->WriteHeader(&node_state_[node_index].log_file_header, node);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800246 }
247}
Austin Schuh8bd96322020-02-13 21:18:22 -0800248
Austin Schuh2f8fd752020-09-01 22:38:28 -0700249void Logger::WriteMissingTimestamps() {
Austin Schuh0c297012020-09-16 18:41:59 -0700250 if (configuration::MultiNode(configuration_)) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700251 server_statistics_fetcher_.Fetch();
252 } else {
253 return;
254 }
255
256 if (server_statistics_fetcher_.get() == nullptr) {
257 return;
258 }
259
260 for (const Node *node : log_namer_->nodes()) {
261 const int node_index =
Austin Schuh0c297012020-09-16 18:41:59 -0700262 configuration::GetNodeIndex(configuration_, node);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700263 if (MaybeUpdateTimestamp(
264 node, node_index,
265 server_statistics_fetcher_.context().monotonic_event_time,
266 server_statistics_fetcher_.context().realtime_event_time)) {
Austin Schuh64fab802020-09-09 22:47:47 -0700267 log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700268 }
269 }
270}
271
272void Logger::SetStartTime(size_t node_index,
273 aos::monotonic_clock::time_point monotonic_start_time,
274 aos::realtime_clock::time_point realtime_start_time) {
275 node_state_[node_index].monotonic_start_time = monotonic_start_time;
276 node_state_[node_index].realtime_start_time = realtime_start_time;
277 node_state_[node_index]
278 .log_file_header.mutable_message()
279 ->mutate_monotonic_start_time(
280 std::chrono::duration_cast<std::chrono::nanoseconds>(
281 monotonic_start_time.time_since_epoch())
282 .count());
283 if (node_state_[node_index]
284 .log_file_header.mutable_message()
285 ->has_realtime_start_time()) {
286 node_state_[node_index]
287 .log_file_header.mutable_message()
288 ->mutate_realtime_start_time(
289 std::chrono::duration_cast<std::chrono::nanoseconds>(
290 realtime_start_time.time_since_epoch())
291 .count());
292 }
293}
294
295bool Logger::MaybeUpdateTimestamp(
296 const Node *node, int node_index,
297 aos::monotonic_clock::time_point monotonic_start_time,
298 aos::realtime_clock::time_point realtime_start_time) {
299 // Bail early if there the start times are already set.
300 if (node_state_[node_index].monotonic_start_time !=
301 monotonic_clock::min_time) {
302 return false;
303 }
Austin Schuh0c297012020-09-16 18:41:59 -0700304 if (configuration::MultiNode(configuration_)) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700305 if (event_loop_->node() == node) {
306 // There are no offsets to compute for ourself, so always succeed.
307 SetStartTime(node_index, monotonic_start_time, realtime_start_time);
308 return true;
309 } else if (server_statistics_fetcher_.get() != nullptr) {
310 // We must be a remote node now. Look for the connection and see if it is
311 // connected.
312
313 for (const message_bridge::ServerConnection *connection :
314 *server_statistics_fetcher_->connections()) {
315 if (connection->node()->name()->string_view() !=
316 node->name()->string_view()) {
317 continue;
318 }
319
320 if (connection->state() != message_bridge::State::CONNECTED) {
321 VLOG(1) << node->name()->string_view()
322 << " is not connected, can't start it yet.";
323 break;
324 }
325
326 if (!connection->has_monotonic_offset()) {
327 VLOG(1) << "Missing monotonic offset for setting start time for node "
328 << aos::FlatbufferToJson(node);
329 break;
330 }
331
332 VLOG(1) << "Updating start time for " << aos::FlatbufferToJson(node);
333
334 // Found it and it is connected. Compensate and go.
335 monotonic_start_time +=
336 std::chrono::nanoseconds(connection->monotonic_offset());
337
338 SetStartTime(node_index, monotonic_start_time, realtime_start_time);
339 return true;
340 }
341 }
342 } else {
343 SetStartTime(node_index, monotonic_start_time, realtime_start_time);
344 return true;
345 }
346 return false;
347}
348
349aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> Logger::MakeHeader(
350 const Node *node) {
Austin Schuhfa895892020-01-07 20:07:41 -0800351 // Now write the header with this timestamp in it.
352 flatbuffers::FlatBufferBuilder fbb;
Austin Schuhd7b15da2020-02-17 15:06:11 -0800353 fbb.ForceDefaults(true);
Austin Schuhfa895892020-01-07 20:07:41 -0800354
Austin Schuh2f8fd752020-09-01 22:38:28 -0700355 // TODO(austin): Compress this much more efficiently. There are a bunch of
356 // duplicated schemas.
Austin Schuhfa895892020-01-07 20:07:41 -0800357 flatbuffers::Offset<aos::Configuration> configuration_offset =
Austin Schuh0c297012020-09-16 18:41:59 -0700358 CopyFlatBuffer(configuration_, &fbb);
Austin Schuhfa895892020-01-07 20:07:41 -0800359
Austin Schuh64fab802020-09-09 22:47:47 -0700360 flatbuffers::Offset<flatbuffers::String> name_offset =
Austin Schuh0c297012020-09-16 18:41:59 -0700361 fbb.CreateString(name_);
Austin Schuhfa895892020-01-07 20:07:41 -0800362
Austin Schuh64fab802020-09-09 22:47:47 -0700363 flatbuffers::Offset<flatbuffers::String> logger_uuid_offset =
364 fbb.CreateString(uuid_.string_view());
365
366 flatbuffers::Offset<flatbuffers::String> parts_uuid_offset =
367 fbb.CreateString("00000000-0000-4000-8000-000000000000");
368
Austin Schuhfa895892020-01-07 20:07:41 -0800369 flatbuffers::Offset<Node> node_offset;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700370
Austin Schuh0c297012020-09-16 18:41:59 -0700371 if (configuration::MultiNode(configuration_)) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800372 node_offset = CopyFlatBuffer(node, &fbb);
Austin Schuhfa895892020-01-07 20:07:41 -0800373 }
374
375 aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
376
Austin Schuh64fab802020-09-09 22:47:47 -0700377 log_file_header_builder.add_name(name_offset);
Austin Schuhfa895892020-01-07 20:07:41 -0800378
379 // Only add the node if we are running in a multinode configuration.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800380 if (node != nullptr) {
Austin Schuhfa895892020-01-07 20:07:41 -0800381 log_file_header_builder.add_node(node_offset);
382 }
383
384 log_file_header_builder.add_configuration(configuration_offset);
385 // The worst case theoretical out of order is the polling period times 2.
386 // One message could get logged right after the boundary, but be for right
387 // before the next boundary. And the reverse could happen for another
388 // message. Report back 3x to be extra safe, and because the cost isn't
389 // huge on the read side.
390 log_file_header_builder.add_max_out_of_order_duration(
391 std::chrono::duration_cast<std::chrono::nanoseconds>(3 * polling_period_)
392 .count());
393
394 log_file_header_builder.add_monotonic_start_time(
395 std::chrono::duration_cast<std::chrono::nanoseconds>(
Austin Schuh2f8fd752020-09-01 22:38:28 -0700396 monotonic_clock::min_time.time_since_epoch())
Austin Schuhfa895892020-01-07 20:07:41 -0800397 .count());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700398 if (node == event_loop_->node()) {
399 log_file_header_builder.add_realtime_start_time(
400 std::chrono::duration_cast<std::chrono::nanoseconds>(
401 realtime_clock::min_time.time_since_epoch())
402 .count());
Austin Schuh6f3babe2020-01-26 20:34:50 -0800403 }
404
Austin Schuh64fab802020-09-09 22:47:47 -0700405 log_file_header_builder.add_logger_uuid(logger_uuid_offset);
406
407 log_file_header_builder.add_parts_uuid(parts_uuid_offset);
408 log_file_header_builder.add_parts_index(0);
409
Austin Schuh2f8fd752020-09-01 22:38:28 -0700410 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
411 return fbb.Release();
412}
413
414void Logger::Rotate() {
415 for (const Node *node : log_namer_->nodes()) {
416 const int node_index =
Austin Schuh0c297012020-09-16 18:41:59 -0700417 configuration::GetNodeIndex(configuration_, node);
Austin Schuh64fab802020-09-09 22:47:47 -0700418 log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700419 }
420}
421
422void Logger::LogUntil(monotonic_clock::time_point t) {
423 WriteMissingTimestamps();
424
425 // Write each channel to disk, one at a time.
426 for (FetcherStruct &f : fetchers_) {
427 while (true) {
428 if (f.written) {
429 if (!f.fetcher->FetchNext()) {
430 VLOG(2) << "No new data on "
431 << configuration::CleanedChannelToString(
432 f.fetcher->channel());
433 break;
434 } else {
435 f.written = false;
436 }
437 }
438
439 CHECK(!f.written);
440
441 // TODO(james): Write tests to exercise this logic.
442 if (f.fetcher->context().monotonic_event_time < t) {
443 if (f.writer != nullptr) {
444 // Write!
445 flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
446 max_header_size_);
447 fbb.ForceDefaults(true);
448
449 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
450 f.channel_index, f.log_type));
451
452 VLOG(2) << "Writing data as node "
453 << FlatbufferToJson(event_loop_->node()) << " for channel "
454 << configuration::CleanedChannelToString(f.fetcher->channel())
455 << " to " << f.writer->filename() << " data "
456 << FlatbufferToJson(
457 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
458 fbb.GetBufferPointer()));
459
460 max_header_size_ = std::max(
461 max_header_size_, fbb.GetSize() - f.fetcher->context().size);
462 f.writer->QueueSizedFlatbuffer(&fbb);
463 }
464
465 if (f.timestamp_writer != nullptr) {
466 // And now handle timestamps.
467 flatbuffers::FlatBufferBuilder fbb;
468 fbb.ForceDefaults(true);
469
470 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
471 f.channel_index,
472 LogType::kLogDeliveryTimeOnly));
473
474 VLOG(2) << "Writing timestamps as node "
475 << FlatbufferToJson(event_loop_->node()) << " for channel "
476 << configuration::CleanedChannelToString(f.fetcher->channel())
477 << " to " << f.timestamp_writer->filename() << " timestamp "
478 << FlatbufferToJson(
479 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
480 fbb.GetBufferPointer()));
481
482 f.timestamp_writer->QueueSizedFlatbuffer(&fbb);
483 }
484
485 if (f.contents_writer != nullptr) {
486 // And now handle the special message contents channel. Copy the
487 // message into a FlatBufferBuilder and save it to disk.
488 // TODO(austin): We can be more efficient here when we start to
489 // care...
490 flatbuffers::FlatBufferBuilder fbb;
491 fbb.ForceDefaults(true);
492
493 const MessageHeader *msg =
494 flatbuffers::GetRoot<MessageHeader>(f.fetcher->context().data);
495
496 logger::MessageHeader::Builder message_header_builder(fbb);
497
498 // Note: this must match the same order as MessageBridgeServer and
499 // PackMessage. We want identical headers to have identical
500 // on-the-wire formats to make comparing them easier.
501 message_header_builder.add_channel_index(msg->channel_index());
502
503 message_header_builder.add_queue_index(msg->queue_index());
504 message_header_builder.add_monotonic_sent_time(
505 msg->monotonic_sent_time());
506 message_header_builder.add_realtime_sent_time(
507 msg->realtime_sent_time());
508
509 message_header_builder.add_monotonic_remote_time(
510 msg->monotonic_remote_time());
511 message_header_builder.add_realtime_remote_time(
512 msg->realtime_remote_time());
513 message_header_builder.add_remote_queue_index(
514 msg->remote_queue_index());
515
516 fbb.FinishSizePrefixed(message_header_builder.Finish());
517
518 f.contents_writer->QueueSizedFlatbuffer(&fbb);
519 }
520
521 f.written = true;
522 } else {
523 break;
524 }
525 }
526 }
527 last_synchronized_time_ = t;
Austin Schuhfa895892020-01-07 20:07:41 -0800528}
529
Austin Schuhe309d2a2019-11-29 13:25:21 -0800530void Logger::DoLogData() {
531 // We want to guarentee that messages aren't out of order by more than
532 // max_out_of_order_duration. To do this, we need sync points. Every write
533 // cycle should be a sync point.
Austin Schuhfa895892020-01-07 20:07:41 -0800534 const monotonic_clock::time_point monotonic_now =
535 event_loop_->monotonic_now();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800536
537 do {
538 // Move the sync point up by at most polling_period. This forces one sync
539 // per iteration, even if it is small.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700540 LogUntil(
541 std::min(last_synchronized_time_ + polling_period_, monotonic_now));
Austin Schuhe309d2a2019-11-29 13:25:21 -0800542
Austin Schuhe309d2a2019-11-29 13:25:21 -0800543 // If we missed cycles, we could be pretty far behind. Spin until we are
544 // caught up.
545 } while (last_synchronized_time_ + polling_period_ < monotonic_now);
Austin Schuhe309d2a2019-11-29 13:25:21 -0800546}
547
Austin Schuh5212cad2020-09-09 23:12:09 -0700548std::vector<std::vector<std::string>> SortParts(
549 const std::vector<std::string> &parts) {
550 // Start by grouping all parts by UUID, and extracting the part index.
551 std::map<std::string, std::vector<std::pair<std::string, int>>> parts_list;
552
553 // Sort part files without UUIDs and part indexes as well. Extract everything
554 // useful from the log in the first pass, then sort later.
555 struct LogPart {
556 std::string filename;
557 monotonic_clock::time_point start_time;
558 monotonic_clock::time_point first_message_time;
559 };
560
561 std::vector<LogPart> old_parts;
562
563 for (const std::string &part : parts) {
564 FlatbufferVector<LogFileHeader> log_header = ReadHeader(part);
565
566 // Looks like an old log. No UUID, index, and also single node. We have
567 // little to no multi-node log files in the wild without part UUIDs and
568 // indexes which we care much about.
569 if (!log_header.message().has_parts_uuid() &&
570 !log_header.message().has_parts_index() &&
571 !log_header.message().has_node()) {
572 LogPart log_part;
573 log_part.filename = part;
574 log_part.start_time = monotonic_clock::time_point(
575 chrono::nanoseconds(log_header.message().monotonic_start_time()));
576 FlatbufferVector<MessageHeader> first_message = ReadNthMessage(part, 0);
577 log_part.first_message_time = monotonic_clock::time_point(
578 chrono::nanoseconds(first_message.message().monotonic_sent_time()));
579 old_parts.emplace_back(std::move(log_part));
580 continue;
581 }
582
583 CHECK(log_header.message().has_parts_uuid());
584 CHECK(log_header.message().has_parts_index());
585
586 const std::string parts_uuid = log_header.message().parts_uuid()->str();
587 auto it = parts_list.find(parts_uuid);
588 if (it == parts_list.end()) {
589 it = parts_list
590 .insert(std::make_pair(
591 parts_uuid, std::vector<std::pair<std::string, int>>{}))
592 .first;
593 }
594 it->second.emplace_back(
595 std::make_pair(part, log_header.message().parts_index()));
596 }
597
598 CHECK_NE(old_parts.empty(), parts_list.empty())
599 << ": Can't have a mix of old and new parts.";
600
601 if (!old_parts.empty()) {
602 // Confirm they all have the same start time. Old loggers always used the
603 // same start time.
604 for (const LogPart &p : old_parts) {
605 CHECK_EQ(old_parts[0].start_time, p.start_time);
606 }
607 // Sort by the oldest message in each file.
608 std::sort(old_parts.begin(), old_parts.end(),
609 [](const LogPart &a, const LogPart &b) {
610 return a.first_message_time < b.first_message_time;
611 });
612
613 // Produce the final form.
614 std::vector<std::string> sorted_old_parts;
615 sorted_old_parts.reserve(old_parts.size());
616 for (LogPart &p : old_parts) {
617 sorted_old_parts.emplace_back(std::move(p.filename));
618 }
619 return std::vector<std::vector<std::string>>{std::move(sorted_old_parts)};
620 }
621
622 // Now, sort them and produce the final vector form.
623 std::vector<std::vector<std::string>> result;
624 result.reserve(parts_list.size());
625 for (auto &part : parts_list) {
626 std::sort(part.second.begin(), part.second.end(),
627 [](const std::pair<std::string, int> &a,
628 const std::pair<std::string, int> &b) {
629 return a.second < b.second;
630 });
631 std::vector<std::string> result_line;
632 result_line.reserve(part.second.size());
633 for (std::pair<std::string, int> &p : part.second) {
634 result_line.emplace_back(std::move(p.first));
635 }
636 result.emplace_back(std::move(result_line));
637 }
638 return result;
639}
640
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800641LogReader::LogReader(std::string_view filename,
642 const Configuration *replay_configuration)
Austin Schuhfa895892020-01-07 20:07:41 -0800643 : LogReader(std::vector<std::string>{std::string(filename)},
644 replay_configuration) {}
645
646LogReader::LogReader(const std::vector<std::string> &filenames,
647 const Configuration *replay_configuration)
Austin Schuh6f3babe2020-01-26 20:34:50 -0800648 : LogReader(std::vector<std::vector<std::string>>{filenames},
649 replay_configuration) {}
650
651LogReader::LogReader(const std::vector<std::vector<std::string>> &filenames,
652 const Configuration *replay_configuration)
653 : filenames_(filenames),
654 log_file_header_(ReadHeader(filenames[0][0])),
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800655 replay_configuration_(replay_configuration) {
Austin Schuh6331ef92020-01-07 18:28:09 -0800656 MakeRemappedConfig();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800657
Austin Schuh6aa77be2020-02-22 21:06:40 -0800658 if (replay_configuration) {
659 CHECK_EQ(configuration::MultiNode(configuration()),
660 configuration::MultiNode(replay_configuration))
Austin Schuh2f8fd752020-09-01 22:38:28 -0700661 << ": Log file and replay config need to both be multi or single "
662 "node.";
Austin Schuh6aa77be2020-02-22 21:06:40 -0800663 }
664
Austin Schuh6f3babe2020-01-26 20:34:50 -0800665 if (!configuration::MultiNode(configuration())) {
Austin Schuh858c9f32020-08-31 16:56:12 -0700666 states_.emplace_back(
667 std::make_unique<State>(std::make_unique<ChannelMerger>(filenames)));
Austin Schuh8bd96322020-02-13 21:18:22 -0800668 } else {
Austin Schuh6aa77be2020-02-22 21:06:40 -0800669 if (replay_configuration) {
James Kuszmaul46d82582020-05-09 19:50:09 -0700670 CHECK_EQ(logged_configuration()->nodes()->size(),
Austin Schuh6aa77be2020-02-22 21:06:40 -0800671 replay_configuration->nodes()->size())
Austin Schuh2f8fd752020-09-01 22:38:28 -0700672 << ": Log file and replay config need to have matching nodes "
673 "lists.";
James Kuszmaul46d82582020-05-09 19:50:09 -0700674 for (const Node *node : *logged_configuration()->nodes()) {
675 if (configuration::GetNode(replay_configuration, node) == nullptr) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700676 LOG(FATAL) << "Found node " << FlatbufferToJson(node)
677 << " in logged config that is not present in the replay "
678 "config.";
James Kuszmaul46d82582020-05-09 19:50:09 -0700679 }
680 }
Austin Schuh6aa77be2020-02-22 21:06:40 -0800681 }
Austin Schuh8bd96322020-02-13 21:18:22 -0800682 states_.resize(configuration()->nodes()->size());
Austin Schuh6f3babe2020-01-26 20:34:50 -0800683 }
Austin Schuhe309d2a2019-11-29 13:25:21 -0800684}
685
Austin Schuh6aa77be2020-02-22 21:06:40 -0800686LogReader::~LogReader() {
Austin Schuh39580f12020-08-01 14:44:08 -0700687 if (event_loop_factory_unique_ptr_) {
688 Deregister();
689 } else if (event_loop_factory_ != nullptr) {
690 LOG(FATAL) << "Must call Deregister before the SimulatedEventLoopFactory "
691 "is destroyed";
692 }
Austin Schuh8bd96322020-02-13 21:18:22 -0800693 if (offset_fp_ != nullptr) {
694 fclose(offset_fp_);
695 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700696 // Zero out some buffers. It's easy to do use-after-frees on these, so make
697 // it more obvious.
Austin Schuh39580f12020-08-01 14:44:08 -0700698 if (remapped_configuration_buffer_) {
699 remapped_configuration_buffer_->Wipe();
700 }
701 log_file_header_.Wipe();
Austin Schuh8bd96322020-02-13 21:18:22 -0800702}
Austin Schuhe309d2a2019-11-29 13:25:21 -0800703
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800704const Configuration *LogReader::logged_configuration() const {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800705 return log_file_header_.message().configuration();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800706}
707
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800708const Configuration *LogReader::configuration() const {
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800709 return remapped_configuration_;
710}
711
Austin Schuh6f3babe2020-01-26 20:34:50 -0800712std::vector<const Node *> LogReader::Nodes() const {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700713 // Because the Node pointer will only be valid if it actually points to
714 // memory owned by remapped_configuration_, we need to wait for the
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800715 // remapped_configuration_ to be populated before accessing it.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800716 //
717 // Also, note, that when ever a map is changed, the nodes in here are
718 // invalidated.
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800719 CHECK(remapped_configuration_ != nullptr)
720 << ": Need to call Register before the node() pointer will be valid.";
Austin Schuh6f3babe2020-01-26 20:34:50 -0800721 return configuration::GetNodes(remapped_configuration_);
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800722}
Austin Schuh15649d62019-12-28 16:36:38 -0800723
Austin Schuh6f3babe2020-01-26 20:34:50 -0800724monotonic_clock::time_point LogReader::monotonic_start_time(const Node *node) {
Austin Schuh8bd96322020-02-13 21:18:22 -0800725 State *state =
726 states_[configuration::GetNodeIndex(configuration(), node)].get();
727 CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
728
Austin Schuh858c9f32020-08-31 16:56:12 -0700729 return state->monotonic_start_time();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800730}
731
Austin Schuh6f3babe2020-01-26 20:34:50 -0800732realtime_clock::time_point LogReader::realtime_start_time(const Node *node) {
Austin Schuh8bd96322020-02-13 21:18:22 -0800733 State *state =
734 states_[configuration::GetNodeIndex(configuration(), node)].get();
735 CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
736
Austin Schuh858c9f32020-08-31 16:56:12 -0700737 return state->realtime_start_time();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800738}
739
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800740void LogReader::Register() {
741 event_loop_factory_unique_ptr_ =
Austin Schuhac0771c2020-01-07 18:36:30 -0800742 std::make_unique<SimulatedEventLoopFactory>(configuration());
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800743 Register(event_loop_factory_unique_ptr_.get());
744}
745
Austin Schuh92547522019-12-28 14:33:43 -0800746void LogReader::Register(SimulatedEventLoopFactory *event_loop_factory) {
Austin Schuh92547522019-12-28 14:33:43 -0800747 event_loop_factory_ = event_loop_factory;
Austin Schuh92547522019-12-28 14:33:43 -0800748
Austin Schuh6f3babe2020-01-26 20:34:50 -0800749 for (const Node *node : configuration::GetNodes(configuration())) {
Austin Schuh8bd96322020-02-13 21:18:22 -0800750 const size_t node_index =
751 configuration::GetNodeIndex(configuration(), node);
Austin Schuh858c9f32020-08-31 16:56:12 -0700752 states_[node_index] =
753 std::make_unique<State>(std::make_unique<ChannelMerger>(filenames_));
Austin Schuh8bd96322020-02-13 21:18:22 -0800754 State *state = states_[node_index].get();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800755
Austin Schuh858c9f32020-08-31 16:56:12 -0700756 Register(state->SetNodeEventLoopFactory(
757 event_loop_factory_->GetNodeEventLoopFactory(node)));
Austin Schuhcde938c2020-02-02 17:30:07 -0800758 }
James Kuszmaul46d82582020-05-09 19:50:09 -0700759 if (live_nodes_ == 0) {
760 LOG(FATAL)
761 << "Don't have logs from any of the nodes in the replay config--are "
762 "you sure that the replay config matches the original config?";
763 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800764
Austin Schuh2f8fd752020-09-01 22:38:28 -0700765 // We need to now seed our per-node time offsets and get everything set up
766 // to run.
767 const size_t num_nodes = nodes_count();
Austin Schuhcde938c2020-02-02 17:30:07 -0800768
Austin Schuh8bd96322020-02-13 21:18:22 -0800769 // It is easiest to solve for per node offsets with a matrix rather than
770 // trying to solve the equations by hand. So let's get after it.
771 //
772 // Now, build up the map matrix.
773 //
Austin Schuh2f8fd752020-09-01 22:38:28 -0700774 // offset_matrix_ = (map_matrix_ + slope_matrix_) * [ta; tb; tc]
775 map_matrix_ = Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>::Zero(
776 filters_.size() + 1, num_nodes);
777 slope_matrix_ =
778 Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>::Zero(
779 filters_.size() + 1, num_nodes);
Austin Schuhcde938c2020-02-02 17:30:07 -0800780
Austin Schuh2f8fd752020-09-01 22:38:28 -0700781 offset_matrix_ =
782 Eigen::Matrix<mpq_class, Eigen::Dynamic, 1>::Zero(filters_.size() + 1);
783 valid_matrix_ =
784 Eigen::Matrix<bool, Eigen::Dynamic, 1>::Zero(filters_.size() + 1);
785 last_valid_matrix_ =
786 Eigen::Matrix<bool, Eigen::Dynamic, 1>::Zero(filters_.size() + 1);
Austin Schuhcde938c2020-02-02 17:30:07 -0800787
Austin Schuh2f8fd752020-09-01 22:38:28 -0700788 time_offset_matrix_ = Eigen::VectorXd::Zero(num_nodes);
789 time_slope_matrix_ = Eigen::VectorXd::Zero(num_nodes);
Austin Schuh8bd96322020-02-13 21:18:22 -0800790
Austin Schuh2f8fd752020-09-01 22:38:28 -0700791 // All times should average out to the distributed clock.
792 for (int i = 0; i < map_matrix_.cols(); ++i) {
793 // 1/num_nodes.
794 map_matrix_(0, i) = mpq_class(1, num_nodes);
795 }
796 valid_matrix_(0) = true;
Austin Schuh8bd96322020-02-13 21:18:22 -0800797
798 {
799 // Now, add the a - b -> sample elements.
800 size_t i = 1;
801 for (std::pair<const std::tuple<const Node *, const Node *>,
Austin Schuh2f8fd752020-09-01 22:38:28 -0700802 std::tuple<message_bridge::NoncausalOffsetEstimator>>
803 &filter : filters_) {
Austin Schuh8bd96322020-02-13 21:18:22 -0800804 const Node *const node_a = std::get<0>(filter.first);
805 const Node *const node_b = std::get<1>(filter.first);
806
807 const size_t node_a_index =
808 configuration::GetNodeIndex(configuration(), node_a);
809 const size_t node_b_index =
810 configuration::GetNodeIndex(configuration(), node_b);
811
Austin Schuh2f8fd752020-09-01 22:38:28 -0700812 // -a
813 map_matrix_(i, node_a_index) = mpq_class(-1);
814 // +b
815 map_matrix_(i, node_b_index) = mpq_class(1);
Austin Schuh8bd96322020-02-13 21:18:22 -0800816
817 // -> sample
Austin Schuh2f8fd752020-09-01 22:38:28 -0700818 std::get<0>(filter.second)
819 .set_slope_pointer(&slope_matrix_(i, node_a_index));
820 std::get<0>(filter.second).set_offset_pointer(&offset_matrix_(i, 0));
821
822 valid_matrix_(i) = false;
823 std::get<0>(filter.second).set_valid_pointer(&valid_matrix_(i));
Austin Schuh8bd96322020-02-13 21:18:22 -0800824
825 ++i;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800826 }
827 }
828
Austin Schuh858c9f32020-08-31 16:56:12 -0700829 for (std::unique_ptr<State> &state : states_) {
830 state->SeedSortedMessages();
831 }
832
Austin Schuh2f8fd752020-09-01 22:38:28 -0700833 // Rank of the map matrix tells you if all the nodes are in communication
834 // with each other, which tells you if the offsets are observable.
835 const size_t connected_nodes =
836 Eigen::FullPivLU<
837 Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>>(map_matrix_)
838 .rank();
839
840 // We don't need to support isolated nodes until someone has a real use
841 // case.
842 CHECK_EQ(connected_nodes, num_nodes)
843 << ": There is a node which isn't communicating with the rest.";
844
845 // And solve.
Austin Schuh8bd96322020-02-13 21:18:22 -0800846 UpdateOffsets();
847
Austin Schuh2f8fd752020-09-01 22:38:28 -0700848 // We want to start the log file at the last start time of the log files
849 // from all the nodes. Compute how long each node's simulation needs to run
850 // to move time to this point.
Austin Schuh8bd96322020-02-13 21:18:22 -0800851 distributed_clock::time_point start_time = distributed_clock::min_time;
Austin Schuhcde938c2020-02-02 17:30:07 -0800852
Austin Schuh2f8fd752020-09-01 22:38:28 -0700853 // TODO(austin): We want an "OnStart" callback for each node rather than
854 // running until the last node.
855
Austin Schuh8bd96322020-02-13 21:18:22 -0800856 for (std::unique_ptr<State> &state : states_) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700857 VLOG(1) << "Start time is " << state->monotonic_start_time() << " for node "
858 << MaybeNodeName(state->event_loop()->node()) << "now "
859 << state->monotonic_now();
860 // And start computing the start time on the distributed clock now that
861 // that works.
Austin Schuh858c9f32020-08-31 16:56:12 -0700862 start_time = std::max(
863 start_time, state->ToDistributedClock(state->monotonic_start_time()));
Austin Schuhcde938c2020-02-02 17:30:07 -0800864 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700865
866 CHECK_GE(start_time, distributed_clock::epoch())
867 << ": Hmm, we have a node starting before the start of time. Offset "
868 "everything.";
Austin Schuhcde938c2020-02-02 17:30:07 -0800869
Austin Schuh6f3babe2020-01-26 20:34:50 -0800870 // Forwarding is tracked per channel. If it is enabled, we want to turn it
871 // off. Otherwise messages replayed will get forwarded across to the other
Austin Schuh2f8fd752020-09-01 22:38:28 -0700872 // nodes, and also replayed on the other nodes. This may not satisfy all
873 // our users, but it'll start the discussion.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800874 if (configuration::MultiNode(event_loop_factory_->configuration())) {
875 for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
876 const Channel *channel = logged_configuration()->channels()->Get(i);
877 const Node *node = configuration::GetNode(
878 configuration(), channel->source_node()->string_view());
879
Austin Schuh8bd96322020-02-13 21:18:22 -0800880 State *state =
881 states_[configuration::GetNodeIndex(configuration(), node)].get();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800882
883 const Channel *remapped_channel =
Austin Schuh858c9f32020-08-31 16:56:12 -0700884 RemapChannel(state->event_loop(), channel);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800885
886 event_loop_factory_->DisableForwarding(remapped_channel);
887 }
Austin Schuh4c3b9702020-08-30 11:34:55 -0700888
889 // If we are replaying a log, we don't want a bunch of redundant messages
890 // from both the real message bridge and simulated message bridge.
891 event_loop_factory_->DisableStatistics();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800892 }
893
Austin Schuhcde938c2020-02-02 17:30:07 -0800894 // While we are starting the system up, we might be relying on matching data
895 // to timestamps on log files where the timestamp log file starts before the
896 // data. In this case, it is reasonable to expect missing data.
897 ignore_missing_data_ = true;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700898 VLOG(1) << "Running until " << start_time << " in Register";
Austin Schuh8bd96322020-02-13 21:18:22 -0800899 event_loop_factory_->RunFor(start_time.time_since_epoch());
Brian Silverman8a32ce62020-08-12 12:02:38 -0700900 VLOG(1) << "At start time";
Austin Schuhcde938c2020-02-02 17:30:07 -0800901 // Now that we are running for real, missing data means that the log file is
902 // corrupted or went wrong.
903 ignore_missing_data_ = false;
Austin Schuh92547522019-12-28 14:33:43 -0800904
Austin Schuh8bd96322020-02-13 21:18:22 -0800905 for (std::unique_ptr<State> &state : states_) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700906 // Make the RT clock be correct before handing it to the user.
907 if (state->realtime_start_time() != realtime_clock::min_time) {
908 state->SetRealtimeOffset(state->monotonic_start_time(),
909 state->realtime_start_time());
910 }
911 VLOG(1) << "Start time is " << state->monotonic_start_time() << " for node "
912 << MaybeNodeName(state->event_loop()->node()) << "now "
913 << state->monotonic_now();
914 }
915
916 if (FLAGS_timestamps_to_csv) {
917 for (std::pair<const std::tuple<const Node *, const Node *>,
918 std::tuple<message_bridge::NoncausalOffsetEstimator>>
919 &filter : filters_) {
920 const Node *const node_a = std::get<0>(filter.first);
921 const Node *const node_b = std::get<1>(filter.first);
922
923 std::get<0>(filter.second)
924 .SetFirstFwdTime(event_loop_factory_->GetNodeEventLoopFactory(node_a)
925 ->monotonic_now());
926 std::get<0>(filter.second)
927 .SetFirstRevTime(event_loop_factory_->GetNodeEventLoopFactory(node_b)
928 ->monotonic_now());
929 }
Austin Schuh8bd96322020-02-13 21:18:22 -0800930 }
931}
932
Austin Schuh2f8fd752020-09-01 22:38:28 -0700933void LogReader::UpdateOffsets() {
934 VLOG(2) << "Samples are " << offset_matrix_;
935 VLOG(2) << "Map is " << (map_matrix_ + slope_matrix_);
936 std::tie(time_slope_matrix_, time_offset_matrix_) = SolveOffsets();
937 Eigen::IOFormat HeavyFmt(Eigen::FullPrecision, 0, ", ", ";\n", "[", "]", "[",
938 "]");
939 VLOG(1) << "First slope " << time_slope_matrix_.transpose().format(HeavyFmt)
940 << " offset " << time_offset_matrix_.transpose().format(HeavyFmt);
941
942 size_t node_index = 0;
943 for (std::unique_ptr<State> &state : states_) {
944 state->SetDistributedOffset(offset(node_index), slope(node_index));
945 VLOG(1) << "Offset for node " << node_index << " "
946 << MaybeNodeName(state->event_loop()->node()) << "is "
947 << aos::distributed_clock::time_point(offset(node_index))
948 << " slope " << std::setprecision(9) << std::fixed
949 << slope(node_index);
950 ++node_index;
951 }
952
953 if (VLOG_IS_ON(1)) {
954 LogFit("Offset is");
955 }
956}
957
958void LogReader::LogFit(std::string_view prefix) {
959 for (std::unique_ptr<State> &state : states_) {
960 VLOG(1) << MaybeNodeName(state->event_loop()->node()) << " now "
961 << state->monotonic_now() << " distributed "
962 << event_loop_factory_->distributed_now();
963 }
964
965 for (std::pair<const std::tuple<const Node *, const Node *>,
966 std::tuple<message_bridge::NoncausalOffsetEstimator>> &filter :
967 filters_) {
968 message_bridge::NoncausalOffsetEstimator *estimator =
969 &std::get<0>(filter.second);
970
971 if (estimator->a_timestamps().size() == 0 &&
972 estimator->b_timestamps().size() == 0) {
973 continue;
974 }
975
976 if (VLOG_IS_ON(1)) {
977 estimator->LogFit(prefix);
978 }
979
980 const Node *const node_a = std::get<0>(filter.first);
981 const Node *const node_b = std::get<1>(filter.first);
982
983 const size_t node_a_index =
984 configuration::GetNodeIndex(configuration(), node_a);
985 const size_t node_b_index =
986 configuration::GetNodeIndex(configuration(), node_b);
987
988 const double recovered_slope =
989 slope(node_b_index) / slope(node_a_index) - 1.0;
990 const int64_t recovered_offset =
991 offset(node_b_index).count() - offset(node_a_index).count() *
992 slope(node_b_index) /
993 slope(node_a_index);
994
995 VLOG(1) << "Recovered slope " << std::setprecision(20) << recovered_slope
996 << " (error " << recovered_slope - estimator->fit().slope() << ") "
997 << " offset " << std::setprecision(20) << recovered_offset
998 << " (error "
999 << recovered_offset - estimator->fit().offset().count() << ")";
1000
1001 const aos::distributed_clock::time_point a0 =
1002 states_[node_a_index]->ToDistributedClock(
1003 std::get<0>(estimator->a_timestamps()[0]));
1004 const aos::distributed_clock::time_point a1 =
1005 states_[node_a_index]->ToDistributedClock(
1006 std::get<0>(estimator->a_timestamps()[1]));
1007
1008 VLOG(1) << node_a->name()->string_view() << " timestamps()[0] = "
1009 << std::get<0>(estimator->a_timestamps()[0]) << " -> " << a0
1010 << " distributed -> " << node_b->name()->string_view() << " "
1011 << states_[node_b_index]->FromDistributedClock(a0) << " should be "
1012 << aos::monotonic_clock::time_point(
1013 std::chrono::nanoseconds(static_cast<int64_t>(
1014 std::get<0>(estimator->a_timestamps()[0])
1015 .time_since_epoch()
1016 .count() *
1017 (1.0 + estimator->fit().slope()))) +
1018 estimator->fit().offset())
1019 << ((a0 <= event_loop_factory_->distributed_now())
1020 ? ""
1021 : " After now, investigate");
1022 VLOG(1) << node_a->name()->string_view() << " timestamps()[1] = "
1023 << std::get<0>(estimator->a_timestamps()[1]) << " -> " << a1
1024 << " distributed -> " << node_b->name()->string_view() << " "
1025 << states_[node_b_index]->FromDistributedClock(a1) << " should be "
1026 << aos::monotonic_clock::time_point(
1027 std::chrono::nanoseconds(static_cast<int64_t>(
1028 std::get<0>(estimator->a_timestamps()[1])
1029 .time_since_epoch()
1030 .count() *
1031 (1.0 + estimator->fit().slope()))) +
1032 estimator->fit().offset())
1033 << ((event_loop_factory_->distributed_now() <= a1)
1034 ? ""
1035 : " Before now, investigate");
1036
1037 const aos::distributed_clock::time_point b0 =
1038 states_[node_b_index]->ToDistributedClock(
1039 std::get<0>(estimator->b_timestamps()[0]));
1040 const aos::distributed_clock::time_point b1 =
1041 states_[node_b_index]->ToDistributedClock(
1042 std::get<0>(estimator->b_timestamps()[1]));
1043
1044 VLOG(1) << node_b->name()->string_view() << " timestamps()[0] = "
1045 << std::get<0>(estimator->b_timestamps()[0]) << " -> " << b0
1046 << " distributed -> " << node_a->name()->string_view() << " "
1047 << states_[node_a_index]->FromDistributedClock(b0)
1048 << ((b0 <= event_loop_factory_->distributed_now())
1049 ? ""
1050 : " After now, investigate");
1051 VLOG(1) << node_b->name()->string_view() << " timestamps()[1] = "
1052 << std::get<0>(estimator->b_timestamps()[1]) << " -> " << b1
1053 << " distributed -> " << node_a->name()->string_view() << " "
1054 << states_[node_a_index]->FromDistributedClock(b1)
1055 << ((event_loop_factory_->distributed_now() <= b1)
1056 ? ""
1057 : " Before now, investigate");
1058 }
1059}
1060
1061message_bridge::NoncausalOffsetEstimator *LogReader::GetFilter(
Austin Schuh8bd96322020-02-13 21:18:22 -08001062 const Node *node_a, const Node *node_b) {
1063 CHECK_NE(node_a, node_b);
1064 CHECK_EQ(configuration::GetNode(configuration(), node_a), node_a);
1065 CHECK_EQ(configuration::GetNode(configuration(), node_b), node_b);
1066
1067 if (node_a > node_b) {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001068 return GetFilter(node_b, node_a);
Austin Schuh8bd96322020-02-13 21:18:22 -08001069 }
1070
1071 auto tuple = std::make_tuple(node_a, node_b);
1072
1073 auto it = filters_.find(tuple);
1074
1075 if (it == filters_.end()) {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001076 auto &x =
1077 filters_
1078 .insert(std::make_pair(
1079 tuple, std::make_tuple(message_bridge::NoncausalOffsetEstimator(
1080 node_a, node_b))))
1081 .first->second;
Austin Schuh8bd96322020-02-13 21:18:22 -08001082 if (FLAGS_timestamps_to_csv) {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001083 std::get<0>(x).SetFwdCsvFileName(absl::StrCat(
1084 "/tmp/timestamp_noncausal_", node_a->name()->string_view(), "_",
1085 node_b->name()->string_view()));
1086 std::get<0>(x).SetRevCsvFileName(absl::StrCat(
1087 "/tmp/timestamp_noncausal_", node_b->name()->string_view(), "_",
1088 node_a->name()->string_view()));
Austin Schuh8bd96322020-02-13 21:18:22 -08001089 }
1090
Austin Schuh2f8fd752020-09-01 22:38:28 -07001091 return &std::get<0>(x);
Austin Schuh8bd96322020-02-13 21:18:22 -08001092 } else {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001093 return &std::get<0>(it->second);
Austin Schuh8bd96322020-02-13 21:18:22 -08001094 }
1095}
1096
Austin Schuh8bd96322020-02-13 21:18:22 -08001097
Austin Schuhe309d2a2019-11-29 13:25:21 -08001098void LogReader::Register(EventLoop *event_loop) {
Austin Schuh8bd96322020-02-13 21:18:22 -08001099 State *state =
1100 states_[configuration::GetNodeIndex(configuration(), event_loop->node())]
1101 .get();
Austin Schuh6f3babe2020-01-26 20:34:50 -08001102
Austin Schuh858c9f32020-08-31 16:56:12 -07001103 state->set_event_loop(event_loop);
Austin Schuhe309d2a2019-11-29 13:25:21 -08001104
Tyler Chatow67ddb032020-01-12 14:30:04 -08001105 // We don't run timing reports when trying to print out logged data, because
1106 // otherwise we would end up printing out the timing reports themselves...
1107 // This is only really relevant when we are replaying into a simulation.
Austin Schuh6f3babe2020-01-26 20:34:50 -08001108 event_loop->SkipTimingReport();
1109 event_loop->SkipAosLog();
Austin Schuh39788ff2019-12-01 18:22:57 -08001110
Austin Schuh858c9f32020-08-31 16:56:12 -07001111 const bool has_data = state->SetNode();
Austin Schuhe309d2a2019-11-29 13:25:21 -08001112
Austin Schuh858c9f32020-08-31 16:56:12 -07001113 state->SetChannelCount(logged_configuration()->channels()->size());
Austin Schuh8bd96322020-02-13 21:18:22 -08001114
Austin Schuh858c9f32020-08-31 16:56:12 -07001115 for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
Austin Schuh6f3babe2020-01-26 20:34:50 -08001116 const Channel *channel =
1117 RemapChannel(event_loop, logged_configuration()->channels()->Get(i));
Austin Schuh6331ef92020-01-07 18:28:09 -08001118
Austin Schuh858c9f32020-08-31 16:56:12 -07001119 NodeEventLoopFactory *channel_target_event_loop_factory = nullptr;
Austin Schuh2f8fd752020-09-01 22:38:28 -07001120 message_bridge::NoncausalOffsetEstimator *filter = nullptr;
Austin Schuh8bd96322020-02-13 21:18:22 -08001121
1122 if (!configuration::ChannelIsSendableOnNode(channel, event_loop->node()) &&
1123 configuration::ChannelIsReadableOnNode(channel, event_loop->node())) {
1124 const Node *target_node = configuration::GetNode(
1125 event_loop->configuration(), channel->source_node()->string_view());
Austin Schuh858c9f32020-08-31 16:56:12 -07001126 filter = GetFilter(event_loop->node(), target_node);
Austin Schuh8bd96322020-02-13 21:18:22 -08001127
1128 if (event_loop_factory_ != nullptr) {
Austin Schuh858c9f32020-08-31 16:56:12 -07001129 channel_target_event_loop_factory =
Austin Schuh8bd96322020-02-13 21:18:22 -08001130 event_loop_factory_->GetNodeEventLoopFactory(target_node);
1131 }
1132 }
Austin Schuh858c9f32020-08-31 16:56:12 -07001133
1134 state->SetChannel(i, event_loop->MakeRawSender(channel), filter,
1135 channel_target_event_loop_factory);
Austin Schuhe309d2a2019-11-29 13:25:21 -08001136 }
1137
Austin Schuh6aa77be2020-02-22 21:06:40 -08001138 // If we didn't find any log files with data in them, we won't ever get a
1139 // callback or be live. So skip the rest of the setup.
1140 if (!has_data) {
1141 return;
1142 }
1143
Austin Schuh858c9f32020-08-31 16:56:12 -07001144 state->set_timer_handler(event_loop->AddTimer([this, state]() {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001145 VLOG(1) << "Starting sending " << MaybeNodeName(state->event_loop()->node())
1146 << "at " << state->event_loop()->context().monotonic_event_time
1147 << " now " << state->monotonic_now();
Austin Schuh858c9f32020-08-31 16:56:12 -07001148 if (state->OldestMessageTime() == monotonic_clock::max_time) {
Austin Schuh6f3babe2020-01-26 20:34:50 -08001149 --live_nodes_;
Austin Schuh2f8fd752020-09-01 22:38:28 -07001150 VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Node down!";
Austin Schuh6f3babe2020-01-26 20:34:50 -08001151 if (live_nodes_ == 0) {
1152 event_loop_factory_->Exit();
1153 }
James Kuszmaul314f1672020-01-03 20:02:08 -08001154 return;
1155 }
Austin Schuh6f3babe2020-01-26 20:34:50 -08001156 TimestampMerger::DeliveryTimestamp channel_timestamp;
Austin Schuh05b70472020-01-01 17:11:17 -08001157 int channel_index;
1158 FlatbufferVector<MessageHeader> channel_data =
1159 FlatbufferVector<MessageHeader>::Empty();
1160
Austin Schuh2f8fd752020-09-01 22:38:28 -07001161 if (VLOG_IS_ON(1)) {
1162 LogFit("Offset was");
1163 }
1164
1165 bool update_time;
Austin Schuh05b70472020-01-01 17:11:17 -08001166 std::tie(channel_timestamp, channel_index, channel_data) =
Austin Schuh2f8fd752020-09-01 22:38:28 -07001167 state->PopOldest(&update_time);
Austin Schuh05b70472020-01-01 17:11:17 -08001168
Austin Schuhe309d2a2019-11-29 13:25:21 -08001169 const monotonic_clock::time_point monotonic_now =
Austin Schuh858c9f32020-08-31 16:56:12 -07001170 state->event_loop()->context().monotonic_event_time;
Austin Schuh2f8fd752020-09-01 22:38:28 -07001171 if (!FLAGS_skip_order_validation) {
1172 CHECK(monotonic_now == channel_timestamp.monotonic_event_time)
1173 << ": " << FlatbufferToJson(state->event_loop()->node()) << " Now "
1174 << monotonic_now << " trying to send "
1175 << channel_timestamp.monotonic_event_time << " failure "
1176 << state->DebugString();
1177 } else if (monotonic_now != channel_timestamp.monotonic_event_time) {
1178 LOG(WARNING) << "Check failed: monotonic_now == "
1179 "channel_timestamp.monotonic_event_time) ("
1180 << monotonic_now << " vs. "
1181 << channel_timestamp.monotonic_event_time
1182 << "): " << FlatbufferToJson(state->event_loop()->node())
1183 << " Now " << monotonic_now << " trying to send "
1184 << channel_timestamp.monotonic_event_time << " failure "
1185 << state->DebugString();
1186 }
Austin Schuhe309d2a2019-11-29 13:25:21 -08001187
Austin Schuh6f3babe2020-01-26 20:34:50 -08001188 if (channel_timestamp.monotonic_event_time >
Austin Schuh858c9f32020-08-31 16:56:12 -07001189 state->monotonic_start_time() ||
Austin Schuh15649d62019-12-28 16:36:38 -08001190 event_loop_factory_ != nullptr) {
Austin Schuh8bd96322020-02-13 21:18:22 -08001191 if ((!ignore_missing_data_ && !FLAGS_skip_missing_forwarding_entries &&
Austin Schuh858c9f32020-08-31 16:56:12 -07001192 !state->at_end()) ||
Austin Schuh05b70472020-01-01 17:11:17 -08001193 channel_data.message().data() != nullptr) {
1194 CHECK(channel_data.message().data() != nullptr)
1195 << ": Got a message without data. Forwarding entry which was "
Austin Schuh2f8fd752020-09-01 22:38:28 -07001196 "not matched? Use --skip_missing_forwarding_entries to "
1197 "ignore "
Austin Schuh15649d62019-12-28 16:36:38 -08001198 "this.";
Austin Schuh92547522019-12-28 14:33:43 -08001199
Austin Schuh2f8fd752020-09-01 22:38:28 -07001200 if (update_time) {
Austin Schuh8bd96322020-02-13 21:18:22 -08001201 // Confirm that the message was sent on the sending node before the
1202 // destination node (this node). As a proxy, do this by making sure
1203 // that time on the source node is past when the message was sent.
Austin Schuh2f8fd752020-09-01 22:38:28 -07001204 if (!FLAGS_skip_order_validation) {
1205 CHECK_LT(channel_timestamp.monotonic_remote_time,
1206 state->monotonic_remote_now(channel_index))
1207 << state->event_loop()->node()->name()->string_view() << " to "
1208 << state->remote_node(channel_index)->name()->string_view()
1209 << " " << state->DebugString();
1210 } else if (channel_timestamp.monotonic_remote_time >=
1211 state->monotonic_remote_now(channel_index)) {
1212 LOG(WARNING)
1213 << "Check failed: channel_timestamp.monotonic_remote_time < "
1214 "state->monotonic_remote_now(channel_index) ("
1215 << channel_timestamp.monotonic_remote_time << " vs. "
1216 << state->monotonic_remote_now(channel_index) << ") "
1217 << state->event_loop()->node()->name()->string_view() << " to "
1218 << state->remote_node(channel_index)->name()->string_view()
1219 << " currently " << channel_timestamp.monotonic_event_time
1220 << " ("
1221 << state->ToDistributedClock(
1222 channel_timestamp.monotonic_event_time)
1223 << ") remote event time "
1224 << channel_timestamp.monotonic_remote_time << " ("
1225 << state->RemoteToDistributedClock(
1226 channel_index, channel_timestamp.monotonic_remote_time)
1227 << ") " << state->DebugString();
1228 }
Austin Schuh8bd96322020-02-13 21:18:22 -08001229
1230 if (FLAGS_timestamps_to_csv) {
1231 if (offset_fp_ == nullptr) {
1232 offset_fp_ = fopen("/tmp/offsets.csv", "w");
1233 fprintf(
1234 offset_fp_,
1235 "# time_since_start, offset node 0, offset node 1, ...\n");
1236 first_time_ = channel_timestamp.realtime_event_time;
1237 }
1238
1239 fprintf(offset_fp_, "%.9f",
1240 std::chrono::duration_cast<std::chrono::duration<double>>(
1241 channel_timestamp.realtime_event_time - first_time_)
1242 .count());
Austin Schuh2f8fd752020-09-01 22:38:28 -07001243 for (int i = 1; i < time_offset_matrix_.rows(); ++i) {
1244 fprintf(offset_fp_, ", %.9f",
1245 time_offset_matrix_(i, 0) +
1246 time_slope_matrix_(i, 0) *
1247 chrono::duration<double>(
1248 event_loop_factory_->distributed_now()
1249 .time_since_epoch())
1250 .count());
Austin Schuh8bd96322020-02-13 21:18:22 -08001251 }
1252 fprintf(offset_fp_, "\n");
1253 }
Austin Schuh8bd96322020-02-13 21:18:22 -08001254 }
1255
Austin Schuh15649d62019-12-28 16:36:38 -08001256 // If we have access to the factory, use it to fix the realtime time.
Austin Schuh858c9f32020-08-31 16:56:12 -07001257 state->SetRealtimeOffset(channel_timestamp.monotonic_event_time,
1258 channel_timestamp.realtime_event_time);
Austin Schuh15649d62019-12-28 16:36:38 -08001259
Austin Schuh2f8fd752020-09-01 22:38:28 -07001260 VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Sending "
1261 << channel_timestamp.monotonic_event_time;
1262 // TODO(austin): std::move channel_data in and make that efficient in
1263 // simulation.
Austin Schuh858c9f32020-08-31 16:56:12 -07001264 state->Send(channel_index, channel_data.message().data()->Data(),
1265 channel_data.message().data()->size(),
1266 channel_timestamp.monotonic_remote_time,
1267 channel_timestamp.realtime_remote_time,
1268 channel_timestamp.remote_queue_index);
Austin Schuh2f8fd752020-09-01 22:38:28 -07001269 } else if (state->at_end() && !ignore_missing_data_) {
Austin Schuh8bd96322020-02-13 21:18:22 -08001270 // We are at the end of the log file and found missing data. Finish
Austin Schuh2f8fd752020-09-01 22:38:28 -07001271 // reading the rest of the log file and call it quits. We don't want
1272 // to replay partial data.
Austin Schuh858c9f32020-08-31 16:56:12 -07001273 while (state->OldestMessageTime() != monotonic_clock::max_time) {
1274 bool update_time_dummy;
1275 state->PopOldest(&update_time_dummy);
Austin Schuh8bd96322020-02-13 21:18:22 -08001276 }
Austin Schuh2f8fd752020-09-01 22:38:28 -07001277 } else {
1278 CHECK(channel_data.message().data() == nullptr) << ": Nullptr";
Austin Schuh92547522019-12-28 14:33:43 -08001279 }
Austin Schuhe309d2a2019-11-29 13:25:21 -08001280 } else {
Austin Schuh6f3babe2020-01-26 20:34:50 -08001281 LOG(WARNING)
1282 << "Not sending data from before the start of the log file. "
1283 << channel_timestamp.monotonic_event_time.time_since_epoch().count()
1284 << " start " << monotonic_start_time().time_since_epoch().count()
1285 << " " << FlatbufferToJson(channel_data);
Austin Schuhe309d2a2019-11-29 13:25:21 -08001286 }
1287
Austin Schuh858c9f32020-08-31 16:56:12 -07001288 const monotonic_clock::time_point next_time = state->OldestMessageTime();
Austin Schuh6f3babe2020-01-26 20:34:50 -08001289 if (next_time != monotonic_clock::max_time) {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001290 VLOG(1) << "Scheduling " << MaybeNodeName(state->event_loop()->node())
1291 << "wakeup for " << next_time << "("
1292 << state->ToDistributedClock(next_time)
1293 << " distributed), now is " << state->monotonic_now();
Austin Schuh858c9f32020-08-31 16:56:12 -07001294 state->Setup(next_time);
James Kuszmaul314f1672020-01-03 20:02:08 -08001295 } else {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001296 VLOG(1) << MaybeNodeName(state->event_loop()->node())
1297 << "No next message, scheduling shutdown";
1298 // Set a timer up immediately after now to die. If we don't do this,
1299 // then the senders waiting on the message we just read will never get
1300 // called.
Austin Schuheecb9282020-01-08 17:43:30 -08001301 if (event_loop_factory_ != nullptr) {
Austin Schuh858c9f32020-08-31 16:56:12 -07001302 state->Setup(monotonic_now + event_loop_factory_->send_delay() +
1303 std::chrono::nanoseconds(1));
Austin Schuheecb9282020-01-08 17:43:30 -08001304 }
Austin Schuhe309d2a2019-11-29 13:25:21 -08001305 }
Austin Schuh8bd96322020-02-13 21:18:22 -08001306
Austin Schuh2f8fd752020-09-01 22:38:28 -07001307 // Once we make this call, the current time changes. So do everything
1308 // which involves time before changing it. That especially includes
1309 // sending the message.
1310 if (update_time) {
1311 VLOG(1) << MaybeNodeName(state->event_loop()->node())
1312 << "updating offsets";
1313
1314 std::vector<aos::monotonic_clock::time_point> before_times;
1315 before_times.resize(states_.size());
1316 std::transform(states_.begin(), states_.end(), before_times.begin(),
1317 [](const std::unique_ptr<State> &state) {
1318 return state->monotonic_now();
1319 });
1320
1321 for (size_t i = 0; i < states_.size(); ++i) {
1322 VLOG(1) << MaybeNodeName(
1323 states_[i]->event_loop()->node())
1324 << "before " << states_[i]->monotonic_now();
1325 }
1326
Austin Schuh8bd96322020-02-13 21:18:22 -08001327 UpdateOffsets();
Austin Schuh2f8fd752020-09-01 22:38:28 -07001328 VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Now is now "
1329 << state->monotonic_now();
1330
1331 for (size_t i = 0; i < states_.size(); ++i) {
1332 VLOG(1) << MaybeNodeName(
1333 states_[i]->event_loop()->node())
1334 << "after " << states_[i]->monotonic_now();
1335 }
1336
1337 // TODO(austin): We should be perfect.
1338 const std::chrono::nanoseconds kTolerance{3};
1339 if (!FLAGS_skip_order_validation) {
1340 CHECK_GE(next_time, state->monotonic_now())
1341 << ": Time skipped the next event.";
1342
1343 for (size_t i = 0; i < states_.size(); ++i) {
1344 CHECK_GE(states_[i]->monotonic_now(), before_times[i] - kTolerance)
1345 << ": Time changed too much on node "
1346 << MaybeNodeName(states_[i]->event_loop()->node());
1347 CHECK_LE(states_[i]->monotonic_now(), before_times[i] + kTolerance)
1348 << ": Time changed too much on node "
1349 << states_[i]->event_loop()->node()->name()->string_view();
1350 }
1351 } else {
1352 if (next_time < state->monotonic_now()) {
1353 LOG(WARNING) << "Check failed: next_time >= "
1354 "state->monotonic_now() ("
1355 << next_time << " vs. " << state->monotonic_now()
1356 << "): Time skipped the next event.";
1357 }
1358 for (size_t i = 0; i < states_.size(); ++i) {
1359 if (states_[i]->monotonic_now() >= before_times[i] - kTolerance) {
1360 LOG(WARNING) << "Check failed: "
1361 "states_[i]->monotonic_now() "
1362 ">= before_times[i] - kTolerance ("
1363 << states_[i]->monotonic_now() << " vs. "
1364 << before_times[i] - kTolerance
1365 << ") : Time changed too much on node "
1366 << MaybeNodeName(states_[i]->event_loop()->node());
1367 }
1368 if (states_[i]->monotonic_now() <= before_times[i] + kTolerance) {
1369 LOG(WARNING) << "Check failed: "
1370 "states_[i]->monotonic_now() "
1371 "<= before_times[i] + kTolerance ("
1372 << states_[i]->monotonic_now() << " vs. "
1373 << before_times[i] - kTolerance
1374 << ") : Time changed too much on node "
1375 << MaybeNodeName(states_[i]->event_loop()->node());
1376 }
1377 }
1378 }
Austin Schuh8bd96322020-02-13 21:18:22 -08001379 }
Austin Schuh2f8fd752020-09-01 22:38:28 -07001380
1381 VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Done sending at "
1382 << state->event_loop()->context().monotonic_event_time << " now "
1383 << state->monotonic_now();
Austin Schuh858c9f32020-08-31 16:56:12 -07001384 }));
Austin Schuhe309d2a2019-11-29 13:25:21 -08001385
Austin Schuh6f3babe2020-01-26 20:34:50 -08001386 ++live_nodes_;
1387
Austin Schuh858c9f32020-08-31 16:56:12 -07001388 if (state->OldestMessageTime() != monotonic_clock::max_time) {
1389 event_loop->OnRun([state]() { state->Setup(state->OldestMessageTime()); });
Austin Schuhe309d2a2019-11-29 13:25:21 -08001390 }
1391}
1392
1393void LogReader::Deregister() {
James Kuszmaul84ff3e52020-01-03 19:48:53 -08001394 // Make sure that things get destroyed in the correct order, rather than
1395 // relying on getting the order correct in the class definition.
Austin Schuh8bd96322020-02-13 21:18:22 -08001396 for (std::unique_ptr<State> &state : states_) {
Austin Schuh858c9f32020-08-31 16:56:12 -07001397 state->Deregister();
Austin Schuhe309d2a2019-11-29 13:25:21 -08001398 }
Austin Schuh92547522019-12-28 14:33:43 -08001399
James Kuszmaul84ff3e52020-01-03 19:48:53 -08001400 event_loop_factory_unique_ptr_.reset();
1401 event_loop_factory_ = nullptr;
Austin Schuhe309d2a2019-11-29 13:25:21 -08001402}
1403
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001404void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
1405 std::string_view add_prefix) {
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001406 for (size_t ii = 0; ii < logged_configuration()->channels()->size(); ++ii) {
1407 const Channel *const channel = logged_configuration()->channels()->Get(ii);
1408 if (channel->name()->str() == name &&
1409 channel->type()->string_view() == type) {
1410 CHECK_EQ(0u, remapped_channels_.count(ii))
1411 << "Already remapped channel "
1412 << configuration::CleanedChannelToString(channel);
1413 remapped_channels_[ii] = std::string(add_prefix) + std::string(name);
1414 VLOG(1) << "Remapping channel "
1415 << configuration::CleanedChannelToString(channel)
1416 << " to have name " << remapped_channels_[ii];
Austin Schuh6331ef92020-01-07 18:28:09 -08001417 MakeRemappedConfig();
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001418 return;
1419 }
1420 }
1421 LOG(FATAL) << "Unabled to locate channel with name " << name << " and type "
1422 << type;
1423}
1424
1425void LogReader::MakeRemappedConfig() {
Austin Schuh8bd96322020-02-13 21:18:22 -08001426 for (std::unique_ptr<State> &state : states_) {
Austin Schuh6aa77be2020-02-22 21:06:40 -08001427 if (state) {
Austin Schuh858c9f32020-08-31 16:56:12 -07001428 CHECK(!state->event_loop())
Austin Schuh6aa77be2020-02-22 21:06:40 -08001429 << ": Can't change the mapping after the events are scheduled.";
1430 }
Austin Schuh6f3babe2020-01-26 20:34:50 -08001431 }
Austin Schuhac0771c2020-01-07 18:36:30 -08001432
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001433 // If no remapping occurred and we are using the original config, then there
1434 // is nothing interesting to do here.
1435 if (remapped_channels_.empty() && replay_configuration_ == nullptr) {
Austin Schuh6f3babe2020-01-26 20:34:50 -08001436 remapped_configuration_ = logged_configuration();
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001437 return;
1438 }
1439 // Config to copy Channel definitions from. Use the specified
1440 // replay_configuration_ if it has been provided.
1441 const Configuration *const base_config = replay_configuration_ == nullptr
1442 ? logged_configuration()
1443 : replay_configuration_;
1444 // The remapped config will be identical to the base_config, except that it
1445 // will have a bunch of extra channels in the channel list, which are exact
1446 // copies of the remapped channels, but with different names.
1447 // Because the flatbuffers API is a pain to work with, this requires a bit of
1448 // a song-and-dance to get copied over.
1449 // The order of operations is to:
1450 // 1) Make a flatbuffer builder for a config that will just contain a list of
1451 // the new channels that we want to add.
1452 // 2) For each channel that we are remapping:
1453 // a) Make a buffer/builder and construct into it a Channel table that only
1454 // contains the new name for the channel.
1455 // b) Merge the new channel with just the name into the channel that we are
1456 // trying to copy, built in the flatbuffer builder made in 1. This gives
1457 // us the new channel definition that we need.
1458 // 3) Using this list of offsets, build the Configuration of just new
1459 // Channels.
1460 // 4) Merge the Configuration with the new Channels into the base_config.
1461 // 5) Call MergeConfiguration() on that result to give MergeConfiguration a
1462 // chance to sanitize the config.
1463
1464 // This is the builder that we use for the config containing all the new
1465 // channels.
1466 flatbuffers::FlatBufferBuilder new_config_fbb;
Austin Schuhd7b15da2020-02-17 15:06:11 -08001467 new_config_fbb.ForceDefaults(true);
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001468 std::vector<flatbuffers::Offset<Channel>> channel_offsets;
1469 for (auto &pair : remapped_channels_) {
1470 // This is the builder that we use for creating the Channel with just the
1471 // new name.
1472 flatbuffers::FlatBufferBuilder new_name_fbb;
Austin Schuhd7b15da2020-02-17 15:06:11 -08001473 new_name_fbb.ForceDefaults(true);
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001474 const flatbuffers::Offset<flatbuffers::String> name_offset =
1475 new_name_fbb.CreateString(pair.second);
1476 ChannelBuilder new_name_builder(new_name_fbb);
1477 new_name_builder.add_name(name_offset);
1478 new_name_fbb.Finish(new_name_builder.Finish());
1479 const FlatbufferDetachedBuffer<Channel> new_name = new_name_fbb.Release();
Austin Schuh2f8fd752020-09-01 22:38:28 -07001480 // Retrieve the channel that we want to copy, confirming that it is
1481 // actually present in base_config.
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001482 const Channel *const base_channel = CHECK_NOTNULL(configuration::GetChannel(
1483 base_config, logged_configuration()->channels()->Get(pair.first), "",
1484 nullptr));
1485 // Actually create the new channel and put it into the vector of Offsets
1486 // that we will use to create the new Configuration.
1487 channel_offsets.emplace_back(MergeFlatBuffers<Channel>(
1488 reinterpret_cast<const flatbuffers::Table *>(base_channel),
1489 reinterpret_cast<const flatbuffers::Table *>(&new_name.message()),
1490 &new_config_fbb));
1491 }
1492 // Create the Configuration containing the new channels that we want to add.
Austin Schuhfa895892020-01-07 20:07:41 -08001493 const auto new_name_vector_offsets =
1494 new_config_fbb.CreateVector(channel_offsets);
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001495 ConfigurationBuilder new_config_builder(new_config_fbb);
1496 new_config_builder.add_channels(new_name_vector_offsets);
1497 new_config_fbb.Finish(new_config_builder.Finish());
1498 const FlatbufferDetachedBuffer<Configuration> new_name_config =
1499 new_config_fbb.Release();
1500 // Merge the new channels configuration into the base_config, giving us the
1501 // remapped configuration.
1502 remapped_configuration_buffer_ =
1503 std::make_unique<FlatbufferDetachedBuffer<Configuration>>(
1504 MergeFlatBuffers<Configuration>(base_config,
1505 &new_name_config.message()));
1506 // Call MergeConfiguration to deal with sanitizing the config.
1507 remapped_configuration_buffer_ =
1508 std::make_unique<FlatbufferDetachedBuffer<Configuration>>(
1509 configuration::MergeConfiguration(*remapped_configuration_buffer_));
1510
1511 remapped_configuration_ = &remapped_configuration_buffer_->message();
1512}
1513
Austin Schuh6f3babe2020-01-26 20:34:50 -08001514const Channel *LogReader::RemapChannel(const EventLoop *event_loop,
1515 const Channel *channel) {
1516 std::string_view channel_name = channel->name()->string_view();
1517 std::string_view channel_type = channel->type()->string_view();
1518 const int channel_index =
1519 configuration::ChannelIndex(logged_configuration(), channel);
1520 // If the channel is remapped, find the correct channel name to use.
1521 if (remapped_channels_.count(channel_index) > 0) {
Austin Schuhee711052020-08-24 16:06:09 -07001522 VLOG(3) << "Got remapped channel on "
Austin Schuh6f3babe2020-01-26 20:34:50 -08001523 << configuration::CleanedChannelToString(channel);
1524 channel_name = remapped_channels_[channel_index];
1525 }
1526
Austin Schuhee711052020-08-24 16:06:09 -07001527 VLOG(2) << "Going to remap channel " << channel_name << " " << channel_type;
Austin Schuh6f3babe2020-01-26 20:34:50 -08001528 const Channel *remapped_channel = configuration::GetChannel(
1529 event_loop->configuration(), channel_name, channel_type,
1530 event_loop->name(), event_loop->node());
1531
1532 CHECK(remapped_channel != nullptr)
1533 << ": Unable to send {\"name\": \"" << channel_name << "\", \"type\": \""
1534 << channel_type << "\"} because it is not in the provided configuration.";
1535
1536 return remapped_channel;
1537}
1538
Austin Schuh858c9f32020-08-31 16:56:12 -07001539LogReader::State::State(std::unique_ptr<ChannelMerger> channel_merger)
1540 : channel_merger_(std::move(channel_merger)) {}
1541
1542EventLoop *LogReader::State::SetNodeEventLoopFactory(
1543 NodeEventLoopFactory *node_event_loop_factory) {
1544 node_event_loop_factory_ = node_event_loop_factory;
1545 event_loop_unique_ptr_ =
1546 node_event_loop_factory_->MakeEventLoop("log_reader");
1547 return event_loop_unique_ptr_.get();
1548}
1549
1550void LogReader::State::SetChannelCount(size_t count) {
1551 channels_.resize(count);
1552 filters_.resize(count);
1553 channel_target_event_loop_factory_.resize(count);
1554}
1555
1556void LogReader::State::SetChannel(
1557 size_t channel, std::unique_ptr<RawSender> sender,
Austin Schuh2f8fd752020-09-01 22:38:28 -07001558 message_bridge::NoncausalOffsetEstimator *filter,
Austin Schuh858c9f32020-08-31 16:56:12 -07001559 NodeEventLoopFactory *channel_target_event_loop_factory) {
1560 channels_[channel] = std::move(sender);
1561 filters_[channel] = filter;
1562 channel_target_event_loop_factory_[channel] =
1563 channel_target_event_loop_factory;
1564}
1565
1566std::tuple<TimestampMerger::DeliveryTimestamp, int,
1567 FlatbufferVector<MessageHeader>>
1568LogReader::State::PopOldest(bool *update_time) {
1569 CHECK_GT(sorted_messages_.size(), 0u);
1570
1571 std::tuple<TimestampMerger::DeliveryTimestamp, int,
Austin Schuh2f8fd752020-09-01 22:38:28 -07001572 FlatbufferVector<MessageHeader>,
1573 message_bridge::NoncausalOffsetEstimator *>
Austin Schuh858c9f32020-08-31 16:56:12 -07001574 result = std::move(sorted_messages_.front());
Austin Schuh2f8fd752020-09-01 22:38:28 -07001575 VLOG(2) << MaybeNodeName(event_loop_->node()) << "PopOldest Popping "
Austin Schuh858c9f32020-08-31 16:56:12 -07001576 << std::get<0>(result).monotonic_event_time;
1577 sorted_messages_.pop_front();
1578 SeedSortedMessages();
1579
Austin Schuh2f8fd752020-09-01 22:38:28 -07001580 if (std::get<3>(result) != nullptr) {
1581 *update_time = std::get<3>(result)->Pop(
1582 event_loop_->node(), std::get<0>(result).monotonic_event_time);
1583 } else {
1584 *update_time = false;
1585 }
Austin Schuh858c9f32020-08-31 16:56:12 -07001586 return std::make_tuple(std::get<0>(result), std::get<1>(result),
1587 std::move(std::get<2>(result)));
1588}
1589
1590monotonic_clock::time_point LogReader::State::OldestMessageTime() const {
1591 if (sorted_messages_.size() > 0) {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001592 VLOG(2) << MaybeNodeName(event_loop_->node()) << "oldest message at "
Austin Schuh858c9f32020-08-31 16:56:12 -07001593 << std::get<0>(sorted_messages_.front()).monotonic_event_time;
1594 return std::get<0>(sorted_messages_.front()).monotonic_event_time;
1595 }
1596
1597 return channel_merger_->OldestMessageTime();
1598}
1599
1600void LogReader::State::SeedSortedMessages() {
1601 const aos::monotonic_clock::time_point end_queue_time =
1602 (sorted_messages_.size() > 0
1603 ? std::get<0>(sorted_messages_.front()).monotonic_event_time
1604 : channel_merger_->monotonic_start_time()) +
1605 std::chrono::seconds(2);
1606
1607 while (true) {
1608 if (channel_merger_->OldestMessageTime() == monotonic_clock::max_time) {
1609 return;
1610 }
1611 if (sorted_messages_.size() > 0) {
1612 // Stop placing sorted messages on the list once we have 2 seconds
1613 // queued up (but queue at least until the log starts.
1614 if (end_queue_time <
1615 std::get<0>(sorted_messages_.back()).monotonic_event_time) {
1616 return;
1617 }
1618 }
1619
1620 TimestampMerger::DeliveryTimestamp channel_timestamp;
1621 int channel_index;
1622 FlatbufferVector<MessageHeader> channel_data =
1623 FlatbufferVector<MessageHeader>::Empty();
1624
Austin Schuh2f8fd752020-09-01 22:38:28 -07001625 message_bridge::NoncausalOffsetEstimator *filter = nullptr;
1626
Austin Schuh858c9f32020-08-31 16:56:12 -07001627 std::tie(channel_timestamp, channel_index, channel_data) =
1628 channel_merger_->PopOldest();
1629
Austin Schuh2f8fd752020-09-01 22:38:28 -07001630 // Skip any messages without forwarding information.
1631 if (channel_timestamp.monotonic_remote_time != monotonic_clock::min_time) {
1632 // Got a forwarding timestamp!
1633 filter = filters_[channel_index];
1634
1635 CHECK(filter != nullptr);
1636
1637 // Call the correct method depending on if we are the forward or
1638 // reverse direction here.
1639 filter->Sample(event_loop_->node(),
1640 channel_timestamp.monotonic_event_time,
1641 channel_timestamp.monotonic_remote_time);
1642 }
Austin Schuh858c9f32020-08-31 16:56:12 -07001643 sorted_messages_.emplace_back(channel_timestamp, channel_index,
Austin Schuh2f8fd752020-09-01 22:38:28 -07001644 std::move(channel_data), filter);
Austin Schuh858c9f32020-08-31 16:56:12 -07001645 }
1646}
1647
1648void LogReader::State::Deregister() {
1649 for (size_t i = 0; i < channels_.size(); ++i) {
1650 channels_[i].reset();
1651 }
1652 event_loop_unique_ptr_.reset();
1653 event_loop_ = nullptr;
1654 timer_handler_ = nullptr;
1655 node_event_loop_factory_ = nullptr;
1656}
1657
Austin Schuhe309d2a2019-11-29 13:25:21 -08001658} // namespace logger
1659} // namespace aos