blob: cb0f7c642b8f486cf0e0d2e6f9c42f0c575bd458 [file] [log] [blame]
Austin Schuhb06f03b2021-02-17 22:00:37 -08001#include "aos/events/logging/log_writer.h"
2
3#include <functional>
4#include <map>
5#include <vector>
6
7#include "aos/configuration.h"
8#include "aos/events/event_loop.h"
9#include "aos/network/message_bridge_server_generated.h"
10#include "aos/network/team_number.h"
11
12namespace aos {
13namespace logger {
14namespace {
15using message_bridge::RemoteMessage;
16} // namespace
17
18Logger::Logger(EventLoop *event_loop, const Configuration *configuration,
19 std::function<bool(const Channel *)> should_log)
20 : event_loop_(event_loop),
21 configuration_(configuration),
22 name_(network::GetHostname()),
23 timer_handler_(event_loop_->AddTimer(
24 [this]() { DoLogData(event_loop_->monotonic_now()); })),
25 server_statistics_fetcher_(
26 configuration::MultiNode(event_loop_->configuration())
27 ? event_loop_->MakeFetcher<message_bridge::ServerStatistics>(
28 "/aos")
29 : aos::Fetcher<message_bridge::ServerStatistics>()) {
30 VLOG(1) << "Creating logger for " << FlatbufferToJson(event_loop_->node());
31
32 // Find all the nodes which are logging timestamps on our node. This may
33 // over-estimate if should_log is specified.
34 std::vector<const Node *> timestamp_logger_nodes =
35 configuration::TimestampNodes(configuration_, event_loop_->node());
36
37 std::map<const Channel *, const Node *> timestamp_logger_channels;
38
39 // Now that we have all the nodes accumulated, make remote timestamp loggers
40 // for them.
41 for (const Node *node : timestamp_logger_nodes) {
42 // Note: since we are doing a find using the event loop channel, we need to
43 // make sure this channel pointer is part of the event loop configuration,
44 // not configuration_. This only matters when configuration_ !=
45 // event_loop->configuration();
46 const Channel *channel = configuration::GetChannel(
47 event_loop->configuration(),
48 absl::StrCat("/aos/remote_timestamps/", node->name()->string_view()),
49 RemoteMessage::GetFullyQualifiedName(), event_loop_->name(),
50 event_loop_->node());
51
52 CHECK(channel != nullptr)
53 << ": Remote timestamps are logged on "
54 << event_loop_->node()->name()->string_view()
55 << " but can't find channel /aos/remote_timestamps/"
56 << node->name()->string_view();
57 if (!should_log(channel)) {
58 continue;
59 }
60 timestamp_logger_channels.insert(std::make_pair(channel, node));
61 }
62
63 const size_t our_node_index =
64 configuration::GetNodeIndex(configuration_, event_loop_->node());
65
66 for (size_t channel_index = 0;
67 channel_index < configuration_->channels()->size(); ++channel_index) {
68 const Channel *const config_channel =
69 configuration_->channels()->Get(channel_index);
70 // The MakeRawFetcher method needs a channel which is in the event loop
71 // configuration() object, not the configuration_ object. Go look that up
72 // from the config.
73 const Channel *channel = aos::configuration::GetChannel(
74 event_loop_->configuration(), config_channel->name()->string_view(),
75 config_channel->type()->string_view(), "", event_loop_->node());
76 CHECK(channel != nullptr)
77 << ": Failed to look up channel "
78 << aos::configuration::CleanedChannelToString(config_channel);
79 if (!should_log(channel)) {
80 continue;
81 }
82
83 FetcherStruct fs;
84 fs.channel_index = channel_index;
85 fs.channel = channel;
86
87 const bool is_local =
88 configuration::ChannelIsSendableOnNode(channel, event_loop_->node());
89
90 const bool is_readable =
91 configuration::ChannelIsReadableOnNode(channel, event_loop_->node());
92 const bool is_logged = configuration::ChannelMessageIsLoggedOnNode(
93 channel, event_loop_->node());
94 const bool log_message = is_logged && is_readable;
95
96 bool log_delivery_times = false;
97 if (event_loop_->node() != nullptr) {
98 log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
99 channel, event_loop_->node(), event_loop_->node());
100 }
101
102 // Now, detect a RemoteMessage timestamp logger where we should just log the
103 // contents to a file directly.
104 const bool log_contents = timestamp_logger_channels.find(channel) !=
105 timestamp_logger_channels.end();
106
107 if (log_message || log_delivery_times || log_contents) {
108 fs.fetcher = event_loop->MakeRawFetcher(channel);
109 VLOG(1) << "Logging channel "
110 << configuration::CleanedChannelToString(channel);
111
112 if (log_delivery_times) {
113 VLOG(1) << " Delivery times";
114 fs.wants_timestamp_writer = true;
115 fs.timestamp_node_index = our_node_index;
116 }
117 if (log_message) {
118 VLOG(1) << " Data";
119 fs.wants_writer = true;
120 if (!is_local) {
121 const Node *source_node = configuration::GetNode(
122 configuration_, channel->source_node()->string_view());
123 fs.data_node_index =
124 configuration::GetNodeIndex(configuration_, source_node);
125 fs.log_type = LogType::kLogRemoteMessage;
126 } else {
127 fs.data_node_index = our_node_index;
128 }
129 }
130 if (log_contents) {
131 VLOG(1) << "Timestamp logger channel "
132 << configuration::CleanedChannelToString(channel);
133 fs.timestamp_node = timestamp_logger_channels.find(channel)->second;
134 fs.wants_contents_writer = true;
135 fs.contents_node_index =
136 configuration::GetNodeIndex(configuration_, fs.timestamp_node);
137 }
138 fetchers_.emplace_back(std::move(fs));
139 }
140 }
141
142 // When we are logging remote timestamps, we need to be able to translate from
143 // the channel index that the event loop uses to the channel index in the
144 // config in the log file.
145 event_loop_to_logged_channel_index_.resize(
146 event_loop->configuration()->channels()->size(), -1);
147 for (size_t event_loop_channel_index = 0;
148 event_loop_channel_index <
149 event_loop->configuration()->channels()->size();
150 ++event_loop_channel_index) {
151 const Channel *event_loop_channel =
152 event_loop->configuration()->channels()->Get(event_loop_channel_index);
153
154 const Channel *logged_channel = aos::configuration::GetChannel(
155 configuration_, event_loop_channel->name()->string_view(),
156 event_loop_channel->type()->string_view(), "",
157 configuration::GetNode(configuration_, event_loop_->node()));
158
159 if (logged_channel != nullptr) {
160 event_loop_to_logged_channel_index_[event_loop_channel_index] =
161 configuration::ChannelIndex(configuration_, logged_channel);
162 }
163 }
164}
165
166Logger::~Logger() {
167 if (log_namer_) {
168 // If we are replaying a log file, or in simulation, we want to force the
169 // last bit of data to be logged. The easiest way to deal with this is to
170 // poll everything as we go to destroy the class, ie, shut down the logger,
171 // and write it to disk.
172 StopLogging(event_loop_->monotonic_now());
173 }
174}
175
176void Logger::StartLogging(std::unique_ptr<LogNamer> log_namer,
177 std::string_view log_start_uuid) {
178 CHECK(!log_namer_) << ": Already logging";
179 log_namer_ = std::move(log_namer);
180
181 std::string config_sha256;
182 if (separate_config_) {
183 flatbuffers::FlatBufferBuilder fbb;
184 flatbuffers::Offset<aos::Configuration> configuration_offset =
185 CopyFlatBuffer(configuration_, &fbb);
186 LogFileHeader::Builder log_file_header_builder(fbb);
187 log_file_header_builder.add_configuration(configuration_offset);
188 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
189 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config_header(
190 fbb.Release());
191 config_sha256 = Sha256(config_header.span());
192 LOG(INFO) << "Config sha256 of " << config_sha256;
193 log_namer_->WriteConfiguration(&config_header, config_sha256);
194 }
195
196 log_event_uuid_ = UUID::Random();
197 log_start_uuid_ = log_start_uuid;
198 VLOG(1) << "Starting logger for " << FlatbufferToJson(event_loop_->node());
199
200 // We want to do as much work as possible before the initial Fetch. Time
201 // between that and actually starting to log opens up the possibility of
202 // falling off the end of the queue during that time.
203
204 for (FetcherStruct &f : fetchers_) {
205 if (f.wants_writer) {
206 f.writer = log_namer_->MakeWriter(f.channel);
207 }
208 if (f.wants_timestamp_writer) {
209 f.timestamp_writer = log_namer_->MakeTimestampWriter(f.channel);
210 }
211 if (f.wants_contents_writer) {
212 f.contents_writer = log_namer_->MakeForwardedTimestampWriter(
213 f.channel, CHECK_NOTNULL(f.timestamp_node));
214 }
215 }
216
217 CHECK(node_state_.empty());
218 node_state_.resize(configuration::MultiNode(configuration_)
219 ? configuration_->nodes()->size()
220 : 1u);
221
222 for (const Node *node : log_namer_->nodes()) {
223 const int node_index = configuration::GetNodeIndex(configuration_, node);
224
225 node_state_[node_index].log_file_header = MakeHeader(node, config_sha256);
226 }
227
228 // Grab data from each channel right before we declare the log file started
229 // so we can capture the latest message on each channel. This lets us have
230 // non periodic messages with configuration that now get logged.
231 for (FetcherStruct &f : fetchers_) {
232 const auto start = event_loop_->monotonic_now();
233 const bool got_new = f.fetcher->Fetch();
234 const auto end = event_loop_->monotonic_now();
235 RecordFetchResult(start, end, got_new, &f);
236
237 // If there is a message, we want to write it.
238 f.written = f.fetcher->context().data == nullptr;
239 }
240
241 // Clear out any old timestamps in case we are re-starting logging.
242 for (size_t i = 0; i < node_state_.size(); ++i) {
243 SetStartTime(i, monotonic_clock::min_time, realtime_clock::min_time,
244 monotonic_clock::min_time, realtime_clock::min_time);
245 }
246
247 WriteHeader();
248
249 LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node())
250 << " start_time " << last_synchronized_time_;
251
252 // Force logging up until the start of the log file now, so the messages at
253 // the start are always ordered before the rest of the messages.
254 // Note: this ship may have already sailed, but we don't have to make it
255 // worse.
256 // TODO(austin): Test...
257 LogUntil(last_synchronized_time_);
258
259 timer_handler_->Setup(event_loop_->monotonic_now() + polling_period_,
260 polling_period_);
261}
262
263std::unique_ptr<LogNamer> Logger::StopLogging(
264 aos::monotonic_clock::time_point end_time) {
265 CHECK(log_namer_) << ": Not logging right now";
266
267 if (end_time != aos::monotonic_clock::min_time) {
268 LogUntil(end_time);
269 }
270 timer_handler_->Disable();
271
272 for (FetcherStruct &f : fetchers_) {
273 f.writer = nullptr;
274 f.timestamp_writer = nullptr;
275 f.contents_writer = nullptr;
276 }
277 node_state_.clear();
278
279 log_event_uuid_ = UUID::Zero();
280 log_start_uuid_ = std::string();
281
282 return std::move(log_namer_);
283}
284
285void Logger::WriteHeader() {
286 if (configuration::MultiNode(configuration_)) {
287 server_statistics_fetcher_.Fetch();
288 }
289
290 aos::monotonic_clock::time_point monotonic_start_time =
291 event_loop_->monotonic_now();
292 aos::realtime_clock::time_point realtime_start_time =
293 event_loop_->realtime_now();
294
295 // We need to pick a point in time to declare the log file "started". This
296 // starts here. It needs to be after everything is fetched so that the
297 // fetchers are all pointed at the most recent message before the start
298 // time.
299 last_synchronized_time_ = monotonic_start_time;
300
301 for (const Node *node : log_namer_->nodes()) {
302 const int node_index = configuration::GetNodeIndex(configuration_, node);
303 MaybeUpdateTimestamp(node, node_index, monotonic_start_time,
304 realtime_start_time);
305 MaybeWriteHeader(node_index, node);
306 }
307}
308
309void Logger::MaybeWriteHeader(int node_index) {
310 if (configuration::MultiNode(configuration_)) {
311 return MaybeWriteHeader(node_index,
312 configuration_->nodes()->Get(node_index));
313 } else {
314 return MaybeWriteHeader(node_index, nullptr);
315 }
316}
317
318void Logger::MaybeWriteHeader(int node_index, const Node *node) {
319 // This function is responsible for writing the header when the header both
320 // has valid data, and when it needs to be written.
321 if (node_state_[node_index].header_written &&
322 node_state_[node_index].header_valid) {
323 // The header has been written and is valid, nothing to do.
324 return;
325 }
326 if (!node_state_[node_index].has_source_node_boot_uuid) {
327 // Can't write a header if we don't have the boot UUID.
328 return;
329 }
330
331 // WriteHeader writes the first header in a log file. We want to do this only
332 // once.
333 //
334 // Rotate rewrites the same header with a new part ID, but keeps the same part
335 // UUID. We don't want that when things reboot, because that implies that
336 // parts go together across a reboot.
337 //
338 // Reboot resets the parts UUID. So, once we've written a header the first
339 // time, we want to use Reboot to rotate the log and reset the parts UUID.
340 //
341 // header_valid is cleared whenever the remote reboots.
342 if (node_state_[node_index].header_written) {
343 log_namer_->Reboot(node, &node_state_[node_index].log_file_header);
344 } else {
345 log_namer_->WriteHeader(&node_state_[node_index].log_file_header, node);
346
347 node_state_[node_index].header_written = true;
348 }
349 node_state_[node_index].header_valid = true;
350}
351
352void Logger::WriteMissingTimestamps() {
353 if (configuration::MultiNode(configuration_)) {
354 server_statistics_fetcher_.Fetch();
355 } else {
356 return;
357 }
358
359 if (server_statistics_fetcher_.get() == nullptr) {
360 return;
361 }
362
363 for (const Node *node : log_namer_->nodes()) {
364 const int node_index = configuration::GetNodeIndex(configuration_, node);
365 if (MaybeUpdateTimestamp(
366 node, node_index,
367 server_statistics_fetcher_.context().monotonic_event_time,
368 server_statistics_fetcher_.context().realtime_event_time)) {
369 CHECK(node_state_[node_index].header_written);
370 CHECK(node_state_[node_index].header_valid);
371 log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
372 } else {
373 MaybeWriteHeader(node_index, node);
374 }
375 }
376}
377
378void Logger::SetStartTime(
379 size_t node_index, aos::monotonic_clock::time_point monotonic_start_time,
380 aos::realtime_clock::time_point realtime_start_time,
381 aos::monotonic_clock::time_point logger_monotonic_start_time,
382 aos::realtime_clock::time_point logger_realtime_start_time) {
383 node_state_[node_index].monotonic_start_time = monotonic_start_time;
384 node_state_[node_index].realtime_start_time = realtime_start_time;
385 node_state_[node_index]
386 .log_file_header.mutable_message()
387 ->mutate_monotonic_start_time(
388 std::chrono::duration_cast<std::chrono::nanoseconds>(
389 monotonic_start_time.time_since_epoch())
390 .count());
391
392 // Add logger start times if they are available in the log file header.
393 if (node_state_[node_index]
394 .log_file_header.mutable_message()
395 ->has_logger_monotonic_start_time()) {
396 node_state_[node_index]
397 .log_file_header.mutable_message()
398 ->mutate_logger_monotonic_start_time(
399 std::chrono::duration_cast<std::chrono::nanoseconds>(
400 logger_monotonic_start_time.time_since_epoch())
401 .count());
402 }
403
404 if (node_state_[node_index]
405 .log_file_header.mutable_message()
406 ->has_logger_realtime_start_time()) {
407 node_state_[node_index]
408 .log_file_header.mutable_message()
409 ->mutate_logger_realtime_start_time(
410 std::chrono::duration_cast<std::chrono::nanoseconds>(
411 logger_realtime_start_time.time_since_epoch())
412 .count());
413 }
414
415 if (node_state_[node_index]
416 .log_file_header.mutable_message()
417 ->has_realtime_start_time()) {
418 node_state_[node_index]
419 .log_file_header.mutable_message()
420 ->mutate_realtime_start_time(
421 std::chrono::duration_cast<std::chrono::nanoseconds>(
422 realtime_start_time.time_since_epoch())
423 .count());
424 }
425}
426
427bool Logger::MaybeUpdateTimestamp(
428 const Node *node, int node_index,
429 aos::monotonic_clock::time_point monotonic_start_time,
430 aos::realtime_clock::time_point realtime_start_time) {
431 // Bail early if the start times are already set.
432 if (node_state_[node_index].monotonic_start_time !=
433 monotonic_clock::min_time) {
434 return false;
435 }
436 if (event_loop_->node() == node ||
437 !configuration::MultiNode(configuration_)) {
438 // There are no offsets to compute for ourself, so always succeed.
439 SetStartTime(node_index, monotonic_start_time, realtime_start_time,
440 monotonic_start_time, realtime_start_time);
441 node_state_[node_index].SetBootUUID(event_loop_->boot_uuid().string_view());
442 return true;
443 } else if (server_statistics_fetcher_.get() != nullptr) {
444 // We must be a remote node now. Look for the connection and see if it is
445 // connected.
446
447 for (const message_bridge::ServerConnection *connection :
448 *server_statistics_fetcher_->connections()) {
449 if (connection->node()->name()->string_view() !=
450 node->name()->string_view()) {
451 continue;
452 }
453
454 if (connection->state() != message_bridge::State::CONNECTED) {
455 VLOG(1) << node->name()->string_view()
456 << " is not connected, can't start it yet.";
457 break;
458 }
459
460 // Update the boot UUID as soon as we know we are connected.
461 if (!connection->has_boot_uuid()) {
462 VLOG(1) << "Missing boot_uuid for node " << aos::FlatbufferToJson(node);
463 break;
464 }
465
466 if (!node_state_[node_index].has_source_node_boot_uuid ||
467 node_state_[node_index].source_node_boot_uuid !=
468 connection->boot_uuid()->string_view()) {
469 node_state_[node_index].SetBootUUID(
470 connection->boot_uuid()->string_view());
471 }
472
473 if (!connection->has_monotonic_offset()) {
474 VLOG(1) << "Missing monotonic offset for setting start time for node "
475 << aos::FlatbufferToJson(node);
476 break;
477 }
478
479 // Found it and it is connected. Compensate and go.
480 SetStartTime(node_index,
481 monotonic_start_time +
482 std::chrono::nanoseconds(connection->monotonic_offset()),
483 realtime_start_time, monotonic_start_time,
484 realtime_start_time);
485 return true;
486 }
487 }
488 return false;
489}
490
491aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> Logger::MakeHeader(
492 const Node *node, std::string_view config_sha256) {
493 // Now write the header with this timestamp in it.
494 flatbuffers::FlatBufferBuilder fbb;
495 fbb.ForceDefaults(true);
496
497 flatbuffers::Offset<aos::Configuration> configuration_offset;
498 if (!separate_config_) {
499 configuration_offset = CopyFlatBuffer(configuration_, &fbb);
500 } else {
501 CHECK(!config_sha256.empty());
502 }
503
504 const flatbuffers::Offset<flatbuffers::String> name_offset =
505 fbb.CreateString(name_);
506
507 CHECK(log_event_uuid_ != UUID::Zero());
508 const flatbuffers::Offset<flatbuffers::String> log_event_uuid_offset =
509 fbb.CreateString(log_event_uuid_.string_view());
510
511 const flatbuffers::Offset<flatbuffers::String> logger_instance_uuid_offset =
512 fbb.CreateString(logger_instance_uuid_.string_view());
513
514 flatbuffers::Offset<flatbuffers::String> log_start_uuid_offset;
515 if (!log_start_uuid_.empty()) {
516 log_start_uuid_offset = fbb.CreateString(log_start_uuid_);
517 }
518
519 flatbuffers::Offset<flatbuffers::String> config_sha256_offset;
520 if (!config_sha256.empty()) {
521 config_sha256_offset = fbb.CreateString(config_sha256);
522 }
523
524 const flatbuffers::Offset<flatbuffers::String> logger_node_boot_uuid_offset =
525 fbb.CreateString(event_loop_->boot_uuid().string_view());
526
527 const flatbuffers::Offset<flatbuffers::String> source_node_boot_uuid_offset =
528 fbb.CreateString(event_loop_->boot_uuid().string_view());
529
530 const flatbuffers::Offset<flatbuffers::String> parts_uuid_offset =
531 fbb.CreateString("00000000-0000-4000-8000-000000000000");
532
533 flatbuffers::Offset<Node> node_offset;
534 flatbuffers::Offset<Node> logger_node_offset;
535
536 if (configuration::MultiNode(configuration_)) {
537 node_offset = RecursiveCopyFlatBuffer(node, &fbb);
538 logger_node_offset = RecursiveCopyFlatBuffer(event_loop_->node(), &fbb);
539 }
540
541 aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
542
543 log_file_header_builder.add_name(name_offset);
544
545 // Only add the node if we are running in a multinode configuration.
546 if (node != nullptr) {
547 log_file_header_builder.add_node(node_offset);
548 log_file_header_builder.add_logger_node(logger_node_offset);
549 }
550
551 if (!configuration_offset.IsNull()) {
552 log_file_header_builder.add_configuration(configuration_offset);
553 }
554 // The worst case theoretical out of order is the polling period times 2.
555 // One message could get logged right after the boundary, but be for right
556 // before the next boundary. And the reverse could happen for another
557 // message. Report back 3x to be extra safe, and because the cost isn't
558 // huge on the read side.
559 log_file_header_builder.add_max_out_of_order_duration(
560 std::chrono::nanoseconds(3 * polling_period_).count());
561
562 log_file_header_builder.add_monotonic_start_time(
563 std::chrono::duration_cast<std::chrono::nanoseconds>(
564 monotonic_clock::min_time.time_since_epoch())
565 .count());
566 if (node == event_loop_->node()) {
567 log_file_header_builder.add_realtime_start_time(
568 std::chrono::duration_cast<std::chrono::nanoseconds>(
569 realtime_clock::min_time.time_since_epoch())
570 .count());
571 } else {
572 log_file_header_builder.add_logger_monotonic_start_time(
573 std::chrono::duration_cast<std::chrono::nanoseconds>(
574 monotonic_clock::min_time.time_since_epoch())
575 .count());
576 log_file_header_builder.add_logger_realtime_start_time(
577 std::chrono::duration_cast<std::chrono::nanoseconds>(
578 realtime_clock::min_time.time_since_epoch())
579 .count());
580 }
581
582 log_file_header_builder.add_log_event_uuid(log_event_uuid_offset);
583 log_file_header_builder.add_logger_instance_uuid(logger_instance_uuid_offset);
584 if (!log_start_uuid_offset.IsNull()) {
585 log_file_header_builder.add_log_start_uuid(log_start_uuid_offset);
586 }
587 log_file_header_builder.add_logger_node_boot_uuid(
588 logger_node_boot_uuid_offset);
589 log_file_header_builder.add_source_node_boot_uuid(
590 source_node_boot_uuid_offset);
591
592 log_file_header_builder.add_parts_uuid(parts_uuid_offset);
593 log_file_header_builder.add_parts_index(0);
594
595 log_file_header_builder.add_configuration_sha256(0);
596
597 if (!config_sha256_offset.IsNull()) {
598 log_file_header_builder.add_configuration_sha256(config_sha256_offset);
599 }
600
601 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
602 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> result(
603 fbb.Release());
604
605 CHECK(result.Verify()) << ": Built a corrupted header.";
606
607 return result;
608}
609
610void Logger::ResetStatisics() {
611 max_message_fetch_time_ = std::chrono::nanoseconds::zero();
612 max_message_fetch_time_channel_ = -1;
613 max_message_fetch_time_size_ = -1;
614 total_message_fetch_time_ = std::chrono::nanoseconds::zero();
615 total_message_fetch_count_ = 0;
616 total_message_fetch_bytes_ = 0;
617 total_nop_fetch_time_ = std::chrono::nanoseconds::zero();
618 total_nop_fetch_count_ = 0;
619 max_copy_time_ = std::chrono::nanoseconds::zero();
620 max_copy_time_channel_ = -1;
621 max_copy_time_size_ = -1;
622 total_copy_time_ = std::chrono::nanoseconds::zero();
623 total_copy_count_ = 0;
624 total_copy_bytes_ = 0;
625}
626
627void Logger::Rotate() {
628 for (const Node *node : log_namer_->nodes()) {
629 const int node_index = configuration::GetNodeIndex(configuration_, node);
630 log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
631 }
632}
633
634void Logger::LogUntil(monotonic_clock::time_point t) {
635 // Grab the latest ServerStatistics message. This will always have the
636 // oppertunity to be >= to the current time, so it will always represent any
637 // reboots which may have happened.
638 WriteMissingTimestamps();
639
640 // Write each channel to disk, one at a time.
641 for (FetcherStruct &f : fetchers_) {
642 while (true) {
643 if (f.written) {
644 const auto start = event_loop_->monotonic_now();
645 const bool got_new = f.fetcher->FetchNext();
646 const auto end = event_loop_->monotonic_now();
647 RecordFetchResult(start, end, got_new, &f);
648 if (!got_new) {
649 VLOG(2) << "No new data on "
650 << configuration::CleanedChannelToString(
651 f.fetcher->channel());
652 break;
653 }
654 f.written = false;
655 }
656
657 // TODO(james): Write tests to exercise this logic.
658 if (f.fetcher->context().monotonic_event_time >= t) {
659 break;
660 }
661 if (f.writer != nullptr) {
662 // Write!
663 const auto start = event_loop_->monotonic_now();
664 flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
665 max_header_size_);
666 fbb.ForceDefaults(true);
667
668 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
669 f.channel_index, f.log_type));
670 const auto end = event_loop_->monotonic_now();
671 RecordCreateMessageTime(start, end, &f);
672
673 VLOG(2) << "Writing data as node "
674 << FlatbufferToJson(event_loop_->node()) << " for channel "
675 << configuration::CleanedChannelToString(f.fetcher->channel())
676 << " to " << f.writer->filename() << " data "
677 << FlatbufferToJson(
678 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
679 fbb.GetBufferPointer()));
680
681 max_header_size_ = std::max(max_header_size_,
682 fbb.GetSize() - f.fetcher->context().size);
683 CHECK(node_state_[f.data_node_index].header_valid)
684 << ": Can't write data before the header on channel "
685 << configuration::CleanedChannelToString(f.fetcher->channel());
686 f.writer->QueueSizedFlatbuffer(&fbb);
687 }
688
689 if (f.timestamp_writer != nullptr) {
690 // And now handle timestamps.
691 const auto start = event_loop_->monotonic_now();
692 flatbuffers::FlatBufferBuilder fbb;
693 fbb.ForceDefaults(true);
694
695 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
696 f.channel_index,
697 LogType::kLogDeliveryTimeOnly));
698 const auto end = event_loop_->monotonic_now();
699 RecordCreateMessageTime(start, end, &f);
700
701 VLOG(2) << "Writing timestamps as node "
702 << FlatbufferToJson(event_loop_->node()) << " for channel "
703 << configuration::CleanedChannelToString(f.fetcher->channel())
704 << " to " << f.timestamp_writer->filename() << " timestamp "
705 << FlatbufferToJson(
706 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
707 fbb.GetBufferPointer()));
708
709 CHECK(node_state_[f.timestamp_node_index].header_valid)
710 << ": Can't write data before the header on channel "
711 << configuration::CleanedChannelToString(f.fetcher->channel());
712 f.timestamp_writer->QueueSizedFlatbuffer(&fbb);
713 }
714
715 if (f.contents_writer != nullptr) {
716 const auto start = event_loop_->monotonic_now();
717 // And now handle the special message contents channel. Copy the
718 // message into a FlatBufferBuilder and save it to disk.
719 // TODO(austin): We can be more efficient here when we start to
720 // care...
721 flatbuffers::FlatBufferBuilder fbb;
722 fbb.ForceDefaults(true);
723
724 const RemoteMessage *msg =
725 flatbuffers::GetRoot<RemoteMessage>(f.fetcher->context().data);
726
727 CHECK(msg->has_boot_uuid()) << ": " << aos::FlatbufferToJson(msg);
728 if (!node_state_[f.contents_node_index].has_source_node_boot_uuid ||
729 node_state_[f.contents_node_index].source_node_boot_uuid !=
730 msg->boot_uuid()->string_view()) {
731 node_state_[f.contents_node_index].SetBootUUID(
732 msg->boot_uuid()->string_view());
733
734 MaybeWriteHeader(f.contents_node_index);
735 }
736
737 logger::MessageHeader::Builder message_header_builder(fbb);
738
739 // TODO(austin): This needs to check the channel_index and confirm
740 // that it should be logged before squirreling away the timestamp to
741 // disk. We don't want to log irrelevant timestamps.
742
743 // Note: this must match the same order as MessageBridgeServer and
744 // PackMessage. We want identical headers to have identical
745 // on-the-wire formats to make comparing them easier.
746
747 // Translate from the channel index that the event loop uses to the
748 // channel index in the log file.
749 message_header_builder.add_channel_index(
750 event_loop_to_logged_channel_index_[msg->channel_index()]);
751
752 message_header_builder.add_queue_index(msg->queue_index());
753 message_header_builder.add_monotonic_sent_time(
754 msg->monotonic_sent_time());
755 message_header_builder.add_realtime_sent_time(
756 msg->realtime_sent_time());
757
758 message_header_builder.add_monotonic_remote_time(
759 msg->monotonic_remote_time());
760 message_header_builder.add_realtime_remote_time(
761 msg->realtime_remote_time());
762 message_header_builder.add_remote_queue_index(
763 msg->remote_queue_index());
764
765 message_header_builder.add_monotonic_timestamp_time(
766 f.fetcher->context()
767 .monotonic_event_time.time_since_epoch()
768 .count());
769
770 fbb.FinishSizePrefixed(message_header_builder.Finish());
771 const auto end = event_loop_->monotonic_now();
772 RecordCreateMessageTime(start, end, &f);
773
774 CHECK(node_state_[f.contents_node_index].header_valid)
775 << ": Can't write data before the header on channel "
776 << configuration::CleanedChannelToString(f.fetcher->channel());
777 f.contents_writer->QueueSizedFlatbuffer(&fbb);
778 }
779
780 f.written = true;
781 }
782 }
783 last_synchronized_time_ = t;
784}
785
786void Logger::DoLogData(const monotonic_clock::time_point end_time) {
787 // We want to guarantee that messages aren't out of order by more than
788 // max_out_of_order_duration. To do this, we need sync points. Every write
789 // cycle should be a sync point.
790
791 do {
792 // Move the sync point up by at most polling_period. This forces one sync
793 // per iteration, even if it is small.
794 LogUntil(std::min(last_synchronized_time_ + polling_period_, end_time));
795
796 on_logged_period_();
797
798 // If we missed cycles, we could be pretty far behind. Spin until we are
799 // caught up.
800 } while (last_synchronized_time_ + polling_period_ < end_time);
801}
802
803void Logger::RecordFetchResult(aos::monotonic_clock::time_point start,
804 aos::monotonic_clock::time_point end,
805 bool got_new, FetcherStruct *fetcher) {
806 const auto duration = end - start;
807 if (!got_new) {
808 ++total_nop_fetch_count_;
809 total_nop_fetch_time_ += duration;
810 return;
811 }
812 ++total_message_fetch_count_;
813 total_message_fetch_bytes_ += fetcher->fetcher->context().size;
814 total_message_fetch_time_ += duration;
815 if (duration > max_message_fetch_time_) {
816 max_message_fetch_time_ = duration;
817 max_message_fetch_time_channel_ = fetcher->channel_index;
818 max_message_fetch_time_size_ = fetcher->fetcher->context().size;
819 }
820}
821
822void Logger::RecordCreateMessageTime(aos::monotonic_clock::time_point start,
823 aos::monotonic_clock::time_point end,
824 FetcherStruct *fetcher) {
825 const auto duration = end - start;
826 total_copy_time_ += duration;
827 ++total_copy_count_;
828 total_copy_bytes_ += fetcher->fetcher->context().size;
829 if (duration > max_copy_time_) {
830 max_copy_time_ = duration;
831 max_copy_time_channel_ = fetcher->channel_index;
832 max_copy_time_size_ = fetcher->fetcher->context().size;
833 }
834}
835
836} // namespace logger
837} // namespace aos