blob: 3e6c24b1d90c83b17c04d8088c42926bcb4201a3 [file] [log] [blame]
James Kuszmaul38735e82019-12-07 16:42:06 -08001#include "aos/events/logging/logger.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -08002
3#include <fcntl.h>
Austin Schuh4c4e0092019-12-22 16:18:03 -08004#include <limits.h>
Austin Schuhe309d2a2019-11-29 13:25:21 -08005#include <sys/stat.h>
6#include <sys/types.h>
7#include <sys/uio.h>
8#include <vector>
9
Austin Schuh8bd96322020-02-13 21:18:22 -080010#include "Eigen/Dense"
Austin Schuh2f8fd752020-09-01 22:38:28 -070011#include "absl/strings/escaping.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080012#include "absl/types/span.h"
13#include "aos/events/event_loop.h"
James Kuszmaul38735e82019-12-07 16:42:06 -080014#include "aos/events/logging/logger_generated.h"
Austin Schuh64fab802020-09-09 22:47:47 -070015#include "aos/events/logging/uuid.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080016#include "aos/flatbuffer_merge.h"
Austin Schuh288479d2019-12-18 19:47:52 -080017#include "aos/network/team_number.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080018#include "aos/time/time.h"
19#include "flatbuffers/flatbuffers.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -070020#include "third_party/gmp/gmpxx.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080021
Austin Schuh15649d62019-12-28 16:36:38 -080022DEFINE_bool(skip_missing_forwarding_entries, false,
23 "If true, drop any forwarding entries with missing data. If "
24 "false, CHECK.");
Austin Schuhe309d2a2019-11-29 13:25:21 -080025
Austin Schuh8bd96322020-02-13 21:18:22 -080026DEFINE_bool(timestamps_to_csv, false,
27 "If true, write all the time synchronization information to a set "
28 "of CSV files in /tmp/. This should only be needed when debugging "
29 "time synchronization.");
30
Austin Schuh2f8fd752020-09-01 22:38:28 -070031DEFINE_bool(skip_order_validation, false,
32 "If true, ignore any out of orderness in replay");
33
Austin Schuhe309d2a2019-11-29 13:25:21 -080034namespace aos {
35namespace logger {
Austin Schuhe309d2a2019-11-29 13:25:21 -080036namespace chrono = std::chrono;
37
Austin Schuh2f8fd752020-09-01 22:38:28 -070038
39Logger::Logger(std::string_view base_name, EventLoop *event_loop,
Austin Schuhe309d2a2019-11-29 13:25:21 -080040 std::chrono::milliseconds polling_period)
Austin Schuh0c297012020-09-16 18:41:59 -070041 : Logger(base_name, event_loop, event_loop->configuration(),
42 polling_period) {}
43Logger::Logger(std::string_view base_name, EventLoop *event_loop,
44 const Configuration *configuration,
45 std::chrono::milliseconds polling_period)
Austin Schuh2f8fd752020-09-01 22:38:28 -070046 : Logger(std::make_unique<LocalLogNamer>(base_name, event_loop->node()),
Austin Schuh0c297012020-09-16 18:41:59 -070047 event_loop, configuration, polling_period) {}
48Logger::Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
49 std::chrono::milliseconds polling_period)
50 : Logger(std::move(log_namer), event_loop, event_loop->configuration(),
51 polling_period) {}
Austin Schuh6f3babe2020-01-26 20:34:50 -080052
53Logger::Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
Austin Schuh0c297012020-09-16 18:41:59 -070054 const Configuration *configuration,
Austin Schuh6f3babe2020-01-26 20:34:50 -080055 std::chrono::milliseconds polling_period)
Austin Schuhe309d2a2019-11-29 13:25:21 -080056 : event_loop_(event_loop),
Austin Schuh64fab802020-09-09 22:47:47 -070057 uuid_(UUID::Random()),
Austin Schuh6f3babe2020-01-26 20:34:50 -080058 log_namer_(std::move(log_namer)),
Austin Schuh0c297012020-09-16 18:41:59 -070059 configuration_(configuration),
60 name_(network::GetHostname()),
Austin Schuhe309d2a2019-11-29 13:25:21 -080061 timer_handler_(event_loop_->AddTimer([this]() { DoLogData(); })),
Austin Schuh2f8fd752020-09-01 22:38:28 -070062 polling_period_(polling_period),
63 server_statistics_fetcher_(
64 configuration::MultiNode(event_loop_->configuration())
65 ? event_loop_->MakeFetcher<message_bridge::ServerStatistics>(
66 "/aos")
67 : aos::Fetcher<message_bridge::ServerStatistics>()) {
Austin Schuh6f3babe2020-01-26 20:34:50 -080068 VLOG(1) << "Starting logger for " << FlatbufferToJson(event_loop_->node());
69 int channel_index = 0;
Austin Schuh2f8fd752020-09-01 22:38:28 -070070
71 // Find all the nodes which are logging timestamps on our node.
72 std::set<const Node *> timestamp_logger_nodes;
Austin Schuh0c297012020-09-16 18:41:59 -070073 for (const Channel *channel : *configuration_->channels()) {
Austin Schuh2f8fd752020-09-01 22:38:28 -070074 if (!configuration::ChannelIsSendableOnNode(channel, event_loop_->node()) ||
75 !channel->has_destination_nodes()) {
76 continue;
77 }
78 for (const Connection *connection : *channel->destination_nodes()) {
79 const Node *other_node = configuration::GetNode(
Austin Schuh0c297012020-09-16 18:41:59 -070080 configuration_, connection->name()->string_view());
Austin Schuh2f8fd752020-09-01 22:38:28 -070081
82 if (configuration::ConnectionDeliveryTimeIsLoggedOnNode(
83 connection, event_loop_->node())) {
84 VLOG(1) << "Timestamps are logged from "
85 << FlatbufferToJson(other_node);
86 timestamp_logger_nodes.insert(other_node);
87 }
88 }
89 }
90
91 std::map<const Channel *, const Node *> timestamp_logger_channels;
92
93 // Now that we have all the nodes accumulated, make remote timestamp loggers
94 // for them.
95 for (const Node *node : timestamp_logger_nodes) {
96 const Channel *channel = configuration::GetChannel(
Austin Schuh0c297012020-09-16 18:41:59 -070097 configuration_,
Austin Schuh2f8fd752020-09-01 22:38:28 -070098 absl::StrCat("/aos/remote_timestamps/", node->name()->string_view()),
99 logger::MessageHeader::GetFullyQualifiedName(), event_loop_->name(),
100 event_loop_->node());
101
102 CHECK(channel != nullptr)
103 << ": Remote timestamps are logged on "
104 << event_loop_->node()->name()->string_view()
105 << " but can't find channel /aos/remote_timestamps/"
106 << node->name()->string_view();
107 timestamp_logger_channels.insert(std::make_pair(channel, node));
108 }
109
110 const size_t our_node_index = configuration::GetNodeIndex(
Austin Schuh0c297012020-09-16 18:41:59 -0700111 configuration_, event_loop_->node());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700112
Austin Schuh0c297012020-09-16 18:41:59 -0700113 for (const Channel *config_channel : *configuration_->channels()) {
114 // The MakeRawFetcher method needs a channel which is in the event loop
115 // configuration() object, not the configuration_ object. Go look that up
116 // from the config.
117 const Channel *channel = aos::configuration::GetChannel(
118 event_loop_->configuration(), config_channel->name()->string_view(),
119 config_channel->type()->string_view(), "", event_loop_->node());
120
Austin Schuhe309d2a2019-11-29 13:25:21 -0800121 FetcherStruct fs;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700122 fs.node_index = our_node_index;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800123 const bool is_local =
124 configuration::ChannelIsSendableOnNode(channel, event_loop_->node());
125
Austin Schuh15649d62019-12-28 16:36:38 -0800126 const bool is_readable =
127 configuration::ChannelIsReadableOnNode(channel, event_loop_->node());
128 const bool log_message = configuration::ChannelMessageIsLoggedOnNode(
129 channel, event_loop_->node()) &&
130 is_readable;
131
132 const bool log_delivery_times =
133 (event_loop_->node() == nullptr)
134 ? false
135 : configuration::ConnectionDeliveryTimeIsLoggedOnNode(
136 channel, event_loop_->node(), event_loop_->node());
137
Austin Schuh2f8fd752020-09-01 22:38:28 -0700138 // Now, detect a MessageHeader timestamp logger where we should just log the
139 // contents to a file directly.
140 const bool log_contents = timestamp_logger_channels.find(channel) !=
141 timestamp_logger_channels.end();
142 const Node *timestamp_node =
143 log_contents ? timestamp_logger_channels.find(channel)->second
144 : nullptr;
145
146 if (log_message || log_delivery_times || log_contents) {
Austin Schuh15649d62019-12-28 16:36:38 -0800147 fs.fetcher = event_loop->MakeRawFetcher(channel);
148 VLOG(1) << "Logging channel "
149 << configuration::CleanedChannelToString(channel);
150
151 if (log_delivery_times) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800152 VLOG(1) << " Delivery times";
153 fs.timestamp_writer = log_namer_->MakeTimestampWriter(channel);
Austin Schuh15649d62019-12-28 16:36:38 -0800154 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800155 if (log_message) {
156 VLOG(1) << " Data";
157 fs.writer = log_namer_->MakeWriter(channel);
158 if (!is_local) {
159 fs.log_type = LogType::kLogRemoteMessage;
160 }
161 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700162 if (log_contents) {
163 VLOG(1) << "Timestamp logger channel "
164 << configuration::CleanedChannelToString(channel);
165 fs.contents_writer =
166 log_namer_->MakeForwardedTimestampWriter(channel, timestamp_node);
Austin Schuh0c297012020-09-16 18:41:59 -0700167 fs.node_index =
168 configuration::GetNodeIndex(configuration_, timestamp_node);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700169 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800170 fs.channel_index = channel_index;
171 fs.written = false;
172 fetchers_.emplace_back(std::move(fs));
Austin Schuh15649d62019-12-28 16:36:38 -0800173 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800174 ++channel_index;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800175 }
176
Austin Schuh0c297012020-09-16 18:41:59 -0700177 node_state_.resize(configuration::MultiNode(configuration_)
178 ? configuration_->nodes()->size()
Austin Schuh2f8fd752020-09-01 22:38:28 -0700179 : 1u);
Austin Schuhe309d2a2019-11-29 13:25:21 -0800180
Austin Schuh2f8fd752020-09-01 22:38:28 -0700181 for (const Node *node : log_namer_->nodes()) {
182 const int node_index =
Austin Schuh0c297012020-09-16 18:41:59 -0700183 configuration::GetNodeIndex(configuration_, node);
Austin Schuhe309d2a2019-11-29 13:25:21 -0800184
Austin Schuh2f8fd752020-09-01 22:38:28 -0700185 node_state_[node_index].log_file_header = MakeHeader(node);
186 }
Austin Schuhe309d2a2019-11-29 13:25:21 -0800187
Austin Schuh2f8fd752020-09-01 22:38:28 -0700188 // When things start, we want to log the header, then the most recent
189 // messages available on each fetcher to capture the previous state, then
190 // start polling.
191 event_loop_->OnRun([this]() { StartLogging(); });
Austin Schuhe309d2a2019-11-29 13:25:21 -0800192}
193
Austin Schuh0c297012020-09-16 18:41:59 -0700194Logger::~Logger() {
195 // If we are replaying a log file, or in simulation, we want to force the last
196 // bit of data to be logged. The easiest way to deal with this is to poll
197 // everything as we go to destroy the class, ie, shut down the logger, and
198 // write it to disk.
199 DoLogData();
200}
201
Austin Schuh2f8fd752020-09-01 22:38:28 -0700202void Logger::StartLogging() {
203 // Grab data from each channel right before we declare the log file started
204 // so we can capture the latest message on each channel. This lets us have
205 // non periodic messages with configuration that now get logged.
206 for (FetcherStruct &f : fetchers_) {
207 f.written = !f.fetcher->Fetch();
208 }
209
210 // Clear out any old timestamps in case we are re-starting logging.
211 for (size_t i = 0; i < node_state_.size(); ++i) {
212 SetStartTime(i, monotonic_clock::min_time, realtime_clock::min_time);
213 }
214
215 WriteHeader();
216
217 LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node())
218 << " start_time " << last_synchronized_time_;
219
220 timer_handler_->Setup(event_loop_->monotonic_now() + polling_period_,
221 polling_period_);
222}
223
Austin Schuhfa895892020-01-07 20:07:41 -0800224void Logger::WriteHeader() {
Austin Schuh0c297012020-09-16 18:41:59 -0700225 if (configuration::MultiNode(configuration_)) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700226 server_statistics_fetcher_.Fetch();
227 }
228
229 aos::monotonic_clock::time_point monotonic_start_time =
230 event_loop_->monotonic_now();
231 aos::realtime_clock::time_point realtime_start_time =
232 event_loop_->realtime_now();
233
234 // We need to pick a point in time to declare the log file "started". This
235 // starts here. It needs to be after everything is fetched so that the
236 // fetchers are all pointed at the most recent message before the start
237 // time.
238 last_synchronized_time_ = monotonic_start_time;
239
Austin Schuh6f3babe2020-01-26 20:34:50 -0800240 for (const Node *node : log_namer_->nodes()) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700241 const int node_index =
Austin Schuh0c297012020-09-16 18:41:59 -0700242 configuration::GetNodeIndex(configuration_, node);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700243 MaybeUpdateTimestamp(node, node_index, monotonic_start_time,
244 realtime_start_time);
Austin Schuh64fab802020-09-09 22:47:47 -0700245 log_namer_->WriteHeader(&node_state_[node_index].log_file_header, node);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800246 }
247}
Austin Schuh8bd96322020-02-13 21:18:22 -0800248
Austin Schuh2f8fd752020-09-01 22:38:28 -0700249void Logger::WriteMissingTimestamps() {
Austin Schuh0c297012020-09-16 18:41:59 -0700250 if (configuration::MultiNode(configuration_)) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700251 server_statistics_fetcher_.Fetch();
252 } else {
253 return;
254 }
255
256 if (server_statistics_fetcher_.get() == nullptr) {
257 return;
258 }
259
260 for (const Node *node : log_namer_->nodes()) {
261 const int node_index =
Austin Schuh0c297012020-09-16 18:41:59 -0700262 configuration::GetNodeIndex(configuration_, node);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700263 if (MaybeUpdateTimestamp(
264 node, node_index,
265 server_statistics_fetcher_.context().monotonic_event_time,
266 server_statistics_fetcher_.context().realtime_event_time)) {
Austin Schuh64fab802020-09-09 22:47:47 -0700267 log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700268 }
269 }
270}
271
272void Logger::SetStartTime(size_t node_index,
273 aos::monotonic_clock::time_point monotonic_start_time,
274 aos::realtime_clock::time_point realtime_start_time) {
275 node_state_[node_index].monotonic_start_time = monotonic_start_time;
276 node_state_[node_index].realtime_start_time = realtime_start_time;
277 node_state_[node_index]
278 .log_file_header.mutable_message()
279 ->mutate_monotonic_start_time(
280 std::chrono::duration_cast<std::chrono::nanoseconds>(
281 monotonic_start_time.time_since_epoch())
282 .count());
283 if (node_state_[node_index]
284 .log_file_header.mutable_message()
285 ->has_realtime_start_time()) {
286 node_state_[node_index]
287 .log_file_header.mutable_message()
288 ->mutate_realtime_start_time(
289 std::chrono::duration_cast<std::chrono::nanoseconds>(
290 realtime_start_time.time_since_epoch())
291 .count());
292 }
293}
294
295bool Logger::MaybeUpdateTimestamp(
296 const Node *node, int node_index,
297 aos::monotonic_clock::time_point monotonic_start_time,
298 aos::realtime_clock::time_point realtime_start_time) {
Brian Silverman87ac0402020-09-17 14:47:01 -0700299 // Bail early if the start times are already set.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700300 if (node_state_[node_index].monotonic_start_time !=
301 monotonic_clock::min_time) {
302 return false;
303 }
Austin Schuh0c297012020-09-16 18:41:59 -0700304 if (configuration::MultiNode(configuration_)) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700305 if (event_loop_->node() == node) {
306 // There are no offsets to compute for ourself, so always succeed.
307 SetStartTime(node_index, monotonic_start_time, realtime_start_time);
308 return true;
309 } else if (server_statistics_fetcher_.get() != nullptr) {
310 // We must be a remote node now. Look for the connection and see if it is
311 // connected.
312
313 for (const message_bridge::ServerConnection *connection :
314 *server_statistics_fetcher_->connections()) {
315 if (connection->node()->name()->string_view() !=
316 node->name()->string_view()) {
317 continue;
318 }
319
320 if (connection->state() != message_bridge::State::CONNECTED) {
321 VLOG(1) << node->name()->string_view()
322 << " is not connected, can't start it yet.";
323 break;
324 }
325
326 if (!connection->has_monotonic_offset()) {
327 VLOG(1) << "Missing monotonic offset for setting start time for node "
328 << aos::FlatbufferToJson(node);
329 break;
330 }
331
332 VLOG(1) << "Updating start time for " << aos::FlatbufferToJson(node);
333
334 // Found it and it is connected. Compensate and go.
335 monotonic_start_time +=
336 std::chrono::nanoseconds(connection->monotonic_offset());
337
338 SetStartTime(node_index, monotonic_start_time, realtime_start_time);
339 return true;
340 }
341 }
342 } else {
343 SetStartTime(node_index, monotonic_start_time, realtime_start_time);
344 return true;
345 }
346 return false;
347}
348
349aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> Logger::MakeHeader(
350 const Node *node) {
Austin Schuhfa895892020-01-07 20:07:41 -0800351 // Now write the header with this timestamp in it.
352 flatbuffers::FlatBufferBuilder fbb;
Austin Schuhd7b15da2020-02-17 15:06:11 -0800353 fbb.ForceDefaults(true);
Austin Schuhfa895892020-01-07 20:07:41 -0800354
Austin Schuh2f8fd752020-09-01 22:38:28 -0700355 // TODO(austin): Compress this much more efficiently. There are a bunch of
356 // duplicated schemas.
Austin Schuhfa895892020-01-07 20:07:41 -0800357 flatbuffers::Offset<aos::Configuration> configuration_offset =
Austin Schuh0c297012020-09-16 18:41:59 -0700358 CopyFlatBuffer(configuration_, &fbb);
Austin Schuhfa895892020-01-07 20:07:41 -0800359
Austin Schuh64fab802020-09-09 22:47:47 -0700360 flatbuffers::Offset<flatbuffers::String> name_offset =
Austin Schuh0c297012020-09-16 18:41:59 -0700361 fbb.CreateString(name_);
Austin Schuhfa895892020-01-07 20:07:41 -0800362
Austin Schuh64fab802020-09-09 22:47:47 -0700363 flatbuffers::Offset<flatbuffers::String> logger_uuid_offset =
364 fbb.CreateString(uuid_.string_view());
365
366 flatbuffers::Offset<flatbuffers::String> parts_uuid_offset =
367 fbb.CreateString("00000000-0000-4000-8000-000000000000");
368
Austin Schuhfa895892020-01-07 20:07:41 -0800369 flatbuffers::Offset<Node> node_offset;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700370
Austin Schuh0c297012020-09-16 18:41:59 -0700371 if (configuration::MultiNode(configuration_)) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800372 node_offset = CopyFlatBuffer(node, &fbb);
Austin Schuhfa895892020-01-07 20:07:41 -0800373 }
374
375 aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
376
Austin Schuh64fab802020-09-09 22:47:47 -0700377 log_file_header_builder.add_name(name_offset);
Austin Schuhfa895892020-01-07 20:07:41 -0800378
379 // Only add the node if we are running in a multinode configuration.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800380 if (node != nullptr) {
Austin Schuhfa895892020-01-07 20:07:41 -0800381 log_file_header_builder.add_node(node_offset);
382 }
383
384 log_file_header_builder.add_configuration(configuration_offset);
385 // The worst case theoretical out of order is the polling period times 2.
386 // One message could get logged right after the boundary, but be for right
387 // before the next boundary. And the reverse could happen for another
388 // message. Report back 3x to be extra safe, and because the cost isn't
389 // huge on the read side.
390 log_file_header_builder.add_max_out_of_order_duration(
391 std::chrono::duration_cast<std::chrono::nanoseconds>(3 * polling_period_)
392 .count());
393
394 log_file_header_builder.add_monotonic_start_time(
395 std::chrono::duration_cast<std::chrono::nanoseconds>(
Austin Schuh2f8fd752020-09-01 22:38:28 -0700396 monotonic_clock::min_time.time_since_epoch())
Austin Schuhfa895892020-01-07 20:07:41 -0800397 .count());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700398 if (node == event_loop_->node()) {
399 log_file_header_builder.add_realtime_start_time(
400 std::chrono::duration_cast<std::chrono::nanoseconds>(
401 realtime_clock::min_time.time_since_epoch())
402 .count());
Austin Schuh6f3babe2020-01-26 20:34:50 -0800403 }
404
Austin Schuh64fab802020-09-09 22:47:47 -0700405 log_file_header_builder.add_logger_uuid(logger_uuid_offset);
406
407 log_file_header_builder.add_parts_uuid(parts_uuid_offset);
408 log_file_header_builder.add_parts_index(0);
409
Austin Schuh2f8fd752020-09-01 22:38:28 -0700410 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
411 return fbb.Release();
412}
413
414void Logger::Rotate() {
415 for (const Node *node : log_namer_->nodes()) {
416 const int node_index =
Austin Schuh0c297012020-09-16 18:41:59 -0700417 configuration::GetNodeIndex(configuration_, node);
Austin Schuh64fab802020-09-09 22:47:47 -0700418 log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700419 }
420}
421
422void Logger::LogUntil(monotonic_clock::time_point t) {
423 WriteMissingTimestamps();
424
425 // Write each channel to disk, one at a time.
426 for (FetcherStruct &f : fetchers_) {
427 while (true) {
428 if (f.written) {
429 if (!f.fetcher->FetchNext()) {
430 VLOG(2) << "No new data on "
431 << configuration::CleanedChannelToString(
432 f.fetcher->channel());
433 break;
434 } else {
435 f.written = false;
436 }
437 }
438
439 CHECK(!f.written);
440
441 // TODO(james): Write tests to exercise this logic.
442 if (f.fetcher->context().monotonic_event_time < t) {
443 if (f.writer != nullptr) {
444 // Write!
445 flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
446 max_header_size_);
447 fbb.ForceDefaults(true);
448
449 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
450 f.channel_index, f.log_type));
451
452 VLOG(2) << "Writing data as node "
453 << FlatbufferToJson(event_loop_->node()) << " for channel "
454 << configuration::CleanedChannelToString(f.fetcher->channel())
455 << " to " << f.writer->filename() << " data "
456 << FlatbufferToJson(
457 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
458 fbb.GetBufferPointer()));
459
460 max_header_size_ = std::max(
461 max_header_size_, fbb.GetSize() - f.fetcher->context().size);
462 f.writer->QueueSizedFlatbuffer(&fbb);
463 }
464
465 if (f.timestamp_writer != nullptr) {
466 // And now handle timestamps.
467 flatbuffers::FlatBufferBuilder fbb;
468 fbb.ForceDefaults(true);
469
470 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
471 f.channel_index,
472 LogType::kLogDeliveryTimeOnly));
473
474 VLOG(2) << "Writing timestamps as node "
475 << FlatbufferToJson(event_loop_->node()) << " for channel "
476 << configuration::CleanedChannelToString(f.fetcher->channel())
477 << " to " << f.timestamp_writer->filename() << " timestamp "
478 << FlatbufferToJson(
479 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
480 fbb.GetBufferPointer()));
481
482 f.timestamp_writer->QueueSizedFlatbuffer(&fbb);
483 }
484
485 if (f.contents_writer != nullptr) {
486 // And now handle the special message contents channel. Copy the
487 // message into a FlatBufferBuilder and save it to disk.
488 // TODO(austin): We can be more efficient here when we start to
489 // care...
490 flatbuffers::FlatBufferBuilder fbb;
491 fbb.ForceDefaults(true);
492
493 const MessageHeader *msg =
494 flatbuffers::GetRoot<MessageHeader>(f.fetcher->context().data);
495
496 logger::MessageHeader::Builder message_header_builder(fbb);
497
498 // Note: this must match the same order as MessageBridgeServer and
499 // PackMessage. We want identical headers to have identical
500 // on-the-wire formats to make comparing them easier.
501 message_header_builder.add_channel_index(msg->channel_index());
502
503 message_header_builder.add_queue_index(msg->queue_index());
504 message_header_builder.add_monotonic_sent_time(
505 msg->monotonic_sent_time());
506 message_header_builder.add_realtime_sent_time(
507 msg->realtime_sent_time());
508
509 message_header_builder.add_monotonic_remote_time(
510 msg->monotonic_remote_time());
511 message_header_builder.add_realtime_remote_time(
512 msg->realtime_remote_time());
513 message_header_builder.add_remote_queue_index(
514 msg->remote_queue_index());
515
516 fbb.FinishSizePrefixed(message_header_builder.Finish());
517
518 f.contents_writer->QueueSizedFlatbuffer(&fbb);
519 }
520
521 f.written = true;
522 } else {
523 break;
524 }
525 }
526 }
527 last_synchronized_time_ = t;
Austin Schuhfa895892020-01-07 20:07:41 -0800528}
529
Austin Schuhe309d2a2019-11-29 13:25:21 -0800530void Logger::DoLogData() {
531 // We want to guarentee that messages aren't out of order by more than
532 // max_out_of_order_duration. To do this, we need sync points. Every write
533 // cycle should be a sync point.
Austin Schuhfa895892020-01-07 20:07:41 -0800534 const monotonic_clock::time_point monotonic_now =
535 event_loop_->monotonic_now();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800536
537 do {
538 // Move the sync point up by at most polling_period. This forces one sync
539 // per iteration, even if it is small.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700540 LogUntil(
541 std::min(last_synchronized_time_ + polling_period_, monotonic_now));
Austin Schuhe309d2a2019-11-29 13:25:21 -0800542
Austin Schuhe309d2a2019-11-29 13:25:21 -0800543 // If we missed cycles, we could be pretty far behind. Spin until we are
544 // caught up.
545 } while (last_synchronized_time_ + polling_period_ < monotonic_now);
Austin Schuhe309d2a2019-11-29 13:25:21 -0800546}
547
Austin Schuh5212cad2020-09-09 23:12:09 -0700548std::vector<std::vector<std::string>> SortParts(
549 const std::vector<std::string> &parts) {
550 // Start by grouping all parts by UUID, and extracting the part index.
551 std::map<std::string, std::vector<std::pair<std::string, int>>> parts_list;
552
553 // Sort part files without UUIDs and part indexes as well. Extract everything
554 // useful from the log in the first pass, then sort later.
555 struct LogPart {
556 std::string filename;
557 monotonic_clock::time_point start_time;
558 monotonic_clock::time_point first_message_time;
559 };
560
561 std::vector<LogPart> old_parts;
562
563 for (const std::string &part : parts) {
564 FlatbufferVector<LogFileHeader> log_header = ReadHeader(part);
565
566 // Looks like an old log. No UUID, index, and also single node. We have
567 // little to no multi-node log files in the wild without part UUIDs and
568 // indexes which we care much about.
569 if (!log_header.message().has_parts_uuid() &&
570 !log_header.message().has_parts_index() &&
571 !log_header.message().has_node()) {
572 LogPart log_part;
573 log_part.filename = part;
574 log_part.start_time = monotonic_clock::time_point(
575 chrono::nanoseconds(log_header.message().monotonic_start_time()));
576 FlatbufferVector<MessageHeader> first_message = ReadNthMessage(part, 0);
577 log_part.first_message_time = monotonic_clock::time_point(
578 chrono::nanoseconds(first_message.message().monotonic_sent_time()));
579 old_parts.emplace_back(std::move(log_part));
580 continue;
581 }
582
583 CHECK(log_header.message().has_parts_uuid());
584 CHECK(log_header.message().has_parts_index());
585
586 const std::string parts_uuid = log_header.message().parts_uuid()->str();
587 auto it = parts_list.find(parts_uuid);
588 if (it == parts_list.end()) {
589 it = parts_list
590 .insert(std::make_pair(
591 parts_uuid, std::vector<std::pair<std::string, int>>{}))
592 .first;
593 }
594 it->second.emplace_back(
595 std::make_pair(part, log_header.message().parts_index()));
596 }
597
598 CHECK_NE(old_parts.empty(), parts_list.empty())
599 << ": Can't have a mix of old and new parts.";
600
601 if (!old_parts.empty()) {
602 // Confirm they all have the same start time. Old loggers always used the
603 // same start time.
604 for (const LogPart &p : old_parts) {
605 CHECK_EQ(old_parts[0].start_time, p.start_time);
606 }
607 // Sort by the oldest message in each file.
608 std::sort(old_parts.begin(), old_parts.end(),
609 [](const LogPart &a, const LogPart &b) {
610 return a.first_message_time < b.first_message_time;
611 });
612
613 // Produce the final form.
614 std::vector<std::string> sorted_old_parts;
615 sorted_old_parts.reserve(old_parts.size());
616 for (LogPart &p : old_parts) {
617 sorted_old_parts.emplace_back(std::move(p.filename));
618 }
619 return std::vector<std::vector<std::string>>{std::move(sorted_old_parts)};
620 }
621
622 // Now, sort them and produce the final vector form.
623 std::vector<std::vector<std::string>> result;
624 result.reserve(parts_list.size());
625 for (auto &part : parts_list) {
626 std::sort(part.second.begin(), part.second.end(),
627 [](const std::pair<std::string, int> &a,
628 const std::pair<std::string, int> &b) {
629 return a.second < b.second;
630 });
631 std::vector<std::string> result_line;
632 result_line.reserve(part.second.size());
633 for (std::pair<std::string, int> &p : part.second) {
634 result_line.emplace_back(std::move(p.first));
635 }
636 result.emplace_back(std::move(result_line));
637 }
638 return result;
639}
640
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800641LogReader::LogReader(std::string_view filename,
642 const Configuration *replay_configuration)
Austin Schuhfa895892020-01-07 20:07:41 -0800643 : LogReader(std::vector<std::string>{std::string(filename)},
644 replay_configuration) {}
645
646LogReader::LogReader(const std::vector<std::string> &filenames,
647 const Configuration *replay_configuration)
Austin Schuh6f3babe2020-01-26 20:34:50 -0800648 : LogReader(std::vector<std::vector<std::string>>{filenames},
649 replay_configuration) {}
650
651LogReader::LogReader(const std::vector<std::vector<std::string>> &filenames,
652 const Configuration *replay_configuration)
653 : filenames_(filenames),
654 log_file_header_(ReadHeader(filenames[0][0])),
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800655 replay_configuration_(replay_configuration) {
Austin Schuh6331ef92020-01-07 18:28:09 -0800656 MakeRemappedConfig();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800657
Austin Schuh6aa77be2020-02-22 21:06:40 -0800658 if (replay_configuration) {
659 CHECK_EQ(configuration::MultiNode(configuration()),
660 configuration::MultiNode(replay_configuration))
Austin Schuh2f8fd752020-09-01 22:38:28 -0700661 << ": Log file and replay config need to both be multi or single "
662 "node.";
Austin Schuh6aa77be2020-02-22 21:06:40 -0800663 }
664
Austin Schuh6f3babe2020-01-26 20:34:50 -0800665 if (!configuration::MultiNode(configuration())) {
Austin Schuh858c9f32020-08-31 16:56:12 -0700666 states_.emplace_back(
667 std::make_unique<State>(std::make_unique<ChannelMerger>(filenames)));
Austin Schuh8bd96322020-02-13 21:18:22 -0800668 } else {
Austin Schuh6aa77be2020-02-22 21:06:40 -0800669 if (replay_configuration) {
James Kuszmaul46d82582020-05-09 19:50:09 -0700670 CHECK_EQ(logged_configuration()->nodes()->size(),
Austin Schuh6aa77be2020-02-22 21:06:40 -0800671 replay_configuration->nodes()->size())
Austin Schuh2f8fd752020-09-01 22:38:28 -0700672 << ": Log file and replay config need to have matching nodes "
673 "lists.";
James Kuszmaul46d82582020-05-09 19:50:09 -0700674 for (const Node *node : *logged_configuration()->nodes()) {
675 if (configuration::GetNode(replay_configuration, node) == nullptr) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700676 LOG(FATAL) << "Found node " << FlatbufferToJson(node)
677 << " in logged config that is not present in the replay "
678 "config.";
James Kuszmaul46d82582020-05-09 19:50:09 -0700679 }
680 }
Austin Schuh6aa77be2020-02-22 21:06:40 -0800681 }
Austin Schuh8bd96322020-02-13 21:18:22 -0800682 states_.resize(configuration()->nodes()->size());
Austin Schuh6f3babe2020-01-26 20:34:50 -0800683 }
Austin Schuhe309d2a2019-11-29 13:25:21 -0800684}
685
Austin Schuh6aa77be2020-02-22 21:06:40 -0800686LogReader::~LogReader() {
Austin Schuh39580f12020-08-01 14:44:08 -0700687 if (event_loop_factory_unique_ptr_) {
688 Deregister();
689 } else if (event_loop_factory_ != nullptr) {
690 LOG(FATAL) << "Must call Deregister before the SimulatedEventLoopFactory "
691 "is destroyed";
692 }
Austin Schuh8bd96322020-02-13 21:18:22 -0800693 if (offset_fp_ != nullptr) {
694 fclose(offset_fp_);
695 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700696 // Zero out some buffers. It's easy to do use-after-frees on these, so make
697 // it more obvious.
Austin Schuh39580f12020-08-01 14:44:08 -0700698 if (remapped_configuration_buffer_) {
699 remapped_configuration_buffer_->Wipe();
700 }
701 log_file_header_.Wipe();
Austin Schuh8bd96322020-02-13 21:18:22 -0800702}
Austin Schuhe309d2a2019-11-29 13:25:21 -0800703
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800704const Configuration *LogReader::logged_configuration() const {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800705 return log_file_header_.message().configuration();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800706}
707
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800708const Configuration *LogReader::configuration() const {
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800709 return remapped_configuration_;
710}
711
Austin Schuh6f3babe2020-01-26 20:34:50 -0800712std::vector<const Node *> LogReader::Nodes() const {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700713 // Because the Node pointer will only be valid if it actually points to
714 // memory owned by remapped_configuration_, we need to wait for the
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800715 // remapped_configuration_ to be populated before accessing it.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800716 //
717 // Also, note, that when ever a map is changed, the nodes in here are
718 // invalidated.
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800719 CHECK(remapped_configuration_ != nullptr)
720 << ": Need to call Register before the node() pointer will be valid.";
Austin Schuh6f3babe2020-01-26 20:34:50 -0800721 return configuration::GetNodes(remapped_configuration_);
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800722}
Austin Schuh15649d62019-12-28 16:36:38 -0800723
Austin Schuh6f3babe2020-01-26 20:34:50 -0800724monotonic_clock::time_point LogReader::monotonic_start_time(const Node *node) {
Austin Schuh8bd96322020-02-13 21:18:22 -0800725 State *state =
726 states_[configuration::GetNodeIndex(configuration(), node)].get();
727 CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
728
Austin Schuh858c9f32020-08-31 16:56:12 -0700729 return state->monotonic_start_time();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800730}
731
Austin Schuh6f3babe2020-01-26 20:34:50 -0800732realtime_clock::time_point LogReader::realtime_start_time(const Node *node) {
Austin Schuh8bd96322020-02-13 21:18:22 -0800733 State *state =
734 states_[configuration::GetNodeIndex(configuration(), node)].get();
735 CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
736
Austin Schuh858c9f32020-08-31 16:56:12 -0700737 return state->realtime_start_time();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800738}
739
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800740void LogReader::Register() {
741 event_loop_factory_unique_ptr_ =
Austin Schuhac0771c2020-01-07 18:36:30 -0800742 std::make_unique<SimulatedEventLoopFactory>(configuration());
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800743 Register(event_loop_factory_unique_ptr_.get());
744}
745
Austin Schuh92547522019-12-28 14:33:43 -0800746void LogReader::Register(SimulatedEventLoopFactory *event_loop_factory) {
Austin Schuh92547522019-12-28 14:33:43 -0800747 event_loop_factory_ = event_loop_factory;
Austin Schuh92547522019-12-28 14:33:43 -0800748
Austin Schuh6f3babe2020-01-26 20:34:50 -0800749 for (const Node *node : configuration::GetNodes(configuration())) {
Austin Schuh8bd96322020-02-13 21:18:22 -0800750 const size_t node_index =
751 configuration::GetNodeIndex(configuration(), node);
Austin Schuh858c9f32020-08-31 16:56:12 -0700752 states_[node_index] =
753 std::make_unique<State>(std::make_unique<ChannelMerger>(filenames_));
Austin Schuh8bd96322020-02-13 21:18:22 -0800754 State *state = states_[node_index].get();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800755
Austin Schuh858c9f32020-08-31 16:56:12 -0700756 Register(state->SetNodeEventLoopFactory(
757 event_loop_factory_->GetNodeEventLoopFactory(node)));
Austin Schuhcde938c2020-02-02 17:30:07 -0800758 }
James Kuszmaul46d82582020-05-09 19:50:09 -0700759 if (live_nodes_ == 0) {
760 LOG(FATAL)
761 << "Don't have logs from any of the nodes in the replay config--are "
762 "you sure that the replay config matches the original config?";
763 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800764
Austin Schuh2f8fd752020-09-01 22:38:28 -0700765 // We need to now seed our per-node time offsets and get everything set up
766 // to run.
767 const size_t num_nodes = nodes_count();
Austin Schuhcde938c2020-02-02 17:30:07 -0800768
Austin Schuh8bd96322020-02-13 21:18:22 -0800769 // It is easiest to solve for per node offsets with a matrix rather than
770 // trying to solve the equations by hand. So let's get after it.
771 //
772 // Now, build up the map matrix.
773 //
Austin Schuh2f8fd752020-09-01 22:38:28 -0700774 // offset_matrix_ = (map_matrix_ + slope_matrix_) * [ta; tb; tc]
775 map_matrix_ = Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>::Zero(
776 filters_.size() + 1, num_nodes);
777 slope_matrix_ =
778 Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>::Zero(
779 filters_.size() + 1, num_nodes);
Austin Schuhcde938c2020-02-02 17:30:07 -0800780
Austin Schuh2f8fd752020-09-01 22:38:28 -0700781 offset_matrix_ =
782 Eigen::Matrix<mpq_class, Eigen::Dynamic, 1>::Zero(filters_.size() + 1);
783 valid_matrix_ =
784 Eigen::Matrix<bool, Eigen::Dynamic, 1>::Zero(filters_.size() + 1);
785 last_valid_matrix_ =
786 Eigen::Matrix<bool, Eigen::Dynamic, 1>::Zero(filters_.size() + 1);
Austin Schuhcde938c2020-02-02 17:30:07 -0800787
Austin Schuh2f8fd752020-09-01 22:38:28 -0700788 time_offset_matrix_ = Eigen::VectorXd::Zero(num_nodes);
789 time_slope_matrix_ = Eigen::VectorXd::Zero(num_nodes);
Austin Schuh8bd96322020-02-13 21:18:22 -0800790
Austin Schuh2f8fd752020-09-01 22:38:28 -0700791 // All times should average out to the distributed clock.
792 for (int i = 0; i < map_matrix_.cols(); ++i) {
793 // 1/num_nodes.
794 map_matrix_(0, i) = mpq_class(1, num_nodes);
795 }
796 valid_matrix_(0) = true;
Austin Schuh8bd96322020-02-13 21:18:22 -0800797
798 {
799 // Now, add the a - b -> sample elements.
800 size_t i = 1;
801 for (std::pair<const std::tuple<const Node *, const Node *>,
Austin Schuh2f8fd752020-09-01 22:38:28 -0700802 std::tuple<message_bridge::NoncausalOffsetEstimator>>
803 &filter : filters_) {
Austin Schuh8bd96322020-02-13 21:18:22 -0800804 const Node *const node_a = std::get<0>(filter.first);
805 const Node *const node_b = std::get<1>(filter.first);
806
807 const size_t node_a_index =
808 configuration::GetNodeIndex(configuration(), node_a);
809 const size_t node_b_index =
810 configuration::GetNodeIndex(configuration(), node_b);
811
Austin Schuh2f8fd752020-09-01 22:38:28 -0700812 // -a
813 map_matrix_(i, node_a_index) = mpq_class(-1);
814 // +b
815 map_matrix_(i, node_b_index) = mpq_class(1);
Austin Schuh8bd96322020-02-13 21:18:22 -0800816
817 // -> sample
Austin Schuh2f8fd752020-09-01 22:38:28 -0700818 std::get<0>(filter.second)
819 .set_slope_pointer(&slope_matrix_(i, node_a_index));
820 std::get<0>(filter.second).set_offset_pointer(&offset_matrix_(i, 0));
821
822 valid_matrix_(i) = false;
823 std::get<0>(filter.second).set_valid_pointer(&valid_matrix_(i));
Austin Schuh8bd96322020-02-13 21:18:22 -0800824
825 ++i;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800826 }
827 }
828
Austin Schuh858c9f32020-08-31 16:56:12 -0700829 for (std::unique_ptr<State> &state : states_) {
830 state->SeedSortedMessages();
831 }
832
Austin Schuh2f8fd752020-09-01 22:38:28 -0700833 // Rank of the map matrix tells you if all the nodes are in communication
834 // with each other, which tells you if the offsets are observable.
835 const size_t connected_nodes =
836 Eigen::FullPivLU<
837 Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>>(map_matrix_)
838 .rank();
839
840 // We don't need to support isolated nodes until someone has a real use
841 // case.
842 CHECK_EQ(connected_nodes, num_nodes)
843 << ": There is a node which isn't communicating with the rest.";
844
845 // And solve.
Austin Schuh8bd96322020-02-13 21:18:22 -0800846 UpdateOffsets();
847
Austin Schuh2f8fd752020-09-01 22:38:28 -0700848 // We want to start the log file at the last start time of the log files
849 // from all the nodes. Compute how long each node's simulation needs to run
850 // to move time to this point.
Austin Schuh8bd96322020-02-13 21:18:22 -0800851 distributed_clock::time_point start_time = distributed_clock::min_time;
Austin Schuhcde938c2020-02-02 17:30:07 -0800852
Austin Schuh2f8fd752020-09-01 22:38:28 -0700853 // TODO(austin): We want an "OnStart" callback for each node rather than
854 // running until the last node.
855
Austin Schuh8bd96322020-02-13 21:18:22 -0800856 for (std::unique_ptr<State> &state : states_) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700857 VLOG(1) << "Start time is " << state->monotonic_start_time() << " for node "
858 << MaybeNodeName(state->event_loop()->node()) << "now "
859 << state->monotonic_now();
860 // And start computing the start time on the distributed clock now that
861 // that works.
Austin Schuh858c9f32020-08-31 16:56:12 -0700862 start_time = std::max(
863 start_time, state->ToDistributedClock(state->monotonic_start_time()));
Austin Schuhcde938c2020-02-02 17:30:07 -0800864 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700865
866 CHECK_GE(start_time, distributed_clock::epoch())
867 << ": Hmm, we have a node starting before the start of time. Offset "
868 "everything.";
Austin Schuhcde938c2020-02-02 17:30:07 -0800869
Austin Schuh6f3babe2020-01-26 20:34:50 -0800870 // Forwarding is tracked per channel. If it is enabled, we want to turn it
871 // off. Otherwise messages replayed will get forwarded across to the other
Austin Schuh2f8fd752020-09-01 22:38:28 -0700872 // nodes, and also replayed on the other nodes. This may not satisfy all
873 // our users, but it'll start the discussion.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800874 if (configuration::MultiNode(event_loop_factory_->configuration())) {
875 for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
876 const Channel *channel = logged_configuration()->channels()->Get(i);
877 const Node *node = configuration::GetNode(
878 configuration(), channel->source_node()->string_view());
879
Austin Schuh8bd96322020-02-13 21:18:22 -0800880 State *state =
881 states_[configuration::GetNodeIndex(configuration(), node)].get();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800882
883 const Channel *remapped_channel =
Austin Schuh858c9f32020-08-31 16:56:12 -0700884 RemapChannel(state->event_loop(), channel);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800885
886 event_loop_factory_->DisableForwarding(remapped_channel);
887 }
Austin Schuh4c3b9702020-08-30 11:34:55 -0700888
889 // If we are replaying a log, we don't want a bunch of redundant messages
890 // from both the real message bridge and simulated message bridge.
891 event_loop_factory_->DisableStatistics();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800892 }
893
Austin Schuhcde938c2020-02-02 17:30:07 -0800894 // While we are starting the system up, we might be relying on matching data
895 // to timestamps on log files where the timestamp log file starts before the
896 // data. In this case, it is reasonable to expect missing data.
897 ignore_missing_data_ = true;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700898 VLOG(1) << "Running until " << start_time << " in Register";
Austin Schuh8bd96322020-02-13 21:18:22 -0800899 event_loop_factory_->RunFor(start_time.time_since_epoch());
Brian Silverman8a32ce62020-08-12 12:02:38 -0700900 VLOG(1) << "At start time";
Austin Schuhcde938c2020-02-02 17:30:07 -0800901 // Now that we are running for real, missing data means that the log file is
902 // corrupted or went wrong.
903 ignore_missing_data_ = false;
Austin Schuh92547522019-12-28 14:33:43 -0800904
Austin Schuh8bd96322020-02-13 21:18:22 -0800905 for (std::unique_ptr<State> &state : states_) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700906 // Make the RT clock be correct before handing it to the user.
907 if (state->realtime_start_time() != realtime_clock::min_time) {
908 state->SetRealtimeOffset(state->monotonic_start_time(),
909 state->realtime_start_time());
910 }
911 VLOG(1) << "Start time is " << state->monotonic_start_time() << " for node "
912 << MaybeNodeName(state->event_loop()->node()) << "now "
913 << state->monotonic_now();
914 }
915
916 if (FLAGS_timestamps_to_csv) {
917 for (std::pair<const std::tuple<const Node *, const Node *>,
918 std::tuple<message_bridge::NoncausalOffsetEstimator>>
919 &filter : filters_) {
920 const Node *const node_a = std::get<0>(filter.first);
921 const Node *const node_b = std::get<1>(filter.first);
922
923 std::get<0>(filter.second)
924 .SetFirstFwdTime(event_loop_factory_->GetNodeEventLoopFactory(node_a)
925 ->monotonic_now());
926 std::get<0>(filter.second)
927 .SetFirstRevTime(event_loop_factory_->GetNodeEventLoopFactory(node_b)
928 ->monotonic_now());
929 }
Austin Schuh8bd96322020-02-13 21:18:22 -0800930 }
931}
932
Austin Schuh2f8fd752020-09-01 22:38:28 -0700933void LogReader::UpdateOffsets() {
934 VLOG(2) << "Samples are " << offset_matrix_;
935 VLOG(2) << "Map is " << (map_matrix_ + slope_matrix_);
936 std::tie(time_slope_matrix_, time_offset_matrix_) = SolveOffsets();
937 Eigen::IOFormat HeavyFmt(Eigen::FullPrecision, 0, ", ", ";\n", "[", "]", "[",
938 "]");
939 VLOG(1) << "First slope " << time_slope_matrix_.transpose().format(HeavyFmt)
940 << " offset " << time_offset_matrix_.transpose().format(HeavyFmt);
941
942 size_t node_index = 0;
943 for (std::unique_ptr<State> &state : states_) {
944 state->SetDistributedOffset(offset(node_index), slope(node_index));
945 VLOG(1) << "Offset for node " << node_index << " "
946 << MaybeNodeName(state->event_loop()->node()) << "is "
947 << aos::distributed_clock::time_point(offset(node_index))
948 << " slope " << std::setprecision(9) << std::fixed
949 << slope(node_index);
950 ++node_index;
951 }
952
953 if (VLOG_IS_ON(1)) {
954 LogFit("Offset is");
955 }
956}
957
958void LogReader::LogFit(std::string_view prefix) {
959 for (std::unique_ptr<State> &state : states_) {
960 VLOG(1) << MaybeNodeName(state->event_loop()->node()) << " now "
961 << state->monotonic_now() << " distributed "
962 << event_loop_factory_->distributed_now();
963 }
964
965 for (std::pair<const std::tuple<const Node *, const Node *>,
966 std::tuple<message_bridge::NoncausalOffsetEstimator>> &filter :
967 filters_) {
968 message_bridge::NoncausalOffsetEstimator *estimator =
969 &std::get<0>(filter.second);
970
971 if (estimator->a_timestamps().size() == 0 &&
972 estimator->b_timestamps().size() == 0) {
973 continue;
974 }
975
976 if (VLOG_IS_ON(1)) {
977 estimator->LogFit(prefix);
978 }
979
980 const Node *const node_a = std::get<0>(filter.first);
981 const Node *const node_b = std::get<1>(filter.first);
982
983 const size_t node_a_index =
984 configuration::GetNodeIndex(configuration(), node_a);
985 const size_t node_b_index =
986 configuration::GetNodeIndex(configuration(), node_b);
987
988 const double recovered_slope =
989 slope(node_b_index) / slope(node_a_index) - 1.0;
990 const int64_t recovered_offset =
991 offset(node_b_index).count() - offset(node_a_index).count() *
992 slope(node_b_index) /
993 slope(node_a_index);
994
995 VLOG(1) << "Recovered slope " << std::setprecision(20) << recovered_slope
996 << " (error " << recovered_slope - estimator->fit().slope() << ") "
997 << " offset " << std::setprecision(20) << recovered_offset
998 << " (error "
999 << recovered_offset - estimator->fit().offset().count() << ")";
1000
1001 const aos::distributed_clock::time_point a0 =
1002 states_[node_a_index]->ToDistributedClock(
1003 std::get<0>(estimator->a_timestamps()[0]));
1004 const aos::distributed_clock::time_point a1 =
1005 states_[node_a_index]->ToDistributedClock(
1006 std::get<0>(estimator->a_timestamps()[1]));
1007
1008 VLOG(1) << node_a->name()->string_view() << " timestamps()[0] = "
1009 << std::get<0>(estimator->a_timestamps()[0]) << " -> " << a0
1010 << " distributed -> " << node_b->name()->string_view() << " "
1011 << states_[node_b_index]->FromDistributedClock(a0) << " should be "
1012 << aos::monotonic_clock::time_point(
1013 std::chrono::nanoseconds(static_cast<int64_t>(
1014 std::get<0>(estimator->a_timestamps()[0])
1015 .time_since_epoch()
1016 .count() *
1017 (1.0 + estimator->fit().slope()))) +
1018 estimator->fit().offset())
1019 << ((a0 <= event_loop_factory_->distributed_now())
1020 ? ""
1021 : " After now, investigate");
1022 VLOG(1) << node_a->name()->string_view() << " timestamps()[1] = "
1023 << std::get<0>(estimator->a_timestamps()[1]) << " -> " << a1
1024 << " distributed -> " << node_b->name()->string_view() << " "
1025 << states_[node_b_index]->FromDistributedClock(a1) << " should be "
1026 << aos::monotonic_clock::time_point(
1027 std::chrono::nanoseconds(static_cast<int64_t>(
1028 std::get<0>(estimator->a_timestamps()[1])
1029 .time_since_epoch()
1030 .count() *
1031 (1.0 + estimator->fit().slope()))) +
1032 estimator->fit().offset())
1033 << ((event_loop_factory_->distributed_now() <= a1)
1034 ? ""
1035 : " Before now, investigate");
1036
1037 const aos::distributed_clock::time_point b0 =
1038 states_[node_b_index]->ToDistributedClock(
1039 std::get<0>(estimator->b_timestamps()[0]));
1040 const aos::distributed_clock::time_point b1 =
1041 states_[node_b_index]->ToDistributedClock(
1042 std::get<0>(estimator->b_timestamps()[1]));
1043
1044 VLOG(1) << node_b->name()->string_view() << " timestamps()[0] = "
1045 << std::get<0>(estimator->b_timestamps()[0]) << " -> " << b0
1046 << " distributed -> " << node_a->name()->string_view() << " "
1047 << states_[node_a_index]->FromDistributedClock(b0)
1048 << ((b0 <= event_loop_factory_->distributed_now())
1049 ? ""
1050 : " After now, investigate");
1051 VLOG(1) << node_b->name()->string_view() << " timestamps()[1] = "
1052 << std::get<0>(estimator->b_timestamps()[1]) << " -> " << b1
1053 << " distributed -> " << node_a->name()->string_view() << " "
1054 << states_[node_a_index]->FromDistributedClock(b1)
1055 << ((event_loop_factory_->distributed_now() <= b1)
1056 ? ""
1057 : " Before now, investigate");
1058 }
1059}
1060
1061message_bridge::NoncausalOffsetEstimator *LogReader::GetFilter(
Austin Schuh8bd96322020-02-13 21:18:22 -08001062 const Node *node_a, const Node *node_b) {
1063 CHECK_NE(node_a, node_b);
1064 CHECK_EQ(configuration::GetNode(configuration(), node_a), node_a);
1065 CHECK_EQ(configuration::GetNode(configuration(), node_b), node_b);
1066
1067 if (node_a > node_b) {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001068 return GetFilter(node_b, node_a);
Austin Schuh8bd96322020-02-13 21:18:22 -08001069 }
1070
1071 auto tuple = std::make_tuple(node_a, node_b);
1072
1073 auto it = filters_.find(tuple);
1074
1075 if (it == filters_.end()) {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001076 auto &x =
1077 filters_
1078 .insert(std::make_pair(
1079 tuple, std::make_tuple(message_bridge::NoncausalOffsetEstimator(
1080 node_a, node_b))))
1081 .first->second;
Austin Schuh8bd96322020-02-13 21:18:22 -08001082 if (FLAGS_timestamps_to_csv) {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001083 std::get<0>(x).SetFwdCsvFileName(absl::StrCat(
1084 "/tmp/timestamp_noncausal_", node_a->name()->string_view(), "_",
1085 node_b->name()->string_view()));
1086 std::get<0>(x).SetRevCsvFileName(absl::StrCat(
1087 "/tmp/timestamp_noncausal_", node_b->name()->string_view(), "_",
1088 node_a->name()->string_view()));
Austin Schuh8bd96322020-02-13 21:18:22 -08001089 }
1090
Austin Schuh2f8fd752020-09-01 22:38:28 -07001091 return &std::get<0>(x);
Austin Schuh8bd96322020-02-13 21:18:22 -08001092 } else {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001093 return &std::get<0>(it->second);
Austin Schuh8bd96322020-02-13 21:18:22 -08001094 }
1095}
1096
Austin Schuh8bd96322020-02-13 21:18:22 -08001097
Austin Schuhe309d2a2019-11-29 13:25:21 -08001098void LogReader::Register(EventLoop *event_loop) {
Austin Schuh8bd96322020-02-13 21:18:22 -08001099 State *state =
1100 states_[configuration::GetNodeIndex(configuration(), event_loop->node())]
1101 .get();
Austin Schuh6f3babe2020-01-26 20:34:50 -08001102
Austin Schuh858c9f32020-08-31 16:56:12 -07001103 state->set_event_loop(event_loop);
Austin Schuhe309d2a2019-11-29 13:25:21 -08001104
Tyler Chatow67ddb032020-01-12 14:30:04 -08001105 // We don't run timing reports when trying to print out logged data, because
1106 // otherwise we would end up printing out the timing reports themselves...
1107 // This is only really relevant when we are replaying into a simulation.
Austin Schuh6f3babe2020-01-26 20:34:50 -08001108 event_loop->SkipTimingReport();
1109 event_loop->SkipAosLog();
Austin Schuh39788ff2019-12-01 18:22:57 -08001110
Austin Schuh858c9f32020-08-31 16:56:12 -07001111 const bool has_data = state->SetNode();
Austin Schuhe309d2a2019-11-29 13:25:21 -08001112
Austin Schuh858c9f32020-08-31 16:56:12 -07001113 state->SetChannelCount(logged_configuration()->channels()->size());
Austin Schuh8bd96322020-02-13 21:18:22 -08001114
Austin Schuh858c9f32020-08-31 16:56:12 -07001115 for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
Austin Schuh6f3babe2020-01-26 20:34:50 -08001116 const Channel *channel =
1117 RemapChannel(event_loop, logged_configuration()->channels()->Get(i));
Austin Schuh6331ef92020-01-07 18:28:09 -08001118
Austin Schuh858c9f32020-08-31 16:56:12 -07001119 NodeEventLoopFactory *channel_target_event_loop_factory = nullptr;
Austin Schuh2f8fd752020-09-01 22:38:28 -07001120 message_bridge::NoncausalOffsetEstimator *filter = nullptr;
Austin Schuh8bd96322020-02-13 21:18:22 -08001121
1122 if (!configuration::ChannelIsSendableOnNode(channel, event_loop->node()) &&
1123 configuration::ChannelIsReadableOnNode(channel, event_loop->node())) {
1124 const Node *target_node = configuration::GetNode(
1125 event_loop->configuration(), channel->source_node()->string_view());
Austin Schuh858c9f32020-08-31 16:56:12 -07001126 filter = GetFilter(event_loop->node(), target_node);
Austin Schuh8bd96322020-02-13 21:18:22 -08001127
1128 if (event_loop_factory_ != nullptr) {
Austin Schuh858c9f32020-08-31 16:56:12 -07001129 channel_target_event_loop_factory =
Austin Schuh8bd96322020-02-13 21:18:22 -08001130 event_loop_factory_->GetNodeEventLoopFactory(target_node);
1131 }
1132 }
Austin Schuh858c9f32020-08-31 16:56:12 -07001133
1134 state->SetChannel(i, event_loop->MakeRawSender(channel), filter,
1135 channel_target_event_loop_factory);
Austin Schuhe309d2a2019-11-29 13:25:21 -08001136 }
1137
Austin Schuh6aa77be2020-02-22 21:06:40 -08001138 // If we didn't find any log files with data in them, we won't ever get a
1139 // callback or be live. So skip the rest of the setup.
1140 if (!has_data) {
1141 return;
1142 }
1143
Austin Schuh858c9f32020-08-31 16:56:12 -07001144 state->set_timer_handler(event_loop->AddTimer([this, state]() {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001145 VLOG(1) << "Starting sending " << MaybeNodeName(state->event_loop()->node())
1146 << "at " << state->event_loop()->context().monotonic_event_time
1147 << " now " << state->monotonic_now();
Austin Schuh858c9f32020-08-31 16:56:12 -07001148 if (state->OldestMessageTime() == monotonic_clock::max_time) {
Austin Schuh6f3babe2020-01-26 20:34:50 -08001149 --live_nodes_;
Austin Schuh2f8fd752020-09-01 22:38:28 -07001150 VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Node down!";
Austin Schuh6f3babe2020-01-26 20:34:50 -08001151 if (live_nodes_ == 0) {
1152 event_loop_factory_->Exit();
1153 }
James Kuszmaul314f1672020-01-03 20:02:08 -08001154 return;
1155 }
Austin Schuh6f3babe2020-01-26 20:34:50 -08001156 TimestampMerger::DeliveryTimestamp channel_timestamp;
Austin Schuh05b70472020-01-01 17:11:17 -08001157 int channel_index;
1158 FlatbufferVector<MessageHeader> channel_data =
1159 FlatbufferVector<MessageHeader>::Empty();
1160
Austin Schuh2f8fd752020-09-01 22:38:28 -07001161 if (VLOG_IS_ON(1)) {
1162 LogFit("Offset was");
1163 }
1164
1165 bool update_time;
Austin Schuh05b70472020-01-01 17:11:17 -08001166 std::tie(channel_timestamp, channel_index, channel_data) =
Austin Schuh2f8fd752020-09-01 22:38:28 -07001167 state->PopOldest(&update_time);
Austin Schuh05b70472020-01-01 17:11:17 -08001168
Austin Schuhe309d2a2019-11-29 13:25:21 -08001169 const monotonic_clock::time_point monotonic_now =
Austin Schuh858c9f32020-08-31 16:56:12 -07001170 state->event_loop()->context().monotonic_event_time;
Austin Schuh2f8fd752020-09-01 22:38:28 -07001171 if (!FLAGS_skip_order_validation) {
1172 CHECK(monotonic_now == channel_timestamp.monotonic_event_time)
1173 << ": " << FlatbufferToJson(state->event_loop()->node()) << " Now "
1174 << monotonic_now << " trying to send "
1175 << channel_timestamp.monotonic_event_time << " failure "
1176 << state->DebugString();
1177 } else if (monotonic_now != channel_timestamp.monotonic_event_time) {
1178 LOG(WARNING) << "Check failed: monotonic_now == "
1179 "channel_timestamp.monotonic_event_time) ("
1180 << monotonic_now << " vs. "
1181 << channel_timestamp.monotonic_event_time
1182 << "): " << FlatbufferToJson(state->event_loop()->node())
1183 << " Now " << monotonic_now << " trying to send "
1184 << channel_timestamp.monotonic_event_time << " failure "
1185 << state->DebugString();
1186 }
Austin Schuhe309d2a2019-11-29 13:25:21 -08001187
Austin Schuh6f3babe2020-01-26 20:34:50 -08001188 if (channel_timestamp.monotonic_event_time >
Austin Schuh858c9f32020-08-31 16:56:12 -07001189 state->monotonic_start_time() ||
Austin Schuh15649d62019-12-28 16:36:38 -08001190 event_loop_factory_ != nullptr) {
Austin Schuh8bd96322020-02-13 21:18:22 -08001191 if ((!ignore_missing_data_ && !FLAGS_skip_missing_forwarding_entries &&
Austin Schuh858c9f32020-08-31 16:56:12 -07001192 !state->at_end()) ||
Austin Schuh05b70472020-01-01 17:11:17 -08001193 channel_data.message().data() != nullptr) {
1194 CHECK(channel_data.message().data() != nullptr)
1195 << ": Got a message without data. Forwarding entry which was "
Austin Schuh2f8fd752020-09-01 22:38:28 -07001196 "not matched? Use --skip_missing_forwarding_entries to "
Brian Silverman87ac0402020-09-17 14:47:01 -07001197 "ignore this.";
Austin Schuh92547522019-12-28 14:33:43 -08001198
Austin Schuh2f8fd752020-09-01 22:38:28 -07001199 if (update_time) {
Austin Schuh8bd96322020-02-13 21:18:22 -08001200 // Confirm that the message was sent on the sending node before the
1201 // destination node (this node). As a proxy, do this by making sure
1202 // that time on the source node is past when the message was sent.
Austin Schuh2f8fd752020-09-01 22:38:28 -07001203 if (!FLAGS_skip_order_validation) {
1204 CHECK_LT(channel_timestamp.monotonic_remote_time,
1205 state->monotonic_remote_now(channel_index))
1206 << state->event_loop()->node()->name()->string_view() << " to "
1207 << state->remote_node(channel_index)->name()->string_view()
1208 << " " << state->DebugString();
1209 } else if (channel_timestamp.monotonic_remote_time >=
1210 state->monotonic_remote_now(channel_index)) {
1211 LOG(WARNING)
1212 << "Check failed: channel_timestamp.monotonic_remote_time < "
1213 "state->monotonic_remote_now(channel_index) ("
1214 << channel_timestamp.monotonic_remote_time << " vs. "
1215 << state->monotonic_remote_now(channel_index) << ") "
1216 << state->event_loop()->node()->name()->string_view() << " to "
1217 << state->remote_node(channel_index)->name()->string_view()
1218 << " currently " << channel_timestamp.monotonic_event_time
1219 << " ("
1220 << state->ToDistributedClock(
1221 channel_timestamp.monotonic_event_time)
1222 << ") remote event time "
1223 << channel_timestamp.monotonic_remote_time << " ("
1224 << state->RemoteToDistributedClock(
1225 channel_index, channel_timestamp.monotonic_remote_time)
1226 << ") " << state->DebugString();
1227 }
Austin Schuh8bd96322020-02-13 21:18:22 -08001228
1229 if (FLAGS_timestamps_to_csv) {
1230 if (offset_fp_ == nullptr) {
1231 offset_fp_ = fopen("/tmp/offsets.csv", "w");
1232 fprintf(
1233 offset_fp_,
1234 "# time_since_start, offset node 0, offset node 1, ...\n");
1235 first_time_ = channel_timestamp.realtime_event_time;
1236 }
1237
1238 fprintf(offset_fp_, "%.9f",
1239 std::chrono::duration_cast<std::chrono::duration<double>>(
1240 channel_timestamp.realtime_event_time - first_time_)
1241 .count());
Austin Schuh2f8fd752020-09-01 22:38:28 -07001242 for (int i = 1; i < time_offset_matrix_.rows(); ++i) {
1243 fprintf(offset_fp_, ", %.9f",
1244 time_offset_matrix_(i, 0) +
1245 time_slope_matrix_(i, 0) *
1246 chrono::duration<double>(
1247 event_loop_factory_->distributed_now()
1248 .time_since_epoch())
1249 .count());
Austin Schuh8bd96322020-02-13 21:18:22 -08001250 }
1251 fprintf(offset_fp_, "\n");
1252 }
Austin Schuh8bd96322020-02-13 21:18:22 -08001253 }
1254
Austin Schuh15649d62019-12-28 16:36:38 -08001255 // If we have access to the factory, use it to fix the realtime time.
Austin Schuh858c9f32020-08-31 16:56:12 -07001256 state->SetRealtimeOffset(channel_timestamp.monotonic_event_time,
1257 channel_timestamp.realtime_event_time);
Austin Schuh15649d62019-12-28 16:36:38 -08001258
Austin Schuh2f8fd752020-09-01 22:38:28 -07001259 VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Sending "
1260 << channel_timestamp.monotonic_event_time;
1261 // TODO(austin): std::move channel_data in and make that efficient in
1262 // simulation.
Austin Schuh858c9f32020-08-31 16:56:12 -07001263 state->Send(channel_index, channel_data.message().data()->Data(),
1264 channel_data.message().data()->size(),
1265 channel_timestamp.monotonic_remote_time,
1266 channel_timestamp.realtime_remote_time,
1267 channel_timestamp.remote_queue_index);
Austin Schuh2f8fd752020-09-01 22:38:28 -07001268 } else if (state->at_end() && !ignore_missing_data_) {
Austin Schuh8bd96322020-02-13 21:18:22 -08001269 // We are at the end of the log file and found missing data. Finish
Austin Schuh2f8fd752020-09-01 22:38:28 -07001270 // reading the rest of the log file and call it quits. We don't want
1271 // to replay partial data.
Austin Schuh858c9f32020-08-31 16:56:12 -07001272 while (state->OldestMessageTime() != monotonic_clock::max_time) {
1273 bool update_time_dummy;
1274 state->PopOldest(&update_time_dummy);
Austin Schuh8bd96322020-02-13 21:18:22 -08001275 }
Austin Schuh2f8fd752020-09-01 22:38:28 -07001276 } else {
1277 CHECK(channel_data.message().data() == nullptr) << ": Nullptr";
Austin Schuh92547522019-12-28 14:33:43 -08001278 }
Austin Schuhe309d2a2019-11-29 13:25:21 -08001279 } else {
Austin Schuh6f3babe2020-01-26 20:34:50 -08001280 LOG(WARNING)
1281 << "Not sending data from before the start of the log file. "
1282 << channel_timestamp.monotonic_event_time.time_since_epoch().count()
1283 << " start " << monotonic_start_time().time_since_epoch().count()
1284 << " " << FlatbufferToJson(channel_data);
Austin Schuhe309d2a2019-11-29 13:25:21 -08001285 }
1286
Austin Schuh858c9f32020-08-31 16:56:12 -07001287 const monotonic_clock::time_point next_time = state->OldestMessageTime();
Austin Schuh6f3babe2020-01-26 20:34:50 -08001288 if (next_time != monotonic_clock::max_time) {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001289 VLOG(1) << "Scheduling " << MaybeNodeName(state->event_loop()->node())
1290 << "wakeup for " << next_time << "("
1291 << state->ToDistributedClock(next_time)
1292 << " distributed), now is " << state->monotonic_now();
Austin Schuh858c9f32020-08-31 16:56:12 -07001293 state->Setup(next_time);
James Kuszmaul314f1672020-01-03 20:02:08 -08001294 } else {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001295 VLOG(1) << MaybeNodeName(state->event_loop()->node())
1296 << "No next message, scheduling shutdown";
1297 // Set a timer up immediately after now to die. If we don't do this,
1298 // then the senders waiting on the message we just read will never get
1299 // called.
Austin Schuheecb9282020-01-08 17:43:30 -08001300 if (event_loop_factory_ != nullptr) {
Austin Schuh858c9f32020-08-31 16:56:12 -07001301 state->Setup(monotonic_now + event_loop_factory_->send_delay() +
1302 std::chrono::nanoseconds(1));
Austin Schuheecb9282020-01-08 17:43:30 -08001303 }
Austin Schuhe309d2a2019-11-29 13:25:21 -08001304 }
Austin Schuh8bd96322020-02-13 21:18:22 -08001305
Austin Schuh2f8fd752020-09-01 22:38:28 -07001306 // Once we make this call, the current time changes. So do everything
1307 // which involves time before changing it. That especially includes
1308 // sending the message.
1309 if (update_time) {
1310 VLOG(1) << MaybeNodeName(state->event_loop()->node())
1311 << "updating offsets";
1312
1313 std::vector<aos::monotonic_clock::time_point> before_times;
1314 before_times.resize(states_.size());
1315 std::transform(states_.begin(), states_.end(), before_times.begin(),
1316 [](const std::unique_ptr<State> &state) {
1317 return state->monotonic_now();
1318 });
1319
1320 for (size_t i = 0; i < states_.size(); ++i) {
1321 VLOG(1) << MaybeNodeName(
1322 states_[i]->event_loop()->node())
1323 << "before " << states_[i]->monotonic_now();
1324 }
1325
Austin Schuh8bd96322020-02-13 21:18:22 -08001326 UpdateOffsets();
Austin Schuh2f8fd752020-09-01 22:38:28 -07001327 VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Now is now "
1328 << state->monotonic_now();
1329
1330 for (size_t i = 0; i < states_.size(); ++i) {
1331 VLOG(1) << MaybeNodeName(
1332 states_[i]->event_loop()->node())
1333 << "after " << states_[i]->monotonic_now();
1334 }
1335
1336 // TODO(austin): We should be perfect.
1337 const std::chrono::nanoseconds kTolerance{3};
1338 if (!FLAGS_skip_order_validation) {
1339 CHECK_GE(next_time, state->monotonic_now())
1340 << ": Time skipped the next event.";
1341
1342 for (size_t i = 0; i < states_.size(); ++i) {
1343 CHECK_GE(states_[i]->monotonic_now(), before_times[i] - kTolerance)
1344 << ": Time changed too much on node "
1345 << MaybeNodeName(states_[i]->event_loop()->node());
1346 CHECK_LE(states_[i]->monotonic_now(), before_times[i] + kTolerance)
1347 << ": Time changed too much on node "
1348 << states_[i]->event_loop()->node()->name()->string_view();
1349 }
1350 } else {
1351 if (next_time < state->monotonic_now()) {
1352 LOG(WARNING) << "Check failed: next_time >= "
1353 "state->monotonic_now() ("
1354 << next_time << " vs. " << state->monotonic_now()
1355 << "): Time skipped the next event.";
1356 }
1357 for (size_t i = 0; i < states_.size(); ++i) {
1358 if (states_[i]->monotonic_now() >= before_times[i] - kTolerance) {
1359 LOG(WARNING) << "Check failed: "
1360 "states_[i]->monotonic_now() "
1361 ">= before_times[i] - kTolerance ("
1362 << states_[i]->monotonic_now() << " vs. "
1363 << before_times[i] - kTolerance
1364 << ") : Time changed too much on node "
1365 << MaybeNodeName(states_[i]->event_loop()->node());
1366 }
1367 if (states_[i]->monotonic_now() <= before_times[i] + kTolerance) {
1368 LOG(WARNING) << "Check failed: "
1369 "states_[i]->monotonic_now() "
1370 "<= before_times[i] + kTolerance ("
1371 << states_[i]->monotonic_now() << " vs. "
1372 << before_times[i] - kTolerance
1373 << ") : Time changed too much on node "
1374 << MaybeNodeName(states_[i]->event_loop()->node());
1375 }
1376 }
1377 }
Austin Schuh8bd96322020-02-13 21:18:22 -08001378 }
Austin Schuh2f8fd752020-09-01 22:38:28 -07001379
1380 VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Done sending at "
1381 << state->event_loop()->context().monotonic_event_time << " now "
1382 << state->monotonic_now();
Austin Schuh858c9f32020-08-31 16:56:12 -07001383 }));
Austin Schuhe309d2a2019-11-29 13:25:21 -08001384
Austin Schuh6f3babe2020-01-26 20:34:50 -08001385 ++live_nodes_;
1386
Austin Schuh858c9f32020-08-31 16:56:12 -07001387 if (state->OldestMessageTime() != monotonic_clock::max_time) {
1388 event_loop->OnRun([state]() { state->Setup(state->OldestMessageTime()); });
Austin Schuhe309d2a2019-11-29 13:25:21 -08001389 }
1390}
1391
1392void LogReader::Deregister() {
James Kuszmaul84ff3e52020-01-03 19:48:53 -08001393 // Make sure that things get destroyed in the correct order, rather than
1394 // relying on getting the order correct in the class definition.
Austin Schuh8bd96322020-02-13 21:18:22 -08001395 for (std::unique_ptr<State> &state : states_) {
Austin Schuh858c9f32020-08-31 16:56:12 -07001396 state->Deregister();
Austin Schuhe309d2a2019-11-29 13:25:21 -08001397 }
Austin Schuh92547522019-12-28 14:33:43 -08001398
James Kuszmaul84ff3e52020-01-03 19:48:53 -08001399 event_loop_factory_unique_ptr_.reset();
1400 event_loop_factory_ = nullptr;
Austin Schuhe309d2a2019-11-29 13:25:21 -08001401}
1402
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001403void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
1404 std::string_view add_prefix) {
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001405 for (size_t ii = 0; ii < logged_configuration()->channels()->size(); ++ii) {
1406 const Channel *const channel = logged_configuration()->channels()->Get(ii);
1407 if (channel->name()->str() == name &&
1408 channel->type()->string_view() == type) {
1409 CHECK_EQ(0u, remapped_channels_.count(ii))
1410 << "Already remapped channel "
1411 << configuration::CleanedChannelToString(channel);
1412 remapped_channels_[ii] = std::string(add_prefix) + std::string(name);
1413 VLOG(1) << "Remapping channel "
1414 << configuration::CleanedChannelToString(channel)
1415 << " to have name " << remapped_channels_[ii];
Austin Schuh6331ef92020-01-07 18:28:09 -08001416 MakeRemappedConfig();
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001417 return;
1418 }
1419 }
1420 LOG(FATAL) << "Unabled to locate channel with name " << name << " and type "
1421 << type;
1422}
1423
Austin Schuh01b4c352020-09-21 23:09:39 -07001424void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
1425 const Node *node,
1426 std::string_view add_prefix) {
1427 VLOG(1) << "Node is " << aos::FlatbufferToJson(node);
1428 const Channel *remapped_channel =
1429 configuration::GetChannel(logged_configuration(), name, type, "", node);
1430 CHECK(remapped_channel != nullptr) << ": Failed to find {\"name\": \"" << name
1431 << "\", \"type\": \"" << type << "\"}";
1432 VLOG(1) << "Original {\"name\": \"" << name << "\", \"type\": \"" << type
1433 << "\"}";
1434 VLOG(1) << "Remapped "
1435 << aos::configuration::StrippedChannelToString(remapped_channel);
1436
1437 // We want to make /spray on node 0 go to /0/spray by snooping the maps. And
1438 // we want it to degrade if the heuristics fail to just work.
1439 //
1440 // The easiest way to do this is going to be incredibly specific and verbose.
1441 // Look up /spray, to /0/spray. Then, prefix the result with /original to get
1442 // /original/0/spray. Then, create a map from /original/spray to
1443 // /original/0/spray for just the type we were asked for.
1444 if (name != remapped_channel->name()->string_view()) {
1445 MapT new_map;
1446 new_map.match = std::make_unique<ChannelT>();
1447 new_map.match->name = absl::StrCat(add_prefix, name);
1448 new_map.match->type = type;
1449 if (node != nullptr) {
1450 new_map.match->source_node = node->name()->str();
1451 }
1452 new_map.rename = std::make_unique<ChannelT>();
1453 new_map.rename->name =
1454 absl::StrCat(add_prefix, remapped_channel->name()->string_view());
1455 maps_.emplace_back(std::move(new_map));
1456 }
1457
1458 const size_t channel_index =
1459 configuration::ChannelIndex(logged_configuration(), remapped_channel);
1460 CHECK_EQ(0u, remapped_channels_.count(channel_index))
1461 << "Already remapped channel "
1462 << configuration::CleanedChannelToString(remapped_channel);
1463 remapped_channels_[channel_index] =
1464 absl::StrCat(add_prefix, remapped_channel->name()->string_view());
1465 MakeRemappedConfig();
1466}
1467
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001468void LogReader::MakeRemappedConfig() {
Austin Schuh8bd96322020-02-13 21:18:22 -08001469 for (std::unique_ptr<State> &state : states_) {
Austin Schuh6aa77be2020-02-22 21:06:40 -08001470 if (state) {
Austin Schuh858c9f32020-08-31 16:56:12 -07001471 CHECK(!state->event_loop())
Austin Schuh6aa77be2020-02-22 21:06:40 -08001472 << ": Can't change the mapping after the events are scheduled.";
1473 }
Austin Schuh6f3babe2020-01-26 20:34:50 -08001474 }
Austin Schuhac0771c2020-01-07 18:36:30 -08001475
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001476 // If no remapping occurred and we are using the original config, then there
1477 // is nothing interesting to do here.
1478 if (remapped_channels_.empty() && replay_configuration_ == nullptr) {
Austin Schuh6f3babe2020-01-26 20:34:50 -08001479 remapped_configuration_ = logged_configuration();
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001480 return;
1481 }
1482 // Config to copy Channel definitions from. Use the specified
1483 // replay_configuration_ if it has been provided.
1484 const Configuration *const base_config = replay_configuration_ == nullptr
1485 ? logged_configuration()
1486 : replay_configuration_;
1487 // The remapped config will be identical to the base_config, except that it
1488 // will have a bunch of extra channels in the channel list, which are exact
1489 // copies of the remapped channels, but with different names.
1490 // Because the flatbuffers API is a pain to work with, this requires a bit of
1491 // a song-and-dance to get copied over.
1492 // The order of operations is to:
1493 // 1) Make a flatbuffer builder for a config that will just contain a list of
1494 // the new channels that we want to add.
1495 // 2) For each channel that we are remapping:
1496 // a) Make a buffer/builder and construct into it a Channel table that only
1497 // contains the new name for the channel.
1498 // b) Merge the new channel with just the name into the channel that we are
1499 // trying to copy, built in the flatbuffer builder made in 1. This gives
1500 // us the new channel definition that we need.
1501 // 3) Using this list of offsets, build the Configuration of just new
1502 // Channels.
1503 // 4) Merge the Configuration with the new Channels into the base_config.
1504 // 5) Call MergeConfiguration() on that result to give MergeConfiguration a
1505 // chance to sanitize the config.
1506
1507 // This is the builder that we use for the config containing all the new
1508 // channels.
1509 flatbuffers::FlatBufferBuilder new_config_fbb;
Austin Schuhd7b15da2020-02-17 15:06:11 -08001510 new_config_fbb.ForceDefaults(true);
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001511 std::vector<flatbuffers::Offset<Channel>> channel_offsets;
1512 for (auto &pair : remapped_channels_) {
1513 // This is the builder that we use for creating the Channel with just the
1514 // new name.
1515 flatbuffers::FlatBufferBuilder new_name_fbb;
Austin Schuhd7b15da2020-02-17 15:06:11 -08001516 new_name_fbb.ForceDefaults(true);
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001517 const flatbuffers::Offset<flatbuffers::String> name_offset =
1518 new_name_fbb.CreateString(pair.second);
1519 ChannelBuilder new_name_builder(new_name_fbb);
1520 new_name_builder.add_name(name_offset);
1521 new_name_fbb.Finish(new_name_builder.Finish());
1522 const FlatbufferDetachedBuffer<Channel> new_name = new_name_fbb.Release();
Austin Schuh2f8fd752020-09-01 22:38:28 -07001523 // Retrieve the channel that we want to copy, confirming that it is
1524 // actually present in base_config.
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001525 const Channel *const base_channel = CHECK_NOTNULL(configuration::GetChannel(
1526 base_config, logged_configuration()->channels()->Get(pair.first), "",
1527 nullptr));
1528 // Actually create the new channel and put it into the vector of Offsets
1529 // that we will use to create the new Configuration.
1530 channel_offsets.emplace_back(MergeFlatBuffers<Channel>(
1531 reinterpret_cast<const flatbuffers::Table *>(base_channel),
1532 reinterpret_cast<const flatbuffers::Table *>(&new_name.message()),
1533 &new_config_fbb));
1534 }
1535 // Create the Configuration containing the new channels that we want to add.
Austin Schuh01b4c352020-09-21 23:09:39 -07001536 const auto new_channel_vector_offsets =
Austin Schuhfa895892020-01-07 20:07:41 -08001537 new_config_fbb.CreateVector(channel_offsets);
Austin Schuh01b4c352020-09-21 23:09:39 -07001538
1539 // Now create the new maps.
1540 std::vector<flatbuffers::Offset<Map>> map_offsets;
1541 for (const MapT &map : maps_) {
1542 const flatbuffers::Offset<flatbuffers::String> match_name_offset =
1543 new_config_fbb.CreateString(map.match->name);
1544 const flatbuffers::Offset<flatbuffers::String> match_type_offset =
1545 new_config_fbb.CreateString(map.match->type);
1546 const flatbuffers::Offset<flatbuffers::String> rename_name_offset =
1547 new_config_fbb.CreateString(map.rename->name);
1548 flatbuffers::Offset<flatbuffers::String> match_source_node_offset;
1549 if (!map.match->source_node.empty()) {
1550 match_source_node_offset =
1551 new_config_fbb.CreateString(map.match->source_node);
1552 }
1553 Channel::Builder match_builder(new_config_fbb);
1554 match_builder.add_name(match_name_offset);
1555 match_builder.add_type(match_type_offset);
1556 if (!map.match->source_node.empty()) {
1557 match_builder.add_source_node(match_source_node_offset);
1558 }
1559 const flatbuffers::Offset<Channel> match_offset = match_builder.Finish();
1560
1561 Channel::Builder rename_builder(new_config_fbb);
1562 rename_builder.add_name(rename_name_offset);
1563 const flatbuffers::Offset<Channel> rename_offset = rename_builder.Finish();
1564
1565 Map::Builder map_builder(new_config_fbb);
1566 map_builder.add_match(match_offset);
1567 map_builder.add_rename(rename_offset);
1568 map_offsets.emplace_back(map_builder.Finish());
1569 }
1570
1571 const auto new_maps_offsets = new_config_fbb.CreateVector(map_offsets);
1572
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001573 ConfigurationBuilder new_config_builder(new_config_fbb);
Austin Schuh01b4c352020-09-21 23:09:39 -07001574 new_config_builder.add_channels(new_channel_vector_offsets);
1575 new_config_builder.add_maps(new_maps_offsets);
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -08001576 new_config_fbb.Finish(new_config_builder.Finish());
1577 const FlatbufferDetachedBuffer<Configuration> new_name_config =
1578 new_config_fbb.Release();
1579 // Merge the new channels configuration into the base_config, giving us the
1580 // remapped configuration.
1581 remapped_configuration_buffer_ =
1582 std::make_unique<FlatbufferDetachedBuffer<Configuration>>(
1583 MergeFlatBuffers<Configuration>(base_config,
1584 &new_name_config.message()));
1585 // Call MergeConfiguration to deal with sanitizing the config.
1586 remapped_configuration_buffer_ =
1587 std::make_unique<FlatbufferDetachedBuffer<Configuration>>(
1588 configuration::MergeConfiguration(*remapped_configuration_buffer_));
1589
1590 remapped_configuration_ = &remapped_configuration_buffer_->message();
1591}
1592
Austin Schuh6f3babe2020-01-26 20:34:50 -08001593const Channel *LogReader::RemapChannel(const EventLoop *event_loop,
1594 const Channel *channel) {
1595 std::string_view channel_name = channel->name()->string_view();
1596 std::string_view channel_type = channel->type()->string_view();
1597 const int channel_index =
1598 configuration::ChannelIndex(logged_configuration(), channel);
1599 // If the channel is remapped, find the correct channel name to use.
1600 if (remapped_channels_.count(channel_index) > 0) {
Austin Schuhee711052020-08-24 16:06:09 -07001601 VLOG(3) << "Got remapped channel on "
Austin Schuh6f3babe2020-01-26 20:34:50 -08001602 << configuration::CleanedChannelToString(channel);
1603 channel_name = remapped_channels_[channel_index];
1604 }
1605
Austin Schuhee711052020-08-24 16:06:09 -07001606 VLOG(2) << "Going to remap channel " << channel_name << " " << channel_type;
Austin Schuh6f3babe2020-01-26 20:34:50 -08001607 const Channel *remapped_channel = configuration::GetChannel(
1608 event_loop->configuration(), channel_name, channel_type,
1609 event_loop->name(), event_loop->node());
1610
1611 CHECK(remapped_channel != nullptr)
1612 << ": Unable to send {\"name\": \"" << channel_name << "\", \"type\": \""
1613 << channel_type << "\"} because it is not in the provided configuration.";
1614
1615 return remapped_channel;
1616}
1617
Austin Schuh858c9f32020-08-31 16:56:12 -07001618LogReader::State::State(std::unique_ptr<ChannelMerger> channel_merger)
1619 : channel_merger_(std::move(channel_merger)) {}
1620
1621EventLoop *LogReader::State::SetNodeEventLoopFactory(
1622 NodeEventLoopFactory *node_event_loop_factory) {
1623 node_event_loop_factory_ = node_event_loop_factory;
1624 event_loop_unique_ptr_ =
1625 node_event_loop_factory_->MakeEventLoop("log_reader");
1626 return event_loop_unique_ptr_.get();
1627}
1628
1629void LogReader::State::SetChannelCount(size_t count) {
1630 channels_.resize(count);
1631 filters_.resize(count);
1632 channel_target_event_loop_factory_.resize(count);
1633}
1634
1635void LogReader::State::SetChannel(
1636 size_t channel, std::unique_ptr<RawSender> sender,
Austin Schuh2f8fd752020-09-01 22:38:28 -07001637 message_bridge::NoncausalOffsetEstimator *filter,
Austin Schuh858c9f32020-08-31 16:56:12 -07001638 NodeEventLoopFactory *channel_target_event_loop_factory) {
1639 channels_[channel] = std::move(sender);
1640 filters_[channel] = filter;
1641 channel_target_event_loop_factory_[channel] =
1642 channel_target_event_loop_factory;
1643}
1644
1645std::tuple<TimestampMerger::DeliveryTimestamp, int,
1646 FlatbufferVector<MessageHeader>>
1647LogReader::State::PopOldest(bool *update_time) {
1648 CHECK_GT(sorted_messages_.size(), 0u);
1649
1650 std::tuple<TimestampMerger::DeliveryTimestamp, int,
Austin Schuh2f8fd752020-09-01 22:38:28 -07001651 FlatbufferVector<MessageHeader>,
1652 message_bridge::NoncausalOffsetEstimator *>
Austin Schuh858c9f32020-08-31 16:56:12 -07001653 result = std::move(sorted_messages_.front());
Austin Schuh2f8fd752020-09-01 22:38:28 -07001654 VLOG(2) << MaybeNodeName(event_loop_->node()) << "PopOldest Popping "
Austin Schuh858c9f32020-08-31 16:56:12 -07001655 << std::get<0>(result).monotonic_event_time;
1656 sorted_messages_.pop_front();
1657 SeedSortedMessages();
1658
Austin Schuh2f8fd752020-09-01 22:38:28 -07001659 if (std::get<3>(result) != nullptr) {
1660 *update_time = std::get<3>(result)->Pop(
1661 event_loop_->node(), std::get<0>(result).monotonic_event_time);
1662 } else {
1663 *update_time = false;
1664 }
Austin Schuh858c9f32020-08-31 16:56:12 -07001665 return std::make_tuple(std::get<0>(result), std::get<1>(result),
1666 std::move(std::get<2>(result)));
1667}
1668
1669monotonic_clock::time_point LogReader::State::OldestMessageTime() const {
1670 if (sorted_messages_.size() > 0) {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001671 VLOG(2) << MaybeNodeName(event_loop_->node()) << "oldest message at "
Austin Schuh858c9f32020-08-31 16:56:12 -07001672 << std::get<0>(sorted_messages_.front()).monotonic_event_time;
1673 return std::get<0>(sorted_messages_.front()).monotonic_event_time;
1674 }
1675
1676 return channel_merger_->OldestMessageTime();
1677}
1678
1679void LogReader::State::SeedSortedMessages() {
1680 const aos::monotonic_clock::time_point end_queue_time =
1681 (sorted_messages_.size() > 0
1682 ? std::get<0>(sorted_messages_.front()).monotonic_event_time
1683 : channel_merger_->monotonic_start_time()) +
1684 std::chrono::seconds(2);
1685
1686 while (true) {
1687 if (channel_merger_->OldestMessageTime() == monotonic_clock::max_time) {
1688 return;
1689 }
1690 if (sorted_messages_.size() > 0) {
1691 // Stop placing sorted messages on the list once we have 2 seconds
1692 // queued up (but queue at least until the log starts.
1693 if (end_queue_time <
1694 std::get<0>(sorted_messages_.back()).monotonic_event_time) {
1695 return;
1696 }
1697 }
1698
1699 TimestampMerger::DeliveryTimestamp channel_timestamp;
1700 int channel_index;
1701 FlatbufferVector<MessageHeader> channel_data =
1702 FlatbufferVector<MessageHeader>::Empty();
1703
Austin Schuh2f8fd752020-09-01 22:38:28 -07001704 message_bridge::NoncausalOffsetEstimator *filter = nullptr;
1705
Austin Schuh858c9f32020-08-31 16:56:12 -07001706 std::tie(channel_timestamp, channel_index, channel_data) =
1707 channel_merger_->PopOldest();
1708
Austin Schuh2f8fd752020-09-01 22:38:28 -07001709 // Skip any messages without forwarding information.
1710 if (channel_timestamp.monotonic_remote_time != monotonic_clock::min_time) {
1711 // Got a forwarding timestamp!
1712 filter = filters_[channel_index];
1713
1714 CHECK(filter != nullptr);
1715
1716 // Call the correct method depending on if we are the forward or
1717 // reverse direction here.
1718 filter->Sample(event_loop_->node(),
1719 channel_timestamp.monotonic_event_time,
1720 channel_timestamp.monotonic_remote_time);
1721 }
Austin Schuh858c9f32020-08-31 16:56:12 -07001722 sorted_messages_.emplace_back(channel_timestamp, channel_index,
Austin Schuh2f8fd752020-09-01 22:38:28 -07001723 std::move(channel_data), filter);
Austin Schuh858c9f32020-08-31 16:56:12 -07001724 }
1725}
1726
1727void LogReader::State::Deregister() {
1728 for (size_t i = 0; i < channels_.size(); ++i) {
1729 channels_[i].reset();
1730 }
1731 event_loop_unique_ptr_.reset();
1732 event_loop_ = nullptr;
1733 timer_handler_ = nullptr;
1734 node_event_loop_factory_ = nullptr;
1735}
1736
Austin Schuhe309d2a2019-11-29 13:25:21 -08001737} // namespace logger
1738} // namespace aos