blob: abae9a1ac3c6ede59178e3bb04da72b19928b27d [file] [log] [blame]
Austin Schuhb06f03b2021-02-17 22:00:37 -08001#include "aos/events/logging/log_writer.h"
2
Austin Schuh6bb8a822021-03-31 23:04:39 -07003#include <dirent.h>
4
Austin Schuhb06f03b2021-02-17 22:00:37 -08005#include <functional>
6#include <map>
7#include <vector>
8
9#include "aos/configuration.h"
10#include "aos/events/event_loop.h"
11#include "aos/network/message_bridge_server_generated.h"
12#include "aos/network/team_number.h"
Austin Schuh61e973f2021-02-21 21:43:56 -080013#include "aos/network/timestamp_channel.h"
Austin Schuhb06f03b2021-02-17 22:00:37 -080014
15namespace aos {
16namespace logger {
17namespace {
18using message_bridge::RemoteMessage;
Austin Schuhbd06ae42021-03-31 22:48:21 -070019namespace chrono = std::chrono;
Austin Schuhb06f03b2021-02-17 22:00:37 -080020} // namespace
21
22Logger::Logger(EventLoop *event_loop, const Configuration *configuration,
23 std::function<bool(const Channel *)> should_log)
24 : event_loop_(event_loop),
25 configuration_(configuration),
26 name_(network::GetHostname()),
27 timer_handler_(event_loop_->AddTimer(
Austin Schuh30586902021-03-30 22:54:08 -070028 [this]() { DoLogData(event_loop_->monotonic_now(), true); })),
Austin Schuhb06f03b2021-02-17 22:00:37 -080029 server_statistics_fetcher_(
30 configuration::MultiNode(event_loop_->configuration())
31 ? event_loop_->MakeFetcher<message_bridge::ServerStatistics>(
32 "/aos")
33 : aos::Fetcher<message_bridge::ServerStatistics>()) {
34 VLOG(1) << "Creating logger for " << FlatbufferToJson(event_loop_->node());
35
Austin Schuhb06f03b2021-02-17 22:00:37 -080036 std::map<const Channel *, const Node *> timestamp_logger_channels;
37
Austin Schuh61e973f2021-02-21 21:43:56 -080038 message_bridge::ChannelTimestampFinder finder(event_loop_);
39 for (const Channel *channel : *event_loop_->configuration()->channels()) {
40 if (!configuration::ChannelIsSendableOnNode(channel, event_loop_->node())) {
Austin Schuhb06f03b2021-02-17 22:00:37 -080041 continue;
42 }
Austin Schuh61e973f2021-02-21 21:43:56 -080043 if (!channel->has_destination_nodes()) {
44 continue;
45 }
46 for (const Connection *connection : *channel->destination_nodes()) {
47 if (configuration::ConnectionDeliveryTimeIsLoggedOnNode(
48 connection, event_loop_->node())) {
49 const Node *other_node = configuration::GetNode(
50 event_loop_->configuration(), connection->name()->string_view());
51
52 VLOG(1) << "Timestamps are logged from "
53 << FlatbufferToJson(other_node);
54 timestamp_logger_channels.insert(
55 std::make_pair(finder.ForChannel(channel, connection), other_node));
56 }
57 }
Austin Schuhb06f03b2021-02-17 22:00:37 -080058 }
59
60 const size_t our_node_index =
61 configuration::GetNodeIndex(configuration_, event_loop_->node());
62
63 for (size_t channel_index = 0;
64 channel_index < configuration_->channels()->size(); ++channel_index) {
65 const Channel *const config_channel =
66 configuration_->channels()->Get(channel_index);
67 // The MakeRawFetcher method needs a channel which is in the event loop
68 // configuration() object, not the configuration_ object. Go look that up
69 // from the config.
70 const Channel *channel = aos::configuration::GetChannel(
71 event_loop_->configuration(), config_channel->name()->string_view(),
72 config_channel->type()->string_view(), "", event_loop_->node());
73 CHECK(channel != nullptr)
74 << ": Failed to look up channel "
75 << aos::configuration::CleanedChannelToString(config_channel);
76 if (!should_log(channel)) {
77 continue;
78 }
79
80 FetcherStruct fs;
81 fs.channel_index = channel_index;
82 fs.channel = channel;
83
84 const bool is_local =
85 configuration::ChannelIsSendableOnNode(channel, event_loop_->node());
86
87 const bool is_readable =
88 configuration::ChannelIsReadableOnNode(channel, event_loop_->node());
89 const bool is_logged = configuration::ChannelMessageIsLoggedOnNode(
90 channel, event_loop_->node());
91 const bool log_message = is_logged && is_readable;
92
93 bool log_delivery_times = false;
94 if (event_loop_->node() != nullptr) {
95 log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
96 channel, event_loop_->node(), event_loop_->node());
97 }
98
99 // Now, detect a RemoteMessage timestamp logger where we should just log the
100 // contents to a file directly.
101 const bool log_contents = timestamp_logger_channels.find(channel) !=
102 timestamp_logger_channels.end();
103
104 if (log_message || log_delivery_times || log_contents) {
105 fs.fetcher = event_loop->MakeRawFetcher(channel);
106 VLOG(1) << "Logging channel "
107 << configuration::CleanedChannelToString(channel);
108
109 if (log_delivery_times) {
110 VLOG(1) << " Delivery times";
111 fs.wants_timestamp_writer = true;
112 fs.timestamp_node_index = our_node_index;
113 }
114 if (log_message) {
115 VLOG(1) << " Data";
116 fs.wants_writer = true;
117 if (!is_local) {
118 const Node *source_node = configuration::GetNode(
119 configuration_, channel->source_node()->string_view());
120 fs.data_node_index =
121 configuration::GetNodeIndex(configuration_, source_node);
122 fs.log_type = LogType::kLogRemoteMessage;
123 } else {
124 fs.data_node_index = our_node_index;
125 }
126 }
127 if (log_contents) {
128 VLOG(1) << "Timestamp logger channel "
129 << configuration::CleanedChannelToString(channel);
130 fs.timestamp_node = timestamp_logger_channels.find(channel)->second;
131 fs.wants_contents_writer = true;
132 fs.contents_node_index =
133 configuration::GetNodeIndex(configuration_, fs.timestamp_node);
134 }
135 fetchers_.emplace_back(std::move(fs));
136 }
137 }
138
139 // When we are logging remote timestamps, we need to be able to translate from
140 // the channel index that the event loop uses to the channel index in the
141 // config in the log file.
142 event_loop_to_logged_channel_index_.resize(
143 event_loop->configuration()->channels()->size(), -1);
144 for (size_t event_loop_channel_index = 0;
145 event_loop_channel_index <
146 event_loop->configuration()->channels()->size();
147 ++event_loop_channel_index) {
148 const Channel *event_loop_channel =
149 event_loop->configuration()->channels()->Get(event_loop_channel_index);
150
151 const Channel *logged_channel = aos::configuration::GetChannel(
152 configuration_, event_loop_channel->name()->string_view(),
153 event_loop_channel->type()->string_view(), "",
154 configuration::GetNode(configuration_, event_loop_->node()));
155
156 if (logged_channel != nullptr) {
157 event_loop_to_logged_channel_index_[event_loop_channel_index] =
158 configuration::ChannelIndex(configuration_, logged_channel);
159 }
160 }
161}
162
163Logger::~Logger() {
164 if (log_namer_) {
165 // If we are replaying a log file, or in simulation, we want to force the
166 // last bit of data to be logged. The easiest way to deal with this is to
167 // poll everything as we go to destroy the class, ie, shut down the logger,
168 // and write it to disk.
169 StopLogging(event_loop_->monotonic_now());
170 }
171}
172
Austin Schuh6bb8a822021-03-31 23:04:39 -0700173bool Logger::RenameLogBase(std::string new_base_name) {
174 if (new_base_name == log_namer_->base_name()) {
175 return true;
176 }
177 std::string current_directory = std::string(log_namer_->base_name());
178 std::string new_directory = new_base_name;
179
180 auto current_path_split = current_directory.rfind("/");
181 auto new_path_split = new_directory.rfind("/");
182
183 CHECK(new_base_name.substr(new_path_split) ==
184 current_directory.substr(current_path_split))
185 << "Rename of file base from " << current_directory << " to "
186 << new_directory << " is not supported.";
187
188 current_directory.resize(current_path_split);
189 new_directory.resize(new_path_split);
190 DIR *dir = opendir(current_directory.c_str());
191 if (dir) {
192 closedir(dir);
193 const int result = rename(current_directory.c_str(), new_directory.c_str());
194 if (result != 0) {
195 PLOG(ERROR) << "Unable to rename " << current_directory << " to "
196 << new_directory;
197 return false;
198 }
199 } else {
200 // Handle if directory was already renamed.
201 dir = opendir(new_directory.c_str());
202 if (!dir) {
203 LOG(ERROR) << "Old directory " << current_directory
204 << " missing and new directory " << new_directory
205 << " not present.";
206 return false;
207 }
208 closedir(dir);
209 }
210
211 log_namer_->set_base_name(new_base_name);
212 Rotate();
213 return true;
214}
215
Austin Schuhb06f03b2021-02-17 22:00:37 -0800216void Logger::StartLogging(std::unique_ptr<LogNamer> log_namer,
Austin Schuh34f9e482021-03-31 22:54:18 -0700217 std::optional<UUID> log_start_uuid) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800218 CHECK(!log_namer_) << ": Already logging";
219 log_namer_ = std::move(log_namer);
220
221 std::string config_sha256;
222 if (separate_config_) {
223 flatbuffers::FlatBufferBuilder fbb;
224 flatbuffers::Offset<aos::Configuration> configuration_offset =
225 CopyFlatBuffer(configuration_, &fbb);
226 LogFileHeader::Builder log_file_header_builder(fbb);
227 log_file_header_builder.add_configuration(configuration_offset);
228 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
229 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config_header(
230 fbb.Release());
231 config_sha256 = Sha256(config_header.span());
232 LOG(INFO) << "Config sha256 of " << config_sha256;
233 log_namer_->WriteConfiguration(&config_header, config_sha256);
234 }
235
236 log_event_uuid_ = UUID::Random();
237 log_start_uuid_ = log_start_uuid;
238 VLOG(1) << "Starting logger for " << FlatbufferToJson(event_loop_->node());
239
240 // We want to do as much work as possible before the initial Fetch. Time
241 // between that and actually starting to log opens up the possibility of
242 // falling off the end of the queue during that time.
243
244 for (FetcherStruct &f : fetchers_) {
245 if (f.wants_writer) {
246 f.writer = log_namer_->MakeWriter(f.channel);
247 }
248 if (f.wants_timestamp_writer) {
249 f.timestamp_writer = log_namer_->MakeTimestampWriter(f.channel);
250 }
251 if (f.wants_contents_writer) {
252 f.contents_writer = log_namer_->MakeForwardedTimestampWriter(
253 f.channel, CHECK_NOTNULL(f.timestamp_node));
254 }
255 }
256
257 CHECK(node_state_.empty());
258 node_state_.resize(configuration::MultiNode(configuration_)
259 ? configuration_->nodes()->size()
260 : 1u);
261
262 for (const Node *node : log_namer_->nodes()) {
263 const int node_index = configuration::GetNodeIndex(configuration_, node);
264
265 node_state_[node_index].log_file_header = MakeHeader(node, config_sha256);
266 }
267
Austin Schuha42ee962021-03-31 22:49:30 -0700268 const aos::monotonic_clock::time_point beginning_time =
269 event_loop_->monotonic_now();
270
Austin Schuhb06f03b2021-02-17 22:00:37 -0800271 // Grab data from each channel right before we declare the log file started
272 // so we can capture the latest message on each channel. This lets us have
273 // non periodic messages with configuration that now get logged.
274 for (FetcherStruct &f : fetchers_) {
275 const auto start = event_loop_->monotonic_now();
276 const bool got_new = f.fetcher->Fetch();
277 const auto end = event_loop_->monotonic_now();
278 RecordFetchResult(start, end, got_new, &f);
279
280 // If there is a message, we want to write it.
281 f.written = f.fetcher->context().data == nullptr;
282 }
283
284 // Clear out any old timestamps in case we are re-starting logging.
285 for (size_t i = 0; i < node_state_.size(); ++i) {
286 SetStartTime(i, monotonic_clock::min_time, realtime_clock::min_time,
287 monotonic_clock::min_time, realtime_clock::min_time);
288 }
289
Austin Schuha42ee962021-03-31 22:49:30 -0700290 const aos::monotonic_clock::time_point fetch_time =
291 event_loop_->monotonic_now();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800292 WriteHeader();
Austin Schuha42ee962021-03-31 22:49:30 -0700293 const aos::monotonic_clock::time_point header_time =
294 event_loop_->monotonic_now();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800295
296 LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node())
Austin Schuha42ee962021-03-31 22:49:30 -0700297 << " start_time " << last_synchronized_time_ << ", took "
298 << chrono::duration<double>(fetch_time - beginning_time).count()
299 << " to fetch, "
300 << chrono::duration<double>(header_time - fetch_time).count()
301 << " to write headers, boot uuid " << event_loop_->boot_uuid();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800302
303 // Force logging up until the start of the log file now, so the messages at
304 // the start are always ordered before the rest of the messages.
305 // Note: this ship may have already sailed, but we don't have to make it
306 // worse.
307 // TODO(austin): Test...
Austin Schuh855f8932021-03-19 22:01:32 -0700308 //
309 // This is safe to call here since we have set last_synchronized_time_ as the
310 // same time as in the header, and all the data before it should be logged
311 // without ordering concerns.
Austin Schuhb06f03b2021-02-17 22:00:37 -0800312 LogUntil(last_synchronized_time_);
313
314 timer_handler_->Setup(event_loop_->monotonic_now() + polling_period_,
315 polling_period_);
316}
317
318std::unique_ptr<LogNamer> Logger::StopLogging(
319 aos::monotonic_clock::time_point end_time) {
320 CHECK(log_namer_) << ": Not logging right now";
321
322 if (end_time != aos::monotonic_clock::min_time) {
Austin Schuh30586902021-03-30 22:54:08 -0700323 // Folks like to use the on_logged_period_ callback to trigger stop and
324 // start events. We can't have those then recurse and try to stop again.
325 // Rather than making everything reentrant, let's just instead block the
326 // callback here.
327 DoLogData(end_time, false);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800328 }
329 timer_handler_->Disable();
330
331 for (FetcherStruct &f : fetchers_) {
332 f.writer = nullptr;
333 f.timestamp_writer = nullptr;
334 f.contents_writer = nullptr;
335 }
336 node_state_.clear();
337
338 log_event_uuid_ = UUID::Zero();
Austin Schuh34f9e482021-03-31 22:54:18 -0700339 log_start_uuid_ = std::nullopt;
Austin Schuhb06f03b2021-02-17 22:00:37 -0800340
341 return std::move(log_namer_);
342}
343
344void Logger::WriteHeader() {
345 if (configuration::MultiNode(configuration_)) {
346 server_statistics_fetcher_.Fetch();
347 }
348
349 aos::monotonic_clock::time_point monotonic_start_time =
350 event_loop_->monotonic_now();
351 aos::realtime_clock::time_point realtime_start_time =
352 event_loop_->realtime_now();
353
354 // We need to pick a point in time to declare the log file "started". This
355 // starts here. It needs to be after everything is fetched so that the
356 // fetchers are all pointed at the most recent message before the start
357 // time.
358 last_synchronized_time_ = monotonic_start_time;
359
360 for (const Node *node : log_namer_->nodes()) {
361 const int node_index = configuration::GetNodeIndex(configuration_, node);
362 MaybeUpdateTimestamp(node, node_index, monotonic_start_time,
363 realtime_start_time);
364 MaybeWriteHeader(node_index, node);
365 }
366}
367
368void Logger::MaybeWriteHeader(int node_index) {
369 if (configuration::MultiNode(configuration_)) {
370 return MaybeWriteHeader(node_index,
371 configuration_->nodes()->Get(node_index));
372 } else {
373 return MaybeWriteHeader(node_index, nullptr);
374 }
375}
376
377void Logger::MaybeWriteHeader(int node_index, const Node *node) {
378 // This function is responsible for writing the header when the header both
379 // has valid data, and when it needs to be written.
380 if (node_state_[node_index].header_written &&
381 node_state_[node_index].header_valid) {
382 // The header has been written and is valid, nothing to do.
383 return;
384 }
385 if (!node_state_[node_index].has_source_node_boot_uuid) {
386 // Can't write a header if we don't have the boot UUID.
387 return;
388 }
389
390 // WriteHeader writes the first header in a log file. We want to do this only
391 // once.
392 //
393 // Rotate rewrites the same header with a new part ID, but keeps the same part
394 // UUID. We don't want that when things reboot, because that implies that
395 // parts go together across a reboot.
396 //
397 // Reboot resets the parts UUID. So, once we've written a header the first
398 // time, we want to use Reboot to rotate the log and reset the parts UUID.
399 //
400 // header_valid is cleared whenever the remote reboots.
401 if (node_state_[node_index].header_written) {
402 log_namer_->Reboot(node, &node_state_[node_index].log_file_header);
403 } else {
404 log_namer_->WriteHeader(&node_state_[node_index].log_file_header, node);
405
406 node_state_[node_index].header_written = true;
407 }
408 node_state_[node_index].header_valid = true;
409}
410
411void Logger::WriteMissingTimestamps() {
412 if (configuration::MultiNode(configuration_)) {
413 server_statistics_fetcher_.Fetch();
414 } else {
415 return;
416 }
417
418 if (server_statistics_fetcher_.get() == nullptr) {
419 return;
420 }
421
422 for (const Node *node : log_namer_->nodes()) {
423 const int node_index = configuration::GetNodeIndex(configuration_, node);
424 if (MaybeUpdateTimestamp(
425 node, node_index,
426 server_statistics_fetcher_.context().monotonic_event_time,
427 server_statistics_fetcher_.context().realtime_event_time)) {
428 CHECK(node_state_[node_index].header_written);
429 CHECK(node_state_[node_index].header_valid);
430 log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
431 } else {
432 MaybeWriteHeader(node_index, node);
433 }
434 }
435}
436
437void Logger::SetStartTime(
438 size_t node_index, aos::monotonic_clock::time_point monotonic_start_time,
439 aos::realtime_clock::time_point realtime_start_time,
440 aos::monotonic_clock::time_point logger_monotonic_start_time,
441 aos::realtime_clock::time_point logger_realtime_start_time) {
442 node_state_[node_index].monotonic_start_time = monotonic_start_time;
443 node_state_[node_index].realtime_start_time = realtime_start_time;
444 node_state_[node_index]
445 .log_file_header.mutable_message()
446 ->mutate_monotonic_start_time(
447 std::chrono::duration_cast<std::chrono::nanoseconds>(
448 monotonic_start_time.time_since_epoch())
449 .count());
450
451 // Add logger start times if they are available in the log file header.
452 if (node_state_[node_index]
453 .log_file_header.mutable_message()
454 ->has_logger_monotonic_start_time()) {
455 node_state_[node_index]
456 .log_file_header.mutable_message()
457 ->mutate_logger_monotonic_start_time(
458 std::chrono::duration_cast<std::chrono::nanoseconds>(
459 logger_monotonic_start_time.time_since_epoch())
460 .count());
461 }
462
463 if (node_state_[node_index]
464 .log_file_header.mutable_message()
465 ->has_logger_realtime_start_time()) {
466 node_state_[node_index]
467 .log_file_header.mutable_message()
468 ->mutate_logger_realtime_start_time(
469 std::chrono::duration_cast<std::chrono::nanoseconds>(
470 logger_realtime_start_time.time_since_epoch())
471 .count());
472 }
473
474 if (node_state_[node_index]
475 .log_file_header.mutable_message()
476 ->has_realtime_start_time()) {
477 node_state_[node_index]
478 .log_file_header.mutable_message()
479 ->mutate_realtime_start_time(
480 std::chrono::duration_cast<std::chrono::nanoseconds>(
481 realtime_start_time.time_since_epoch())
482 .count());
483 }
484}
485
486bool Logger::MaybeUpdateTimestamp(
487 const Node *node, int node_index,
488 aos::monotonic_clock::time_point monotonic_start_time,
489 aos::realtime_clock::time_point realtime_start_time) {
490 // Bail early if the start times are already set.
491 if (node_state_[node_index].monotonic_start_time !=
492 monotonic_clock::min_time) {
493 return false;
494 }
495 if (event_loop_->node() == node ||
496 !configuration::MultiNode(configuration_)) {
497 // There are no offsets to compute for ourself, so always succeed.
498 SetStartTime(node_index, monotonic_start_time, realtime_start_time,
499 monotonic_start_time, realtime_start_time);
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800500 node_state_[node_index].SetBootUUID(event_loop_->boot_uuid());
Austin Schuhb06f03b2021-02-17 22:00:37 -0800501 return true;
502 } else if (server_statistics_fetcher_.get() != nullptr) {
503 // We must be a remote node now. Look for the connection and see if it is
504 // connected.
505
506 for (const message_bridge::ServerConnection *connection :
507 *server_statistics_fetcher_->connections()) {
508 if (connection->node()->name()->string_view() !=
509 node->name()->string_view()) {
510 continue;
511 }
512
513 if (connection->state() != message_bridge::State::CONNECTED) {
514 VLOG(1) << node->name()->string_view()
515 << " is not connected, can't start it yet.";
516 break;
517 }
518
Austin Schuhb06f03b2021-02-17 22:00:37 -0800519 if (!connection->has_monotonic_offset()) {
520 VLOG(1) << "Missing monotonic offset for setting start time for node "
521 << aos::FlatbufferToJson(node);
522 break;
523 }
524
525 // Found it and it is connected. Compensate and go.
526 SetStartTime(node_index,
527 monotonic_start_time +
528 std::chrono::nanoseconds(connection->monotonic_offset()),
529 realtime_start_time, monotonic_start_time,
530 realtime_start_time);
531 return true;
532 }
533 }
534 return false;
535}
536
537aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> Logger::MakeHeader(
538 const Node *node, std::string_view config_sha256) {
539 // Now write the header with this timestamp in it.
540 flatbuffers::FlatBufferBuilder fbb;
541 fbb.ForceDefaults(true);
542
543 flatbuffers::Offset<aos::Configuration> configuration_offset;
544 if (!separate_config_) {
545 configuration_offset = CopyFlatBuffer(configuration_, &fbb);
546 } else {
547 CHECK(!config_sha256.empty());
548 }
549
550 const flatbuffers::Offset<flatbuffers::String> name_offset =
551 fbb.CreateString(name_);
552
553 CHECK(log_event_uuid_ != UUID::Zero());
554 const flatbuffers::Offset<flatbuffers::String> log_event_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800555 log_event_uuid_.PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800556
557 const flatbuffers::Offset<flatbuffers::String> logger_instance_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800558 logger_instance_uuid_.PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800559
560 flatbuffers::Offset<flatbuffers::String> log_start_uuid_offset;
Austin Schuh34f9e482021-03-31 22:54:18 -0700561 if (log_start_uuid_) {
562 log_start_uuid_offset = fbb.CreateString(log_start_uuid_->ToString());
Austin Schuhb06f03b2021-02-17 22:00:37 -0800563 }
564
565 flatbuffers::Offset<flatbuffers::String> config_sha256_offset;
566 if (!config_sha256.empty()) {
567 config_sha256_offset = fbb.CreateString(config_sha256);
568 }
569
570 const flatbuffers::Offset<flatbuffers::String> logger_node_boot_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800571 event_loop_->boot_uuid().PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800572
573 const flatbuffers::Offset<flatbuffers::String> source_node_boot_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800574 event_loop_->boot_uuid().PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800575
576 const flatbuffers::Offset<flatbuffers::String> parts_uuid_offset =
577 fbb.CreateString("00000000-0000-4000-8000-000000000000");
578
579 flatbuffers::Offset<Node> node_offset;
580 flatbuffers::Offset<Node> logger_node_offset;
581
582 if (configuration::MultiNode(configuration_)) {
583 node_offset = RecursiveCopyFlatBuffer(node, &fbb);
584 logger_node_offset = RecursiveCopyFlatBuffer(event_loop_->node(), &fbb);
585 }
586
587 aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
588
589 log_file_header_builder.add_name(name_offset);
590
591 // Only add the node if we are running in a multinode configuration.
592 if (node != nullptr) {
593 log_file_header_builder.add_node(node_offset);
594 log_file_header_builder.add_logger_node(logger_node_offset);
595 }
596
597 if (!configuration_offset.IsNull()) {
598 log_file_header_builder.add_configuration(configuration_offset);
599 }
600 // The worst case theoretical out of order is the polling period times 2.
601 // One message could get logged right after the boundary, but be for right
602 // before the next boundary. And the reverse could happen for another
603 // message. Report back 3x to be extra safe, and because the cost isn't
604 // huge on the read side.
605 log_file_header_builder.add_max_out_of_order_duration(
606 std::chrono::nanoseconds(3 * polling_period_).count());
607
608 log_file_header_builder.add_monotonic_start_time(
609 std::chrono::duration_cast<std::chrono::nanoseconds>(
610 monotonic_clock::min_time.time_since_epoch())
611 .count());
612 if (node == event_loop_->node()) {
613 log_file_header_builder.add_realtime_start_time(
614 std::chrono::duration_cast<std::chrono::nanoseconds>(
615 realtime_clock::min_time.time_since_epoch())
616 .count());
617 } else {
618 log_file_header_builder.add_logger_monotonic_start_time(
619 std::chrono::duration_cast<std::chrono::nanoseconds>(
620 monotonic_clock::min_time.time_since_epoch())
621 .count());
622 log_file_header_builder.add_logger_realtime_start_time(
623 std::chrono::duration_cast<std::chrono::nanoseconds>(
624 realtime_clock::min_time.time_since_epoch())
625 .count());
626 }
627
628 log_file_header_builder.add_log_event_uuid(log_event_uuid_offset);
629 log_file_header_builder.add_logger_instance_uuid(logger_instance_uuid_offset);
630 if (!log_start_uuid_offset.IsNull()) {
631 log_file_header_builder.add_log_start_uuid(log_start_uuid_offset);
632 }
633 log_file_header_builder.add_logger_node_boot_uuid(
634 logger_node_boot_uuid_offset);
635 log_file_header_builder.add_source_node_boot_uuid(
636 source_node_boot_uuid_offset);
637
638 log_file_header_builder.add_parts_uuid(parts_uuid_offset);
639 log_file_header_builder.add_parts_index(0);
640
641 log_file_header_builder.add_configuration_sha256(0);
642
643 if (!config_sha256_offset.IsNull()) {
644 log_file_header_builder.add_configuration_sha256(config_sha256_offset);
645 }
646
647 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
648 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> result(
649 fbb.Release());
650
651 CHECK(result.Verify()) << ": Built a corrupted header.";
652
653 return result;
654}
655
656void Logger::ResetStatisics() {
657 max_message_fetch_time_ = std::chrono::nanoseconds::zero();
658 max_message_fetch_time_channel_ = -1;
659 max_message_fetch_time_size_ = -1;
660 total_message_fetch_time_ = std::chrono::nanoseconds::zero();
661 total_message_fetch_count_ = 0;
662 total_message_fetch_bytes_ = 0;
663 total_nop_fetch_time_ = std::chrono::nanoseconds::zero();
664 total_nop_fetch_count_ = 0;
665 max_copy_time_ = std::chrono::nanoseconds::zero();
666 max_copy_time_channel_ = -1;
667 max_copy_time_size_ = -1;
668 total_copy_time_ = std::chrono::nanoseconds::zero();
669 total_copy_count_ = 0;
670 total_copy_bytes_ = 0;
671}
672
673void Logger::Rotate() {
674 for (const Node *node : log_namer_->nodes()) {
675 const int node_index = configuration::GetNodeIndex(configuration_, node);
676 log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
677 }
678}
679
680void Logger::LogUntil(monotonic_clock::time_point t) {
681 // Grab the latest ServerStatistics message. This will always have the
682 // oppertunity to be >= to the current time, so it will always represent any
683 // reboots which may have happened.
684 WriteMissingTimestamps();
685
Austin Schuhcdd90272021-03-15 12:46:16 -0700686 int our_node_index = aos::configuration::GetNodeIndex(
687 event_loop_->configuration(), event_loop_->node());
688
Austin Schuhb06f03b2021-02-17 22:00:37 -0800689 // Write each channel to disk, one at a time.
690 for (FetcherStruct &f : fetchers_) {
691 while (true) {
692 if (f.written) {
693 const auto start = event_loop_->monotonic_now();
694 const bool got_new = f.fetcher->FetchNext();
695 const auto end = event_loop_->monotonic_now();
696 RecordFetchResult(start, end, got_new, &f);
697 if (!got_new) {
698 VLOG(2) << "No new data on "
699 << configuration::CleanedChannelToString(
700 f.fetcher->channel());
701 break;
702 }
703 f.written = false;
704 }
705
706 // TODO(james): Write tests to exercise this logic.
707 if (f.fetcher->context().monotonic_event_time >= t) {
708 break;
709 }
710 if (f.writer != nullptr) {
Austin Schuhcdd90272021-03-15 12:46:16 -0700711 // Only check if the boot UUID has changed if this is data from another
712 // node. Our UUID can't change without restarting the application.
713 if (our_node_index != f.data_node_index) {
714 // And update our boot UUID if the UUID has changed.
715 if (node_state_[f.data_node_index].SetBootUUID(
Austin Schuha9012be2021-07-21 15:19:11 -0700716 f.fetcher->context().source_boot_uuid)) {
Austin Schuhcdd90272021-03-15 12:46:16 -0700717 MaybeWriteHeader(f.data_node_index);
718 }
719 }
720
Austin Schuhb06f03b2021-02-17 22:00:37 -0800721 // Write!
722 const auto start = event_loop_->monotonic_now();
723 flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
724 max_header_size_);
725 fbb.ForceDefaults(true);
726
727 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
728 f.channel_index, f.log_type));
729 const auto end = event_loop_->monotonic_now();
730 RecordCreateMessageTime(start, end, &f);
731
732 VLOG(2) << "Writing data as node "
733 << FlatbufferToJson(event_loop_->node()) << " for channel "
734 << configuration::CleanedChannelToString(f.fetcher->channel())
735 << " to " << f.writer->filename() << " data "
736 << FlatbufferToJson(
737 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
738 fbb.GetBufferPointer()));
739
740 max_header_size_ = std::max(max_header_size_,
741 fbb.GetSize() - f.fetcher->context().size);
742 CHECK(node_state_[f.data_node_index].header_valid)
743 << ": Can't write data before the header on channel "
744 << configuration::CleanedChannelToString(f.fetcher->channel());
Austin Schuhbd06ae42021-03-31 22:48:21 -0700745 f.writer->QueueSizedFlatbuffer(&fbb, end);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800746 }
747
748 if (f.timestamp_writer != nullptr) {
749 // And now handle timestamps.
750 const auto start = event_loop_->monotonic_now();
751 flatbuffers::FlatBufferBuilder fbb;
752 fbb.ForceDefaults(true);
753
754 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
755 f.channel_index,
756 LogType::kLogDeliveryTimeOnly));
757 const auto end = event_loop_->monotonic_now();
758 RecordCreateMessageTime(start, end, &f);
759
760 VLOG(2) << "Writing timestamps as node "
761 << FlatbufferToJson(event_loop_->node()) << " for channel "
762 << configuration::CleanedChannelToString(f.fetcher->channel())
763 << " to " << f.timestamp_writer->filename() << " timestamp "
764 << FlatbufferToJson(
765 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
766 fbb.GetBufferPointer()));
767
768 CHECK(node_state_[f.timestamp_node_index].header_valid)
769 << ": Can't write data before the header on channel "
770 << configuration::CleanedChannelToString(f.fetcher->channel());
Austin Schuhbd06ae42021-03-31 22:48:21 -0700771 f.timestamp_writer->QueueSizedFlatbuffer(&fbb, end);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800772 }
773
774 if (f.contents_writer != nullptr) {
775 const auto start = event_loop_->monotonic_now();
776 // And now handle the special message contents channel. Copy the
777 // message into a FlatBufferBuilder and save it to disk.
778 // TODO(austin): We can be more efficient here when we start to
779 // care...
780 flatbuffers::FlatBufferBuilder fbb;
781 fbb.ForceDefaults(true);
782
783 const RemoteMessage *msg =
784 flatbuffers::GetRoot<RemoteMessage>(f.fetcher->context().data);
785
786 CHECK(msg->has_boot_uuid()) << ": " << aos::FlatbufferToJson(msg);
Austin Schuhcdd90272021-03-15 12:46:16 -0700787 if (node_state_[f.contents_node_index].SetBootUUID(
788 UUID::FromVector(msg->boot_uuid()))) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800789 MaybeWriteHeader(f.contents_node_index);
790 }
791
792 logger::MessageHeader::Builder message_header_builder(fbb);
793
794 // TODO(austin): This needs to check the channel_index and confirm
795 // that it should be logged before squirreling away the timestamp to
796 // disk. We don't want to log irrelevant timestamps.
797
798 // Note: this must match the same order as MessageBridgeServer and
799 // PackMessage. We want identical headers to have identical
800 // on-the-wire formats to make comparing them easier.
801
802 // Translate from the channel index that the event loop uses to the
803 // channel index in the log file.
804 message_header_builder.add_channel_index(
805 event_loop_to_logged_channel_index_[msg->channel_index()]);
806
807 message_header_builder.add_queue_index(msg->queue_index());
808 message_header_builder.add_monotonic_sent_time(
809 msg->monotonic_sent_time());
810 message_header_builder.add_realtime_sent_time(
811 msg->realtime_sent_time());
812
813 message_header_builder.add_monotonic_remote_time(
814 msg->monotonic_remote_time());
815 message_header_builder.add_realtime_remote_time(
816 msg->realtime_remote_time());
817 message_header_builder.add_remote_queue_index(
818 msg->remote_queue_index());
819
820 message_header_builder.add_monotonic_timestamp_time(
821 f.fetcher->context()
822 .monotonic_event_time.time_since_epoch()
823 .count());
824
825 fbb.FinishSizePrefixed(message_header_builder.Finish());
826 const auto end = event_loop_->monotonic_now();
827 RecordCreateMessageTime(start, end, &f);
828
829 CHECK(node_state_[f.contents_node_index].header_valid)
830 << ": Can't write data before the header on channel "
831 << configuration::CleanedChannelToString(f.fetcher->channel());
Austin Schuhbd06ae42021-03-31 22:48:21 -0700832 f.contents_writer->QueueSizedFlatbuffer(&fbb, end);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800833 }
834
835 f.written = true;
836 }
837 }
838 last_synchronized_time_ = t;
839}
840
Austin Schuh30586902021-03-30 22:54:08 -0700841void Logger::DoLogData(const monotonic_clock::time_point end_time,
842 bool run_on_logged) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800843 // We want to guarantee that messages aren't out of order by more than
844 // max_out_of_order_duration. To do this, we need sync points. Every write
845 // cycle should be a sync point.
846
847 do {
848 // Move the sync point up by at most polling_period. This forces one sync
849 // per iteration, even if it is small.
850 LogUntil(std::min(last_synchronized_time_ + polling_period_, end_time));
851
Austin Schuh30586902021-03-30 22:54:08 -0700852 if (run_on_logged) {
853 on_logged_period_();
854 }
Austin Schuhb06f03b2021-02-17 22:00:37 -0800855
856 // If we missed cycles, we could be pretty far behind. Spin until we are
857 // caught up.
858 } while (last_synchronized_time_ + polling_period_ < end_time);
859}
860
861void Logger::RecordFetchResult(aos::monotonic_clock::time_point start,
862 aos::monotonic_clock::time_point end,
863 bool got_new, FetcherStruct *fetcher) {
864 const auto duration = end - start;
865 if (!got_new) {
866 ++total_nop_fetch_count_;
867 total_nop_fetch_time_ += duration;
868 return;
869 }
870 ++total_message_fetch_count_;
871 total_message_fetch_bytes_ += fetcher->fetcher->context().size;
872 total_message_fetch_time_ += duration;
873 if (duration > max_message_fetch_time_) {
874 max_message_fetch_time_ = duration;
875 max_message_fetch_time_channel_ = fetcher->channel_index;
876 max_message_fetch_time_size_ = fetcher->fetcher->context().size;
877 }
878}
879
880void Logger::RecordCreateMessageTime(aos::monotonic_clock::time_point start,
881 aos::monotonic_clock::time_point end,
882 FetcherStruct *fetcher) {
883 const auto duration = end - start;
884 total_copy_time_ += duration;
885 ++total_copy_count_;
886 total_copy_bytes_ += fetcher->fetcher->context().size;
887 if (duration > max_copy_time_) {
888 max_copy_time_ = duration;
889 max_copy_time_channel_ = fetcher->channel_index;
890 max_copy_time_size_ = fetcher->fetcher->context().size;
891 }
892}
893
894} // namespace logger
895} // namespace aos