blob: f68bbf64a4d6c1b500a5d8a0afb4f1d125fc4c06 [file] [log] [blame]
Austin Schuhb06f03b2021-02-17 22:00:37 -08001#include "aos/events/logging/log_writer.h"
2
3#include <functional>
4#include <map>
5#include <vector>
6
7#include "aos/configuration.h"
8#include "aos/events/event_loop.h"
9#include "aos/network/message_bridge_server_generated.h"
10#include "aos/network/team_number.h"
Austin Schuh61e973f2021-02-21 21:43:56 -080011#include "aos/network/timestamp_channel.h"
Austin Schuhb06f03b2021-02-17 22:00:37 -080012
13namespace aos {
14namespace logger {
15namespace {
16using message_bridge::RemoteMessage;
17} // namespace
18
19Logger::Logger(EventLoop *event_loop, const Configuration *configuration,
20 std::function<bool(const Channel *)> should_log)
21 : event_loop_(event_loop),
22 configuration_(configuration),
23 name_(network::GetHostname()),
24 timer_handler_(event_loop_->AddTimer(
25 [this]() { DoLogData(event_loop_->monotonic_now()); })),
26 server_statistics_fetcher_(
27 configuration::MultiNode(event_loop_->configuration())
28 ? event_loop_->MakeFetcher<message_bridge::ServerStatistics>(
29 "/aos")
30 : aos::Fetcher<message_bridge::ServerStatistics>()) {
31 VLOG(1) << "Creating logger for " << FlatbufferToJson(event_loop_->node());
32
Austin Schuhb06f03b2021-02-17 22:00:37 -080033 std::map<const Channel *, const Node *> timestamp_logger_channels;
34
Austin Schuh61e973f2021-02-21 21:43:56 -080035 message_bridge::ChannelTimestampFinder finder(event_loop_);
36 for (const Channel *channel : *event_loop_->configuration()->channels()) {
37 if (!configuration::ChannelIsSendableOnNode(channel, event_loop_->node())) {
Austin Schuhb06f03b2021-02-17 22:00:37 -080038 continue;
39 }
Austin Schuh61e973f2021-02-21 21:43:56 -080040 if (!channel->has_destination_nodes()) {
41 continue;
42 }
43 for (const Connection *connection : *channel->destination_nodes()) {
44 if (configuration::ConnectionDeliveryTimeIsLoggedOnNode(
45 connection, event_loop_->node())) {
46 const Node *other_node = configuration::GetNode(
47 event_loop_->configuration(), connection->name()->string_view());
48
49 VLOG(1) << "Timestamps are logged from "
50 << FlatbufferToJson(other_node);
51 timestamp_logger_channels.insert(
52 std::make_pair(finder.ForChannel(channel, connection), other_node));
53 }
54 }
Austin Schuhb06f03b2021-02-17 22:00:37 -080055 }
56
57 const size_t our_node_index =
58 configuration::GetNodeIndex(configuration_, event_loop_->node());
59
60 for (size_t channel_index = 0;
61 channel_index < configuration_->channels()->size(); ++channel_index) {
62 const Channel *const config_channel =
63 configuration_->channels()->Get(channel_index);
64 // The MakeRawFetcher method needs a channel which is in the event loop
65 // configuration() object, not the configuration_ object. Go look that up
66 // from the config.
67 const Channel *channel = aos::configuration::GetChannel(
68 event_loop_->configuration(), config_channel->name()->string_view(),
69 config_channel->type()->string_view(), "", event_loop_->node());
70 CHECK(channel != nullptr)
71 << ": Failed to look up channel "
72 << aos::configuration::CleanedChannelToString(config_channel);
73 if (!should_log(channel)) {
74 continue;
75 }
76
77 FetcherStruct fs;
78 fs.channel_index = channel_index;
79 fs.channel = channel;
80
81 const bool is_local =
82 configuration::ChannelIsSendableOnNode(channel, event_loop_->node());
83
84 const bool is_readable =
85 configuration::ChannelIsReadableOnNode(channel, event_loop_->node());
86 const bool is_logged = configuration::ChannelMessageIsLoggedOnNode(
87 channel, event_loop_->node());
88 const bool log_message = is_logged && is_readable;
89
90 bool log_delivery_times = false;
91 if (event_loop_->node() != nullptr) {
92 log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
93 channel, event_loop_->node(), event_loop_->node());
94 }
95
96 // Now, detect a RemoteMessage timestamp logger where we should just log the
97 // contents to a file directly.
98 const bool log_contents = timestamp_logger_channels.find(channel) !=
99 timestamp_logger_channels.end();
100
101 if (log_message || log_delivery_times || log_contents) {
102 fs.fetcher = event_loop->MakeRawFetcher(channel);
103 VLOG(1) << "Logging channel "
104 << configuration::CleanedChannelToString(channel);
105
106 if (log_delivery_times) {
107 VLOG(1) << " Delivery times";
108 fs.wants_timestamp_writer = true;
109 fs.timestamp_node_index = our_node_index;
110 }
111 if (log_message) {
112 VLOG(1) << " Data";
113 fs.wants_writer = true;
114 if (!is_local) {
115 const Node *source_node = configuration::GetNode(
116 configuration_, channel->source_node()->string_view());
117 fs.data_node_index =
118 configuration::GetNodeIndex(configuration_, source_node);
119 fs.log_type = LogType::kLogRemoteMessage;
120 } else {
121 fs.data_node_index = our_node_index;
122 }
123 }
124 if (log_contents) {
125 VLOG(1) << "Timestamp logger channel "
126 << configuration::CleanedChannelToString(channel);
127 fs.timestamp_node = timestamp_logger_channels.find(channel)->second;
128 fs.wants_contents_writer = true;
129 fs.contents_node_index =
130 configuration::GetNodeIndex(configuration_, fs.timestamp_node);
131 }
132 fetchers_.emplace_back(std::move(fs));
133 }
134 }
135
136 // When we are logging remote timestamps, we need to be able to translate from
137 // the channel index that the event loop uses to the channel index in the
138 // config in the log file.
139 event_loop_to_logged_channel_index_.resize(
140 event_loop->configuration()->channels()->size(), -1);
141 for (size_t event_loop_channel_index = 0;
142 event_loop_channel_index <
143 event_loop->configuration()->channels()->size();
144 ++event_loop_channel_index) {
145 const Channel *event_loop_channel =
146 event_loop->configuration()->channels()->Get(event_loop_channel_index);
147
148 const Channel *logged_channel = aos::configuration::GetChannel(
149 configuration_, event_loop_channel->name()->string_view(),
150 event_loop_channel->type()->string_view(), "",
151 configuration::GetNode(configuration_, event_loop_->node()));
152
153 if (logged_channel != nullptr) {
154 event_loop_to_logged_channel_index_[event_loop_channel_index] =
155 configuration::ChannelIndex(configuration_, logged_channel);
156 }
157 }
158}
159
160Logger::~Logger() {
161 if (log_namer_) {
162 // If we are replaying a log file, or in simulation, we want to force the
163 // last bit of data to be logged. The easiest way to deal with this is to
164 // poll everything as we go to destroy the class, ie, shut down the logger,
165 // and write it to disk.
166 StopLogging(event_loop_->monotonic_now());
167 }
168}
169
170void Logger::StartLogging(std::unique_ptr<LogNamer> log_namer,
171 std::string_view log_start_uuid) {
172 CHECK(!log_namer_) << ": Already logging";
173 log_namer_ = std::move(log_namer);
174
175 std::string config_sha256;
176 if (separate_config_) {
177 flatbuffers::FlatBufferBuilder fbb;
178 flatbuffers::Offset<aos::Configuration> configuration_offset =
179 CopyFlatBuffer(configuration_, &fbb);
180 LogFileHeader::Builder log_file_header_builder(fbb);
181 log_file_header_builder.add_configuration(configuration_offset);
182 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
183 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config_header(
184 fbb.Release());
185 config_sha256 = Sha256(config_header.span());
186 LOG(INFO) << "Config sha256 of " << config_sha256;
187 log_namer_->WriteConfiguration(&config_header, config_sha256);
188 }
189
190 log_event_uuid_ = UUID::Random();
191 log_start_uuid_ = log_start_uuid;
192 VLOG(1) << "Starting logger for " << FlatbufferToJson(event_loop_->node());
193
194 // We want to do as much work as possible before the initial Fetch. Time
195 // between that and actually starting to log opens up the possibility of
196 // falling off the end of the queue during that time.
197
198 for (FetcherStruct &f : fetchers_) {
199 if (f.wants_writer) {
200 f.writer = log_namer_->MakeWriter(f.channel);
201 }
202 if (f.wants_timestamp_writer) {
203 f.timestamp_writer = log_namer_->MakeTimestampWriter(f.channel);
204 }
205 if (f.wants_contents_writer) {
206 f.contents_writer = log_namer_->MakeForwardedTimestampWriter(
207 f.channel, CHECK_NOTNULL(f.timestamp_node));
208 }
209 }
210
211 CHECK(node_state_.empty());
212 node_state_.resize(configuration::MultiNode(configuration_)
213 ? configuration_->nodes()->size()
214 : 1u);
215
216 for (const Node *node : log_namer_->nodes()) {
217 const int node_index = configuration::GetNodeIndex(configuration_, node);
218
219 node_state_[node_index].log_file_header = MakeHeader(node, config_sha256);
220 }
221
222 // Grab data from each channel right before we declare the log file started
223 // so we can capture the latest message on each channel. This lets us have
224 // non periodic messages with configuration that now get logged.
225 for (FetcherStruct &f : fetchers_) {
226 const auto start = event_loop_->monotonic_now();
227 const bool got_new = f.fetcher->Fetch();
228 const auto end = event_loop_->monotonic_now();
229 RecordFetchResult(start, end, got_new, &f);
230
231 // If there is a message, we want to write it.
232 f.written = f.fetcher->context().data == nullptr;
233 }
234
235 // Clear out any old timestamps in case we are re-starting logging.
236 for (size_t i = 0; i < node_state_.size(); ++i) {
237 SetStartTime(i, monotonic_clock::min_time, realtime_clock::min_time,
238 monotonic_clock::min_time, realtime_clock::min_time);
239 }
240
241 WriteHeader();
242
243 LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node())
Austin Schuhcdd90272021-03-15 12:46:16 -0700244 << " start_time " << last_synchronized_time_ << " boot uuid "
245 << event_loop_->boot_uuid();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800246
247 // Force logging up until the start of the log file now, so the messages at
248 // the start are always ordered before the rest of the messages.
249 // Note: this ship may have already sailed, but we don't have to make it
250 // worse.
251 // TODO(austin): Test...
Austin Schuh855f8932021-03-19 22:01:32 -0700252 //
253 // This is safe to call here since we have set last_synchronized_time_ as the
254 // same time as in the header, and all the data before it should be logged
255 // without ordering concerns.
Austin Schuhb06f03b2021-02-17 22:00:37 -0800256 LogUntil(last_synchronized_time_);
257
258 timer_handler_->Setup(event_loop_->monotonic_now() + polling_period_,
259 polling_period_);
260}
261
262std::unique_ptr<LogNamer> Logger::StopLogging(
263 aos::monotonic_clock::time_point end_time) {
264 CHECK(log_namer_) << ": Not logging right now";
265
266 if (end_time != aos::monotonic_clock::min_time) {
Austin Schuh855f8932021-03-19 22:01:32 -0700267 DoLogData(end_time);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800268 }
269 timer_handler_->Disable();
270
271 for (FetcherStruct &f : fetchers_) {
272 f.writer = nullptr;
273 f.timestamp_writer = nullptr;
274 f.contents_writer = nullptr;
275 }
276 node_state_.clear();
277
278 log_event_uuid_ = UUID::Zero();
279 log_start_uuid_ = std::string();
280
281 return std::move(log_namer_);
282}
283
284void Logger::WriteHeader() {
285 if (configuration::MultiNode(configuration_)) {
286 server_statistics_fetcher_.Fetch();
287 }
288
289 aos::monotonic_clock::time_point monotonic_start_time =
290 event_loop_->monotonic_now();
291 aos::realtime_clock::time_point realtime_start_time =
292 event_loop_->realtime_now();
293
294 // We need to pick a point in time to declare the log file "started". This
295 // starts here. It needs to be after everything is fetched so that the
296 // fetchers are all pointed at the most recent message before the start
297 // time.
298 last_synchronized_time_ = monotonic_start_time;
299
300 for (const Node *node : log_namer_->nodes()) {
301 const int node_index = configuration::GetNodeIndex(configuration_, node);
302 MaybeUpdateTimestamp(node, node_index, monotonic_start_time,
303 realtime_start_time);
304 MaybeWriteHeader(node_index, node);
305 }
306}
307
308void Logger::MaybeWriteHeader(int node_index) {
309 if (configuration::MultiNode(configuration_)) {
310 return MaybeWriteHeader(node_index,
311 configuration_->nodes()->Get(node_index));
312 } else {
313 return MaybeWriteHeader(node_index, nullptr);
314 }
315}
316
317void Logger::MaybeWriteHeader(int node_index, const Node *node) {
318 // This function is responsible for writing the header when the header both
319 // has valid data, and when it needs to be written.
320 if (node_state_[node_index].header_written &&
321 node_state_[node_index].header_valid) {
322 // The header has been written and is valid, nothing to do.
323 return;
324 }
325 if (!node_state_[node_index].has_source_node_boot_uuid) {
326 // Can't write a header if we don't have the boot UUID.
327 return;
328 }
329
330 // WriteHeader writes the first header in a log file. We want to do this only
331 // once.
332 //
333 // Rotate rewrites the same header with a new part ID, but keeps the same part
334 // UUID. We don't want that when things reboot, because that implies that
335 // parts go together across a reboot.
336 //
337 // Reboot resets the parts UUID. So, once we've written a header the first
338 // time, we want to use Reboot to rotate the log and reset the parts UUID.
339 //
340 // header_valid is cleared whenever the remote reboots.
341 if (node_state_[node_index].header_written) {
342 log_namer_->Reboot(node, &node_state_[node_index].log_file_header);
343 } else {
344 log_namer_->WriteHeader(&node_state_[node_index].log_file_header, node);
345
346 node_state_[node_index].header_written = true;
347 }
348 node_state_[node_index].header_valid = true;
349}
350
351void Logger::WriteMissingTimestamps() {
352 if (configuration::MultiNode(configuration_)) {
353 server_statistics_fetcher_.Fetch();
354 } else {
355 return;
356 }
357
358 if (server_statistics_fetcher_.get() == nullptr) {
359 return;
360 }
361
362 for (const Node *node : log_namer_->nodes()) {
363 const int node_index = configuration::GetNodeIndex(configuration_, node);
364 if (MaybeUpdateTimestamp(
365 node, node_index,
366 server_statistics_fetcher_.context().monotonic_event_time,
367 server_statistics_fetcher_.context().realtime_event_time)) {
368 CHECK(node_state_[node_index].header_written);
369 CHECK(node_state_[node_index].header_valid);
370 log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
371 } else {
372 MaybeWriteHeader(node_index, node);
373 }
374 }
375}
376
377void Logger::SetStartTime(
378 size_t node_index, aos::monotonic_clock::time_point monotonic_start_time,
379 aos::realtime_clock::time_point realtime_start_time,
380 aos::monotonic_clock::time_point logger_monotonic_start_time,
381 aos::realtime_clock::time_point logger_realtime_start_time) {
382 node_state_[node_index].monotonic_start_time = monotonic_start_time;
383 node_state_[node_index].realtime_start_time = realtime_start_time;
384 node_state_[node_index]
385 .log_file_header.mutable_message()
386 ->mutate_monotonic_start_time(
387 std::chrono::duration_cast<std::chrono::nanoseconds>(
388 monotonic_start_time.time_since_epoch())
389 .count());
390
391 // Add logger start times if they are available in the log file header.
392 if (node_state_[node_index]
393 .log_file_header.mutable_message()
394 ->has_logger_monotonic_start_time()) {
395 node_state_[node_index]
396 .log_file_header.mutable_message()
397 ->mutate_logger_monotonic_start_time(
398 std::chrono::duration_cast<std::chrono::nanoseconds>(
399 logger_monotonic_start_time.time_since_epoch())
400 .count());
401 }
402
403 if (node_state_[node_index]
404 .log_file_header.mutable_message()
405 ->has_logger_realtime_start_time()) {
406 node_state_[node_index]
407 .log_file_header.mutable_message()
408 ->mutate_logger_realtime_start_time(
409 std::chrono::duration_cast<std::chrono::nanoseconds>(
410 logger_realtime_start_time.time_since_epoch())
411 .count());
412 }
413
414 if (node_state_[node_index]
415 .log_file_header.mutable_message()
416 ->has_realtime_start_time()) {
417 node_state_[node_index]
418 .log_file_header.mutable_message()
419 ->mutate_realtime_start_time(
420 std::chrono::duration_cast<std::chrono::nanoseconds>(
421 realtime_start_time.time_since_epoch())
422 .count());
423 }
424}
425
426bool Logger::MaybeUpdateTimestamp(
427 const Node *node, int node_index,
428 aos::monotonic_clock::time_point monotonic_start_time,
429 aos::realtime_clock::time_point realtime_start_time) {
430 // Bail early if the start times are already set.
431 if (node_state_[node_index].monotonic_start_time !=
432 monotonic_clock::min_time) {
433 return false;
434 }
435 if (event_loop_->node() == node ||
436 !configuration::MultiNode(configuration_)) {
437 // There are no offsets to compute for ourself, so always succeed.
438 SetStartTime(node_index, monotonic_start_time, realtime_start_time,
439 monotonic_start_time, realtime_start_time);
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800440 node_state_[node_index].SetBootUUID(event_loop_->boot_uuid());
Austin Schuhb06f03b2021-02-17 22:00:37 -0800441 return true;
442 } else if (server_statistics_fetcher_.get() != nullptr) {
443 // We must be a remote node now. Look for the connection and see if it is
444 // connected.
445
446 for (const message_bridge::ServerConnection *connection :
447 *server_statistics_fetcher_->connections()) {
448 if (connection->node()->name()->string_view() !=
449 node->name()->string_view()) {
450 continue;
451 }
452
453 if (connection->state() != message_bridge::State::CONNECTED) {
454 VLOG(1) << node->name()->string_view()
455 << " is not connected, can't start it yet.";
456 break;
457 }
458
Austin Schuhb06f03b2021-02-17 22:00:37 -0800459 if (!connection->has_monotonic_offset()) {
460 VLOG(1) << "Missing monotonic offset for setting start time for node "
461 << aos::FlatbufferToJson(node);
462 break;
463 }
464
465 // Found it and it is connected. Compensate and go.
466 SetStartTime(node_index,
467 monotonic_start_time +
468 std::chrono::nanoseconds(connection->monotonic_offset()),
469 realtime_start_time, monotonic_start_time,
470 realtime_start_time);
471 return true;
472 }
473 }
474 return false;
475}
476
477aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> Logger::MakeHeader(
478 const Node *node, std::string_view config_sha256) {
479 // Now write the header with this timestamp in it.
480 flatbuffers::FlatBufferBuilder fbb;
481 fbb.ForceDefaults(true);
482
483 flatbuffers::Offset<aos::Configuration> configuration_offset;
484 if (!separate_config_) {
485 configuration_offset = CopyFlatBuffer(configuration_, &fbb);
486 } else {
487 CHECK(!config_sha256.empty());
488 }
489
490 const flatbuffers::Offset<flatbuffers::String> name_offset =
491 fbb.CreateString(name_);
492
493 CHECK(log_event_uuid_ != UUID::Zero());
494 const flatbuffers::Offset<flatbuffers::String> log_event_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800495 log_event_uuid_.PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800496
497 const flatbuffers::Offset<flatbuffers::String> logger_instance_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800498 logger_instance_uuid_.PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800499
500 flatbuffers::Offset<flatbuffers::String> log_start_uuid_offset;
501 if (!log_start_uuid_.empty()) {
502 log_start_uuid_offset = fbb.CreateString(log_start_uuid_);
503 }
504
505 flatbuffers::Offset<flatbuffers::String> config_sha256_offset;
506 if (!config_sha256.empty()) {
507 config_sha256_offset = fbb.CreateString(config_sha256);
508 }
509
510 const flatbuffers::Offset<flatbuffers::String> logger_node_boot_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800511 event_loop_->boot_uuid().PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800512
513 const flatbuffers::Offset<flatbuffers::String> source_node_boot_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800514 event_loop_->boot_uuid().PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800515
516 const flatbuffers::Offset<flatbuffers::String> parts_uuid_offset =
517 fbb.CreateString("00000000-0000-4000-8000-000000000000");
518
519 flatbuffers::Offset<Node> node_offset;
520 flatbuffers::Offset<Node> logger_node_offset;
521
522 if (configuration::MultiNode(configuration_)) {
523 node_offset = RecursiveCopyFlatBuffer(node, &fbb);
524 logger_node_offset = RecursiveCopyFlatBuffer(event_loop_->node(), &fbb);
525 }
526
527 aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
528
529 log_file_header_builder.add_name(name_offset);
530
531 // Only add the node if we are running in a multinode configuration.
532 if (node != nullptr) {
533 log_file_header_builder.add_node(node_offset);
534 log_file_header_builder.add_logger_node(logger_node_offset);
535 }
536
537 if (!configuration_offset.IsNull()) {
538 log_file_header_builder.add_configuration(configuration_offset);
539 }
540 // The worst case theoretical out of order is the polling period times 2.
541 // One message could get logged right after the boundary, but be for right
542 // before the next boundary. And the reverse could happen for another
543 // message. Report back 3x to be extra safe, and because the cost isn't
544 // huge on the read side.
545 log_file_header_builder.add_max_out_of_order_duration(
546 std::chrono::nanoseconds(3 * polling_period_).count());
547
548 log_file_header_builder.add_monotonic_start_time(
549 std::chrono::duration_cast<std::chrono::nanoseconds>(
550 monotonic_clock::min_time.time_since_epoch())
551 .count());
552 if (node == event_loop_->node()) {
553 log_file_header_builder.add_realtime_start_time(
554 std::chrono::duration_cast<std::chrono::nanoseconds>(
555 realtime_clock::min_time.time_since_epoch())
556 .count());
557 } else {
558 log_file_header_builder.add_logger_monotonic_start_time(
559 std::chrono::duration_cast<std::chrono::nanoseconds>(
560 monotonic_clock::min_time.time_since_epoch())
561 .count());
562 log_file_header_builder.add_logger_realtime_start_time(
563 std::chrono::duration_cast<std::chrono::nanoseconds>(
564 realtime_clock::min_time.time_since_epoch())
565 .count());
566 }
567
568 log_file_header_builder.add_log_event_uuid(log_event_uuid_offset);
569 log_file_header_builder.add_logger_instance_uuid(logger_instance_uuid_offset);
570 if (!log_start_uuid_offset.IsNull()) {
571 log_file_header_builder.add_log_start_uuid(log_start_uuid_offset);
572 }
573 log_file_header_builder.add_logger_node_boot_uuid(
574 logger_node_boot_uuid_offset);
575 log_file_header_builder.add_source_node_boot_uuid(
576 source_node_boot_uuid_offset);
577
578 log_file_header_builder.add_parts_uuid(parts_uuid_offset);
579 log_file_header_builder.add_parts_index(0);
580
581 log_file_header_builder.add_configuration_sha256(0);
582
583 if (!config_sha256_offset.IsNull()) {
584 log_file_header_builder.add_configuration_sha256(config_sha256_offset);
585 }
586
587 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
588 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> result(
589 fbb.Release());
590
591 CHECK(result.Verify()) << ": Built a corrupted header.";
592
593 return result;
594}
595
596void Logger::ResetStatisics() {
597 max_message_fetch_time_ = std::chrono::nanoseconds::zero();
598 max_message_fetch_time_channel_ = -1;
599 max_message_fetch_time_size_ = -1;
600 total_message_fetch_time_ = std::chrono::nanoseconds::zero();
601 total_message_fetch_count_ = 0;
602 total_message_fetch_bytes_ = 0;
603 total_nop_fetch_time_ = std::chrono::nanoseconds::zero();
604 total_nop_fetch_count_ = 0;
605 max_copy_time_ = std::chrono::nanoseconds::zero();
606 max_copy_time_channel_ = -1;
607 max_copy_time_size_ = -1;
608 total_copy_time_ = std::chrono::nanoseconds::zero();
609 total_copy_count_ = 0;
610 total_copy_bytes_ = 0;
611}
612
613void Logger::Rotate() {
614 for (const Node *node : log_namer_->nodes()) {
615 const int node_index = configuration::GetNodeIndex(configuration_, node);
616 log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
617 }
618}
619
620void Logger::LogUntil(monotonic_clock::time_point t) {
621 // Grab the latest ServerStatistics message. This will always have the
622 // oppertunity to be >= to the current time, so it will always represent any
623 // reboots which may have happened.
624 WriteMissingTimestamps();
625
Austin Schuhcdd90272021-03-15 12:46:16 -0700626 int our_node_index = aos::configuration::GetNodeIndex(
627 event_loop_->configuration(), event_loop_->node());
628
Austin Schuhb06f03b2021-02-17 22:00:37 -0800629 // Write each channel to disk, one at a time.
630 for (FetcherStruct &f : fetchers_) {
631 while (true) {
632 if (f.written) {
633 const auto start = event_loop_->monotonic_now();
634 const bool got_new = f.fetcher->FetchNext();
635 const auto end = event_loop_->monotonic_now();
636 RecordFetchResult(start, end, got_new, &f);
637 if (!got_new) {
638 VLOG(2) << "No new data on "
639 << configuration::CleanedChannelToString(
640 f.fetcher->channel());
641 break;
642 }
643 f.written = false;
644 }
645
646 // TODO(james): Write tests to exercise this logic.
647 if (f.fetcher->context().monotonic_event_time >= t) {
648 break;
649 }
650 if (f.writer != nullptr) {
Austin Schuhcdd90272021-03-15 12:46:16 -0700651 // Only check if the boot UUID has changed if this is data from another
652 // node. Our UUID can't change without restarting the application.
653 if (our_node_index != f.data_node_index) {
654 // And update our boot UUID if the UUID has changed.
655 if (node_state_[f.data_node_index].SetBootUUID(
656 f.fetcher->context().remote_boot_uuid)) {
657 MaybeWriteHeader(f.data_node_index);
658 }
659 }
660
Austin Schuhb06f03b2021-02-17 22:00:37 -0800661 // Write!
662 const auto start = event_loop_->monotonic_now();
663 flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
664 max_header_size_);
665 fbb.ForceDefaults(true);
666
667 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
668 f.channel_index, f.log_type));
669 const auto end = event_loop_->monotonic_now();
670 RecordCreateMessageTime(start, end, &f);
671
672 VLOG(2) << "Writing data as node "
673 << FlatbufferToJson(event_loop_->node()) << " for channel "
674 << configuration::CleanedChannelToString(f.fetcher->channel())
675 << " to " << f.writer->filename() << " data "
676 << FlatbufferToJson(
677 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
678 fbb.GetBufferPointer()));
679
680 max_header_size_ = std::max(max_header_size_,
681 fbb.GetSize() - f.fetcher->context().size);
682 CHECK(node_state_[f.data_node_index].header_valid)
683 << ": Can't write data before the header on channel "
684 << configuration::CleanedChannelToString(f.fetcher->channel());
685 f.writer->QueueSizedFlatbuffer(&fbb);
686 }
687
688 if (f.timestamp_writer != nullptr) {
689 // And now handle timestamps.
690 const auto start = event_loop_->monotonic_now();
691 flatbuffers::FlatBufferBuilder fbb;
692 fbb.ForceDefaults(true);
693
694 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
695 f.channel_index,
696 LogType::kLogDeliveryTimeOnly));
697 const auto end = event_loop_->monotonic_now();
698 RecordCreateMessageTime(start, end, &f);
699
700 VLOG(2) << "Writing timestamps as node "
701 << FlatbufferToJson(event_loop_->node()) << " for channel "
702 << configuration::CleanedChannelToString(f.fetcher->channel())
703 << " to " << f.timestamp_writer->filename() << " timestamp "
704 << FlatbufferToJson(
705 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
706 fbb.GetBufferPointer()));
707
708 CHECK(node_state_[f.timestamp_node_index].header_valid)
709 << ": Can't write data before the header on channel "
710 << configuration::CleanedChannelToString(f.fetcher->channel());
711 f.timestamp_writer->QueueSizedFlatbuffer(&fbb);
712 }
713
714 if (f.contents_writer != nullptr) {
715 const auto start = event_loop_->monotonic_now();
716 // And now handle the special message contents channel. Copy the
717 // message into a FlatBufferBuilder and save it to disk.
718 // TODO(austin): We can be more efficient here when we start to
719 // care...
720 flatbuffers::FlatBufferBuilder fbb;
721 fbb.ForceDefaults(true);
722
723 const RemoteMessage *msg =
724 flatbuffers::GetRoot<RemoteMessage>(f.fetcher->context().data);
725
726 CHECK(msg->has_boot_uuid()) << ": " << aos::FlatbufferToJson(msg);
Austin Schuhcdd90272021-03-15 12:46:16 -0700727 if (node_state_[f.contents_node_index].SetBootUUID(
728 UUID::FromVector(msg->boot_uuid()))) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800729 MaybeWriteHeader(f.contents_node_index);
730 }
731
732 logger::MessageHeader::Builder message_header_builder(fbb);
733
734 // TODO(austin): This needs to check the channel_index and confirm
735 // that it should be logged before squirreling away the timestamp to
736 // disk. We don't want to log irrelevant timestamps.
737
738 // Note: this must match the same order as MessageBridgeServer and
739 // PackMessage. We want identical headers to have identical
740 // on-the-wire formats to make comparing them easier.
741
742 // Translate from the channel index that the event loop uses to the
743 // channel index in the log file.
744 message_header_builder.add_channel_index(
745 event_loop_to_logged_channel_index_[msg->channel_index()]);
746
747 message_header_builder.add_queue_index(msg->queue_index());
748 message_header_builder.add_monotonic_sent_time(
749 msg->monotonic_sent_time());
750 message_header_builder.add_realtime_sent_time(
751 msg->realtime_sent_time());
752
753 message_header_builder.add_monotonic_remote_time(
754 msg->monotonic_remote_time());
755 message_header_builder.add_realtime_remote_time(
756 msg->realtime_remote_time());
757 message_header_builder.add_remote_queue_index(
758 msg->remote_queue_index());
759
760 message_header_builder.add_monotonic_timestamp_time(
761 f.fetcher->context()
762 .monotonic_event_time.time_since_epoch()
763 .count());
764
765 fbb.FinishSizePrefixed(message_header_builder.Finish());
766 const auto end = event_loop_->monotonic_now();
767 RecordCreateMessageTime(start, end, &f);
768
769 CHECK(node_state_[f.contents_node_index].header_valid)
770 << ": Can't write data before the header on channel "
771 << configuration::CleanedChannelToString(f.fetcher->channel());
772 f.contents_writer->QueueSizedFlatbuffer(&fbb);
773 }
774
775 f.written = true;
776 }
777 }
778 last_synchronized_time_ = t;
779}
780
781void Logger::DoLogData(const monotonic_clock::time_point end_time) {
782 // We want to guarantee that messages aren't out of order by more than
783 // max_out_of_order_duration. To do this, we need sync points. Every write
784 // cycle should be a sync point.
785
786 do {
787 // Move the sync point up by at most polling_period. This forces one sync
788 // per iteration, even if it is small.
789 LogUntil(std::min(last_synchronized_time_ + polling_period_, end_time));
790
791 on_logged_period_();
792
793 // If we missed cycles, we could be pretty far behind. Spin until we are
794 // caught up.
795 } while (last_synchronized_time_ + polling_period_ < end_time);
796}
797
798void Logger::RecordFetchResult(aos::monotonic_clock::time_point start,
799 aos::monotonic_clock::time_point end,
800 bool got_new, FetcherStruct *fetcher) {
801 const auto duration = end - start;
802 if (!got_new) {
803 ++total_nop_fetch_count_;
804 total_nop_fetch_time_ += duration;
805 return;
806 }
807 ++total_message_fetch_count_;
808 total_message_fetch_bytes_ += fetcher->fetcher->context().size;
809 total_message_fetch_time_ += duration;
810 if (duration > max_message_fetch_time_) {
811 max_message_fetch_time_ = duration;
812 max_message_fetch_time_channel_ = fetcher->channel_index;
813 max_message_fetch_time_size_ = fetcher->fetcher->context().size;
814 }
815}
816
817void Logger::RecordCreateMessageTime(aos::monotonic_clock::time_point start,
818 aos::monotonic_clock::time_point end,
819 FetcherStruct *fetcher) {
820 const auto duration = end - start;
821 total_copy_time_ += duration;
822 ++total_copy_count_;
823 total_copy_bytes_ += fetcher->fetcher->context().size;
824 if (duration > max_copy_time_) {
825 max_copy_time_ = duration;
826 max_copy_time_channel_ = fetcher->channel_index;
827 max_copy_time_size_ = fetcher->fetcher->context().size;
828 }
829}
830
831} // namespace logger
832} // namespace aos