blob: 4016a65f55cfd4e5508ac481acaf5967b3ebc277 [file] [log] [blame]
Austin Schuhb06f03b2021-02-17 22:00:37 -08001#include "aos/events/logging/log_writer.h"
2
Austin Schuh6bb8a822021-03-31 23:04:39 -07003#include <dirent.h>
4
Austin Schuhb06f03b2021-02-17 22:00:37 -08005#include <functional>
6#include <map>
7#include <vector>
8
9#include "aos/configuration.h"
10#include "aos/events/event_loop.h"
11#include "aos/network/message_bridge_server_generated.h"
12#include "aos/network/team_number.h"
Austin Schuh61e973f2021-02-21 21:43:56 -080013#include "aos/network/timestamp_channel.h"
Austin Schuhb06f03b2021-02-17 22:00:37 -080014
15namespace aos {
16namespace logger {
17namespace {
18using message_bridge::RemoteMessage;
Austin Schuhbd06ae42021-03-31 22:48:21 -070019namespace chrono = std::chrono;
Austin Schuhb06f03b2021-02-17 22:00:37 -080020} // namespace
21
22Logger::Logger(EventLoop *event_loop, const Configuration *configuration,
23 std::function<bool(const Channel *)> should_log)
24 : event_loop_(event_loop),
25 configuration_(configuration),
26 name_(network::GetHostname()),
27 timer_handler_(event_loop_->AddTimer(
28 [this]() { DoLogData(event_loop_->monotonic_now()); })),
29 server_statistics_fetcher_(
30 configuration::MultiNode(event_loop_->configuration())
31 ? event_loop_->MakeFetcher<message_bridge::ServerStatistics>(
32 "/aos")
33 : aos::Fetcher<message_bridge::ServerStatistics>()) {
34 VLOG(1) << "Creating logger for " << FlatbufferToJson(event_loop_->node());
35
Austin Schuhb06f03b2021-02-17 22:00:37 -080036 std::map<const Channel *, const Node *> timestamp_logger_channels;
37
Austin Schuh61e973f2021-02-21 21:43:56 -080038 message_bridge::ChannelTimestampFinder finder(event_loop_);
39 for (const Channel *channel : *event_loop_->configuration()->channels()) {
40 if (!configuration::ChannelIsSendableOnNode(channel, event_loop_->node())) {
Austin Schuhb06f03b2021-02-17 22:00:37 -080041 continue;
42 }
Austin Schuh61e973f2021-02-21 21:43:56 -080043 if (!channel->has_destination_nodes()) {
44 continue;
45 }
46 for (const Connection *connection : *channel->destination_nodes()) {
47 if (configuration::ConnectionDeliveryTimeIsLoggedOnNode(
48 connection, event_loop_->node())) {
49 const Node *other_node = configuration::GetNode(
50 event_loop_->configuration(), connection->name()->string_view());
51
52 VLOG(1) << "Timestamps are logged from "
53 << FlatbufferToJson(other_node);
54 timestamp_logger_channels.insert(
55 std::make_pair(finder.ForChannel(channel, connection), other_node));
56 }
57 }
Austin Schuhb06f03b2021-02-17 22:00:37 -080058 }
59
60 const size_t our_node_index =
61 configuration::GetNodeIndex(configuration_, event_loop_->node());
62
63 for (size_t channel_index = 0;
64 channel_index < configuration_->channels()->size(); ++channel_index) {
65 const Channel *const config_channel =
66 configuration_->channels()->Get(channel_index);
67 // The MakeRawFetcher method needs a channel which is in the event loop
68 // configuration() object, not the configuration_ object. Go look that up
69 // from the config.
70 const Channel *channel = aos::configuration::GetChannel(
71 event_loop_->configuration(), config_channel->name()->string_view(),
72 config_channel->type()->string_view(), "", event_loop_->node());
73 CHECK(channel != nullptr)
74 << ": Failed to look up channel "
75 << aos::configuration::CleanedChannelToString(config_channel);
76 if (!should_log(channel)) {
77 continue;
78 }
79
80 FetcherStruct fs;
81 fs.channel_index = channel_index;
82 fs.channel = channel;
83
84 const bool is_local =
85 configuration::ChannelIsSendableOnNode(channel, event_loop_->node());
86
87 const bool is_readable =
88 configuration::ChannelIsReadableOnNode(channel, event_loop_->node());
89 const bool is_logged = configuration::ChannelMessageIsLoggedOnNode(
90 channel, event_loop_->node());
91 const bool log_message = is_logged && is_readable;
92
93 bool log_delivery_times = false;
94 if (event_loop_->node() != nullptr) {
95 log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
96 channel, event_loop_->node(), event_loop_->node());
97 }
98
99 // Now, detect a RemoteMessage timestamp logger where we should just log the
100 // contents to a file directly.
101 const bool log_contents = timestamp_logger_channels.find(channel) !=
102 timestamp_logger_channels.end();
103
104 if (log_message || log_delivery_times || log_contents) {
105 fs.fetcher = event_loop->MakeRawFetcher(channel);
106 VLOG(1) << "Logging channel "
107 << configuration::CleanedChannelToString(channel);
108
109 if (log_delivery_times) {
110 VLOG(1) << " Delivery times";
111 fs.wants_timestamp_writer = true;
112 fs.timestamp_node_index = our_node_index;
113 }
114 if (log_message) {
115 VLOG(1) << " Data";
116 fs.wants_writer = true;
117 if (!is_local) {
118 const Node *source_node = configuration::GetNode(
119 configuration_, channel->source_node()->string_view());
120 fs.data_node_index =
121 configuration::GetNodeIndex(configuration_, source_node);
122 fs.log_type = LogType::kLogRemoteMessage;
123 } else {
124 fs.data_node_index = our_node_index;
125 }
126 }
127 if (log_contents) {
128 VLOG(1) << "Timestamp logger channel "
129 << configuration::CleanedChannelToString(channel);
130 fs.timestamp_node = timestamp_logger_channels.find(channel)->second;
131 fs.wants_contents_writer = true;
132 fs.contents_node_index =
133 configuration::GetNodeIndex(configuration_, fs.timestamp_node);
134 }
135 fetchers_.emplace_back(std::move(fs));
136 }
137 }
138
139 // When we are logging remote timestamps, we need to be able to translate from
140 // the channel index that the event loop uses to the channel index in the
141 // config in the log file.
142 event_loop_to_logged_channel_index_.resize(
143 event_loop->configuration()->channels()->size(), -1);
144 for (size_t event_loop_channel_index = 0;
145 event_loop_channel_index <
146 event_loop->configuration()->channels()->size();
147 ++event_loop_channel_index) {
148 const Channel *event_loop_channel =
149 event_loop->configuration()->channels()->Get(event_loop_channel_index);
150
151 const Channel *logged_channel = aos::configuration::GetChannel(
152 configuration_, event_loop_channel->name()->string_view(),
153 event_loop_channel->type()->string_view(), "",
154 configuration::GetNode(configuration_, event_loop_->node()));
155
156 if (logged_channel != nullptr) {
157 event_loop_to_logged_channel_index_[event_loop_channel_index] =
158 configuration::ChannelIndex(configuration_, logged_channel);
159 }
160 }
161}
162
163Logger::~Logger() {
164 if (log_namer_) {
165 // If we are replaying a log file, or in simulation, we want to force the
166 // last bit of data to be logged. The easiest way to deal with this is to
167 // poll everything as we go to destroy the class, ie, shut down the logger,
168 // and write it to disk.
169 StopLogging(event_loop_->monotonic_now());
170 }
171}
172
Austin Schuh6bb8a822021-03-31 23:04:39 -0700173bool Logger::RenameLogBase(std::string new_base_name) {
174 if (new_base_name == log_namer_->base_name()) {
175 return true;
176 }
177 std::string current_directory = std::string(log_namer_->base_name());
178 std::string new_directory = new_base_name;
179
180 auto current_path_split = current_directory.rfind("/");
181 auto new_path_split = new_directory.rfind("/");
182
183 CHECK(new_base_name.substr(new_path_split) ==
184 current_directory.substr(current_path_split))
185 << "Rename of file base from " << current_directory << " to "
186 << new_directory << " is not supported.";
187
188 current_directory.resize(current_path_split);
189 new_directory.resize(new_path_split);
190 DIR *dir = opendir(current_directory.c_str());
191 if (dir) {
192 closedir(dir);
193 const int result = rename(current_directory.c_str(), new_directory.c_str());
194 if (result != 0) {
195 PLOG(ERROR) << "Unable to rename " << current_directory << " to "
196 << new_directory;
197 return false;
198 }
199 } else {
200 // Handle if directory was already renamed.
201 dir = opendir(new_directory.c_str());
202 if (!dir) {
203 LOG(ERROR) << "Old directory " << current_directory
204 << " missing and new directory " << new_directory
205 << " not present.";
206 return false;
207 }
208 closedir(dir);
209 }
210
211 log_namer_->set_base_name(new_base_name);
212 Rotate();
213 return true;
214}
215
Austin Schuhb06f03b2021-02-17 22:00:37 -0800216void Logger::StartLogging(std::unique_ptr<LogNamer> log_namer,
Austin Schuh34f9e482021-03-31 22:54:18 -0700217 std::optional<UUID> log_start_uuid) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800218 CHECK(!log_namer_) << ": Already logging";
219 log_namer_ = std::move(log_namer);
220
221 std::string config_sha256;
222 if (separate_config_) {
223 flatbuffers::FlatBufferBuilder fbb;
224 flatbuffers::Offset<aos::Configuration> configuration_offset =
225 CopyFlatBuffer(configuration_, &fbb);
226 LogFileHeader::Builder log_file_header_builder(fbb);
227 log_file_header_builder.add_configuration(configuration_offset);
228 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
229 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config_header(
230 fbb.Release());
231 config_sha256 = Sha256(config_header.span());
232 LOG(INFO) << "Config sha256 of " << config_sha256;
233 log_namer_->WriteConfiguration(&config_header, config_sha256);
234 }
235
236 log_event_uuid_ = UUID::Random();
237 log_start_uuid_ = log_start_uuid;
238 VLOG(1) << "Starting logger for " << FlatbufferToJson(event_loop_->node());
239
240 // We want to do as much work as possible before the initial Fetch. Time
241 // between that and actually starting to log opens up the possibility of
242 // falling off the end of the queue during that time.
243
244 for (FetcherStruct &f : fetchers_) {
245 if (f.wants_writer) {
246 f.writer = log_namer_->MakeWriter(f.channel);
247 }
248 if (f.wants_timestamp_writer) {
249 f.timestamp_writer = log_namer_->MakeTimestampWriter(f.channel);
250 }
251 if (f.wants_contents_writer) {
252 f.contents_writer = log_namer_->MakeForwardedTimestampWriter(
253 f.channel, CHECK_NOTNULL(f.timestamp_node));
254 }
255 }
256
257 CHECK(node_state_.empty());
258 node_state_.resize(configuration::MultiNode(configuration_)
259 ? configuration_->nodes()->size()
260 : 1u);
261
262 for (const Node *node : log_namer_->nodes()) {
263 const int node_index = configuration::GetNodeIndex(configuration_, node);
264
265 node_state_[node_index].log_file_header = MakeHeader(node, config_sha256);
266 }
267
Austin Schuha42ee962021-03-31 22:49:30 -0700268 const aos::monotonic_clock::time_point beginning_time =
269 event_loop_->monotonic_now();
270
Austin Schuhb06f03b2021-02-17 22:00:37 -0800271 // Grab data from each channel right before we declare the log file started
272 // so we can capture the latest message on each channel. This lets us have
273 // non periodic messages with configuration that now get logged.
274 for (FetcherStruct &f : fetchers_) {
275 const auto start = event_loop_->monotonic_now();
276 const bool got_new = f.fetcher->Fetch();
277 const auto end = event_loop_->monotonic_now();
278 RecordFetchResult(start, end, got_new, &f);
279
280 // If there is a message, we want to write it.
281 f.written = f.fetcher->context().data == nullptr;
282 }
283
284 // Clear out any old timestamps in case we are re-starting logging.
285 for (size_t i = 0; i < node_state_.size(); ++i) {
286 SetStartTime(i, monotonic_clock::min_time, realtime_clock::min_time,
287 monotonic_clock::min_time, realtime_clock::min_time);
288 }
289
Austin Schuha42ee962021-03-31 22:49:30 -0700290 const aos::monotonic_clock::time_point fetch_time =
291 event_loop_->monotonic_now();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800292 WriteHeader();
Austin Schuha42ee962021-03-31 22:49:30 -0700293 const aos::monotonic_clock::time_point header_time =
294 event_loop_->monotonic_now();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800295
296 LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node())
Austin Schuha42ee962021-03-31 22:49:30 -0700297 << " start_time " << last_synchronized_time_ << ", took "
298 << chrono::duration<double>(fetch_time - beginning_time).count()
299 << " to fetch, "
300 << chrono::duration<double>(header_time - fetch_time).count()
301 << " to write headers, boot uuid " << event_loop_->boot_uuid();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800302
303 // Force logging up until the start of the log file now, so the messages at
304 // the start are always ordered before the rest of the messages.
305 // Note: this ship may have already sailed, but we don't have to make it
306 // worse.
307 // TODO(austin): Test...
Austin Schuh855f8932021-03-19 22:01:32 -0700308 //
309 // This is safe to call here since we have set last_synchronized_time_ as the
310 // same time as in the header, and all the data before it should be logged
311 // without ordering concerns.
Austin Schuhb06f03b2021-02-17 22:00:37 -0800312 LogUntil(last_synchronized_time_);
313
314 timer_handler_->Setup(event_loop_->monotonic_now() + polling_period_,
315 polling_period_);
316}
317
318std::unique_ptr<LogNamer> Logger::StopLogging(
319 aos::monotonic_clock::time_point end_time) {
320 CHECK(log_namer_) << ": Not logging right now";
321
322 if (end_time != aos::monotonic_clock::min_time) {
Austin Schuh855f8932021-03-19 22:01:32 -0700323 DoLogData(end_time);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800324 }
325 timer_handler_->Disable();
326
327 for (FetcherStruct &f : fetchers_) {
328 f.writer = nullptr;
329 f.timestamp_writer = nullptr;
330 f.contents_writer = nullptr;
331 }
332 node_state_.clear();
333
334 log_event_uuid_ = UUID::Zero();
Austin Schuh34f9e482021-03-31 22:54:18 -0700335 log_start_uuid_ = std::nullopt;
Austin Schuhb06f03b2021-02-17 22:00:37 -0800336
337 return std::move(log_namer_);
338}
339
340void Logger::WriteHeader() {
341 if (configuration::MultiNode(configuration_)) {
342 server_statistics_fetcher_.Fetch();
343 }
344
345 aos::monotonic_clock::time_point monotonic_start_time =
346 event_loop_->monotonic_now();
347 aos::realtime_clock::time_point realtime_start_time =
348 event_loop_->realtime_now();
349
350 // We need to pick a point in time to declare the log file "started". This
351 // starts here. It needs to be after everything is fetched so that the
352 // fetchers are all pointed at the most recent message before the start
353 // time.
354 last_synchronized_time_ = monotonic_start_time;
355
356 for (const Node *node : log_namer_->nodes()) {
357 const int node_index = configuration::GetNodeIndex(configuration_, node);
358 MaybeUpdateTimestamp(node, node_index, monotonic_start_time,
359 realtime_start_time);
360 MaybeWriteHeader(node_index, node);
361 }
362}
363
364void Logger::MaybeWriteHeader(int node_index) {
365 if (configuration::MultiNode(configuration_)) {
366 return MaybeWriteHeader(node_index,
367 configuration_->nodes()->Get(node_index));
368 } else {
369 return MaybeWriteHeader(node_index, nullptr);
370 }
371}
372
373void Logger::MaybeWriteHeader(int node_index, const Node *node) {
374 // This function is responsible for writing the header when the header both
375 // has valid data, and when it needs to be written.
376 if (node_state_[node_index].header_written &&
377 node_state_[node_index].header_valid) {
378 // The header has been written and is valid, nothing to do.
379 return;
380 }
381 if (!node_state_[node_index].has_source_node_boot_uuid) {
382 // Can't write a header if we don't have the boot UUID.
383 return;
384 }
385
386 // WriteHeader writes the first header in a log file. We want to do this only
387 // once.
388 //
389 // Rotate rewrites the same header with a new part ID, but keeps the same part
390 // UUID. We don't want that when things reboot, because that implies that
391 // parts go together across a reboot.
392 //
393 // Reboot resets the parts UUID. So, once we've written a header the first
394 // time, we want to use Reboot to rotate the log and reset the parts UUID.
395 //
396 // header_valid is cleared whenever the remote reboots.
397 if (node_state_[node_index].header_written) {
398 log_namer_->Reboot(node, &node_state_[node_index].log_file_header);
399 } else {
400 log_namer_->WriteHeader(&node_state_[node_index].log_file_header, node);
401
402 node_state_[node_index].header_written = true;
403 }
404 node_state_[node_index].header_valid = true;
405}
406
407void Logger::WriteMissingTimestamps() {
408 if (configuration::MultiNode(configuration_)) {
409 server_statistics_fetcher_.Fetch();
410 } else {
411 return;
412 }
413
414 if (server_statistics_fetcher_.get() == nullptr) {
415 return;
416 }
417
418 for (const Node *node : log_namer_->nodes()) {
419 const int node_index = configuration::GetNodeIndex(configuration_, node);
420 if (MaybeUpdateTimestamp(
421 node, node_index,
422 server_statistics_fetcher_.context().monotonic_event_time,
423 server_statistics_fetcher_.context().realtime_event_time)) {
424 CHECK(node_state_[node_index].header_written);
425 CHECK(node_state_[node_index].header_valid);
426 log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
427 } else {
428 MaybeWriteHeader(node_index, node);
429 }
430 }
431}
432
433void Logger::SetStartTime(
434 size_t node_index, aos::monotonic_clock::time_point monotonic_start_time,
435 aos::realtime_clock::time_point realtime_start_time,
436 aos::monotonic_clock::time_point logger_monotonic_start_time,
437 aos::realtime_clock::time_point logger_realtime_start_time) {
438 node_state_[node_index].monotonic_start_time = monotonic_start_time;
439 node_state_[node_index].realtime_start_time = realtime_start_time;
440 node_state_[node_index]
441 .log_file_header.mutable_message()
442 ->mutate_monotonic_start_time(
443 std::chrono::duration_cast<std::chrono::nanoseconds>(
444 monotonic_start_time.time_since_epoch())
445 .count());
446
447 // Add logger start times if they are available in the log file header.
448 if (node_state_[node_index]
449 .log_file_header.mutable_message()
450 ->has_logger_monotonic_start_time()) {
451 node_state_[node_index]
452 .log_file_header.mutable_message()
453 ->mutate_logger_monotonic_start_time(
454 std::chrono::duration_cast<std::chrono::nanoseconds>(
455 logger_monotonic_start_time.time_since_epoch())
456 .count());
457 }
458
459 if (node_state_[node_index]
460 .log_file_header.mutable_message()
461 ->has_logger_realtime_start_time()) {
462 node_state_[node_index]
463 .log_file_header.mutable_message()
464 ->mutate_logger_realtime_start_time(
465 std::chrono::duration_cast<std::chrono::nanoseconds>(
466 logger_realtime_start_time.time_since_epoch())
467 .count());
468 }
469
470 if (node_state_[node_index]
471 .log_file_header.mutable_message()
472 ->has_realtime_start_time()) {
473 node_state_[node_index]
474 .log_file_header.mutable_message()
475 ->mutate_realtime_start_time(
476 std::chrono::duration_cast<std::chrono::nanoseconds>(
477 realtime_start_time.time_since_epoch())
478 .count());
479 }
480}
481
482bool Logger::MaybeUpdateTimestamp(
483 const Node *node, int node_index,
484 aos::monotonic_clock::time_point monotonic_start_time,
485 aos::realtime_clock::time_point realtime_start_time) {
486 // Bail early if the start times are already set.
487 if (node_state_[node_index].monotonic_start_time !=
488 monotonic_clock::min_time) {
489 return false;
490 }
491 if (event_loop_->node() == node ||
492 !configuration::MultiNode(configuration_)) {
493 // There are no offsets to compute for ourself, so always succeed.
494 SetStartTime(node_index, monotonic_start_time, realtime_start_time,
495 monotonic_start_time, realtime_start_time);
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800496 node_state_[node_index].SetBootUUID(event_loop_->boot_uuid());
Austin Schuhb06f03b2021-02-17 22:00:37 -0800497 return true;
498 } else if (server_statistics_fetcher_.get() != nullptr) {
499 // We must be a remote node now. Look for the connection and see if it is
500 // connected.
501
502 for (const message_bridge::ServerConnection *connection :
503 *server_statistics_fetcher_->connections()) {
504 if (connection->node()->name()->string_view() !=
505 node->name()->string_view()) {
506 continue;
507 }
508
509 if (connection->state() != message_bridge::State::CONNECTED) {
510 VLOG(1) << node->name()->string_view()
511 << " is not connected, can't start it yet.";
512 break;
513 }
514
Austin Schuhb06f03b2021-02-17 22:00:37 -0800515 if (!connection->has_monotonic_offset()) {
516 VLOG(1) << "Missing monotonic offset for setting start time for node "
517 << aos::FlatbufferToJson(node);
518 break;
519 }
520
521 // Found it and it is connected. Compensate and go.
522 SetStartTime(node_index,
523 monotonic_start_time +
524 std::chrono::nanoseconds(connection->monotonic_offset()),
525 realtime_start_time, monotonic_start_time,
526 realtime_start_time);
527 return true;
528 }
529 }
530 return false;
531}
532
533aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> Logger::MakeHeader(
534 const Node *node, std::string_view config_sha256) {
535 // Now write the header with this timestamp in it.
536 flatbuffers::FlatBufferBuilder fbb;
537 fbb.ForceDefaults(true);
538
539 flatbuffers::Offset<aos::Configuration> configuration_offset;
540 if (!separate_config_) {
541 configuration_offset = CopyFlatBuffer(configuration_, &fbb);
542 } else {
543 CHECK(!config_sha256.empty());
544 }
545
546 const flatbuffers::Offset<flatbuffers::String> name_offset =
547 fbb.CreateString(name_);
548
549 CHECK(log_event_uuid_ != UUID::Zero());
550 const flatbuffers::Offset<flatbuffers::String> log_event_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800551 log_event_uuid_.PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800552
553 const flatbuffers::Offset<flatbuffers::String> logger_instance_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800554 logger_instance_uuid_.PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800555
556 flatbuffers::Offset<flatbuffers::String> log_start_uuid_offset;
Austin Schuh34f9e482021-03-31 22:54:18 -0700557 if (log_start_uuid_) {
558 log_start_uuid_offset = fbb.CreateString(log_start_uuid_->ToString());
Austin Schuhb06f03b2021-02-17 22:00:37 -0800559 }
560
561 flatbuffers::Offset<flatbuffers::String> config_sha256_offset;
562 if (!config_sha256.empty()) {
563 config_sha256_offset = fbb.CreateString(config_sha256);
564 }
565
566 const flatbuffers::Offset<flatbuffers::String> logger_node_boot_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800567 event_loop_->boot_uuid().PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800568
569 const flatbuffers::Offset<flatbuffers::String> source_node_boot_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800570 event_loop_->boot_uuid().PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800571
572 const flatbuffers::Offset<flatbuffers::String> parts_uuid_offset =
573 fbb.CreateString("00000000-0000-4000-8000-000000000000");
574
575 flatbuffers::Offset<Node> node_offset;
576 flatbuffers::Offset<Node> logger_node_offset;
577
578 if (configuration::MultiNode(configuration_)) {
579 node_offset = RecursiveCopyFlatBuffer(node, &fbb);
580 logger_node_offset = RecursiveCopyFlatBuffer(event_loop_->node(), &fbb);
581 }
582
583 aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
584
585 log_file_header_builder.add_name(name_offset);
586
587 // Only add the node if we are running in a multinode configuration.
588 if (node != nullptr) {
589 log_file_header_builder.add_node(node_offset);
590 log_file_header_builder.add_logger_node(logger_node_offset);
591 }
592
593 if (!configuration_offset.IsNull()) {
594 log_file_header_builder.add_configuration(configuration_offset);
595 }
596 // The worst case theoretical out of order is the polling period times 2.
597 // One message could get logged right after the boundary, but be for right
598 // before the next boundary. And the reverse could happen for another
599 // message. Report back 3x to be extra safe, and because the cost isn't
600 // huge on the read side.
601 log_file_header_builder.add_max_out_of_order_duration(
602 std::chrono::nanoseconds(3 * polling_period_).count());
603
604 log_file_header_builder.add_monotonic_start_time(
605 std::chrono::duration_cast<std::chrono::nanoseconds>(
606 monotonic_clock::min_time.time_since_epoch())
607 .count());
608 if (node == event_loop_->node()) {
609 log_file_header_builder.add_realtime_start_time(
610 std::chrono::duration_cast<std::chrono::nanoseconds>(
611 realtime_clock::min_time.time_since_epoch())
612 .count());
613 } else {
614 log_file_header_builder.add_logger_monotonic_start_time(
615 std::chrono::duration_cast<std::chrono::nanoseconds>(
616 monotonic_clock::min_time.time_since_epoch())
617 .count());
618 log_file_header_builder.add_logger_realtime_start_time(
619 std::chrono::duration_cast<std::chrono::nanoseconds>(
620 realtime_clock::min_time.time_since_epoch())
621 .count());
622 }
623
624 log_file_header_builder.add_log_event_uuid(log_event_uuid_offset);
625 log_file_header_builder.add_logger_instance_uuid(logger_instance_uuid_offset);
626 if (!log_start_uuid_offset.IsNull()) {
627 log_file_header_builder.add_log_start_uuid(log_start_uuid_offset);
628 }
629 log_file_header_builder.add_logger_node_boot_uuid(
630 logger_node_boot_uuid_offset);
631 log_file_header_builder.add_source_node_boot_uuid(
632 source_node_boot_uuid_offset);
633
634 log_file_header_builder.add_parts_uuid(parts_uuid_offset);
635 log_file_header_builder.add_parts_index(0);
636
637 log_file_header_builder.add_configuration_sha256(0);
638
639 if (!config_sha256_offset.IsNull()) {
640 log_file_header_builder.add_configuration_sha256(config_sha256_offset);
641 }
642
643 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
644 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> result(
645 fbb.Release());
646
647 CHECK(result.Verify()) << ": Built a corrupted header.";
648
649 return result;
650}
651
652void Logger::ResetStatisics() {
653 max_message_fetch_time_ = std::chrono::nanoseconds::zero();
654 max_message_fetch_time_channel_ = -1;
655 max_message_fetch_time_size_ = -1;
656 total_message_fetch_time_ = std::chrono::nanoseconds::zero();
657 total_message_fetch_count_ = 0;
658 total_message_fetch_bytes_ = 0;
659 total_nop_fetch_time_ = std::chrono::nanoseconds::zero();
660 total_nop_fetch_count_ = 0;
661 max_copy_time_ = std::chrono::nanoseconds::zero();
662 max_copy_time_channel_ = -1;
663 max_copy_time_size_ = -1;
664 total_copy_time_ = std::chrono::nanoseconds::zero();
665 total_copy_count_ = 0;
666 total_copy_bytes_ = 0;
667}
668
669void Logger::Rotate() {
670 for (const Node *node : log_namer_->nodes()) {
671 const int node_index = configuration::GetNodeIndex(configuration_, node);
672 log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
673 }
674}
675
676void Logger::LogUntil(monotonic_clock::time_point t) {
677 // Grab the latest ServerStatistics message. This will always have the
678 // oppertunity to be >= to the current time, so it will always represent any
679 // reboots which may have happened.
680 WriteMissingTimestamps();
681
Austin Schuhcdd90272021-03-15 12:46:16 -0700682 int our_node_index = aos::configuration::GetNodeIndex(
683 event_loop_->configuration(), event_loop_->node());
684
Austin Schuhb06f03b2021-02-17 22:00:37 -0800685 // Write each channel to disk, one at a time.
686 for (FetcherStruct &f : fetchers_) {
687 while (true) {
688 if (f.written) {
689 const auto start = event_loop_->monotonic_now();
690 const bool got_new = f.fetcher->FetchNext();
691 const auto end = event_loop_->monotonic_now();
692 RecordFetchResult(start, end, got_new, &f);
693 if (!got_new) {
694 VLOG(2) << "No new data on "
695 << configuration::CleanedChannelToString(
696 f.fetcher->channel());
697 break;
698 }
699 f.written = false;
700 }
701
702 // TODO(james): Write tests to exercise this logic.
703 if (f.fetcher->context().monotonic_event_time >= t) {
704 break;
705 }
706 if (f.writer != nullptr) {
Austin Schuhcdd90272021-03-15 12:46:16 -0700707 // Only check if the boot UUID has changed if this is data from another
708 // node. Our UUID can't change without restarting the application.
709 if (our_node_index != f.data_node_index) {
710 // And update our boot UUID if the UUID has changed.
711 if (node_state_[f.data_node_index].SetBootUUID(
712 f.fetcher->context().remote_boot_uuid)) {
713 MaybeWriteHeader(f.data_node_index);
714 }
715 }
716
Austin Schuhb06f03b2021-02-17 22:00:37 -0800717 // Write!
718 const auto start = event_loop_->monotonic_now();
719 flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
720 max_header_size_);
721 fbb.ForceDefaults(true);
722
723 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
724 f.channel_index, f.log_type));
725 const auto end = event_loop_->monotonic_now();
726 RecordCreateMessageTime(start, end, &f);
727
728 VLOG(2) << "Writing data as node "
729 << FlatbufferToJson(event_loop_->node()) << " for channel "
730 << configuration::CleanedChannelToString(f.fetcher->channel())
731 << " to " << f.writer->filename() << " data "
732 << FlatbufferToJson(
733 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
734 fbb.GetBufferPointer()));
735
736 max_header_size_ = std::max(max_header_size_,
737 fbb.GetSize() - f.fetcher->context().size);
738 CHECK(node_state_[f.data_node_index].header_valid)
739 << ": Can't write data before the header on channel "
740 << configuration::CleanedChannelToString(f.fetcher->channel());
Austin Schuhbd06ae42021-03-31 22:48:21 -0700741 f.writer->QueueSizedFlatbuffer(&fbb, end);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800742 }
743
744 if (f.timestamp_writer != nullptr) {
745 // And now handle timestamps.
746 const auto start = event_loop_->monotonic_now();
747 flatbuffers::FlatBufferBuilder fbb;
748 fbb.ForceDefaults(true);
749
750 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
751 f.channel_index,
752 LogType::kLogDeliveryTimeOnly));
753 const auto end = event_loop_->monotonic_now();
754 RecordCreateMessageTime(start, end, &f);
755
756 VLOG(2) << "Writing timestamps as node "
757 << FlatbufferToJson(event_loop_->node()) << " for channel "
758 << configuration::CleanedChannelToString(f.fetcher->channel())
759 << " to " << f.timestamp_writer->filename() << " timestamp "
760 << FlatbufferToJson(
761 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
762 fbb.GetBufferPointer()));
763
764 CHECK(node_state_[f.timestamp_node_index].header_valid)
765 << ": Can't write data before the header on channel "
766 << configuration::CleanedChannelToString(f.fetcher->channel());
Austin Schuhbd06ae42021-03-31 22:48:21 -0700767 f.timestamp_writer->QueueSizedFlatbuffer(&fbb, end);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800768 }
769
770 if (f.contents_writer != nullptr) {
771 const auto start = event_loop_->monotonic_now();
772 // And now handle the special message contents channel. Copy the
773 // message into a FlatBufferBuilder and save it to disk.
774 // TODO(austin): We can be more efficient here when we start to
775 // care...
776 flatbuffers::FlatBufferBuilder fbb;
777 fbb.ForceDefaults(true);
778
779 const RemoteMessage *msg =
780 flatbuffers::GetRoot<RemoteMessage>(f.fetcher->context().data);
781
782 CHECK(msg->has_boot_uuid()) << ": " << aos::FlatbufferToJson(msg);
Austin Schuhcdd90272021-03-15 12:46:16 -0700783 if (node_state_[f.contents_node_index].SetBootUUID(
784 UUID::FromVector(msg->boot_uuid()))) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800785 MaybeWriteHeader(f.contents_node_index);
786 }
787
788 logger::MessageHeader::Builder message_header_builder(fbb);
789
790 // TODO(austin): This needs to check the channel_index and confirm
791 // that it should be logged before squirreling away the timestamp to
792 // disk. We don't want to log irrelevant timestamps.
793
794 // Note: this must match the same order as MessageBridgeServer and
795 // PackMessage. We want identical headers to have identical
796 // on-the-wire formats to make comparing them easier.
797
798 // Translate from the channel index that the event loop uses to the
799 // channel index in the log file.
800 message_header_builder.add_channel_index(
801 event_loop_to_logged_channel_index_[msg->channel_index()]);
802
803 message_header_builder.add_queue_index(msg->queue_index());
804 message_header_builder.add_monotonic_sent_time(
805 msg->monotonic_sent_time());
806 message_header_builder.add_realtime_sent_time(
807 msg->realtime_sent_time());
808
809 message_header_builder.add_monotonic_remote_time(
810 msg->monotonic_remote_time());
811 message_header_builder.add_realtime_remote_time(
812 msg->realtime_remote_time());
813 message_header_builder.add_remote_queue_index(
814 msg->remote_queue_index());
815
816 message_header_builder.add_monotonic_timestamp_time(
817 f.fetcher->context()
818 .monotonic_event_time.time_since_epoch()
819 .count());
820
821 fbb.FinishSizePrefixed(message_header_builder.Finish());
822 const auto end = event_loop_->monotonic_now();
823 RecordCreateMessageTime(start, end, &f);
824
825 CHECK(node_state_[f.contents_node_index].header_valid)
826 << ": Can't write data before the header on channel "
827 << configuration::CleanedChannelToString(f.fetcher->channel());
Austin Schuhbd06ae42021-03-31 22:48:21 -0700828 f.contents_writer->QueueSizedFlatbuffer(&fbb, end);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800829 }
830
831 f.written = true;
832 }
833 }
834 last_synchronized_time_ = t;
835}
836
837void Logger::DoLogData(const monotonic_clock::time_point end_time) {
838 // We want to guarantee that messages aren't out of order by more than
839 // max_out_of_order_duration. To do this, we need sync points. Every write
840 // cycle should be a sync point.
841
842 do {
843 // Move the sync point up by at most polling_period. This forces one sync
844 // per iteration, even if it is small.
845 LogUntil(std::min(last_synchronized_time_ + polling_period_, end_time));
846
847 on_logged_period_();
848
849 // If we missed cycles, we could be pretty far behind. Spin until we are
850 // caught up.
851 } while (last_synchronized_time_ + polling_period_ < end_time);
852}
853
854void Logger::RecordFetchResult(aos::monotonic_clock::time_point start,
855 aos::monotonic_clock::time_point end,
856 bool got_new, FetcherStruct *fetcher) {
857 const auto duration = end - start;
858 if (!got_new) {
859 ++total_nop_fetch_count_;
860 total_nop_fetch_time_ += duration;
861 return;
862 }
863 ++total_message_fetch_count_;
864 total_message_fetch_bytes_ += fetcher->fetcher->context().size;
865 total_message_fetch_time_ += duration;
866 if (duration > max_message_fetch_time_) {
867 max_message_fetch_time_ = duration;
868 max_message_fetch_time_channel_ = fetcher->channel_index;
869 max_message_fetch_time_size_ = fetcher->fetcher->context().size;
870 }
871}
872
873void Logger::RecordCreateMessageTime(aos::monotonic_clock::time_point start,
874 aos::monotonic_clock::time_point end,
875 FetcherStruct *fetcher) {
876 const auto duration = end - start;
877 total_copy_time_ += duration;
878 ++total_copy_count_;
879 total_copy_bytes_ += fetcher->fetcher->context().size;
880 if (duration > max_copy_time_) {
881 max_copy_time_ = duration;
882 max_copy_time_channel_ = fetcher->channel_index;
883 max_copy_time_size_ = fetcher->fetcher->context().size;
884 }
885}
886
887} // namespace logger
888} // namespace aos