blob: 2780922eade9ae7654acb3665a06d09b67edb361 [file] [log] [blame]
Austin Schuhb06f03b2021-02-17 22:00:37 -08001#include "aos/events/logging/log_writer.h"
2
3#include <functional>
4#include <map>
5#include <vector>
6
7#include "aos/configuration.h"
8#include "aos/events/event_loop.h"
9#include "aos/network/message_bridge_server_generated.h"
10#include "aos/network/team_number.h"
Austin Schuh61e973f2021-02-21 21:43:56 -080011#include "aos/network/timestamp_channel.h"
Austin Schuhb06f03b2021-02-17 22:00:37 -080012
13namespace aos {
14namespace logger {
15namespace {
16using message_bridge::RemoteMessage;
17} // namespace
18
19Logger::Logger(EventLoop *event_loop, const Configuration *configuration,
20 std::function<bool(const Channel *)> should_log)
21 : event_loop_(event_loop),
22 configuration_(configuration),
23 name_(network::GetHostname()),
24 timer_handler_(event_loop_->AddTimer(
Austin Schuh30586902021-03-30 22:54:08 -070025 [this]() { DoLogData(event_loop_->monotonic_now(), true); })),
Austin Schuhb06f03b2021-02-17 22:00:37 -080026 server_statistics_fetcher_(
27 configuration::MultiNode(event_loop_->configuration())
28 ? event_loop_->MakeFetcher<message_bridge::ServerStatistics>(
29 "/aos")
30 : aos::Fetcher<message_bridge::ServerStatistics>()) {
31 VLOG(1) << "Creating logger for " << FlatbufferToJson(event_loop_->node());
32
Austin Schuhb06f03b2021-02-17 22:00:37 -080033 std::map<const Channel *, const Node *> timestamp_logger_channels;
34
Austin Schuh61e973f2021-02-21 21:43:56 -080035 message_bridge::ChannelTimestampFinder finder(event_loop_);
36 for (const Channel *channel : *event_loop_->configuration()->channels()) {
37 if (!configuration::ChannelIsSendableOnNode(channel, event_loop_->node())) {
Austin Schuhb06f03b2021-02-17 22:00:37 -080038 continue;
39 }
Austin Schuh61e973f2021-02-21 21:43:56 -080040 if (!channel->has_destination_nodes()) {
41 continue;
42 }
43 for (const Connection *connection : *channel->destination_nodes()) {
44 if (configuration::ConnectionDeliveryTimeIsLoggedOnNode(
45 connection, event_loop_->node())) {
46 const Node *other_node = configuration::GetNode(
47 event_loop_->configuration(), connection->name()->string_view());
48
49 VLOG(1) << "Timestamps are logged from "
50 << FlatbufferToJson(other_node);
51 timestamp_logger_channels.insert(
52 std::make_pair(finder.ForChannel(channel, connection), other_node));
53 }
54 }
Austin Schuhb06f03b2021-02-17 22:00:37 -080055 }
56
57 const size_t our_node_index =
58 configuration::GetNodeIndex(configuration_, event_loop_->node());
59
60 for (size_t channel_index = 0;
61 channel_index < configuration_->channels()->size(); ++channel_index) {
62 const Channel *const config_channel =
63 configuration_->channels()->Get(channel_index);
64 // The MakeRawFetcher method needs a channel which is in the event loop
65 // configuration() object, not the configuration_ object. Go look that up
66 // from the config.
67 const Channel *channel = aos::configuration::GetChannel(
68 event_loop_->configuration(), config_channel->name()->string_view(),
69 config_channel->type()->string_view(), "", event_loop_->node());
70 CHECK(channel != nullptr)
71 << ": Failed to look up channel "
72 << aos::configuration::CleanedChannelToString(config_channel);
73 if (!should_log(channel)) {
74 continue;
75 }
76
77 FetcherStruct fs;
78 fs.channel_index = channel_index;
79 fs.channel = channel;
80
81 const bool is_local =
82 configuration::ChannelIsSendableOnNode(channel, event_loop_->node());
83
84 const bool is_readable =
85 configuration::ChannelIsReadableOnNode(channel, event_loop_->node());
86 const bool is_logged = configuration::ChannelMessageIsLoggedOnNode(
87 channel, event_loop_->node());
88 const bool log_message = is_logged && is_readable;
89
90 bool log_delivery_times = false;
91 if (event_loop_->node() != nullptr) {
92 log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
93 channel, event_loop_->node(), event_loop_->node());
94 }
95
96 // Now, detect a RemoteMessage timestamp logger where we should just log the
97 // contents to a file directly.
98 const bool log_contents = timestamp_logger_channels.find(channel) !=
99 timestamp_logger_channels.end();
100
101 if (log_message || log_delivery_times || log_contents) {
102 fs.fetcher = event_loop->MakeRawFetcher(channel);
103 VLOG(1) << "Logging channel "
104 << configuration::CleanedChannelToString(channel);
105
106 if (log_delivery_times) {
107 VLOG(1) << " Delivery times";
108 fs.wants_timestamp_writer = true;
109 fs.timestamp_node_index = our_node_index;
110 }
111 if (log_message) {
112 VLOG(1) << " Data";
113 fs.wants_writer = true;
114 if (!is_local) {
115 const Node *source_node = configuration::GetNode(
116 configuration_, channel->source_node()->string_view());
117 fs.data_node_index =
118 configuration::GetNodeIndex(configuration_, source_node);
119 fs.log_type = LogType::kLogRemoteMessage;
120 } else {
121 fs.data_node_index = our_node_index;
122 }
123 }
124 if (log_contents) {
125 VLOG(1) << "Timestamp logger channel "
126 << configuration::CleanedChannelToString(channel);
127 fs.timestamp_node = timestamp_logger_channels.find(channel)->second;
128 fs.wants_contents_writer = true;
129 fs.contents_node_index =
130 configuration::GetNodeIndex(configuration_, fs.timestamp_node);
131 }
132 fetchers_.emplace_back(std::move(fs));
133 }
134 }
135
136 // When we are logging remote timestamps, we need to be able to translate from
137 // the channel index that the event loop uses to the channel index in the
138 // config in the log file.
139 event_loop_to_logged_channel_index_.resize(
140 event_loop->configuration()->channels()->size(), -1);
141 for (size_t event_loop_channel_index = 0;
142 event_loop_channel_index <
143 event_loop->configuration()->channels()->size();
144 ++event_loop_channel_index) {
145 const Channel *event_loop_channel =
146 event_loop->configuration()->channels()->Get(event_loop_channel_index);
147
148 const Channel *logged_channel = aos::configuration::GetChannel(
149 configuration_, event_loop_channel->name()->string_view(),
150 event_loop_channel->type()->string_view(), "",
151 configuration::GetNode(configuration_, event_loop_->node()));
152
153 if (logged_channel != nullptr) {
154 event_loop_to_logged_channel_index_[event_loop_channel_index] =
155 configuration::ChannelIndex(configuration_, logged_channel);
156 }
157 }
158}
159
160Logger::~Logger() {
161 if (log_namer_) {
162 // If we are replaying a log file, or in simulation, we want to force the
163 // last bit of data to be logged. The easiest way to deal with this is to
164 // poll everything as we go to destroy the class, ie, shut down the logger,
165 // and write it to disk.
166 StopLogging(event_loop_->monotonic_now());
167 }
168}
169
170void Logger::StartLogging(std::unique_ptr<LogNamer> log_namer,
171 std::string_view log_start_uuid) {
172 CHECK(!log_namer_) << ": Already logging";
173 log_namer_ = std::move(log_namer);
174
175 std::string config_sha256;
176 if (separate_config_) {
177 flatbuffers::FlatBufferBuilder fbb;
178 flatbuffers::Offset<aos::Configuration> configuration_offset =
179 CopyFlatBuffer(configuration_, &fbb);
180 LogFileHeader::Builder log_file_header_builder(fbb);
181 log_file_header_builder.add_configuration(configuration_offset);
182 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
183 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config_header(
184 fbb.Release());
185 config_sha256 = Sha256(config_header.span());
186 LOG(INFO) << "Config sha256 of " << config_sha256;
187 log_namer_->WriteConfiguration(&config_header, config_sha256);
188 }
189
190 log_event_uuid_ = UUID::Random();
191 log_start_uuid_ = log_start_uuid;
192 VLOG(1) << "Starting logger for " << FlatbufferToJson(event_loop_->node());
193
194 // We want to do as much work as possible before the initial Fetch. Time
195 // between that and actually starting to log opens up the possibility of
196 // falling off the end of the queue during that time.
197
198 for (FetcherStruct &f : fetchers_) {
199 if (f.wants_writer) {
200 f.writer = log_namer_->MakeWriter(f.channel);
201 }
202 if (f.wants_timestamp_writer) {
203 f.timestamp_writer = log_namer_->MakeTimestampWriter(f.channel);
204 }
205 if (f.wants_contents_writer) {
206 f.contents_writer = log_namer_->MakeForwardedTimestampWriter(
207 f.channel, CHECK_NOTNULL(f.timestamp_node));
208 }
209 }
210
211 CHECK(node_state_.empty());
212 node_state_.resize(configuration::MultiNode(configuration_)
213 ? configuration_->nodes()->size()
214 : 1u);
215
216 for (const Node *node : log_namer_->nodes()) {
217 const int node_index = configuration::GetNodeIndex(configuration_, node);
218
219 node_state_[node_index].log_file_header = MakeHeader(node, config_sha256);
220 }
221
222 // Grab data from each channel right before we declare the log file started
223 // so we can capture the latest message on each channel. This lets us have
224 // non periodic messages with configuration that now get logged.
225 for (FetcherStruct &f : fetchers_) {
226 const auto start = event_loop_->monotonic_now();
227 const bool got_new = f.fetcher->Fetch();
228 const auto end = event_loop_->monotonic_now();
229 RecordFetchResult(start, end, got_new, &f);
230
231 // If there is a message, we want to write it.
232 f.written = f.fetcher->context().data == nullptr;
233 }
234
235 // Clear out any old timestamps in case we are re-starting logging.
236 for (size_t i = 0; i < node_state_.size(); ++i) {
237 SetStartTime(i, monotonic_clock::min_time, realtime_clock::min_time,
238 monotonic_clock::min_time, realtime_clock::min_time);
239 }
240
241 WriteHeader();
242
243 LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node())
Austin Schuhcdd90272021-03-15 12:46:16 -0700244 << " start_time " << last_synchronized_time_ << " boot uuid "
245 << event_loop_->boot_uuid();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800246
247 // Force logging up until the start of the log file now, so the messages at
248 // the start are always ordered before the rest of the messages.
249 // Note: this ship may have already sailed, but we don't have to make it
250 // worse.
251 // TODO(austin): Test...
Austin Schuh855f8932021-03-19 22:01:32 -0700252 //
253 // This is safe to call here since we have set last_synchronized_time_ as the
254 // same time as in the header, and all the data before it should be logged
255 // without ordering concerns.
Austin Schuhb06f03b2021-02-17 22:00:37 -0800256 LogUntil(last_synchronized_time_);
257
258 timer_handler_->Setup(event_loop_->monotonic_now() + polling_period_,
259 polling_period_);
260}
261
262std::unique_ptr<LogNamer> Logger::StopLogging(
263 aos::monotonic_clock::time_point end_time) {
264 CHECK(log_namer_) << ": Not logging right now";
265
266 if (end_time != aos::monotonic_clock::min_time) {
Austin Schuh30586902021-03-30 22:54:08 -0700267 // Folks like to use the on_logged_period_ callback to trigger stop and
268 // start events. We can't have those then recurse and try to stop again.
269 // Rather than making everything reentrant, let's just instead block the
270 // callback here.
271 DoLogData(end_time, false);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800272 }
273 timer_handler_->Disable();
274
275 for (FetcherStruct &f : fetchers_) {
276 f.writer = nullptr;
277 f.timestamp_writer = nullptr;
278 f.contents_writer = nullptr;
279 }
280 node_state_.clear();
281
282 log_event_uuid_ = UUID::Zero();
283 log_start_uuid_ = std::string();
284
285 return std::move(log_namer_);
286}
287
288void Logger::WriteHeader() {
289 if (configuration::MultiNode(configuration_)) {
290 server_statistics_fetcher_.Fetch();
291 }
292
293 aos::monotonic_clock::time_point monotonic_start_time =
294 event_loop_->monotonic_now();
295 aos::realtime_clock::time_point realtime_start_time =
296 event_loop_->realtime_now();
297
298 // We need to pick a point in time to declare the log file "started". This
299 // starts here. It needs to be after everything is fetched so that the
300 // fetchers are all pointed at the most recent message before the start
301 // time.
302 last_synchronized_time_ = monotonic_start_time;
303
304 for (const Node *node : log_namer_->nodes()) {
305 const int node_index = configuration::GetNodeIndex(configuration_, node);
306 MaybeUpdateTimestamp(node, node_index, monotonic_start_time,
307 realtime_start_time);
308 MaybeWriteHeader(node_index, node);
309 }
310}
311
312void Logger::MaybeWriteHeader(int node_index) {
313 if (configuration::MultiNode(configuration_)) {
314 return MaybeWriteHeader(node_index,
315 configuration_->nodes()->Get(node_index));
316 } else {
317 return MaybeWriteHeader(node_index, nullptr);
318 }
319}
320
321void Logger::MaybeWriteHeader(int node_index, const Node *node) {
322 // This function is responsible for writing the header when the header both
323 // has valid data, and when it needs to be written.
324 if (node_state_[node_index].header_written &&
325 node_state_[node_index].header_valid) {
326 // The header has been written and is valid, nothing to do.
327 return;
328 }
329 if (!node_state_[node_index].has_source_node_boot_uuid) {
330 // Can't write a header if we don't have the boot UUID.
331 return;
332 }
333
334 // WriteHeader writes the first header in a log file. We want to do this only
335 // once.
336 //
337 // Rotate rewrites the same header with a new part ID, but keeps the same part
338 // UUID. We don't want that when things reboot, because that implies that
339 // parts go together across a reboot.
340 //
341 // Reboot resets the parts UUID. So, once we've written a header the first
342 // time, we want to use Reboot to rotate the log and reset the parts UUID.
343 //
344 // header_valid is cleared whenever the remote reboots.
345 if (node_state_[node_index].header_written) {
346 log_namer_->Reboot(node, &node_state_[node_index].log_file_header);
347 } else {
348 log_namer_->WriteHeader(&node_state_[node_index].log_file_header, node);
349
350 node_state_[node_index].header_written = true;
351 }
352 node_state_[node_index].header_valid = true;
353}
354
355void Logger::WriteMissingTimestamps() {
356 if (configuration::MultiNode(configuration_)) {
357 server_statistics_fetcher_.Fetch();
358 } else {
359 return;
360 }
361
362 if (server_statistics_fetcher_.get() == nullptr) {
363 return;
364 }
365
366 for (const Node *node : log_namer_->nodes()) {
367 const int node_index = configuration::GetNodeIndex(configuration_, node);
368 if (MaybeUpdateTimestamp(
369 node, node_index,
370 server_statistics_fetcher_.context().monotonic_event_time,
371 server_statistics_fetcher_.context().realtime_event_time)) {
372 CHECK(node_state_[node_index].header_written);
373 CHECK(node_state_[node_index].header_valid);
374 log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
375 } else {
376 MaybeWriteHeader(node_index, node);
377 }
378 }
379}
380
381void Logger::SetStartTime(
382 size_t node_index, aos::monotonic_clock::time_point monotonic_start_time,
383 aos::realtime_clock::time_point realtime_start_time,
384 aos::monotonic_clock::time_point logger_monotonic_start_time,
385 aos::realtime_clock::time_point logger_realtime_start_time) {
386 node_state_[node_index].monotonic_start_time = monotonic_start_time;
387 node_state_[node_index].realtime_start_time = realtime_start_time;
388 node_state_[node_index]
389 .log_file_header.mutable_message()
390 ->mutate_monotonic_start_time(
391 std::chrono::duration_cast<std::chrono::nanoseconds>(
392 monotonic_start_time.time_since_epoch())
393 .count());
394
395 // Add logger start times if they are available in the log file header.
396 if (node_state_[node_index]
397 .log_file_header.mutable_message()
398 ->has_logger_monotonic_start_time()) {
399 node_state_[node_index]
400 .log_file_header.mutable_message()
401 ->mutate_logger_monotonic_start_time(
402 std::chrono::duration_cast<std::chrono::nanoseconds>(
403 logger_monotonic_start_time.time_since_epoch())
404 .count());
405 }
406
407 if (node_state_[node_index]
408 .log_file_header.mutable_message()
409 ->has_logger_realtime_start_time()) {
410 node_state_[node_index]
411 .log_file_header.mutable_message()
412 ->mutate_logger_realtime_start_time(
413 std::chrono::duration_cast<std::chrono::nanoseconds>(
414 logger_realtime_start_time.time_since_epoch())
415 .count());
416 }
417
418 if (node_state_[node_index]
419 .log_file_header.mutable_message()
420 ->has_realtime_start_time()) {
421 node_state_[node_index]
422 .log_file_header.mutable_message()
423 ->mutate_realtime_start_time(
424 std::chrono::duration_cast<std::chrono::nanoseconds>(
425 realtime_start_time.time_since_epoch())
426 .count());
427 }
428}
429
430bool Logger::MaybeUpdateTimestamp(
431 const Node *node, int node_index,
432 aos::monotonic_clock::time_point monotonic_start_time,
433 aos::realtime_clock::time_point realtime_start_time) {
434 // Bail early if the start times are already set.
435 if (node_state_[node_index].monotonic_start_time !=
436 monotonic_clock::min_time) {
437 return false;
438 }
439 if (event_loop_->node() == node ||
440 !configuration::MultiNode(configuration_)) {
441 // There are no offsets to compute for ourself, so always succeed.
442 SetStartTime(node_index, monotonic_start_time, realtime_start_time,
443 monotonic_start_time, realtime_start_time);
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800444 node_state_[node_index].SetBootUUID(event_loop_->boot_uuid());
Austin Schuhb06f03b2021-02-17 22:00:37 -0800445 return true;
446 } else if (server_statistics_fetcher_.get() != nullptr) {
447 // We must be a remote node now. Look for the connection and see if it is
448 // connected.
449
450 for (const message_bridge::ServerConnection *connection :
451 *server_statistics_fetcher_->connections()) {
452 if (connection->node()->name()->string_view() !=
453 node->name()->string_view()) {
454 continue;
455 }
456
457 if (connection->state() != message_bridge::State::CONNECTED) {
458 VLOG(1) << node->name()->string_view()
459 << " is not connected, can't start it yet.";
460 break;
461 }
462
Austin Schuhb06f03b2021-02-17 22:00:37 -0800463 if (!connection->has_monotonic_offset()) {
464 VLOG(1) << "Missing monotonic offset for setting start time for node "
465 << aos::FlatbufferToJson(node);
466 break;
467 }
468
469 // Found it and it is connected. Compensate and go.
470 SetStartTime(node_index,
471 monotonic_start_time +
472 std::chrono::nanoseconds(connection->monotonic_offset()),
473 realtime_start_time, monotonic_start_time,
474 realtime_start_time);
475 return true;
476 }
477 }
478 return false;
479}
480
481aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> Logger::MakeHeader(
482 const Node *node, std::string_view config_sha256) {
483 // Now write the header with this timestamp in it.
484 flatbuffers::FlatBufferBuilder fbb;
485 fbb.ForceDefaults(true);
486
487 flatbuffers::Offset<aos::Configuration> configuration_offset;
488 if (!separate_config_) {
489 configuration_offset = CopyFlatBuffer(configuration_, &fbb);
490 } else {
491 CHECK(!config_sha256.empty());
492 }
493
494 const flatbuffers::Offset<flatbuffers::String> name_offset =
495 fbb.CreateString(name_);
496
497 CHECK(log_event_uuid_ != UUID::Zero());
498 const flatbuffers::Offset<flatbuffers::String> log_event_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800499 log_event_uuid_.PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800500
501 const flatbuffers::Offset<flatbuffers::String> logger_instance_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800502 logger_instance_uuid_.PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800503
504 flatbuffers::Offset<flatbuffers::String> log_start_uuid_offset;
505 if (!log_start_uuid_.empty()) {
506 log_start_uuid_offset = fbb.CreateString(log_start_uuid_);
507 }
508
509 flatbuffers::Offset<flatbuffers::String> config_sha256_offset;
510 if (!config_sha256.empty()) {
511 config_sha256_offset = fbb.CreateString(config_sha256);
512 }
513
514 const flatbuffers::Offset<flatbuffers::String> logger_node_boot_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800515 event_loop_->boot_uuid().PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800516
517 const flatbuffers::Offset<flatbuffers::String> source_node_boot_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800518 event_loop_->boot_uuid().PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800519
520 const flatbuffers::Offset<flatbuffers::String> parts_uuid_offset =
521 fbb.CreateString("00000000-0000-4000-8000-000000000000");
522
523 flatbuffers::Offset<Node> node_offset;
524 flatbuffers::Offset<Node> logger_node_offset;
525
526 if (configuration::MultiNode(configuration_)) {
527 node_offset = RecursiveCopyFlatBuffer(node, &fbb);
528 logger_node_offset = RecursiveCopyFlatBuffer(event_loop_->node(), &fbb);
529 }
530
531 aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
532
533 log_file_header_builder.add_name(name_offset);
534
535 // Only add the node if we are running in a multinode configuration.
536 if (node != nullptr) {
537 log_file_header_builder.add_node(node_offset);
538 log_file_header_builder.add_logger_node(logger_node_offset);
539 }
540
541 if (!configuration_offset.IsNull()) {
542 log_file_header_builder.add_configuration(configuration_offset);
543 }
544 // The worst case theoretical out of order is the polling period times 2.
545 // One message could get logged right after the boundary, but be for right
546 // before the next boundary. And the reverse could happen for another
547 // message. Report back 3x to be extra safe, and because the cost isn't
548 // huge on the read side.
549 log_file_header_builder.add_max_out_of_order_duration(
550 std::chrono::nanoseconds(3 * polling_period_).count());
551
552 log_file_header_builder.add_monotonic_start_time(
553 std::chrono::duration_cast<std::chrono::nanoseconds>(
554 monotonic_clock::min_time.time_since_epoch())
555 .count());
556 if (node == event_loop_->node()) {
557 log_file_header_builder.add_realtime_start_time(
558 std::chrono::duration_cast<std::chrono::nanoseconds>(
559 realtime_clock::min_time.time_since_epoch())
560 .count());
561 } else {
562 log_file_header_builder.add_logger_monotonic_start_time(
563 std::chrono::duration_cast<std::chrono::nanoseconds>(
564 monotonic_clock::min_time.time_since_epoch())
565 .count());
566 log_file_header_builder.add_logger_realtime_start_time(
567 std::chrono::duration_cast<std::chrono::nanoseconds>(
568 realtime_clock::min_time.time_since_epoch())
569 .count());
570 }
571
572 log_file_header_builder.add_log_event_uuid(log_event_uuid_offset);
573 log_file_header_builder.add_logger_instance_uuid(logger_instance_uuid_offset);
574 if (!log_start_uuid_offset.IsNull()) {
575 log_file_header_builder.add_log_start_uuid(log_start_uuid_offset);
576 }
577 log_file_header_builder.add_logger_node_boot_uuid(
578 logger_node_boot_uuid_offset);
579 log_file_header_builder.add_source_node_boot_uuid(
580 source_node_boot_uuid_offset);
581
582 log_file_header_builder.add_parts_uuid(parts_uuid_offset);
583 log_file_header_builder.add_parts_index(0);
584
585 log_file_header_builder.add_configuration_sha256(0);
586
587 if (!config_sha256_offset.IsNull()) {
588 log_file_header_builder.add_configuration_sha256(config_sha256_offset);
589 }
590
591 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
592 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> result(
593 fbb.Release());
594
595 CHECK(result.Verify()) << ": Built a corrupted header.";
596
597 return result;
598}
599
600void Logger::ResetStatisics() {
601 max_message_fetch_time_ = std::chrono::nanoseconds::zero();
602 max_message_fetch_time_channel_ = -1;
603 max_message_fetch_time_size_ = -1;
604 total_message_fetch_time_ = std::chrono::nanoseconds::zero();
605 total_message_fetch_count_ = 0;
606 total_message_fetch_bytes_ = 0;
607 total_nop_fetch_time_ = std::chrono::nanoseconds::zero();
608 total_nop_fetch_count_ = 0;
609 max_copy_time_ = std::chrono::nanoseconds::zero();
610 max_copy_time_channel_ = -1;
611 max_copy_time_size_ = -1;
612 total_copy_time_ = std::chrono::nanoseconds::zero();
613 total_copy_count_ = 0;
614 total_copy_bytes_ = 0;
615}
616
617void Logger::Rotate() {
618 for (const Node *node : log_namer_->nodes()) {
619 const int node_index = configuration::GetNodeIndex(configuration_, node);
620 log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
621 }
622}
623
624void Logger::LogUntil(monotonic_clock::time_point t) {
625 // Grab the latest ServerStatistics message. This will always have the
626 // oppertunity to be >= to the current time, so it will always represent any
627 // reboots which may have happened.
628 WriteMissingTimestamps();
629
Austin Schuhcdd90272021-03-15 12:46:16 -0700630 int our_node_index = aos::configuration::GetNodeIndex(
631 event_loop_->configuration(), event_loop_->node());
632
Austin Schuhb06f03b2021-02-17 22:00:37 -0800633 // Write each channel to disk, one at a time.
634 for (FetcherStruct &f : fetchers_) {
635 while (true) {
636 if (f.written) {
637 const auto start = event_loop_->monotonic_now();
638 const bool got_new = f.fetcher->FetchNext();
639 const auto end = event_loop_->monotonic_now();
640 RecordFetchResult(start, end, got_new, &f);
641 if (!got_new) {
642 VLOG(2) << "No new data on "
643 << configuration::CleanedChannelToString(
644 f.fetcher->channel());
645 break;
646 }
647 f.written = false;
648 }
649
650 // TODO(james): Write tests to exercise this logic.
651 if (f.fetcher->context().monotonic_event_time >= t) {
652 break;
653 }
654 if (f.writer != nullptr) {
Austin Schuhcdd90272021-03-15 12:46:16 -0700655 // Only check if the boot UUID has changed if this is data from another
656 // node. Our UUID can't change without restarting the application.
657 if (our_node_index != f.data_node_index) {
658 // And update our boot UUID if the UUID has changed.
659 if (node_state_[f.data_node_index].SetBootUUID(
660 f.fetcher->context().remote_boot_uuid)) {
661 MaybeWriteHeader(f.data_node_index);
662 }
663 }
664
Austin Schuhb06f03b2021-02-17 22:00:37 -0800665 // Write!
666 const auto start = event_loop_->monotonic_now();
667 flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
668 max_header_size_);
669 fbb.ForceDefaults(true);
670
671 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
672 f.channel_index, f.log_type));
673 const auto end = event_loop_->monotonic_now();
674 RecordCreateMessageTime(start, end, &f);
675
676 VLOG(2) << "Writing data as node "
677 << FlatbufferToJson(event_loop_->node()) << " for channel "
678 << configuration::CleanedChannelToString(f.fetcher->channel())
679 << " to " << f.writer->filename() << " data "
680 << FlatbufferToJson(
681 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
682 fbb.GetBufferPointer()));
683
684 max_header_size_ = std::max(max_header_size_,
685 fbb.GetSize() - f.fetcher->context().size);
686 CHECK(node_state_[f.data_node_index].header_valid)
687 << ": Can't write data before the header on channel "
688 << configuration::CleanedChannelToString(f.fetcher->channel());
689 f.writer->QueueSizedFlatbuffer(&fbb);
690 }
691
692 if (f.timestamp_writer != nullptr) {
693 // And now handle timestamps.
694 const auto start = event_loop_->monotonic_now();
695 flatbuffers::FlatBufferBuilder fbb;
696 fbb.ForceDefaults(true);
697
698 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
699 f.channel_index,
700 LogType::kLogDeliveryTimeOnly));
701 const auto end = event_loop_->monotonic_now();
702 RecordCreateMessageTime(start, end, &f);
703
704 VLOG(2) << "Writing timestamps as node "
705 << FlatbufferToJson(event_loop_->node()) << " for channel "
706 << configuration::CleanedChannelToString(f.fetcher->channel())
707 << " to " << f.timestamp_writer->filename() << " timestamp "
708 << FlatbufferToJson(
709 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
710 fbb.GetBufferPointer()));
711
712 CHECK(node_state_[f.timestamp_node_index].header_valid)
713 << ": Can't write data before the header on channel "
714 << configuration::CleanedChannelToString(f.fetcher->channel());
715 f.timestamp_writer->QueueSizedFlatbuffer(&fbb);
716 }
717
718 if (f.contents_writer != nullptr) {
719 const auto start = event_loop_->monotonic_now();
720 // And now handle the special message contents channel. Copy the
721 // message into a FlatBufferBuilder and save it to disk.
722 // TODO(austin): We can be more efficient here when we start to
723 // care...
724 flatbuffers::FlatBufferBuilder fbb;
725 fbb.ForceDefaults(true);
726
727 const RemoteMessage *msg =
728 flatbuffers::GetRoot<RemoteMessage>(f.fetcher->context().data);
729
730 CHECK(msg->has_boot_uuid()) << ": " << aos::FlatbufferToJson(msg);
Austin Schuhcdd90272021-03-15 12:46:16 -0700731 if (node_state_[f.contents_node_index].SetBootUUID(
732 UUID::FromVector(msg->boot_uuid()))) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800733 MaybeWriteHeader(f.contents_node_index);
734 }
735
736 logger::MessageHeader::Builder message_header_builder(fbb);
737
738 // TODO(austin): This needs to check the channel_index and confirm
739 // that it should be logged before squirreling away the timestamp to
740 // disk. We don't want to log irrelevant timestamps.
741
742 // Note: this must match the same order as MessageBridgeServer and
743 // PackMessage. We want identical headers to have identical
744 // on-the-wire formats to make comparing them easier.
745
746 // Translate from the channel index that the event loop uses to the
747 // channel index in the log file.
748 message_header_builder.add_channel_index(
749 event_loop_to_logged_channel_index_[msg->channel_index()]);
750
751 message_header_builder.add_queue_index(msg->queue_index());
752 message_header_builder.add_monotonic_sent_time(
753 msg->monotonic_sent_time());
754 message_header_builder.add_realtime_sent_time(
755 msg->realtime_sent_time());
756
757 message_header_builder.add_monotonic_remote_time(
758 msg->monotonic_remote_time());
759 message_header_builder.add_realtime_remote_time(
760 msg->realtime_remote_time());
761 message_header_builder.add_remote_queue_index(
762 msg->remote_queue_index());
763
764 message_header_builder.add_monotonic_timestamp_time(
765 f.fetcher->context()
766 .monotonic_event_time.time_since_epoch()
767 .count());
768
769 fbb.FinishSizePrefixed(message_header_builder.Finish());
770 const auto end = event_loop_->monotonic_now();
771 RecordCreateMessageTime(start, end, &f);
772
773 CHECK(node_state_[f.contents_node_index].header_valid)
774 << ": Can't write data before the header on channel "
775 << configuration::CleanedChannelToString(f.fetcher->channel());
776 f.contents_writer->QueueSizedFlatbuffer(&fbb);
777 }
778
779 f.written = true;
780 }
781 }
782 last_synchronized_time_ = t;
783}
784
Austin Schuh30586902021-03-30 22:54:08 -0700785void Logger::DoLogData(const monotonic_clock::time_point end_time,
786 bool run_on_logged) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800787 // We want to guarantee that messages aren't out of order by more than
788 // max_out_of_order_duration. To do this, we need sync points. Every write
789 // cycle should be a sync point.
790
791 do {
792 // Move the sync point up by at most polling_period. This forces one sync
793 // per iteration, even if it is small.
794 LogUntil(std::min(last_synchronized_time_ + polling_period_, end_time));
795
Austin Schuh30586902021-03-30 22:54:08 -0700796 if (run_on_logged) {
797 on_logged_period_();
798 }
Austin Schuhb06f03b2021-02-17 22:00:37 -0800799
800 // If we missed cycles, we could be pretty far behind. Spin until we are
801 // caught up.
802 } while (last_synchronized_time_ + polling_period_ < end_time);
803}
804
805void Logger::RecordFetchResult(aos::monotonic_clock::time_point start,
806 aos::monotonic_clock::time_point end,
807 bool got_new, FetcherStruct *fetcher) {
808 const auto duration = end - start;
809 if (!got_new) {
810 ++total_nop_fetch_count_;
811 total_nop_fetch_time_ += duration;
812 return;
813 }
814 ++total_message_fetch_count_;
815 total_message_fetch_bytes_ += fetcher->fetcher->context().size;
816 total_message_fetch_time_ += duration;
817 if (duration > max_message_fetch_time_) {
818 max_message_fetch_time_ = duration;
819 max_message_fetch_time_channel_ = fetcher->channel_index;
820 max_message_fetch_time_size_ = fetcher->fetcher->context().size;
821 }
822}
823
824void Logger::RecordCreateMessageTime(aos::monotonic_clock::time_point start,
825 aos::monotonic_clock::time_point end,
826 FetcherStruct *fetcher) {
827 const auto duration = end - start;
828 total_copy_time_ += duration;
829 ++total_copy_count_;
830 total_copy_bytes_ += fetcher->fetcher->context().size;
831 if (duration > max_copy_time_) {
832 max_copy_time_ = duration;
833 max_copy_time_channel_ = fetcher->channel_index;
834 max_copy_time_size_ = fetcher->fetcher->context().size;
835 }
836}
837
838} // namespace logger
839} // namespace aos