blob: 400ad8d234ce98844c6765573b529e1c9faadf89 [file] [log] [blame]
Austin Schuhb06f03b2021-02-17 22:00:37 -08001#include "aos/events/logging/log_writer.h"
2
3#include <functional>
4#include <map>
5#include <vector>
6
7#include "aos/configuration.h"
8#include "aos/events/event_loop.h"
9#include "aos/network/message_bridge_server_generated.h"
10#include "aos/network/team_number.h"
Austin Schuh61e973f2021-02-21 21:43:56 -080011#include "aos/network/timestamp_channel.h"
Austin Schuhb06f03b2021-02-17 22:00:37 -080012
13namespace aos {
14namespace logger {
15namespace {
16using message_bridge::RemoteMessage;
17} // namespace
18
19Logger::Logger(EventLoop *event_loop, const Configuration *configuration,
20 std::function<bool(const Channel *)> should_log)
21 : event_loop_(event_loop),
22 configuration_(configuration),
23 name_(network::GetHostname()),
24 timer_handler_(event_loop_->AddTimer(
25 [this]() { DoLogData(event_loop_->monotonic_now()); })),
26 server_statistics_fetcher_(
27 configuration::MultiNode(event_loop_->configuration())
28 ? event_loop_->MakeFetcher<message_bridge::ServerStatistics>(
29 "/aos")
30 : aos::Fetcher<message_bridge::ServerStatistics>()) {
31 VLOG(1) << "Creating logger for " << FlatbufferToJson(event_loop_->node());
32
Austin Schuhb06f03b2021-02-17 22:00:37 -080033 std::map<const Channel *, const Node *> timestamp_logger_channels;
34
Austin Schuh61e973f2021-02-21 21:43:56 -080035 message_bridge::ChannelTimestampFinder finder(event_loop_);
36 for (const Channel *channel : *event_loop_->configuration()->channels()) {
37 if (!configuration::ChannelIsSendableOnNode(channel, event_loop_->node())) {
Austin Schuhb06f03b2021-02-17 22:00:37 -080038 continue;
39 }
Austin Schuh61e973f2021-02-21 21:43:56 -080040 if (!channel->has_destination_nodes()) {
41 continue;
42 }
43 for (const Connection *connection : *channel->destination_nodes()) {
44 if (configuration::ConnectionDeliveryTimeIsLoggedOnNode(
45 connection, event_loop_->node())) {
46 const Node *other_node = configuration::GetNode(
47 event_loop_->configuration(), connection->name()->string_view());
48
49 VLOG(1) << "Timestamps are logged from "
50 << FlatbufferToJson(other_node);
51 timestamp_logger_channels.insert(
52 std::make_pair(finder.ForChannel(channel, connection), other_node));
53 }
54 }
Austin Schuhb06f03b2021-02-17 22:00:37 -080055 }
56
57 const size_t our_node_index =
58 configuration::GetNodeIndex(configuration_, event_loop_->node());
59
60 for (size_t channel_index = 0;
61 channel_index < configuration_->channels()->size(); ++channel_index) {
62 const Channel *const config_channel =
63 configuration_->channels()->Get(channel_index);
64 // The MakeRawFetcher method needs a channel which is in the event loop
65 // configuration() object, not the configuration_ object. Go look that up
66 // from the config.
67 const Channel *channel = aos::configuration::GetChannel(
68 event_loop_->configuration(), config_channel->name()->string_view(),
69 config_channel->type()->string_view(), "", event_loop_->node());
70 CHECK(channel != nullptr)
71 << ": Failed to look up channel "
72 << aos::configuration::CleanedChannelToString(config_channel);
73 if (!should_log(channel)) {
74 continue;
75 }
76
77 FetcherStruct fs;
78 fs.channel_index = channel_index;
79 fs.channel = channel;
80
81 const bool is_local =
82 configuration::ChannelIsSendableOnNode(channel, event_loop_->node());
83
84 const bool is_readable =
85 configuration::ChannelIsReadableOnNode(channel, event_loop_->node());
86 const bool is_logged = configuration::ChannelMessageIsLoggedOnNode(
87 channel, event_loop_->node());
88 const bool log_message = is_logged && is_readable;
89
90 bool log_delivery_times = false;
91 if (event_loop_->node() != nullptr) {
92 log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
93 channel, event_loop_->node(), event_loop_->node());
94 }
95
96 // Now, detect a RemoteMessage timestamp logger where we should just log the
97 // contents to a file directly.
98 const bool log_contents = timestamp_logger_channels.find(channel) !=
99 timestamp_logger_channels.end();
100
101 if (log_message || log_delivery_times || log_contents) {
102 fs.fetcher = event_loop->MakeRawFetcher(channel);
103 VLOG(1) << "Logging channel "
104 << configuration::CleanedChannelToString(channel);
105
106 if (log_delivery_times) {
107 VLOG(1) << " Delivery times";
108 fs.wants_timestamp_writer = true;
109 fs.timestamp_node_index = our_node_index;
110 }
111 if (log_message) {
112 VLOG(1) << " Data";
113 fs.wants_writer = true;
114 if (!is_local) {
115 const Node *source_node = configuration::GetNode(
116 configuration_, channel->source_node()->string_view());
117 fs.data_node_index =
118 configuration::GetNodeIndex(configuration_, source_node);
119 fs.log_type = LogType::kLogRemoteMessage;
120 } else {
121 fs.data_node_index = our_node_index;
122 }
123 }
124 if (log_contents) {
125 VLOG(1) << "Timestamp logger channel "
126 << configuration::CleanedChannelToString(channel);
127 fs.timestamp_node = timestamp_logger_channels.find(channel)->second;
128 fs.wants_contents_writer = true;
129 fs.contents_node_index =
130 configuration::GetNodeIndex(configuration_, fs.timestamp_node);
131 }
132 fetchers_.emplace_back(std::move(fs));
133 }
134 }
135
136 // When we are logging remote timestamps, we need to be able to translate from
137 // the channel index that the event loop uses to the channel index in the
138 // config in the log file.
139 event_loop_to_logged_channel_index_.resize(
140 event_loop->configuration()->channels()->size(), -1);
141 for (size_t event_loop_channel_index = 0;
142 event_loop_channel_index <
143 event_loop->configuration()->channels()->size();
144 ++event_loop_channel_index) {
145 const Channel *event_loop_channel =
146 event_loop->configuration()->channels()->Get(event_loop_channel_index);
147
148 const Channel *logged_channel = aos::configuration::GetChannel(
149 configuration_, event_loop_channel->name()->string_view(),
150 event_loop_channel->type()->string_view(), "",
151 configuration::GetNode(configuration_, event_loop_->node()));
152
153 if (logged_channel != nullptr) {
154 event_loop_to_logged_channel_index_[event_loop_channel_index] =
155 configuration::ChannelIndex(configuration_, logged_channel);
156 }
157 }
158}
159
160Logger::~Logger() {
161 if (log_namer_) {
162 // If we are replaying a log file, or in simulation, we want to force the
163 // last bit of data to be logged. The easiest way to deal with this is to
164 // poll everything as we go to destroy the class, ie, shut down the logger,
165 // and write it to disk.
166 StopLogging(event_loop_->monotonic_now());
167 }
168}
169
170void Logger::StartLogging(std::unique_ptr<LogNamer> log_namer,
171 std::string_view log_start_uuid) {
172 CHECK(!log_namer_) << ": Already logging";
173 log_namer_ = std::move(log_namer);
174
175 std::string config_sha256;
176 if (separate_config_) {
177 flatbuffers::FlatBufferBuilder fbb;
178 flatbuffers::Offset<aos::Configuration> configuration_offset =
179 CopyFlatBuffer(configuration_, &fbb);
180 LogFileHeader::Builder log_file_header_builder(fbb);
181 log_file_header_builder.add_configuration(configuration_offset);
182 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
183 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config_header(
184 fbb.Release());
185 config_sha256 = Sha256(config_header.span());
186 LOG(INFO) << "Config sha256 of " << config_sha256;
187 log_namer_->WriteConfiguration(&config_header, config_sha256);
188 }
189
190 log_event_uuid_ = UUID::Random();
191 log_start_uuid_ = log_start_uuid;
192 VLOG(1) << "Starting logger for " << FlatbufferToJson(event_loop_->node());
193
194 // We want to do as much work as possible before the initial Fetch. Time
195 // between that and actually starting to log opens up the possibility of
196 // falling off the end of the queue during that time.
197
198 for (FetcherStruct &f : fetchers_) {
199 if (f.wants_writer) {
200 f.writer = log_namer_->MakeWriter(f.channel);
201 }
202 if (f.wants_timestamp_writer) {
203 f.timestamp_writer = log_namer_->MakeTimestampWriter(f.channel);
204 }
205 if (f.wants_contents_writer) {
206 f.contents_writer = log_namer_->MakeForwardedTimestampWriter(
207 f.channel, CHECK_NOTNULL(f.timestamp_node));
208 }
209 }
210
211 CHECK(node_state_.empty());
212 node_state_.resize(configuration::MultiNode(configuration_)
213 ? configuration_->nodes()->size()
214 : 1u);
215
216 for (const Node *node : log_namer_->nodes()) {
217 const int node_index = configuration::GetNodeIndex(configuration_, node);
218
219 node_state_[node_index].log_file_header = MakeHeader(node, config_sha256);
220 }
221
222 // Grab data from each channel right before we declare the log file started
223 // so we can capture the latest message on each channel. This lets us have
224 // non periodic messages with configuration that now get logged.
225 for (FetcherStruct &f : fetchers_) {
226 const auto start = event_loop_->monotonic_now();
227 const bool got_new = f.fetcher->Fetch();
228 const auto end = event_loop_->monotonic_now();
229 RecordFetchResult(start, end, got_new, &f);
230
231 // If there is a message, we want to write it.
232 f.written = f.fetcher->context().data == nullptr;
233 }
234
235 // Clear out any old timestamps in case we are re-starting logging.
236 for (size_t i = 0; i < node_state_.size(); ++i) {
237 SetStartTime(i, monotonic_clock::min_time, realtime_clock::min_time,
238 monotonic_clock::min_time, realtime_clock::min_time);
239 }
240
241 WriteHeader();
242
243 LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node())
244 << " start_time " << last_synchronized_time_;
245
246 // Force logging up until the start of the log file now, so the messages at
247 // the start are always ordered before the rest of the messages.
248 // Note: this ship may have already sailed, but we don't have to make it
249 // worse.
250 // TODO(austin): Test...
251 LogUntil(last_synchronized_time_);
252
253 timer_handler_->Setup(event_loop_->monotonic_now() + polling_period_,
254 polling_period_);
255}
256
257std::unique_ptr<LogNamer> Logger::StopLogging(
258 aos::monotonic_clock::time_point end_time) {
259 CHECK(log_namer_) << ": Not logging right now";
260
261 if (end_time != aos::monotonic_clock::min_time) {
262 LogUntil(end_time);
263 }
264 timer_handler_->Disable();
265
266 for (FetcherStruct &f : fetchers_) {
267 f.writer = nullptr;
268 f.timestamp_writer = nullptr;
269 f.contents_writer = nullptr;
270 }
271 node_state_.clear();
272
273 log_event_uuid_ = UUID::Zero();
274 log_start_uuid_ = std::string();
275
276 return std::move(log_namer_);
277}
278
279void Logger::WriteHeader() {
280 if (configuration::MultiNode(configuration_)) {
281 server_statistics_fetcher_.Fetch();
282 }
283
284 aos::monotonic_clock::time_point monotonic_start_time =
285 event_loop_->monotonic_now();
286 aos::realtime_clock::time_point realtime_start_time =
287 event_loop_->realtime_now();
288
289 // We need to pick a point in time to declare the log file "started". This
290 // starts here. It needs to be after everything is fetched so that the
291 // fetchers are all pointed at the most recent message before the start
292 // time.
293 last_synchronized_time_ = monotonic_start_time;
294
295 for (const Node *node : log_namer_->nodes()) {
296 const int node_index = configuration::GetNodeIndex(configuration_, node);
297 MaybeUpdateTimestamp(node, node_index, monotonic_start_time,
298 realtime_start_time);
299 MaybeWriteHeader(node_index, node);
300 }
301}
302
303void Logger::MaybeWriteHeader(int node_index) {
304 if (configuration::MultiNode(configuration_)) {
305 return MaybeWriteHeader(node_index,
306 configuration_->nodes()->Get(node_index));
307 } else {
308 return MaybeWriteHeader(node_index, nullptr);
309 }
310}
311
312void Logger::MaybeWriteHeader(int node_index, const Node *node) {
313 // This function is responsible for writing the header when the header both
314 // has valid data, and when it needs to be written.
315 if (node_state_[node_index].header_written &&
316 node_state_[node_index].header_valid) {
317 // The header has been written and is valid, nothing to do.
318 return;
319 }
320 if (!node_state_[node_index].has_source_node_boot_uuid) {
321 // Can't write a header if we don't have the boot UUID.
322 return;
323 }
324
325 // WriteHeader writes the first header in a log file. We want to do this only
326 // once.
327 //
328 // Rotate rewrites the same header with a new part ID, but keeps the same part
329 // UUID. We don't want that when things reboot, because that implies that
330 // parts go together across a reboot.
331 //
332 // Reboot resets the parts UUID. So, once we've written a header the first
333 // time, we want to use Reboot to rotate the log and reset the parts UUID.
334 //
335 // header_valid is cleared whenever the remote reboots.
336 if (node_state_[node_index].header_written) {
337 log_namer_->Reboot(node, &node_state_[node_index].log_file_header);
338 } else {
339 log_namer_->WriteHeader(&node_state_[node_index].log_file_header, node);
340
341 node_state_[node_index].header_written = true;
342 }
343 node_state_[node_index].header_valid = true;
344}
345
346void Logger::WriteMissingTimestamps() {
347 if (configuration::MultiNode(configuration_)) {
348 server_statistics_fetcher_.Fetch();
349 } else {
350 return;
351 }
352
353 if (server_statistics_fetcher_.get() == nullptr) {
354 return;
355 }
356
357 for (const Node *node : log_namer_->nodes()) {
358 const int node_index = configuration::GetNodeIndex(configuration_, node);
359 if (MaybeUpdateTimestamp(
360 node, node_index,
361 server_statistics_fetcher_.context().monotonic_event_time,
362 server_statistics_fetcher_.context().realtime_event_time)) {
363 CHECK(node_state_[node_index].header_written);
364 CHECK(node_state_[node_index].header_valid);
365 log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
366 } else {
367 MaybeWriteHeader(node_index, node);
368 }
369 }
370}
371
372void Logger::SetStartTime(
373 size_t node_index, aos::monotonic_clock::time_point monotonic_start_time,
374 aos::realtime_clock::time_point realtime_start_time,
375 aos::monotonic_clock::time_point logger_monotonic_start_time,
376 aos::realtime_clock::time_point logger_realtime_start_time) {
377 node_state_[node_index].monotonic_start_time = monotonic_start_time;
378 node_state_[node_index].realtime_start_time = realtime_start_time;
379 node_state_[node_index]
380 .log_file_header.mutable_message()
381 ->mutate_monotonic_start_time(
382 std::chrono::duration_cast<std::chrono::nanoseconds>(
383 monotonic_start_time.time_since_epoch())
384 .count());
385
386 // Add logger start times if they are available in the log file header.
387 if (node_state_[node_index]
388 .log_file_header.mutable_message()
389 ->has_logger_monotonic_start_time()) {
390 node_state_[node_index]
391 .log_file_header.mutable_message()
392 ->mutate_logger_monotonic_start_time(
393 std::chrono::duration_cast<std::chrono::nanoseconds>(
394 logger_monotonic_start_time.time_since_epoch())
395 .count());
396 }
397
398 if (node_state_[node_index]
399 .log_file_header.mutable_message()
400 ->has_logger_realtime_start_time()) {
401 node_state_[node_index]
402 .log_file_header.mutable_message()
403 ->mutate_logger_realtime_start_time(
404 std::chrono::duration_cast<std::chrono::nanoseconds>(
405 logger_realtime_start_time.time_since_epoch())
406 .count());
407 }
408
409 if (node_state_[node_index]
410 .log_file_header.mutable_message()
411 ->has_realtime_start_time()) {
412 node_state_[node_index]
413 .log_file_header.mutable_message()
414 ->mutate_realtime_start_time(
415 std::chrono::duration_cast<std::chrono::nanoseconds>(
416 realtime_start_time.time_since_epoch())
417 .count());
418 }
419}
420
421bool Logger::MaybeUpdateTimestamp(
422 const Node *node, int node_index,
423 aos::monotonic_clock::time_point monotonic_start_time,
424 aos::realtime_clock::time_point realtime_start_time) {
425 // Bail early if the start times are already set.
426 if (node_state_[node_index].monotonic_start_time !=
427 monotonic_clock::min_time) {
428 return false;
429 }
430 if (event_loop_->node() == node ||
431 !configuration::MultiNode(configuration_)) {
432 // There are no offsets to compute for ourself, so always succeed.
433 SetStartTime(node_index, monotonic_start_time, realtime_start_time,
434 monotonic_start_time, realtime_start_time);
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800435 node_state_[node_index].SetBootUUID(event_loop_->boot_uuid());
Austin Schuhb06f03b2021-02-17 22:00:37 -0800436 return true;
437 } else if (server_statistics_fetcher_.get() != nullptr) {
438 // We must be a remote node now. Look for the connection and see if it is
439 // connected.
440
441 for (const message_bridge::ServerConnection *connection :
442 *server_statistics_fetcher_->connections()) {
443 if (connection->node()->name()->string_view() !=
444 node->name()->string_view()) {
445 continue;
446 }
447
448 if (connection->state() != message_bridge::State::CONNECTED) {
449 VLOG(1) << node->name()->string_view()
450 << " is not connected, can't start it yet.";
451 break;
452 }
453
454 // Update the boot UUID as soon as we know we are connected.
455 if (!connection->has_boot_uuid()) {
456 VLOG(1) << "Missing boot_uuid for node " << aos::FlatbufferToJson(node);
457 break;
458 }
459
460 if (!node_state_[node_index].has_source_node_boot_uuid ||
461 node_state_[node_index].source_node_boot_uuid !=
462 connection->boot_uuid()->string_view()) {
463 node_state_[node_index].SetBootUUID(
464 connection->boot_uuid()->string_view());
465 }
466
467 if (!connection->has_monotonic_offset()) {
468 VLOG(1) << "Missing monotonic offset for setting start time for node "
469 << aos::FlatbufferToJson(node);
470 break;
471 }
472
473 // Found it and it is connected. Compensate and go.
474 SetStartTime(node_index,
475 monotonic_start_time +
476 std::chrono::nanoseconds(connection->monotonic_offset()),
477 realtime_start_time, monotonic_start_time,
478 realtime_start_time);
479 return true;
480 }
481 }
482 return false;
483}
484
485aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> Logger::MakeHeader(
486 const Node *node, std::string_view config_sha256) {
487 // Now write the header with this timestamp in it.
488 flatbuffers::FlatBufferBuilder fbb;
489 fbb.ForceDefaults(true);
490
491 flatbuffers::Offset<aos::Configuration> configuration_offset;
492 if (!separate_config_) {
493 configuration_offset = CopyFlatBuffer(configuration_, &fbb);
494 } else {
495 CHECK(!config_sha256.empty());
496 }
497
498 const flatbuffers::Offset<flatbuffers::String> name_offset =
499 fbb.CreateString(name_);
500
501 CHECK(log_event_uuid_ != UUID::Zero());
502 const flatbuffers::Offset<flatbuffers::String> log_event_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800503 log_event_uuid_.PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800504
505 const flatbuffers::Offset<flatbuffers::String> logger_instance_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800506 logger_instance_uuid_.PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800507
508 flatbuffers::Offset<flatbuffers::String> log_start_uuid_offset;
509 if (!log_start_uuid_.empty()) {
510 log_start_uuid_offset = fbb.CreateString(log_start_uuid_);
511 }
512
513 flatbuffers::Offset<flatbuffers::String> config_sha256_offset;
514 if (!config_sha256.empty()) {
515 config_sha256_offset = fbb.CreateString(config_sha256);
516 }
517
518 const flatbuffers::Offset<flatbuffers::String> logger_node_boot_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800519 event_loop_->boot_uuid().PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800520
521 const flatbuffers::Offset<flatbuffers::String> source_node_boot_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800522 event_loop_->boot_uuid().PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800523
524 const flatbuffers::Offset<flatbuffers::String> parts_uuid_offset =
525 fbb.CreateString("00000000-0000-4000-8000-000000000000");
526
527 flatbuffers::Offset<Node> node_offset;
528 flatbuffers::Offset<Node> logger_node_offset;
529
530 if (configuration::MultiNode(configuration_)) {
531 node_offset = RecursiveCopyFlatBuffer(node, &fbb);
532 logger_node_offset = RecursiveCopyFlatBuffer(event_loop_->node(), &fbb);
533 }
534
535 aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
536
537 log_file_header_builder.add_name(name_offset);
538
539 // Only add the node if we are running in a multinode configuration.
540 if (node != nullptr) {
541 log_file_header_builder.add_node(node_offset);
542 log_file_header_builder.add_logger_node(logger_node_offset);
543 }
544
545 if (!configuration_offset.IsNull()) {
546 log_file_header_builder.add_configuration(configuration_offset);
547 }
548 // The worst case theoretical out of order is the polling period times 2.
549 // One message could get logged right after the boundary, but be for right
550 // before the next boundary. And the reverse could happen for another
551 // message. Report back 3x to be extra safe, and because the cost isn't
552 // huge on the read side.
553 log_file_header_builder.add_max_out_of_order_duration(
554 std::chrono::nanoseconds(3 * polling_period_).count());
555
556 log_file_header_builder.add_monotonic_start_time(
557 std::chrono::duration_cast<std::chrono::nanoseconds>(
558 monotonic_clock::min_time.time_since_epoch())
559 .count());
560 if (node == event_loop_->node()) {
561 log_file_header_builder.add_realtime_start_time(
562 std::chrono::duration_cast<std::chrono::nanoseconds>(
563 realtime_clock::min_time.time_since_epoch())
564 .count());
565 } else {
566 log_file_header_builder.add_logger_monotonic_start_time(
567 std::chrono::duration_cast<std::chrono::nanoseconds>(
568 monotonic_clock::min_time.time_since_epoch())
569 .count());
570 log_file_header_builder.add_logger_realtime_start_time(
571 std::chrono::duration_cast<std::chrono::nanoseconds>(
572 realtime_clock::min_time.time_since_epoch())
573 .count());
574 }
575
576 log_file_header_builder.add_log_event_uuid(log_event_uuid_offset);
577 log_file_header_builder.add_logger_instance_uuid(logger_instance_uuid_offset);
578 if (!log_start_uuid_offset.IsNull()) {
579 log_file_header_builder.add_log_start_uuid(log_start_uuid_offset);
580 }
581 log_file_header_builder.add_logger_node_boot_uuid(
582 logger_node_boot_uuid_offset);
583 log_file_header_builder.add_source_node_boot_uuid(
584 source_node_boot_uuid_offset);
585
586 log_file_header_builder.add_parts_uuid(parts_uuid_offset);
587 log_file_header_builder.add_parts_index(0);
588
589 log_file_header_builder.add_configuration_sha256(0);
590
591 if (!config_sha256_offset.IsNull()) {
592 log_file_header_builder.add_configuration_sha256(config_sha256_offset);
593 }
594
595 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
596 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> result(
597 fbb.Release());
598
599 CHECK(result.Verify()) << ": Built a corrupted header.";
600
601 return result;
602}
603
604void Logger::ResetStatisics() {
605 max_message_fetch_time_ = std::chrono::nanoseconds::zero();
606 max_message_fetch_time_channel_ = -1;
607 max_message_fetch_time_size_ = -1;
608 total_message_fetch_time_ = std::chrono::nanoseconds::zero();
609 total_message_fetch_count_ = 0;
610 total_message_fetch_bytes_ = 0;
611 total_nop_fetch_time_ = std::chrono::nanoseconds::zero();
612 total_nop_fetch_count_ = 0;
613 max_copy_time_ = std::chrono::nanoseconds::zero();
614 max_copy_time_channel_ = -1;
615 max_copy_time_size_ = -1;
616 total_copy_time_ = std::chrono::nanoseconds::zero();
617 total_copy_count_ = 0;
618 total_copy_bytes_ = 0;
619}
620
621void Logger::Rotate() {
622 for (const Node *node : log_namer_->nodes()) {
623 const int node_index = configuration::GetNodeIndex(configuration_, node);
624 log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
625 }
626}
627
628void Logger::LogUntil(monotonic_clock::time_point t) {
629 // Grab the latest ServerStatistics message. This will always have the
630 // oppertunity to be >= to the current time, so it will always represent any
631 // reboots which may have happened.
632 WriteMissingTimestamps();
633
634 // Write each channel to disk, one at a time.
635 for (FetcherStruct &f : fetchers_) {
636 while (true) {
637 if (f.written) {
638 const auto start = event_loop_->monotonic_now();
639 const bool got_new = f.fetcher->FetchNext();
640 const auto end = event_loop_->monotonic_now();
641 RecordFetchResult(start, end, got_new, &f);
642 if (!got_new) {
643 VLOG(2) << "No new data on "
644 << configuration::CleanedChannelToString(
645 f.fetcher->channel());
646 break;
647 }
648 f.written = false;
649 }
650
651 // TODO(james): Write tests to exercise this logic.
652 if (f.fetcher->context().monotonic_event_time >= t) {
653 break;
654 }
655 if (f.writer != nullptr) {
656 // Write!
657 const auto start = event_loop_->monotonic_now();
658 flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
659 max_header_size_);
660 fbb.ForceDefaults(true);
661
662 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
663 f.channel_index, f.log_type));
664 const auto end = event_loop_->monotonic_now();
665 RecordCreateMessageTime(start, end, &f);
666
667 VLOG(2) << "Writing data as node "
668 << FlatbufferToJson(event_loop_->node()) << " for channel "
669 << configuration::CleanedChannelToString(f.fetcher->channel())
670 << " to " << f.writer->filename() << " data "
671 << FlatbufferToJson(
672 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
673 fbb.GetBufferPointer()));
674
675 max_header_size_ = std::max(max_header_size_,
676 fbb.GetSize() - f.fetcher->context().size);
677 CHECK(node_state_[f.data_node_index].header_valid)
678 << ": Can't write data before the header on channel "
679 << configuration::CleanedChannelToString(f.fetcher->channel());
680 f.writer->QueueSizedFlatbuffer(&fbb);
681 }
682
683 if (f.timestamp_writer != nullptr) {
684 // And now handle timestamps.
685 const auto start = event_loop_->monotonic_now();
686 flatbuffers::FlatBufferBuilder fbb;
687 fbb.ForceDefaults(true);
688
689 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
690 f.channel_index,
691 LogType::kLogDeliveryTimeOnly));
692 const auto end = event_loop_->monotonic_now();
693 RecordCreateMessageTime(start, end, &f);
694
695 VLOG(2) << "Writing timestamps as node "
696 << FlatbufferToJson(event_loop_->node()) << " for channel "
697 << configuration::CleanedChannelToString(f.fetcher->channel())
698 << " to " << f.timestamp_writer->filename() << " timestamp "
699 << FlatbufferToJson(
700 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
701 fbb.GetBufferPointer()));
702
703 CHECK(node_state_[f.timestamp_node_index].header_valid)
704 << ": Can't write data before the header on channel "
705 << configuration::CleanedChannelToString(f.fetcher->channel());
706 f.timestamp_writer->QueueSizedFlatbuffer(&fbb);
707 }
708
709 if (f.contents_writer != nullptr) {
710 const auto start = event_loop_->monotonic_now();
711 // And now handle the special message contents channel. Copy the
712 // message into a FlatBufferBuilder and save it to disk.
713 // TODO(austin): We can be more efficient here when we start to
714 // care...
715 flatbuffers::FlatBufferBuilder fbb;
716 fbb.ForceDefaults(true);
717
718 const RemoteMessage *msg =
719 flatbuffers::GetRoot<RemoteMessage>(f.fetcher->context().data);
720
721 CHECK(msg->has_boot_uuid()) << ": " << aos::FlatbufferToJson(msg);
722 if (!node_state_[f.contents_node_index].has_source_node_boot_uuid ||
723 node_state_[f.contents_node_index].source_node_boot_uuid !=
724 msg->boot_uuid()->string_view()) {
725 node_state_[f.contents_node_index].SetBootUUID(
726 msg->boot_uuid()->string_view());
727
728 MaybeWriteHeader(f.contents_node_index);
729 }
730
731 logger::MessageHeader::Builder message_header_builder(fbb);
732
733 // TODO(austin): This needs to check the channel_index and confirm
734 // that it should be logged before squirreling away the timestamp to
735 // disk. We don't want to log irrelevant timestamps.
736
737 // Note: this must match the same order as MessageBridgeServer and
738 // PackMessage. We want identical headers to have identical
739 // on-the-wire formats to make comparing them easier.
740
741 // Translate from the channel index that the event loop uses to the
742 // channel index in the log file.
743 message_header_builder.add_channel_index(
744 event_loop_to_logged_channel_index_[msg->channel_index()]);
745
746 message_header_builder.add_queue_index(msg->queue_index());
747 message_header_builder.add_monotonic_sent_time(
748 msg->monotonic_sent_time());
749 message_header_builder.add_realtime_sent_time(
750 msg->realtime_sent_time());
751
752 message_header_builder.add_monotonic_remote_time(
753 msg->monotonic_remote_time());
754 message_header_builder.add_realtime_remote_time(
755 msg->realtime_remote_time());
756 message_header_builder.add_remote_queue_index(
757 msg->remote_queue_index());
758
759 message_header_builder.add_monotonic_timestamp_time(
760 f.fetcher->context()
761 .monotonic_event_time.time_since_epoch()
762 .count());
763
764 fbb.FinishSizePrefixed(message_header_builder.Finish());
765 const auto end = event_loop_->monotonic_now();
766 RecordCreateMessageTime(start, end, &f);
767
768 CHECK(node_state_[f.contents_node_index].header_valid)
769 << ": Can't write data before the header on channel "
770 << configuration::CleanedChannelToString(f.fetcher->channel());
771 f.contents_writer->QueueSizedFlatbuffer(&fbb);
772 }
773
774 f.written = true;
775 }
776 }
777 last_synchronized_time_ = t;
778}
779
780void Logger::DoLogData(const monotonic_clock::time_point end_time) {
781 // We want to guarantee that messages aren't out of order by more than
782 // max_out_of_order_duration. To do this, we need sync points. Every write
783 // cycle should be a sync point.
784
785 do {
786 // Move the sync point up by at most polling_period. This forces one sync
787 // per iteration, even if it is small.
788 LogUntil(std::min(last_synchronized_time_ + polling_period_, end_time));
789
790 on_logged_period_();
791
792 // If we missed cycles, we could be pretty far behind. Spin until we are
793 // caught up.
794 } while (last_synchronized_time_ + polling_period_ < end_time);
795}
796
797void Logger::RecordFetchResult(aos::monotonic_clock::time_point start,
798 aos::monotonic_clock::time_point end,
799 bool got_new, FetcherStruct *fetcher) {
800 const auto duration = end - start;
801 if (!got_new) {
802 ++total_nop_fetch_count_;
803 total_nop_fetch_time_ += duration;
804 return;
805 }
806 ++total_message_fetch_count_;
807 total_message_fetch_bytes_ += fetcher->fetcher->context().size;
808 total_message_fetch_time_ += duration;
809 if (duration > max_message_fetch_time_) {
810 max_message_fetch_time_ = duration;
811 max_message_fetch_time_channel_ = fetcher->channel_index;
812 max_message_fetch_time_size_ = fetcher->fetcher->context().size;
813 }
814}
815
816void Logger::RecordCreateMessageTime(aos::monotonic_clock::time_point start,
817 aos::monotonic_clock::time_point end,
818 FetcherStruct *fetcher) {
819 const auto duration = end - start;
820 total_copy_time_ += duration;
821 ++total_copy_count_;
822 total_copy_bytes_ += fetcher->fetcher->context().size;
823 if (duration > max_copy_time_) {
824 max_copy_time_ = duration;
825 max_copy_time_channel_ = fetcher->channel_index;
826 max_copy_time_size_ = fetcher->fetcher->context().size;
827 }
828}
829
830} // namespace logger
831} // namespace aos