blob: 0b08c94fc373371961fbbdef037e5b24e915903e [file] [log] [blame]
Austin Schuhb06f03b2021-02-17 22:00:37 -08001#include "aos/events/logging/log_writer.h"
2
3#include <functional>
4#include <map>
5#include <vector>
6
7#include "aos/configuration.h"
8#include "aos/events/event_loop.h"
9#include "aos/network/message_bridge_server_generated.h"
10#include "aos/network/team_number.h"
Austin Schuh61e973f2021-02-21 21:43:56 -080011#include "aos/network/timestamp_channel.h"
Austin Schuhb06f03b2021-02-17 22:00:37 -080012
13namespace aos {
14namespace logger {
15namespace {
16using message_bridge::RemoteMessage;
17} // namespace
18
19Logger::Logger(EventLoop *event_loop, const Configuration *configuration,
20 std::function<bool(const Channel *)> should_log)
21 : event_loop_(event_loop),
22 configuration_(configuration),
23 name_(network::GetHostname()),
24 timer_handler_(event_loop_->AddTimer(
25 [this]() { DoLogData(event_loop_->monotonic_now()); })),
26 server_statistics_fetcher_(
27 configuration::MultiNode(event_loop_->configuration())
28 ? event_loop_->MakeFetcher<message_bridge::ServerStatistics>(
29 "/aos")
30 : aos::Fetcher<message_bridge::ServerStatistics>()) {
31 VLOG(1) << "Creating logger for " << FlatbufferToJson(event_loop_->node());
32
Austin Schuhb06f03b2021-02-17 22:00:37 -080033 std::map<const Channel *, const Node *> timestamp_logger_channels;
34
Austin Schuh61e973f2021-02-21 21:43:56 -080035 message_bridge::ChannelTimestampFinder finder(event_loop_);
36 for (const Channel *channel : *event_loop_->configuration()->channels()) {
37 if (!configuration::ChannelIsSendableOnNode(channel, event_loop_->node())) {
Austin Schuhb06f03b2021-02-17 22:00:37 -080038 continue;
39 }
Austin Schuh61e973f2021-02-21 21:43:56 -080040 if (!channel->has_destination_nodes()) {
41 continue;
42 }
43 for (const Connection *connection : *channel->destination_nodes()) {
44 if (configuration::ConnectionDeliveryTimeIsLoggedOnNode(
45 connection, event_loop_->node())) {
46 const Node *other_node = configuration::GetNode(
47 event_loop_->configuration(), connection->name()->string_view());
48
49 VLOG(1) << "Timestamps are logged from "
50 << FlatbufferToJson(other_node);
51 timestamp_logger_channels.insert(
52 std::make_pair(finder.ForChannel(channel, connection), other_node));
53 }
54 }
Austin Schuhb06f03b2021-02-17 22:00:37 -080055 }
56
57 const size_t our_node_index =
58 configuration::GetNodeIndex(configuration_, event_loop_->node());
59
60 for (size_t channel_index = 0;
61 channel_index < configuration_->channels()->size(); ++channel_index) {
62 const Channel *const config_channel =
63 configuration_->channels()->Get(channel_index);
64 // The MakeRawFetcher method needs a channel which is in the event loop
65 // configuration() object, not the configuration_ object. Go look that up
66 // from the config.
67 const Channel *channel = aos::configuration::GetChannel(
68 event_loop_->configuration(), config_channel->name()->string_view(),
69 config_channel->type()->string_view(), "", event_loop_->node());
70 CHECK(channel != nullptr)
71 << ": Failed to look up channel "
72 << aos::configuration::CleanedChannelToString(config_channel);
73 if (!should_log(channel)) {
74 continue;
75 }
76
77 FetcherStruct fs;
78 fs.channel_index = channel_index;
79 fs.channel = channel;
80
81 const bool is_local =
82 configuration::ChannelIsSendableOnNode(channel, event_loop_->node());
83
84 const bool is_readable =
85 configuration::ChannelIsReadableOnNode(channel, event_loop_->node());
86 const bool is_logged = configuration::ChannelMessageIsLoggedOnNode(
87 channel, event_loop_->node());
88 const bool log_message = is_logged && is_readable;
89
90 bool log_delivery_times = false;
91 if (event_loop_->node() != nullptr) {
92 log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
93 channel, event_loop_->node(), event_loop_->node());
94 }
95
96 // Now, detect a RemoteMessage timestamp logger where we should just log the
97 // contents to a file directly.
98 const bool log_contents = timestamp_logger_channels.find(channel) !=
99 timestamp_logger_channels.end();
100
101 if (log_message || log_delivery_times || log_contents) {
102 fs.fetcher = event_loop->MakeRawFetcher(channel);
103 VLOG(1) << "Logging channel "
104 << configuration::CleanedChannelToString(channel);
105
106 if (log_delivery_times) {
107 VLOG(1) << " Delivery times";
108 fs.wants_timestamp_writer = true;
109 fs.timestamp_node_index = our_node_index;
110 }
111 if (log_message) {
112 VLOG(1) << " Data";
113 fs.wants_writer = true;
114 if (!is_local) {
115 const Node *source_node = configuration::GetNode(
116 configuration_, channel->source_node()->string_view());
117 fs.data_node_index =
118 configuration::GetNodeIndex(configuration_, source_node);
119 fs.log_type = LogType::kLogRemoteMessage;
120 } else {
121 fs.data_node_index = our_node_index;
122 }
123 }
124 if (log_contents) {
125 VLOG(1) << "Timestamp logger channel "
126 << configuration::CleanedChannelToString(channel);
127 fs.timestamp_node = timestamp_logger_channels.find(channel)->second;
128 fs.wants_contents_writer = true;
129 fs.contents_node_index =
130 configuration::GetNodeIndex(configuration_, fs.timestamp_node);
131 }
132 fetchers_.emplace_back(std::move(fs));
133 }
134 }
135
136 // When we are logging remote timestamps, we need to be able to translate from
137 // the channel index that the event loop uses to the channel index in the
138 // config in the log file.
139 event_loop_to_logged_channel_index_.resize(
140 event_loop->configuration()->channels()->size(), -1);
141 for (size_t event_loop_channel_index = 0;
142 event_loop_channel_index <
143 event_loop->configuration()->channels()->size();
144 ++event_loop_channel_index) {
145 const Channel *event_loop_channel =
146 event_loop->configuration()->channels()->Get(event_loop_channel_index);
147
148 const Channel *logged_channel = aos::configuration::GetChannel(
149 configuration_, event_loop_channel->name()->string_view(),
150 event_loop_channel->type()->string_view(), "",
151 configuration::GetNode(configuration_, event_loop_->node()));
152
153 if (logged_channel != nullptr) {
154 event_loop_to_logged_channel_index_[event_loop_channel_index] =
155 configuration::ChannelIndex(configuration_, logged_channel);
156 }
157 }
158}
159
160Logger::~Logger() {
161 if (log_namer_) {
162 // If we are replaying a log file, or in simulation, we want to force the
163 // last bit of data to be logged. The easiest way to deal with this is to
164 // poll everything as we go to destroy the class, ie, shut down the logger,
165 // and write it to disk.
166 StopLogging(event_loop_->monotonic_now());
167 }
168}
169
170void Logger::StartLogging(std::unique_ptr<LogNamer> log_namer,
171 std::string_view log_start_uuid) {
172 CHECK(!log_namer_) << ": Already logging";
173 log_namer_ = std::move(log_namer);
174
175 std::string config_sha256;
176 if (separate_config_) {
177 flatbuffers::FlatBufferBuilder fbb;
178 flatbuffers::Offset<aos::Configuration> configuration_offset =
179 CopyFlatBuffer(configuration_, &fbb);
180 LogFileHeader::Builder log_file_header_builder(fbb);
181 log_file_header_builder.add_configuration(configuration_offset);
182 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
183 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config_header(
184 fbb.Release());
185 config_sha256 = Sha256(config_header.span());
186 LOG(INFO) << "Config sha256 of " << config_sha256;
187 log_namer_->WriteConfiguration(&config_header, config_sha256);
188 }
189
190 log_event_uuid_ = UUID::Random();
191 log_start_uuid_ = log_start_uuid;
192 VLOG(1) << "Starting logger for " << FlatbufferToJson(event_loop_->node());
193
194 // We want to do as much work as possible before the initial Fetch. Time
195 // between that and actually starting to log opens up the possibility of
196 // falling off the end of the queue during that time.
197
198 for (FetcherStruct &f : fetchers_) {
199 if (f.wants_writer) {
200 f.writer = log_namer_->MakeWriter(f.channel);
201 }
202 if (f.wants_timestamp_writer) {
203 f.timestamp_writer = log_namer_->MakeTimestampWriter(f.channel);
204 }
205 if (f.wants_contents_writer) {
206 f.contents_writer = log_namer_->MakeForwardedTimestampWriter(
207 f.channel, CHECK_NOTNULL(f.timestamp_node));
208 }
209 }
210
211 CHECK(node_state_.empty());
212 node_state_.resize(configuration::MultiNode(configuration_)
213 ? configuration_->nodes()->size()
214 : 1u);
215
216 for (const Node *node : log_namer_->nodes()) {
217 const int node_index = configuration::GetNodeIndex(configuration_, node);
218
219 node_state_[node_index].log_file_header = MakeHeader(node, config_sha256);
220 }
221
222 // Grab data from each channel right before we declare the log file started
223 // so we can capture the latest message on each channel. This lets us have
224 // non periodic messages with configuration that now get logged.
225 for (FetcherStruct &f : fetchers_) {
226 const auto start = event_loop_->monotonic_now();
227 const bool got_new = f.fetcher->Fetch();
228 const auto end = event_loop_->monotonic_now();
229 RecordFetchResult(start, end, got_new, &f);
230
231 // If there is a message, we want to write it.
232 f.written = f.fetcher->context().data == nullptr;
233 }
234
235 // Clear out any old timestamps in case we are re-starting logging.
236 for (size_t i = 0; i < node_state_.size(); ++i) {
237 SetStartTime(i, monotonic_clock::min_time, realtime_clock::min_time,
238 monotonic_clock::min_time, realtime_clock::min_time);
239 }
240
241 WriteHeader();
242
243 LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node())
Austin Schuhcdd90272021-03-15 12:46:16 -0700244 << " start_time " << last_synchronized_time_ << " boot uuid "
245 << event_loop_->boot_uuid();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800246
247 // Force logging up until the start of the log file now, so the messages at
248 // the start are always ordered before the rest of the messages.
249 // Note: this ship may have already sailed, but we don't have to make it
250 // worse.
251 // TODO(austin): Test...
252 LogUntil(last_synchronized_time_);
253
254 timer_handler_->Setup(event_loop_->monotonic_now() + polling_period_,
255 polling_period_);
256}
257
258std::unique_ptr<LogNamer> Logger::StopLogging(
259 aos::monotonic_clock::time_point end_time) {
260 CHECK(log_namer_) << ": Not logging right now";
261
262 if (end_time != aos::monotonic_clock::min_time) {
263 LogUntil(end_time);
264 }
265 timer_handler_->Disable();
266
267 for (FetcherStruct &f : fetchers_) {
268 f.writer = nullptr;
269 f.timestamp_writer = nullptr;
270 f.contents_writer = nullptr;
271 }
272 node_state_.clear();
273
274 log_event_uuid_ = UUID::Zero();
275 log_start_uuid_ = std::string();
276
277 return std::move(log_namer_);
278}
279
280void Logger::WriteHeader() {
281 if (configuration::MultiNode(configuration_)) {
282 server_statistics_fetcher_.Fetch();
283 }
284
285 aos::monotonic_clock::time_point monotonic_start_time =
286 event_loop_->monotonic_now();
287 aos::realtime_clock::time_point realtime_start_time =
288 event_loop_->realtime_now();
289
290 // We need to pick a point in time to declare the log file "started". This
291 // starts here. It needs to be after everything is fetched so that the
292 // fetchers are all pointed at the most recent message before the start
293 // time.
294 last_synchronized_time_ = monotonic_start_time;
295
296 for (const Node *node : log_namer_->nodes()) {
297 const int node_index = configuration::GetNodeIndex(configuration_, node);
298 MaybeUpdateTimestamp(node, node_index, monotonic_start_time,
299 realtime_start_time);
300 MaybeWriteHeader(node_index, node);
301 }
302}
303
304void Logger::MaybeWriteHeader(int node_index) {
305 if (configuration::MultiNode(configuration_)) {
306 return MaybeWriteHeader(node_index,
307 configuration_->nodes()->Get(node_index));
308 } else {
309 return MaybeWriteHeader(node_index, nullptr);
310 }
311}
312
313void Logger::MaybeWriteHeader(int node_index, const Node *node) {
314 // This function is responsible for writing the header when the header both
315 // has valid data, and when it needs to be written.
316 if (node_state_[node_index].header_written &&
317 node_state_[node_index].header_valid) {
318 // The header has been written and is valid, nothing to do.
319 return;
320 }
321 if (!node_state_[node_index].has_source_node_boot_uuid) {
322 // Can't write a header if we don't have the boot UUID.
323 return;
324 }
325
326 // WriteHeader writes the first header in a log file. We want to do this only
327 // once.
328 //
329 // Rotate rewrites the same header with a new part ID, but keeps the same part
330 // UUID. We don't want that when things reboot, because that implies that
331 // parts go together across a reboot.
332 //
333 // Reboot resets the parts UUID. So, once we've written a header the first
334 // time, we want to use Reboot to rotate the log and reset the parts UUID.
335 //
336 // header_valid is cleared whenever the remote reboots.
337 if (node_state_[node_index].header_written) {
338 log_namer_->Reboot(node, &node_state_[node_index].log_file_header);
339 } else {
340 log_namer_->WriteHeader(&node_state_[node_index].log_file_header, node);
341
342 node_state_[node_index].header_written = true;
343 }
344 node_state_[node_index].header_valid = true;
345}
346
347void Logger::WriteMissingTimestamps() {
348 if (configuration::MultiNode(configuration_)) {
349 server_statistics_fetcher_.Fetch();
350 } else {
351 return;
352 }
353
354 if (server_statistics_fetcher_.get() == nullptr) {
355 return;
356 }
357
358 for (const Node *node : log_namer_->nodes()) {
359 const int node_index = configuration::GetNodeIndex(configuration_, node);
360 if (MaybeUpdateTimestamp(
361 node, node_index,
362 server_statistics_fetcher_.context().monotonic_event_time,
363 server_statistics_fetcher_.context().realtime_event_time)) {
364 CHECK(node_state_[node_index].header_written);
365 CHECK(node_state_[node_index].header_valid);
366 log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
367 } else {
368 MaybeWriteHeader(node_index, node);
369 }
370 }
371}
372
373void Logger::SetStartTime(
374 size_t node_index, aos::monotonic_clock::time_point monotonic_start_time,
375 aos::realtime_clock::time_point realtime_start_time,
376 aos::monotonic_clock::time_point logger_monotonic_start_time,
377 aos::realtime_clock::time_point logger_realtime_start_time) {
378 node_state_[node_index].monotonic_start_time = monotonic_start_time;
379 node_state_[node_index].realtime_start_time = realtime_start_time;
380 node_state_[node_index]
381 .log_file_header.mutable_message()
382 ->mutate_monotonic_start_time(
383 std::chrono::duration_cast<std::chrono::nanoseconds>(
384 monotonic_start_time.time_since_epoch())
385 .count());
386
387 // Add logger start times if they are available in the log file header.
388 if (node_state_[node_index]
389 .log_file_header.mutable_message()
390 ->has_logger_monotonic_start_time()) {
391 node_state_[node_index]
392 .log_file_header.mutable_message()
393 ->mutate_logger_monotonic_start_time(
394 std::chrono::duration_cast<std::chrono::nanoseconds>(
395 logger_monotonic_start_time.time_since_epoch())
396 .count());
397 }
398
399 if (node_state_[node_index]
400 .log_file_header.mutable_message()
401 ->has_logger_realtime_start_time()) {
402 node_state_[node_index]
403 .log_file_header.mutable_message()
404 ->mutate_logger_realtime_start_time(
405 std::chrono::duration_cast<std::chrono::nanoseconds>(
406 logger_realtime_start_time.time_since_epoch())
407 .count());
408 }
409
410 if (node_state_[node_index]
411 .log_file_header.mutable_message()
412 ->has_realtime_start_time()) {
413 node_state_[node_index]
414 .log_file_header.mutable_message()
415 ->mutate_realtime_start_time(
416 std::chrono::duration_cast<std::chrono::nanoseconds>(
417 realtime_start_time.time_since_epoch())
418 .count());
419 }
420}
421
422bool Logger::MaybeUpdateTimestamp(
423 const Node *node, int node_index,
424 aos::monotonic_clock::time_point monotonic_start_time,
425 aos::realtime_clock::time_point realtime_start_time) {
426 // Bail early if the start times are already set.
427 if (node_state_[node_index].monotonic_start_time !=
428 monotonic_clock::min_time) {
429 return false;
430 }
431 if (event_loop_->node() == node ||
432 !configuration::MultiNode(configuration_)) {
433 // There are no offsets to compute for ourself, so always succeed.
434 SetStartTime(node_index, monotonic_start_time, realtime_start_time,
435 monotonic_start_time, realtime_start_time);
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800436 node_state_[node_index].SetBootUUID(event_loop_->boot_uuid());
Austin Schuhb06f03b2021-02-17 22:00:37 -0800437 return true;
438 } else if (server_statistics_fetcher_.get() != nullptr) {
439 // We must be a remote node now. Look for the connection and see if it is
440 // connected.
441
442 for (const message_bridge::ServerConnection *connection :
443 *server_statistics_fetcher_->connections()) {
444 if (connection->node()->name()->string_view() !=
445 node->name()->string_view()) {
446 continue;
447 }
448
449 if (connection->state() != message_bridge::State::CONNECTED) {
450 VLOG(1) << node->name()->string_view()
451 << " is not connected, can't start it yet.";
452 break;
453 }
454
Austin Schuhb06f03b2021-02-17 22:00:37 -0800455 if (!connection->has_monotonic_offset()) {
456 VLOG(1) << "Missing monotonic offset for setting start time for node "
457 << aos::FlatbufferToJson(node);
458 break;
459 }
460
461 // Found it and it is connected. Compensate and go.
462 SetStartTime(node_index,
463 monotonic_start_time +
464 std::chrono::nanoseconds(connection->monotonic_offset()),
465 realtime_start_time, monotonic_start_time,
466 realtime_start_time);
467 return true;
468 }
469 }
470 return false;
471}
472
473aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> Logger::MakeHeader(
474 const Node *node, std::string_view config_sha256) {
475 // Now write the header with this timestamp in it.
476 flatbuffers::FlatBufferBuilder fbb;
477 fbb.ForceDefaults(true);
478
479 flatbuffers::Offset<aos::Configuration> configuration_offset;
480 if (!separate_config_) {
481 configuration_offset = CopyFlatBuffer(configuration_, &fbb);
482 } else {
483 CHECK(!config_sha256.empty());
484 }
485
486 const flatbuffers::Offset<flatbuffers::String> name_offset =
487 fbb.CreateString(name_);
488
489 CHECK(log_event_uuid_ != UUID::Zero());
490 const flatbuffers::Offset<flatbuffers::String> log_event_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800491 log_event_uuid_.PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800492
493 const flatbuffers::Offset<flatbuffers::String> logger_instance_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800494 logger_instance_uuid_.PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800495
496 flatbuffers::Offset<flatbuffers::String> log_start_uuid_offset;
497 if (!log_start_uuid_.empty()) {
498 log_start_uuid_offset = fbb.CreateString(log_start_uuid_);
499 }
500
501 flatbuffers::Offset<flatbuffers::String> config_sha256_offset;
502 if (!config_sha256.empty()) {
503 config_sha256_offset = fbb.CreateString(config_sha256);
504 }
505
506 const flatbuffers::Offset<flatbuffers::String> logger_node_boot_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800507 event_loop_->boot_uuid().PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800508
509 const flatbuffers::Offset<flatbuffers::String> source_node_boot_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800510 event_loop_->boot_uuid().PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800511
512 const flatbuffers::Offset<flatbuffers::String> parts_uuid_offset =
513 fbb.CreateString("00000000-0000-4000-8000-000000000000");
514
515 flatbuffers::Offset<Node> node_offset;
516 flatbuffers::Offset<Node> logger_node_offset;
517
518 if (configuration::MultiNode(configuration_)) {
519 node_offset = RecursiveCopyFlatBuffer(node, &fbb);
520 logger_node_offset = RecursiveCopyFlatBuffer(event_loop_->node(), &fbb);
521 }
522
523 aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
524
525 log_file_header_builder.add_name(name_offset);
526
527 // Only add the node if we are running in a multinode configuration.
528 if (node != nullptr) {
529 log_file_header_builder.add_node(node_offset);
530 log_file_header_builder.add_logger_node(logger_node_offset);
531 }
532
533 if (!configuration_offset.IsNull()) {
534 log_file_header_builder.add_configuration(configuration_offset);
535 }
536 // The worst case theoretical out of order is the polling period times 2.
537 // One message could get logged right after the boundary, but be for right
538 // before the next boundary. And the reverse could happen for another
539 // message. Report back 3x to be extra safe, and because the cost isn't
540 // huge on the read side.
541 log_file_header_builder.add_max_out_of_order_duration(
542 std::chrono::nanoseconds(3 * polling_period_).count());
543
544 log_file_header_builder.add_monotonic_start_time(
545 std::chrono::duration_cast<std::chrono::nanoseconds>(
546 monotonic_clock::min_time.time_since_epoch())
547 .count());
548 if (node == event_loop_->node()) {
549 log_file_header_builder.add_realtime_start_time(
550 std::chrono::duration_cast<std::chrono::nanoseconds>(
551 realtime_clock::min_time.time_since_epoch())
552 .count());
553 } else {
554 log_file_header_builder.add_logger_monotonic_start_time(
555 std::chrono::duration_cast<std::chrono::nanoseconds>(
556 monotonic_clock::min_time.time_since_epoch())
557 .count());
558 log_file_header_builder.add_logger_realtime_start_time(
559 std::chrono::duration_cast<std::chrono::nanoseconds>(
560 realtime_clock::min_time.time_since_epoch())
561 .count());
562 }
563
564 log_file_header_builder.add_log_event_uuid(log_event_uuid_offset);
565 log_file_header_builder.add_logger_instance_uuid(logger_instance_uuid_offset);
566 if (!log_start_uuid_offset.IsNull()) {
567 log_file_header_builder.add_log_start_uuid(log_start_uuid_offset);
568 }
569 log_file_header_builder.add_logger_node_boot_uuid(
570 logger_node_boot_uuid_offset);
571 log_file_header_builder.add_source_node_boot_uuid(
572 source_node_boot_uuid_offset);
573
574 log_file_header_builder.add_parts_uuid(parts_uuid_offset);
575 log_file_header_builder.add_parts_index(0);
576
577 log_file_header_builder.add_configuration_sha256(0);
578
579 if (!config_sha256_offset.IsNull()) {
580 log_file_header_builder.add_configuration_sha256(config_sha256_offset);
581 }
582
583 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
584 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> result(
585 fbb.Release());
586
587 CHECK(result.Verify()) << ": Built a corrupted header.";
588
589 return result;
590}
591
592void Logger::ResetStatisics() {
593 max_message_fetch_time_ = std::chrono::nanoseconds::zero();
594 max_message_fetch_time_channel_ = -1;
595 max_message_fetch_time_size_ = -1;
596 total_message_fetch_time_ = std::chrono::nanoseconds::zero();
597 total_message_fetch_count_ = 0;
598 total_message_fetch_bytes_ = 0;
599 total_nop_fetch_time_ = std::chrono::nanoseconds::zero();
600 total_nop_fetch_count_ = 0;
601 max_copy_time_ = std::chrono::nanoseconds::zero();
602 max_copy_time_channel_ = -1;
603 max_copy_time_size_ = -1;
604 total_copy_time_ = std::chrono::nanoseconds::zero();
605 total_copy_count_ = 0;
606 total_copy_bytes_ = 0;
607}
608
609void Logger::Rotate() {
610 for (const Node *node : log_namer_->nodes()) {
611 const int node_index = configuration::GetNodeIndex(configuration_, node);
612 log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
613 }
614}
615
616void Logger::LogUntil(monotonic_clock::time_point t) {
617 // Grab the latest ServerStatistics message. This will always have the
618 // oppertunity to be >= to the current time, so it will always represent any
619 // reboots which may have happened.
620 WriteMissingTimestamps();
621
Austin Schuhcdd90272021-03-15 12:46:16 -0700622 int our_node_index = aos::configuration::GetNodeIndex(
623 event_loop_->configuration(), event_loop_->node());
624
Austin Schuhb06f03b2021-02-17 22:00:37 -0800625 // Write each channel to disk, one at a time.
626 for (FetcherStruct &f : fetchers_) {
627 while (true) {
628 if (f.written) {
629 const auto start = event_loop_->monotonic_now();
630 const bool got_new = f.fetcher->FetchNext();
631 const auto end = event_loop_->monotonic_now();
632 RecordFetchResult(start, end, got_new, &f);
633 if (!got_new) {
634 VLOG(2) << "No new data on "
635 << configuration::CleanedChannelToString(
636 f.fetcher->channel());
637 break;
638 }
639 f.written = false;
640 }
641
642 // TODO(james): Write tests to exercise this logic.
643 if (f.fetcher->context().monotonic_event_time >= t) {
644 break;
645 }
646 if (f.writer != nullptr) {
Austin Schuhcdd90272021-03-15 12:46:16 -0700647 // Only check if the boot UUID has changed if this is data from another
648 // node. Our UUID can't change without restarting the application.
649 if (our_node_index != f.data_node_index) {
650 // And update our boot UUID if the UUID has changed.
651 if (node_state_[f.data_node_index].SetBootUUID(
652 f.fetcher->context().remote_boot_uuid)) {
653 MaybeWriteHeader(f.data_node_index);
654 }
655 }
656
Austin Schuhb06f03b2021-02-17 22:00:37 -0800657 // Write!
658 const auto start = event_loop_->monotonic_now();
659 flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
660 max_header_size_);
661 fbb.ForceDefaults(true);
662
663 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
664 f.channel_index, f.log_type));
665 const auto end = event_loop_->monotonic_now();
666 RecordCreateMessageTime(start, end, &f);
667
668 VLOG(2) << "Writing data as node "
669 << FlatbufferToJson(event_loop_->node()) << " for channel "
670 << configuration::CleanedChannelToString(f.fetcher->channel())
671 << " to " << f.writer->filename() << " data "
672 << FlatbufferToJson(
673 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
674 fbb.GetBufferPointer()));
675
676 max_header_size_ = std::max(max_header_size_,
677 fbb.GetSize() - f.fetcher->context().size);
678 CHECK(node_state_[f.data_node_index].header_valid)
679 << ": Can't write data before the header on channel "
680 << configuration::CleanedChannelToString(f.fetcher->channel());
681 f.writer->QueueSizedFlatbuffer(&fbb);
682 }
683
684 if (f.timestamp_writer != nullptr) {
685 // And now handle timestamps.
686 const auto start = event_loop_->monotonic_now();
687 flatbuffers::FlatBufferBuilder fbb;
688 fbb.ForceDefaults(true);
689
690 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
691 f.channel_index,
692 LogType::kLogDeliveryTimeOnly));
693 const auto end = event_loop_->monotonic_now();
694 RecordCreateMessageTime(start, end, &f);
695
696 VLOG(2) << "Writing timestamps as node "
697 << FlatbufferToJson(event_loop_->node()) << " for channel "
698 << configuration::CleanedChannelToString(f.fetcher->channel())
699 << " to " << f.timestamp_writer->filename() << " timestamp "
700 << FlatbufferToJson(
701 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
702 fbb.GetBufferPointer()));
703
704 CHECK(node_state_[f.timestamp_node_index].header_valid)
705 << ": Can't write data before the header on channel "
706 << configuration::CleanedChannelToString(f.fetcher->channel());
707 f.timestamp_writer->QueueSizedFlatbuffer(&fbb);
708 }
709
710 if (f.contents_writer != nullptr) {
711 const auto start = event_loop_->monotonic_now();
712 // And now handle the special message contents channel. Copy the
713 // message into a FlatBufferBuilder and save it to disk.
714 // TODO(austin): We can be more efficient here when we start to
715 // care...
716 flatbuffers::FlatBufferBuilder fbb;
717 fbb.ForceDefaults(true);
718
719 const RemoteMessage *msg =
720 flatbuffers::GetRoot<RemoteMessage>(f.fetcher->context().data);
721
722 CHECK(msg->has_boot_uuid()) << ": " << aos::FlatbufferToJson(msg);
Austin Schuhcdd90272021-03-15 12:46:16 -0700723 if (node_state_[f.contents_node_index].SetBootUUID(
724 UUID::FromVector(msg->boot_uuid()))) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800725 MaybeWriteHeader(f.contents_node_index);
726 }
727
728 logger::MessageHeader::Builder message_header_builder(fbb);
729
730 // TODO(austin): This needs to check the channel_index and confirm
731 // that it should be logged before squirreling away the timestamp to
732 // disk. We don't want to log irrelevant timestamps.
733
734 // Note: this must match the same order as MessageBridgeServer and
735 // PackMessage. We want identical headers to have identical
736 // on-the-wire formats to make comparing them easier.
737
738 // Translate from the channel index that the event loop uses to the
739 // channel index in the log file.
740 message_header_builder.add_channel_index(
741 event_loop_to_logged_channel_index_[msg->channel_index()]);
742
743 message_header_builder.add_queue_index(msg->queue_index());
744 message_header_builder.add_monotonic_sent_time(
745 msg->monotonic_sent_time());
746 message_header_builder.add_realtime_sent_time(
747 msg->realtime_sent_time());
748
749 message_header_builder.add_monotonic_remote_time(
750 msg->monotonic_remote_time());
751 message_header_builder.add_realtime_remote_time(
752 msg->realtime_remote_time());
753 message_header_builder.add_remote_queue_index(
754 msg->remote_queue_index());
755
756 message_header_builder.add_monotonic_timestamp_time(
757 f.fetcher->context()
758 .monotonic_event_time.time_since_epoch()
759 .count());
760
761 fbb.FinishSizePrefixed(message_header_builder.Finish());
762 const auto end = event_loop_->monotonic_now();
763 RecordCreateMessageTime(start, end, &f);
764
765 CHECK(node_state_[f.contents_node_index].header_valid)
766 << ": Can't write data before the header on channel "
767 << configuration::CleanedChannelToString(f.fetcher->channel());
768 f.contents_writer->QueueSizedFlatbuffer(&fbb);
769 }
770
771 f.written = true;
772 }
773 }
774 last_synchronized_time_ = t;
775}
776
777void Logger::DoLogData(const monotonic_clock::time_point end_time) {
778 // We want to guarantee that messages aren't out of order by more than
779 // max_out_of_order_duration. To do this, we need sync points. Every write
780 // cycle should be a sync point.
781
782 do {
783 // Move the sync point up by at most polling_period. This forces one sync
784 // per iteration, even if it is small.
785 LogUntil(std::min(last_synchronized_time_ + polling_period_, end_time));
786
787 on_logged_period_();
788
789 // If we missed cycles, we could be pretty far behind. Spin until we are
790 // caught up.
791 } while (last_synchronized_time_ + polling_period_ < end_time);
792}
793
794void Logger::RecordFetchResult(aos::monotonic_clock::time_point start,
795 aos::monotonic_clock::time_point end,
796 bool got_new, FetcherStruct *fetcher) {
797 const auto duration = end - start;
798 if (!got_new) {
799 ++total_nop_fetch_count_;
800 total_nop_fetch_time_ += duration;
801 return;
802 }
803 ++total_message_fetch_count_;
804 total_message_fetch_bytes_ += fetcher->fetcher->context().size;
805 total_message_fetch_time_ += duration;
806 if (duration > max_message_fetch_time_) {
807 max_message_fetch_time_ = duration;
808 max_message_fetch_time_channel_ = fetcher->channel_index;
809 max_message_fetch_time_size_ = fetcher->fetcher->context().size;
810 }
811}
812
813void Logger::RecordCreateMessageTime(aos::monotonic_clock::time_point start,
814 aos::monotonic_clock::time_point end,
815 FetcherStruct *fetcher) {
816 const auto duration = end - start;
817 total_copy_time_ += duration;
818 ++total_copy_count_;
819 total_copy_bytes_ += fetcher->fetcher->context().size;
820 if (duration > max_copy_time_) {
821 max_copy_time_ = duration;
822 max_copy_time_channel_ = fetcher->channel_index;
823 max_copy_time_size_ = fetcher->fetcher->context().size;
824 }
825}
826
827} // namespace logger
828} // namespace aos