blob: e5c996a5db663cbf7d81d8c0d0e7cfa2b0ed8ac1 [file] [log] [blame]
Austin Schuhb06f03b2021-02-17 22:00:37 -08001#include "aos/events/logging/log_writer.h"
2
3#include <functional>
4#include <map>
5#include <vector>
6
7#include "aos/configuration.h"
8#include "aos/events/event_loop.h"
9#include "aos/network/message_bridge_server_generated.h"
10#include "aos/network/team_number.h"
Austin Schuh61e973f2021-02-21 21:43:56 -080011#include "aos/network/timestamp_channel.h"
Austin Schuhb06f03b2021-02-17 22:00:37 -080012
13namespace aos {
14namespace logger {
15namespace {
16using message_bridge::RemoteMessage;
Austin Schuhbd06ae42021-03-31 22:48:21 -070017namespace chrono = std::chrono;
Austin Schuhb06f03b2021-02-17 22:00:37 -080018} // namespace
19
20Logger::Logger(EventLoop *event_loop, const Configuration *configuration,
21 std::function<bool(const Channel *)> should_log)
22 : event_loop_(event_loop),
23 configuration_(configuration),
24 name_(network::GetHostname()),
25 timer_handler_(event_loop_->AddTimer(
26 [this]() { DoLogData(event_loop_->monotonic_now()); })),
27 server_statistics_fetcher_(
28 configuration::MultiNode(event_loop_->configuration())
29 ? event_loop_->MakeFetcher<message_bridge::ServerStatistics>(
30 "/aos")
31 : aos::Fetcher<message_bridge::ServerStatistics>()) {
32 VLOG(1) << "Creating logger for " << FlatbufferToJson(event_loop_->node());
33
Austin Schuhb06f03b2021-02-17 22:00:37 -080034 std::map<const Channel *, const Node *> timestamp_logger_channels;
35
Austin Schuh61e973f2021-02-21 21:43:56 -080036 message_bridge::ChannelTimestampFinder finder(event_loop_);
37 for (const Channel *channel : *event_loop_->configuration()->channels()) {
38 if (!configuration::ChannelIsSendableOnNode(channel, event_loop_->node())) {
Austin Schuhb06f03b2021-02-17 22:00:37 -080039 continue;
40 }
Austin Schuh61e973f2021-02-21 21:43:56 -080041 if (!channel->has_destination_nodes()) {
42 continue;
43 }
44 for (const Connection *connection : *channel->destination_nodes()) {
45 if (configuration::ConnectionDeliveryTimeIsLoggedOnNode(
46 connection, event_loop_->node())) {
47 const Node *other_node = configuration::GetNode(
48 event_loop_->configuration(), connection->name()->string_view());
49
50 VLOG(1) << "Timestamps are logged from "
51 << FlatbufferToJson(other_node);
52 timestamp_logger_channels.insert(
53 std::make_pair(finder.ForChannel(channel, connection), other_node));
54 }
55 }
Austin Schuhb06f03b2021-02-17 22:00:37 -080056 }
57
58 const size_t our_node_index =
59 configuration::GetNodeIndex(configuration_, event_loop_->node());
60
61 for (size_t channel_index = 0;
62 channel_index < configuration_->channels()->size(); ++channel_index) {
63 const Channel *const config_channel =
64 configuration_->channels()->Get(channel_index);
65 // The MakeRawFetcher method needs a channel which is in the event loop
66 // configuration() object, not the configuration_ object. Go look that up
67 // from the config.
68 const Channel *channel = aos::configuration::GetChannel(
69 event_loop_->configuration(), config_channel->name()->string_view(),
70 config_channel->type()->string_view(), "", event_loop_->node());
71 CHECK(channel != nullptr)
72 << ": Failed to look up channel "
73 << aos::configuration::CleanedChannelToString(config_channel);
74 if (!should_log(channel)) {
75 continue;
76 }
77
78 FetcherStruct fs;
79 fs.channel_index = channel_index;
80 fs.channel = channel;
81
82 const bool is_local =
83 configuration::ChannelIsSendableOnNode(channel, event_loop_->node());
84
85 const bool is_readable =
86 configuration::ChannelIsReadableOnNode(channel, event_loop_->node());
87 const bool is_logged = configuration::ChannelMessageIsLoggedOnNode(
88 channel, event_loop_->node());
89 const bool log_message = is_logged && is_readable;
90
91 bool log_delivery_times = false;
92 if (event_loop_->node() != nullptr) {
93 log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
94 channel, event_loop_->node(), event_loop_->node());
95 }
96
97 // Now, detect a RemoteMessage timestamp logger where we should just log the
98 // contents to a file directly.
99 const bool log_contents = timestamp_logger_channels.find(channel) !=
100 timestamp_logger_channels.end();
101
102 if (log_message || log_delivery_times || log_contents) {
103 fs.fetcher = event_loop->MakeRawFetcher(channel);
104 VLOG(1) << "Logging channel "
105 << configuration::CleanedChannelToString(channel);
106
107 if (log_delivery_times) {
108 VLOG(1) << " Delivery times";
109 fs.wants_timestamp_writer = true;
110 fs.timestamp_node_index = our_node_index;
111 }
112 if (log_message) {
113 VLOG(1) << " Data";
114 fs.wants_writer = true;
115 if (!is_local) {
116 const Node *source_node = configuration::GetNode(
117 configuration_, channel->source_node()->string_view());
118 fs.data_node_index =
119 configuration::GetNodeIndex(configuration_, source_node);
120 fs.log_type = LogType::kLogRemoteMessage;
121 } else {
122 fs.data_node_index = our_node_index;
123 }
124 }
125 if (log_contents) {
126 VLOG(1) << "Timestamp logger channel "
127 << configuration::CleanedChannelToString(channel);
128 fs.timestamp_node = timestamp_logger_channels.find(channel)->second;
129 fs.wants_contents_writer = true;
130 fs.contents_node_index =
131 configuration::GetNodeIndex(configuration_, fs.timestamp_node);
132 }
133 fetchers_.emplace_back(std::move(fs));
134 }
135 }
136
137 // When we are logging remote timestamps, we need to be able to translate from
138 // the channel index that the event loop uses to the channel index in the
139 // config in the log file.
140 event_loop_to_logged_channel_index_.resize(
141 event_loop->configuration()->channels()->size(), -1);
142 for (size_t event_loop_channel_index = 0;
143 event_loop_channel_index <
144 event_loop->configuration()->channels()->size();
145 ++event_loop_channel_index) {
146 const Channel *event_loop_channel =
147 event_loop->configuration()->channels()->Get(event_loop_channel_index);
148
149 const Channel *logged_channel = aos::configuration::GetChannel(
150 configuration_, event_loop_channel->name()->string_view(),
151 event_loop_channel->type()->string_view(), "",
152 configuration::GetNode(configuration_, event_loop_->node()));
153
154 if (logged_channel != nullptr) {
155 event_loop_to_logged_channel_index_[event_loop_channel_index] =
156 configuration::ChannelIndex(configuration_, logged_channel);
157 }
158 }
159}
160
161Logger::~Logger() {
162 if (log_namer_) {
163 // If we are replaying a log file, or in simulation, we want to force the
164 // last bit of data to be logged. The easiest way to deal with this is to
165 // poll everything as we go to destroy the class, ie, shut down the logger,
166 // and write it to disk.
167 StopLogging(event_loop_->monotonic_now());
168 }
169}
170
171void Logger::StartLogging(std::unique_ptr<LogNamer> log_namer,
172 std::string_view log_start_uuid) {
173 CHECK(!log_namer_) << ": Already logging";
174 log_namer_ = std::move(log_namer);
175
176 std::string config_sha256;
177 if (separate_config_) {
178 flatbuffers::FlatBufferBuilder fbb;
179 flatbuffers::Offset<aos::Configuration> configuration_offset =
180 CopyFlatBuffer(configuration_, &fbb);
181 LogFileHeader::Builder log_file_header_builder(fbb);
182 log_file_header_builder.add_configuration(configuration_offset);
183 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
184 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config_header(
185 fbb.Release());
186 config_sha256 = Sha256(config_header.span());
187 LOG(INFO) << "Config sha256 of " << config_sha256;
188 log_namer_->WriteConfiguration(&config_header, config_sha256);
189 }
190
191 log_event_uuid_ = UUID::Random();
192 log_start_uuid_ = log_start_uuid;
193 VLOG(1) << "Starting logger for " << FlatbufferToJson(event_loop_->node());
194
195 // We want to do as much work as possible before the initial Fetch. Time
196 // between that and actually starting to log opens up the possibility of
197 // falling off the end of the queue during that time.
198
199 for (FetcherStruct &f : fetchers_) {
200 if (f.wants_writer) {
201 f.writer = log_namer_->MakeWriter(f.channel);
202 }
203 if (f.wants_timestamp_writer) {
204 f.timestamp_writer = log_namer_->MakeTimestampWriter(f.channel);
205 }
206 if (f.wants_contents_writer) {
207 f.contents_writer = log_namer_->MakeForwardedTimestampWriter(
208 f.channel, CHECK_NOTNULL(f.timestamp_node));
209 }
210 }
211
212 CHECK(node_state_.empty());
213 node_state_.resize(configuration::MultiNode(configuration_)
214 ? configuration_->nodes()->size()
215 : 1u);
216
217 for (const Node *node : log_namer_->nodes()) {
218 const int node_index = configuration::GetNodeIndex(configuration_, node);
219
220 node_state_[node_index].log_file_header = MakeHeader(node, config_sha256);
221 }
222
Austin Schuha42ee962021-03-31 22:49:30 -0700223 const aos::monotonic_clock::time_point beginning_time =
224 event_loop_->monotonic_now();
225
Austin Schuhb06f03b2021-02-17 22:00:37 -0800226 // Grab data from each channel right before we declare the log file started
227 // so we can capture the latest message on each channel. This lets us have
228 // non periodic messages with configuration that now get logged.
229 for (FetcherStruct &f : fetchers_) {
230 const auto start = event_loop_->monotonic_now();
231 const bool got_new = f.fetcher->Fetch();
232 const auto end = event_loop_->monotonic_now();
233 RecordFetchResult(start, end, got_new, &f);
234
235 // If there is a message, we want to write it.
236 f.written = f.fetcher->context().data == nullptr;
237 }
238
239 // Clear out any old timestamps in case we are re-starting logging.
240 for (size_t i = 0; i < node_state_.size(); ++i) {
241 SetStartTime(i, monotonic_clock::min_time, realtime_clock::min_time,
242 monotonic_clock::min_time, realtime_clock::min_time);
243 }
244
Austin Schuha42ee962021-03-31 22:49:30 -0700245 const aos::monotonic_clock::time_point fetch_time =
246 event_loop_->monotonic_now();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800247 WriteHeader();
Austin Schuha42ee962021-03-31 22:49:30 -0700248 const aos::monotonic_clock::time_point header_time =
249 event_loop_->monotonic_now();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800250
251 LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node())
Austin Schuha42ee962021-03-31 22:49:30 -0700252 << " start_time " << last_synchronized_time_ << ", took "
253 << chrono::duration<double>(fetch_time - beginning_time).count()
254 << " to fetch, "
255 << chrono::duration<double>(header_time - fetch_time).count()
256 << " to write headers, boot uuid " << event_loop_->boot_uuid();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800257
258 // Force logging up until the start of the log file now, so the messages at
259 // the start are always ordered before the rest of the messages.
260 // Note: this ship may have already sailed, but we don't have to make it
261 // worse.
262 // TODO(austin): Test...
Austin Schuh855f8932021-03-19 22:01:32 -0700263 //
264 // This is safe to call here since we have set last_synchronized_time_ as the
265 // same time as in the header, and all the data before it should be logged
266 // without ordering concerns.
Austin Schuhb06f03b2021-02-17 22:00:37 -0800267 LogUntil(last_synchronized_time_);
268
269 timer_handler_->Setup(event_loop_->monotonic_now() + polling_period_,
270 polling_period_);
271}
272
273std::unique_ptr<LogNamer> Logger::StopLogging(
274 aos::monotonic_clock::time_point end_time) {
275 CHECK(log_namer_) << ": Not logging right now";
276
277 if (end_time != aos::monotonic_clock::min_time) {
Austin Schuh855f8932021-03-19 22:01:32 -0700278 DoLogData(end_time);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800279 }
280 timer_handler_->Disable();
281
282 for (FetcherStruct &f : fetchers_) {
283 f.writer = nullptr;
284 f.timestamp_writer = nullptr;
285 f.contents_writer = nullptr;
286 }
287 node_state_.clear();
288
289 log_event_uuid_ = UUID::Zero();
290 log_start_uuid_ = std::string();
291
292 return std::move(log_namer_);
293}
294
295void Logger::WriteHeader() {
296 if (configuration::MultiNode(configuration_)) {
297 server_statistics_fetcher_.Fetch();
298 }
299
300 aos::monotonic_clock::time_point monotonic_start_time =
301 event_loop_->monotonic_now();
302 aos::realtime_clock::time_point realtime_start_time =
303 event_loop_->realtime_now();
304
305 // We need to pick a point in time to declare the log file "started". This
306 // starts here. It needs to be after everything is fetched so that the
307 // fetchers are all pointed at the most recent message before the start
308 // time.
309 last_synchronized_time_ = monotonic_start_time;
310
311 for (const Node *node : log_namer_->nodes()) {
312 const int node_index = configuration::GetNodeIndex(configuration_, node);
313 MaybeUpdateTimestamp(node, node_index, monotonic_start_time,
314 realtime_start_time);
315 MaybeWriteHeader(node_index, node);
316 }
317}
318
319void Logger::MaybeWriteHeader(int node_index) {
320 if (configuration::MultiNode(configuration_)) {
321 return MaybeWriteHeader(node_index,
322 configuration_->nodes()->Get(node_index));
323 } else {
324 return MaybeWriteHeader(node_index, nullptr);
325 }
326}
327
328void Logger::MaybeWriteHeader(int node_index, const Node *node) {
329 // This function is responsible for writing the header when the header both
330 // has valid data, and when it needs to be written.
331 if (node_state_[node_index].header_written &&
332 node_state_[node_index].header_valid) {
333 // The header has been written and is valid, nothing to do.
334 return;
335 }
336 if (!node_state_[node_index].has_source_node_boot_uuid) {
337 // Can't write a header if we don't have the boot UUID.
338 return;
339 }
340
341 // WriteHeader writes the first header in a log file. We want to do this only
342 // once.
343 //
344 // Rotate rewrites the same header with a new part ID, but keeps the same part
345 // UUID. We don't want that when things reboot, because that implies that
346 // parts go together across a reboot.
347 //
348 // Reboot resets the parts UUID. So, once we've written a header the first
349 // time, we want to use Reboot to rotate the log and reset the parts UUID.
350 //
351 // header_valid is cleared whenever the remote reboots.
352 if (node_state_[node_index].header_written) {
353 log_namer_->Reboot(node, &node_state_[node_index].log_file_header);
354 } else {
355 log_namer_->WriteHeader(&node_state_[node_index].log_file_header, node);
356
357 node_state_[node_index].header_written = true;
358 }
359 node_state_[node_index].header_valid = true;
360}
361
362void Logger::WriteMissingTimestamps() {
363 if (configuration::MultiNode(configuration_)) {
364 server_statistics_fetcher_.Fetch();
365 } else {
366 return;
367 }
368
369 if (server_statistics_fetcher_.get() == nullptr) {
370 return;
371 }
372
373 for (const Node *node : log_namer_->nodes()) {
374 const int node_index = configuration::GetNodeIndex(configuration_, node);
375 if (MaybeUpdateTimestamp(
376 node, node_index,
377 server_statistics_fetcher_.context().monotonic_event_time,
378 server_statistics_fetcher_.context().realtime_event_time)) {
379 CHECK(node_state_[node_index].header_written);
380 CHECK(node_state_[node_index].header_valid);
381 log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
382 } else {
383 MaybeWriteHeader(node_index, node);
384 }
385 }
386}
387
388void Logger::SetStartTime(
389 size_t node_index, aos::monotonic_clock::time_point monotonic_start_time,
390 aos::realtime_clock::time_point realtime_start_time,
391 aos::monotonic_clock::time_point logger_monotonic_start_time,
392 aos::realtime_clock::time_point logger_realtime_start_time) {
393 node_state_[node_index].monotonic_start_time = monotonic_start_time;
394 node_state_[node_index].realtime_start_time = realtime_start_time;
395 node_state_[node_index]
396 .log_file_header.mutable_message()
397 ->mutate_monotonic_start_time(
398 std::chrono::duration_cast<std::chrono::nanoseconds>(
399 monotonic_start_time.time_since_epoch())
400 .count());
401
402 // Add logger start times if they are available in the log file header.
403 if (node_state_[node_index]
404 .log_file_header.mutable_message()
405 ->has_logger_monotonic_start_time()) {
406 node_state_[node_index]
407 .log_file_header.mutable_message()
408 ->mutate_logger_monotonic_start_time(
409 std::chrono::duration_cast<std::chrono::nanoseconds>(
410 logger_monotonic_start_time.time_since_epoch())
411 .count());
412 }
413
414 if (node_state_[node_index]
415 .log_file_header.mutable_message()
416 ->has_logger_realtime_start_time()) {
417 node_state_[node_index]
418 .log_file_header.mutable_message()
419 ->mutate_logger_realtime_start_time(
420 std::chrono::duration_cast<std::chrono::nanoseconds>(
421 logger_realtime_start_time.time_since_epoch())
422 .count());
423 }
424
425 if (node_state_[node_index]
426 .log_file_header.mutable_message()
427 ->has_realtime_start_time()) {
428 node_state_[node_index]
429 .log_file_header.mutable_message()
430 ->mutate_realtime_start_time(
431 std::chrono::duration_cast<std::chrono::nanoseconds>(
432 realtime_start_time.time_since_epoch())
433 .count());
434 }
435}
436
437bool Logger::MaybeUpdateTimestamp(
438 const Node *node, int node_index,
439 aos::monotonic_clock::time_point monotonic_start_time,
440 aos::realtime_clock::time_point realtime_start_time) {
441 // Bail early if the start times are already set.
442 if (node_state_[node_index].monotonic_start_time !=
443 monotonic_clock::min_time) {
444 return false;
445 }
446 if (event_loop_->node() == node ||
447 !configuration::MultiNode(configuration_)) {
448 // There are no offsets to compute for ourself, so always succeed.
449 SetStartTime(node_index, monotonic_start_time, realtime_start_time,
450 monotonic_start_time, realtime_start_time);
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800451 node_state_[node_index].SetBootUUID(event_loop_->boot_uuid());
Austin Schuhb06f03b2021-02-17 22:00:37 -0800452 return true;
453 } else if (server_statistics_fetcher_.get() != nullptr) {
454 // We must be a remote node now. Look for the connection and see if it is
455 // connected.
456
457 for (const message_bridge::ServerConnection *connection :
458 *server_statistics_fetcher_->connections()) {
459 if (connection->node()->name()->string_view() !=
460 node->name()->string_view()) {
461 continue;
462 }
463
464 if (connection->state() != message_bridge::State::CONNECTED) {
465 VLOG(1) << node->name()->string_view()
466 << " is not connected, can't start it yet.";
467 break;
468 }
469
Austin Schuhb06f03b2021-02-17 22:00:37 -0800470 if (!connection->has_monotonic_offset()) {
471 VLOG(1) << "Missing monotonic offset for setting start time for node "
472 << aos::FlatbufferToJson(node);
473 break;
474 }
475
476 // Found it and it is connected. Compensate and go.
477 SetStartTime(node_index,
478 monotonic_start_time +
479 std::chrono::nanoseconds(connection->monotonic_offset()),
480 realtime_start_time, monotonic_start_time,
481 realtime_start_time);
482 return true;
483 }
484 }
485 return false;
486}
487
488aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> Logger::MakeHeader(
489 const Node *node, std::string_view config_sha256) {
490 // Now write the header with this timestamp in it.
491 flatbuffers::FlatBufferBuilder fbb;
492 fbb.ForceDefaults(true);
493
494 flatbuffers::Offset<aos::Configuration> configuration_offset;
495 if (!separate_config_) {
496 configuration_offset = CopyFlatBuffer(configuration_, &fbb);
497 } else {
498 CHECK(!config_sha256.empty());
499 }
500
501 const flatbuffers::Offset<flatbuffers::String> name_offset =
502 fbb.CreateString(name_);
503
504 CHECK(log_event_uuid_ != UUID::Zero());
505 const flatbuffers::Offset<flatbuffers::String> log_event_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800506 log_event_uuid_.PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800507
508 const flatbuffers::Offset<flatbuffers::String> logger_instance_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800509 logger_instance_uuid_.PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800510
511 flatbuffers::Offset<flatbuffers::String> log_start_uuid_offset;
512 if (!log_start_uuid_.empty()) {
513 log_start_uuid_offset = fbb.CreateString(log_start_uuid_);
514 }
515
516 flatbuffers::Offset<flatbuffers::String> config_sha256_offset;
517 if (!config_sha256.empty()) {
518 config_sha256_offset = fbb.CreateString(config_sha256);
519 }
520
521 const flatbuffers::Offset<flatbuffers::String> logger_node_boot_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800522 event_loop_->boot_uuid().PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800523
524 const flatbuffers::Offset<flatbuffers::String> source_node_boot_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800525 event_loop_->boot_uuid().PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800526
527 const flatbuffers::Offset<flatbuffers::String> parts_uuid_offset =
528 fbb.CreateString("00000000-0000-4000-8000-000000000000");
529
530 flatbuffers::Offset<Node> node_offset;
531 flatbuffers::Offset<Node> logger_node_offset;
532
533 if (configuration::MultiNode(configuration_)) {
534 node_offset = RecursiveCopyFlatBuffer(node, &fbb);
535 logger_node_offset = RecursiveCopyFlatBuffer(event_loop_->node(), &fbb);
536 }
537
538 aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
539
540 log_file_header_builder.add_name(name_offset);
541
542 // Only add the node if we are running in a multinode configuration.
543 if (node != nullptr) {
544 log_file_header_builder.add_node(node_offset);
545 log_file_header_builder.add_logger_node(logger_node_offset);
546 }
547
548 if (!configuration_offset.IsNull()) {
549 log_file_header_builder.add_configuration(configuration_offset);
550 }
551 // The worst case theoretical out of order is the polling period times 2.
552 // One message could get logged right after the boundary, but be for right
553 // before the next boundary. And the reverse could happen for another
554 // message. Report back 3x to be extra safe, and because the cost isn't
555 // huge on the read side.
556 log_file_header_builder.add_max_out_of_order_duration(
557 std::chrono::nanoseconds(3 * polling_period_).count());
558
559 log_file_header_builder.add_monotonic_start_time(
560 std::chrono::duration_cast<std::chrono::nanoseconds>(
561 monotonic_clock::min_time.time_since_epoch())
562 .count());
563 if (node == event_loop_->node()) {
564 log_file_header_builder.add_realtime_start_time(
565 std::chrono::duration_cast<std::chrono::nanoseconds>(
566 realtime_clock::min_time.time_since_epoch())
567 .count());
568 } else {
569 log_file_header_builder.add_logger_monotonic_start_time(
570 std::chrono::duration_cast<std::chrono::nanoseconds>(
571 monotonic_clock::min_time.time_since_epoch())
572 .count());
573 log_file_header_builder.add_logger_realtime_start_time(
574 std::chrono::duration_cast<std::chrono::nanoseconds>(
575 realtime_clock::min_time.time_since_epoch())
576 .count());
577 }
578
579 log_file_header_builder.add_log_event_uuid(log_event_uuid_offset);
580 log_file_header_builder.add_logger_instance_uuid(logger_instance_uuid_offset);
581 if (!log_start_uuid_offset.IsNull()) {
582 log_file_header_builder.add_log_start_uuid(log_start_uuid_offset);
583 }
584 log_file_header_builder.add_logger_node_boot_uuid(
585 logger_node_boot_uuid_offset);
586 log_file_header_builder.add_source_node_boot_uuid(
587 source_node_boot_uuid_offset);
588
589 log_file_header_builder.add_parts_uuid(parts_uuid_offset);
590 log_file_header_builder.add_parts_index(0);
591
592 log_file_header_builder.add_configuration_sha256(0);
593
594 if (!config_sha256_offset.IsNull()) {
595 log_file_header_builder.add_configuration_sha256(config_sha256_offset);
596 }
597
598 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
599 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> result(
600 fbb.Release());
601
602 CHECK(result.Verify()) << ": Built a corrupted header.";
603
604 return result;
605}
606
607void Logger::ResetStatisics() {
608 max_message_fetch_time_ = std::chrono::nanoseconds::zero();
609 max_message_fetch_time_channel_ = -1;
610 max_message_fetch_time_size_ = -1;
611 total_message_fetch_time_ = std::chrono::nanoseconds::zero();
612 total_message_fetch_count_ = 0;
613 total_message_fetch_bytes_ = 0;
614 total_nop_fetch_time_ = std::chrono::nanoseconds::zero();
615 total_nop_fetch_count_ = 0;
616 max_copy_time_ = std::chrono::nanoseconds::zero();
617 max_copy_time_channel_ = -1;
618 max_copy_time_size_ = -1;
619 total_copy_time_ = std::chrono::nanoseconds::zero();
620 total_copy_count_ = 0;
621 total_copy_bytes_ = 0;
622}
623
624void Logger::Rotate() {
625 for (const Node *node : log_namer_->nodes()) {
626 const int node_index = configuration::GetNodeIndex(configuration_, node);
627 log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
628 }
629}
630
631void Logger::LogUntil(monotonic_clock::time_point t) {
632 // Grab the latest ServerStatistics message. This will always have the
633 // oppertunity to be >= to the current time, so it will always represent any
634 // reboots which may have happened.
635 WriteMissingTimestamps();
636
Austin Schuhcdd90272021-03-15 12:46:16 -0700637 int our_node_index = aos::configuration::GetNodeIndex(
638 event_loop_->configuration(), event_loop_->node());
639
Austin Schuhb06f03b2021-02-17 22:00:37 -0800640 // Write each channel to disk, one at a time.
641 for (FetcherStruct &f : fetchers_) {
642 while (true) {
643 if (f.written) {
644 const auto start = event_loop_->monotonic_now();
645 const bool got_new = f.fetcher->FetchNext();
646 const auto end = event_loop_->monotonic_now();
647 RecordFetchResult(start, end, got_new, &f);
648 if (!got_new) {
649 VLOG(2) << "No new data on "
650 << configuration::CleanedChannelToString(
651 f.fetcher->channel());
652 break;
653 }
654 f.written = false;
655 }
656
657 // TODO(james): Write tests to exercise this logic.
658 if (f.fetcher->context().monotonic_event_time >= t) {
659 break;
660 }
661 if (f.writer != nullptr) {
Austin Schuhcdd90272021-03-15 12:46:16 -0700662 // Only check if the boot UUID has changed if this is data from another
663 // node. Our UUID can't change without restarting the application.
664 if (our_node_index != f.data_node_index) {
665 // And update our boot UUID if the UUID has changed.
666 if (node_state_[f.data_node_index].SetBootUUID(
667 f.fetcher->context().remote_boot_uuid)) {
668 MaybeWriteHeader(f.data_node_index);
669 }
670 }
671
Austin Schuhb06f03b2021-02-17 22:00:37 -0800672 // Write!
673 const auto start = event_loop_->monotonic_now();
674 flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
675 max_header_size_);
676 fbb.ForceDefaults(true);
677
678 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
679 f.channel_index, f.log_type));
680 const auto end = event_loop_->monotonic_now();
681 RecordCreateMessageTime(start, end, &f);
682
683 VLOG(2) << "Writing data as node "
684 << FlatbufferToJson(event_loop_->node()) << " for channel "
685 << configuration::CleanedChannelToString(f.fetcher->channel())
686 << " to " << f.writer->filename() << " data "
687 << FlatbufferToJson(
688 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
689 fbb.GetBufferPointer()));
690
691 max_header_size_ = std::max(max_header_size_,
692 fbb.GetSize() - f.fetcher->context().size);
693 CHECK(node_state_[f.data_node_index].header_valid)
694 << ": Can't write data before the header on channel "
695 << configuration::CleanedChannelToString(f.fetcher->channel());
Austin Schuhbd06ae42021-03-31 22:48:21 -0700696 f.writer->QueueSizedFlatbuffer(&fbb, end);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800697 }
698
699 if (f.timestamp_writer != nullptr) {
700 // And now handle timestamps.
701 const auto start = event_loop_->monotonic_now();
702 flatbuffers::FlatBufferBuilder fbb;
703 fbb.ForceDefaults(true);
704
705 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
706 f.channel_index,
707 LogType::kLogDeliveryTimeOnly));
708 const auto end = event_loop_->monotonic_now();
709 RecordCreateMessageTime(start, end, &f);
710
711 VLOG(2) << "Writing timestamps as node "
712 << FlatbufferToJson(event_loop_->node()) << " for channel "
713 << configuration::CleanedChannelToString(f.fetcher->channel())
714 << " to " << f.timestamp_writer->filename() << " timestamp "
715 << FlatbufferToJson(
716 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
717 fbb.GetBufferPointer()));
718
719 CHECK(node_state_[f.timestamp_node_index].header_valid)
720 << ": Can't write data before the header on channel "
721 << configuration::CleanedChannelToString(f.fetcher->channel());
Austin Schuhbd06ae42021-03-31 22:48:21 -0700722 f.timestamp_writer->QueueSizedFlatbuffer(&fbb, end);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800723 }
724
725 if (f.contents_writer != nullptr) {
726 const auto start = event_loop_->monotonic_now();
727 // And now handle the special message contents channel. Copy the
728 // message into a FlatBufferBuilder and save it to disk.
729 // TODO(austin): We can be more efficient here when we start to
730 // care...
731 flatbuffers::FlatBufferBuilder fbb;
732 fbb.ForceDefaults(true);
733
734 const RemoteMessage *msg =
735 flatbuffers::GetRoot<RemoteMessage>(f.fetcher->context().data);
736
737 CHECK(msg->has_boot_uuid()) << ": " << aos::FlatbufferToJson(msg);
Austin Schuhcdd90272021-03-15 12:46:16 -0700738 if (node_state_[f.contents_node_index].SetBootUUID(
739 UUID::FromVector(msg->boot_uuid()))) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800740 MaybeWriteHeader(f.contents_node_index);
741 }
742
743 logger::MessageHeader::Builder message_header_builder(fbb);
744
745 // TODO(austin): This needs to check the channel_index and confirm
746 // that it should be logged before squirreling away the timestamp to
747 // disk. We don't want to log irrelevant timestamps.
748
749 // Note: this must match the same order as MessageBridgeServer and
750 // PackMessage. We want identical headers to have identical
751 // on-the-wire formats to make comparing them easier.
752
753 // Translate from the channel index that the event loop uses to the
754 // channel index in the log file.
755 message_header_builder.add_channel_index(
756 event_loop_to_logged_channel_index_[msg->channel_index()]);
757
758 message_header_builder.add_queue_index(msg->queue_index());
759 message_header_builder.add_monotonic_sent_time(
760 msg->monotonic_sent_time());
761 message_header_builder.add_realtime_sent_time(
762 msg->realtime_sent_time());
763
764 message_header_builder.add_monotonic_remote_time(
765 msg->monotonic_remote_time());
766 message_header_builder.add_realtime_remote_time(
767 msg->realtime_remote_time());
768 message_header_builder.add_remote_queue_index(
769 msg->remote_queue_index());
770
771 message_header_builder.add_monotonic_timestamp_time(
772 f.fetcher->context()
773 .monotonic_event_time.time_since_epoch()
774 .count());
775
776 fbb.FinishSizePrefixed(message_header_builder.Finish());
777 const auto end = event_loop_->monotonic_now();
778 RecordCreateMessageTime(start, end, &f);
779
780 CHECK(node_state_[f.contents_node_index].header_valid)
781 << ": Can't write data before the header on channel "
782 << configuration::CleanedChannelToString(f.fetcher->channel());
Austin Schuhbd06ae42021-03-31 22:48:21 -0700783 f.contents_writer->QueueSizedFlatbuffer(&fbb, end);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800784 }
785
786 f.written = true;
787 }
788 }
789 last_synchronized_time_ = t;
790}
791
792void Logger::DoLogData(const monotonic_clock::time_point end_time) {
793 // We want to guarantee that messages aren't out of order by more than
794 // max_out_of_order_duration. To do this, we need sync points. Every write
795 // cycle should be a sync point.
796
797 do {
798 // Move the sync point up by at most polling_period. This forces one sync
799 // per iteration, even if it is small.
800 LogUntil(std::min(last_synchronized_time_ + polling_period_, end_time));
801
802 on_logged_period_();
803
804 // If we missed cycles, we could be pretty far behind. Spin until we are
805 // caught up.
806 } while (last_synchronized_time_ + polling_period_ < end_time);
807}
808
809void Logger::RecordFetchResult(aos::monotonic_clock::time_point start,
810 aos::monotonic_clock::time_point end,
811 bool got_new, FetcherStruct *fetcher) {
812 const auto duration = end - start;
813 if (!got_new) {
814 ++total_nop_fetch_count_;
815 total_nop_fetch_time_ += duration;
816 return;
817 }
818 ++total_message_fetch_count_;
819 total_message_fetch_bytes_ += fetcher->fetcher->context().size;
820 total_message_fetch_time_ += duration;
821 if (duration > max_message_fetch_time_) {
822 max_message_fetch_time_ = duration;
823 max_message_fetch_time_channel_ = fetcher->channel_index;
824 max_message_fetch_time_size_ = fetcher->fetcher->context().size;
825 }
826}
827
828void Logger::RecordCreateMessageTime(aos::monotonic_clock::time_point start,
829 aos::monotonic_clock::time_point end,
830 FetcherStruct *fetcher) {
831 const auto duration = end - start;
832 total_copy_time_ += duration;
833 ++total_copy_count_;
834 total_copy_bytes_ += fetcher->fetcher->context().size;
835 if (duration > max_copy_time_) {
836 max_copy_time_ = duration;
837 max_copy_time_channel_ = fetcher->channel_index;
838 max_copy_time_size_ = fetcher->fetcher->context().size;
839 }
840}
841
842} // namespace logger
843} // namespace aos