blob: bc04f83cec7da45b35ead64043ec9a19ee609db3 [file] [log] [blame]
Austin Schuhb06f03b2021-02-17 22:00:37 -08001#include "aos/events/logging/log_writer.h"
2
Austin Schuh6bb8a822021-03-31 23:04:39 -07003#include <dirent.h>
4
Austin Schuhb06f03b2021-02-17 22:00:37 -08005#include <functional>
6#include <map>
7#include <vector>
8
9#include "aos/configuration.h"
10#include "aos/events/event_loop.h"
11#include "aos/network/message_bridge_server_generated.h"
12#include "aos/network/team_number.h"
Austin Schuh61e973f2021-02-21 21:43:56 -080013#include "aos/network/timestamp_channel.h"
Austin Schuhb06f03b2021-02-17 22:00:37 -080014
15namespace aos {
16namespace logger {
17namespace {
18using message_bridge::RemoteMessage;
Austin Schuhbd06ae42021-03-31 22:48:21 -070019namespace chrono = std::chrono;
Austin Schuhb06f03b2021-02-17 22:00:37 -080020} // namespace
21
22Logger::Logger(EventLoop *event_loop, const Configuration *configuration,
23 std::function<bool(const Channel *)> should_log)
24 : event_loop_(event_loop),
25 configuration_(configuration),
26 name_(network::GetHostname()),
27 timer_handler_(event_loop_->AddTimer(
Austin Schuh30586902021-03-30 22:54:08 -070028 [this]() { DoLogData(event_loop_->monotonic_now(), true); })),
Austin Schuhb06f03b2021-02-17 22:00:37 -080029 server_statistics_fetcher_(
30 configuration::MultiNode(event_loop_->configuration())
31 ? event_loop_->MakeFetcher<message_bridge::ServerStatistics>(
32 "/aos")
33 : aos::Fetcher<message_bridge::ServerStatistics>()) {
34 VLOG(1) << "Creating logger for " << FlatbufferToJson(event_loop_->node());
35
Austin Schuhb06f03b2021-02-17 22:00:37 -080036 std::map<const Channel *, const Node *> timestamp_logger_channels;
37
Austin Schuh61e973f2021-02-21 21:43:56 -080038 message_bridge::ChannelTimestampFinder finder(event_loop_);
39 for (const Channel *channel : *event_loop_->configuration()->channels()) {
40 if (!configuration::ChannelIsSendableOnNode(channel, event_loop_->node())) {
Austin Schuhb06f03b2021-02-17 22:00:37 -080041 continue;
42 }
Austin Schuh61e973f2021-02-21 21:43:56 -080043 if (!channel->has_destination_nodes()) {
44 continue;
45 }
46 for (const Connection *connection : *channel->destination_nodes()) {
47 if (configuration::ConnectionDeliveryTimeIsLoggedOnNode(
48 connection, event_loop_->node())) {
49 const Node *other_node = configuration::GetNode(
50 event_loop_->configuration(), connection->name()->string_view());
51
52 VLOG(1) << "Timestamps are logged from "
53 << FlatbufferToJson(other_node);
54 timestamp_logger_channels.insert(
55 std::make_pair(finder.ForChannel(channel, connection), other_node));
56 }
57 }
Austin Schuhb06f03b2021-02-17 22:00:37 -080058 }
59
60 const size_t our_node_index =
61 configuration::GetNodeIndex(configuration_, event_loop_->node());
62
63 for (size_t channel_index = 0;
64 channel_index < configuration_->channels()->size(); ++channel_index) {
65 const Channel *const config_channel =
66 configuration_->channels()->Get(channel_index);
67 // The MakeRawFetcher method needs a channel which is in the event loop
68 // configuration() object, not the configuration_ object. Go look that up
69 // from the config.
70 const Channel *channel = aos::configuration::GetChannel(
71 event_loop_->configuration(), config_channel->name()->string_view(),
72 config_channel->type()->string_view(), "", event_loop_->node());
73 CHECK(channel != nullptr)
74 << ": Failed to look up channel "
75 << aos::configuration::CleanedChannelToString(config_channel);
76 if (!should_log(channel)) {
77 continue;
78 }
79
80 FetcherStruct fs;
81 fs.channel_index = channel_index;
82 fs.channel = channel;
83
84 const bool is_local =
85 configuration::ChannelIsSendableOnNode(channel, event_loop_->node());
86
87 const bool is_readable =
88 configuration::ChannelIsReadableOnNode(channel, event_loop_->node());
89 const bool is_logged = configuration::ChannelMessageIsLoggedOnNode(
90 channel, event_loop_->node());
91 const bool log_message = is_logged && is_readable;
92
93 bool log_delivery_times = false;
94 if (event_loop_->node() != nullptr) {
Austin Schuh72211ae2021-08-05 14:02:30 -070095 const aos::Connection *connection =
96 configuration::ConnectionToNode(channel, event_loop_->node());
97
Austin Schuhb06f03b2021-02-17 22:00:37 -080098 log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
Austin Schuh72211ae2021-08-05 14:02:30 -070099 connection, event_loop_->node());
100
101 CHECK_EQ(log_delivery_times,
102 configuration::ConnectionDeliveryTimeIsLoggedOnNode(
103 channel, event_loop_->node(), event_loop_->node()));
104
105 if (connection) {
106 fs.reliable_forwarding = (connection->time_to_live() == 0);
107 }
Austin Schuhb06f03b2021-02-17 22:00:37 -0800108 }
109
110 // Now, detect a RemoteMessage timestamp logger where we should just log the
111 // contents to a file directly.
112 const bool log_contents = timestamp_logger_channels.find(channel) !=
113 timestamp_logger_channels.end();
114
115 if (log_message || log_delivery_times || log_contents) {
116 fs.fetcher = event_loop->MakeRawFetcher(channel);
117 VLOG(1) << "Logging channel "
118 << configuration::CleanedChannelToString(channel);
119
120 if (log_delivery_times) {
121 VLOG(1) << " Delivery times";
122 fs.wants_timestamp_writer = true;
123 fs.timestamp_node_index = our_node_index;
124 }
Austin Schuhe46492f2021-07-31 19:49:41 -0700125 // Both the timestamp and data writers want data_node_index so it knows
126 // what the source node is.
127 if (log_message || log_delivery_times) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800128 if (!is_local) {
129 const Node *source_node = configuration::GetNode(
130 configuration_, channel->source_node()->string_view());
131 fs.data_node_index =
132 configuration::GetNodeIndex(configuration_, source_node);
Austin Schuhe46492f2021-07-31 19:49:41 -0700133 }
134 }
135 if (log_message) {
136 VLOG(1) << " Data";
137 fs.wants_writer = true;
138 if (!is_local) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800139 fs.log_type = LogType::kLogRemoteMessage;
140 } else {
141 fs.data_node_index = our_node_index;
142 }
143 }
144 if (log_contents) {
145 VLOG(1) << "Timestamp logger channel "
146 << configuration::CleanedChannelToString(channel);
147 fs.timestamp_node = timestamp_logger_channels.find(channel)->second;
148 fs.wants_contents_writer = true;
149 fs.contents_node_index =
150 configuration::GetNodeIndex(configuration_, fs.timestamp_node);
151 }
152 fetchers_.emplace_back(std::move(fs));
153 }
154 }
155
156 // When we are logging remote timestamps, we need to be able to translate from
157 // the channel index that the event loop uses to the channel index in the
158 // config in the log file.
159 event_loop_to_logged_channel_index_.resize(
160 event_loop->configuration()->channels()->size(), -1);
161 for (size_t event_loop_channel_index = 0;
162 event_loop_channel_index <
163 event_loop->configuration()->channels()->size();
164 ++event_loop_channel_index) {
165 const Channel *event_loop_channel =
166 event_loop->configuration()->channels()->Get(event_loop_channel_index);
167
168 const Channel *logged_channel = aos::configuration::GetChannel(
169 configuration_, event_loop_channel->name()->string_view(),
170 event_loop_channel->type()->string_view(), "",
171 configuration::GetNode(configuration_, event_loop_->node()));
172
173 if (logged_channel != nullptr) {
174 event_loop_to_logged_channel_index_[event_loop_channel_index] =
175 configuration::ChannelIndex(configuration_, logged_channel);
176 }
177 }
178}
179
180Logger::~Logger() {
181 if (log_namer_) {
182 // If we are replaying a log file, or in simulation, we want to force the
183 // last bit of data to be logged. The easiest way to deal with this is to
184 // poll everything as we go to destroy the class, ie, shut down the logger,
185 // and write it to disk.
186 StopLogging(event_loop_->monotonic_now());
187 }
188}
189
Austin Schuh6bb8a822021-03-31 23:04:39 -0700190bool Logger::RenameLogBase(std::string new_base_name) {
191 if (new_base_name == log_namer_->base_name()) {
192 return true;
193 }
194 std::string current_directory = std::string(log_namer_->base_name());
195 std::string new_directory = new_base_name;
196
197 auto current_path_split = current_directory.rfind("/");
198 auto new_path_split = new_directory.rfind("/");
199
200 CHECK(new_base_name.substr(new_path_split) ==
201 current_directory.substr(current_path_split))
202 << "Rename of file base from " << current_directory << " to "
203 << new_directory << " is not supported.";
204
205 current_directory.resize(current_path_split);
206 new_directory.resize(new_path_split);
207 DIR *dir = opendir(current_directory.c_str());
208 if (dir) {
209 closedir(dir);
210 const int result = rename(current_directory.c_str(), new_directory.c_str());
211 if (result != 0) {
212 PLOG(ERROR) << "Unable to rename " << current_directory << " to "
213 << new_directory;
214 return false;
215 }
216 } else {
217 // Handle if directory was already renamed.
218 dir = opendir(new_directory.c_str());
219 if (!dir) {
220 LOG(ERROR) << "Old directory " << current_directory
221 << " missing and new directory " << new_directory
222 << " not present.";
223 return false;
224 }
225 closedir(dir);
226 }
227
228 log_namer_->set_base_name(new_base_name);
229 Rotate();
230 return true;
231}
232
Austin Schuhb06f03b2021-02-17 22:00:37 -0800233void Logger::StartLogging(std::unique_ptr<LogNamer> log_namer,
Austin Schuh34f9e482021-03-31 22:54:18 -0700234 std::optional<UUID> log_start_uuid) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800235 CHECK(!log_namer_) << ": Already logging";
236 log_namer_ = std::move(log_namer);
237
238 std::string config_sha256;
239 if (separate_config_) {
240 flatbuffers::FlatBufferBuilder fbb;
241 flatbuffers::Offset<aos::Configuration> configuration_offset =
242 CopyFlatBuffer(configuration_, &fbb);
243 LogFileHeader::Builder log_file_header_builder(fbb);
244 log_file_header_builder.add_configuration(configuration_offset);
245 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
246 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config_header(
247 fbb.Release());
248 config_sha256 = Sha256(config_header.span());
249 LOG(INFO) << "Config sha256 of " << config_sha256;
250 log_namer_->WriteConfiguration(&config_header, config_sha256);
251 }
252
253 log_event_uuid_ = UUID::Random();
254 log_start_uuid_ = log_start_uuid;
255 VLOG(1) << "Starting logger for " << FlatbufferToJson(event_loop_->node());
256
257 // We want to do as much work as possible before the initial Fetch. Time
258 // between that and actually starting to log opens up the possibility of
259 // falling off the end of the queue during that time.
260
261 for (FetcherStruct &f : fetchers_) {
262 if (f.wants_writer) {
263 f.writer = log_namer_->MakeWriter(f.channel);
264 }
265 if (f.wants_timestamp_writer) {
266 f.timestamp_writer = log_namer_->MakeTimestampWriter(f.channel);
267 }
268 if (f.wants_contents_writer) {
269 f.contents_writer = log_namer_->MakeForwardedTimestampWriter(
270 f.channel, CHECK_NOTNULL(f.timestamp_node));
271 }
272 }
273
Austin Schuh73340842021-07-30 22:32:06 -0700274 log_namer_->SetHeaderTemplate(MakeHeader(config_sha256));
Austin Schuhb06f03b2021-02-17 22:00:37 -0800275
Austin Schuha42ee962021-03-31 22:49:30 -0700276 const aos::monotonic_clock::time_point beginning_time =
277 event_loop_->monotonic_now();
278
Austin Schuhb06f03b2021-02-17 22:00:37 -0800279 // Grab data from each channel right before we declare the log file started
280 // so we can capture the latest message on each channel. This lets us have
281 // non periodic messages with configuration that now get logged.
282 for (FetcherStruct &f : fetchers_) {
283 const auto start = event_loop_->monotonic_now();
284 const bool got_new = f.fetcher->Fetch();
285 const auto end = event_loop_->monotonic_now();
286 RecordFetchResult(start, end, got_new, &f);
287
288 // If there is a message, we want to write it.
289 f.written = f.fetcher->context().data == nullptr;
290 }
291
292 // Clear out any old timestamps in case we are re-starting logging.
Austin Schuh572924a2021-07-30 22:32:12 -0700293 for (size_t i = 0; i < configuration::NodesCount(configuration_); ++i) {
Austin Schuh73340842021-07-30 22:32:06 -0700294 log_namer_->SetStartTimes(
295 i, monotonic_clock::min_time, realtime_clock::min_time,
296 monotonic_clock::min_time, realtime_clock::min_time);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800297 }
298
Austin Schuha42ee962021-03-31 22:49:30 -0700299 const aos::monotonic_clock::time_point fetch_time =
300 event_loop_->monotonic_now();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800301 WriteHeader();
Austin Schuha42ee962021-03-31 22:49:30 -0700302 const aos::monotonic_clock::time_point header_time =
303 event_loop_->monotonic_now();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800304
305 LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node())
Austin Schuha42ee962021-03-31 22:49:30 -0700306 << " start_time " << last_synchronized_time_ << ", took "
307 << chrono::duration<double>(fetch_time - beginning_time).count()
308 << " to fetch, "
309 << chrono::duration<double>(header_time - fetch_time).count()
310 << " to write headers, boot uuid " << event_loop_->boot_uuid();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800311
312 // Force logging up until the start of the log file now, so the messages at
313 // the start are always ordered before the rest of the messages.
314 // Note: this ship may have already sailed, but we don't have to make it
315 // worse.
316 // TODO(austin): Test...
Austin Schuh855f8932021-03-19 22:01:32 -0700317 //
318 // This is safe to call here since we have set last_synchronized_time_ as the
319 // same time as in the header, and all the data before it should be logged
320 // without ordering concerns.
Austin Schuhb06f03b2021-02-17 22:00:37 -0800321 LogUntil(last_synchronized_time_);
322
323 timer_handler_->Setup(event_loop_->monotonic_now() + polling_period_,
324 polling_period_);
325}
326
327std::unique_ptr<LogNamer> Logger::StopLogging(
328 aos::monotonic_clock::time_point end_time) {
329 CHECK(log_namer_) << ": Not logging right now";
330
331 if (end_time != aos::monotonic_clock::min_time) {
Austin Schuh30586902021-03-30 22:54:08 -0700332 // Folks like to use the on_logged_period_ callback to trigger stop and
333 // start events. We can't have those then recurse and try to stop again.
334 // Rather than making everything reentrant, let's just instead block the
335 // callback here.
336 DoLogData(end_time, false);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800337 }
338 timer_handler_->Disable();
339
340 for (FetcherStruct &f : fetchers_) {
341 f.writer = nullptr;
342 f.timestamp_writer = nullptr;
343 f.contents_writer = nullptr;
344 }
Austin Schuhb06f03b2021-02-17 22:00:37 -0800345
346 log_event_uuid_ = UUID::Zero();
Austin Schuh34f9e482021-03-31 22:54:18 -0700347 log_start_uuid_ = std::nullopt;
Austin Schuhb06f03b2021-02-17 22:00:37 -0800348
349 return std::move(log_namer_);
350}
351
352void Logger::WriteHeader() {
353 if (configuration::MultiNode(configuration_)) {
354 server_statistics_fetcher_.Fetch();
355 }
356
Austin Schuh73340842021-07-30 22:32:06 -0700357 const aos::monotonic_clock::time_point monotonic_start_time =
Austin Schuhb06f03b2021-02-17 22:00:37 -0800358 event_loop_->monotonic_now();
Austin Schuh73340842021-07-30 22:32:06 -0700359 const aos::realtime_clock::time_point realtime_start_time =
Austin Schuhb06f03b2021-02-17 22:00:37 -0800360 event_loop_->realtime_now();
361
362 // We need to pick a point in time to declare the log file "started". This
363 // starts here. It needs to be after everything is fetched so that the
364 // fetchers are all pointed at the most recent message before the start
365 // time.
366 last_synchronized_time_ = monotonic_start_time;
367
368 for (const Node *node : log_namer_->nodes()) {
369 const int node_index = configuration::GetNodeIndex(configuration_, node);
370 MaybeUpdateTimestamp(node, node_index, monotonic_start_time,
371 realtime_start_time);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800372 }
373}
374
Austin Schuhb06f03b2021-02-17 22:00:37 -0800375void Logger::WriteMissingTimestamps() {
376 if (configuration::MultiNode(configuration_)) {
377 server_statistics_fetcher_.Fetch();
378 } else {
379 return;
380 }
381
382 if (server_statistics_fetcher_.get() == nullptr) {
383 return;
384 }
385
386 for (const Node *node : log_namer_->nodes()) {
387 const int node_index = configuration::GetNodeIndex(configuration_, node);
388 if (MaybeUpdateTimestamp(
389 node, node_index,
390 server_statistics_fetcher_.context().monotonic_event_time,
391 server_statistics_fetcher_.context().realtime_event_time)) {
Austin Schuh73340842021-07-30 22:32:06 -0700392 VLOG(1) << "Rotating because timestamps changed";
393 log_namer_->Rotate(node);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800394 }
395 }
396}
397
Austin Schuhb06f03b2021-02-17 22:00:37 -0800398bool Logger::MaybeUpdateTimestamp(
399 const Node *node, int node_index,
400 aos::monotonic_clock::time_point monotonic_start_time,
401 aos::realtime_clock::time_point realtime_start_time) {
402 // Bail early if the start times are already set.
Austin Schuh73340842021-07-30 22:32:06 -0700403 if (log_namer_->monotonic_start_time(node_index) !=
Austin Schuhb06f03b2021-02-17 22:00:37 -0800404 monotonic_clock::min_time) {
405 return false;
406 }
407 if (event_loop_->node() == node ||
408 !configuration::MultiNode(configuration_)) {
409 // There are no offsets to compute for ourself, so always succeed.
Austin Schuh73340842021-07-30 22:32:06 -0700410 log_namer_->SetStartTimes(node_index, monotonic_start_time,
411 realtime_start_time, monotonic_start_time,
412 realtime_start_time);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800413 return true;
414 } else if (server_statistics_fetcher_.get() != nullptr) {
415 // We must be a remote node now. Look for the connection and see if it is
416 // connected.
417
418 for (const message_bridge::ServerConnection *connection :
419 *server_statistics_fetcher_->connections()) {
420 if (connection->node()->name()->string_view() !=
421 node->name()->string_view()) {
422 continue;
423 }
424
425 if (connection->state() != message_bridge::State::CONNECTED) {
426 VLOG(1) << node->name()->string_view()
427 << " is not connected, can't start it yet.";
428 break;
429 }
430
Austin Schuhb06f03b2021-02-17 22:00:37 -0800431 if (!connection->has_monotonic_offset()) {
432 VLOG(1) << "Missing monotonic offset for setting start time for node "
433 << aos::FlatbufferToJson(node);
434 break;
435 }
436
437 // Found it and it is connected. Compensate and go.
Austin Schuh73340842021-07-30 22:32:06 -0700438 log_namer_->SetStartTimes(
439 node_index,
440 monotonic_start_time +
441 std::chrono::nanoseconds(connection->monotonic_offset()),
442 realtime_start_time, monotonic_start_time, realtime_start_time);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800443 return true;
444 }
445 }
446 return false;
447}
448
449aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> Logger::MakeHeader(
Austin Schuh73340842021-07-30 22:32:06 -0700450 std::string_view config_sha256) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800451 flatbuffers::FlatBufferBuilder fbb;
452 fbb.ForceDefaults(true);
453
454 flatbuffers::Offset<aos::Configuration> configuration_offset;
455 if (!separate_config_) {
456 configuration_offset = CopyFlatBuffer(configuration_, &fbb);
457 } else {
458 CHECK(!config_sha256.empty());
459 }
460
461 const flatbuffers::Offset<flatbuffers::String> name_offset =
462 fbb.CreateString(name_);
463
464 CHECK(log_event_uuid_ != UUID::Zero());
465 const flatbuffers::Offset<flatbuffers::String> log_event_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800466 log_event_uuid_.PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800467
468 const flatbuffers::Offset<flatbuffers::String> logger_instance_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800469 logger_instance_uuid_.PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800470
471 flatbuffers::Offset<flatbuffers::String> log_start_uuid_offset;
Austin Schuh34f9e482021-03-31 22:54:18 -0700472 if (log_start_uuid_) {
473 log_start_uuid_offset = fbb.CreateString(log_start_uuid_->ToString());
Austin Schuhb06f03b2021-02-17 22:00:37 -0800474 }
475
476 flatbuffers::Offset<flatbuffers::String> config_sha256_offset;
477 if (!config_sha256.empty()) {
478 config_sha256_offset = fbb.CreateString(config_sha256);
479 }
480
481 const flatbuffers::Offset<flatbuffers::String> logger_node_boot_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800482 event_loop_->boot_uuid().PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800483
Austin Schuhb06f03b2021-02-17 22:00:37 -0800484 flatbuffers::Offset<Node> logger_node_offset;
485
486 if (configuration::MultiNode(configuration_)) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800487 logger_node_offset = RecursiveCopyFlatBuffer(event_loop_->node(), &fbb);
488 }
489
490 aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
491
492 log_file_header_builder.add_name(name_offset);
493
494 // Only add the node if we are running in a multinode configuration.
Austin Schuh73340842021-07-30 22:32:06 -0700495 if (configuration::MultiNode(configuration_)) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800496 log_file_header_builder.add_logger_node(logger_node_offset);
497 }
498
499 if (!configuration_offset.IsNull()) {
500 log_file_header_builder.add_configuration(configuration_offset);
501 }
502 // The worst case theoretical out of order is the polling period times 2.
503 // One message could get logged right after the boundary, but be for right
504 // before the next boundary. And the reverse could happen for another
505 // message. Report back 3x to be extra safe, and because the cost isn't
506 // huge on the read side.
507 log_file_header_builder.add_max_out_of_order_duration(
508 std::chrono::nanoseconds(3 * polling_period_).count());
509
Austin Schuhb06f03b2021-02-17 22:00:37 -0800510 log_file_header_builder.add_log_event_uuid(log_event_uuid_offset);
511 log_file_header_builder.add_logger_instance_uuid(logger_instance_uuid_offset);
512 if (!log_start_uuid_offset.IsNull()) {
513 log_file_header_builder.add_log_start_uuid(log_start_uuid_offset);
514 }
515 log_file_header_builder.add_logger_node_boot_uuid(
516 logger_node_boot_uuid_offset);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800517
518 if (!config_sha256_offset.IsNull()) {
519 log_file_header_builder.add_configuration_sha256(config_sha256_offset);
520 }
521
522 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
523 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> result(
524 fbb.Release());
525
526 CHECK(result.Verify()) << ": Built a corrupted header.";
527
528 return result;
529}
530
531void Logger::ResetStatisics() {
532 max_message_fetch_time_ = std::chrono::nanoseconds::zero();
533 max_message_fetch_time_channel_ = -1;
534 max_message_fetch_time_size_ = -1;
535 total_message_fetch_time_ = std::chrono::nanoseconds::zero();
536 total_message_fetch_count_ = 0;
537 total_message_fetch_bytes_ = 0;
538 total_nop_fetch_time_ = std::chrono::nanoseconds::zero();
539 total_nop_fetch_count_ = 0;
540 max_copy_time_ = std::chrono::nanoseconds::zero();
541 max_copy_time_channel_ = -1;
542 max_copy_time_size_ = -1;
543 total_copy_time_ = std::chrono::nanoseconds::zero();
544 total_copy_count_ = 0;
545 total_copy_bytes_ = 0;
546}
547
548void Logger::Rotate() {
549 for (const Node *node : log_namer_->nodes()) {
Austin Schuh73340842021-07-30 22:32:06 -0700550 log_namer_->Rotate(node);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800551 }
552}
553
554void Logger::LogUntil(monotonic_clock::time_point t) {
555 // Grab the latest ServerStatistics message. This will always have the
556 // oppertunity to be >= to the current time, so it will always represent any
557 // reboots which may have happened.
558 WriteMissingTimestamps();
559
Austin Schuhcdd90272021-03-15 12:46:16 -0700560 int our_node_index = aos::configuration::GetNodeIndex(
561 event_loop_->configuration(), event_loop_->node());
562
Austin Schuhb06f03b2021-02-17 22:00:37 -0800563 // Write each channel to disk, one at a time.
564 for (FetcherStruct &f : fetchers_) {
565 while (true) {
566 if (f.written) {
567 const auto start = event_loop_->monotonic_now();
568 const bool got_new = f.fetcher->FetchNext();
569 const auto end = event_loop_->monotonic_now();
570 RecordFetchResult(start, end, got_new, &f);
571 if (!got_new) {
572 VLOG(2) << "No new data on "
573 << configuration::CleanedChannelToString(
574 f.fetcher->channel());
575 break;
576 }
577 f.written = false;
578 }
579
580 // TODO(james): Write tests to exercise this logic.
581 if (f.fetcher->context().monotonic_event_time >= t) {
582 break;
583 }
584 if (f.writer != nullptr) {
Austin Schuh572924a2021-07-30 22:32:12 -0700585 const UUID source_node_boot_uuid =
586 our_node_index != f.data_node_index
587 ? f.fetcher->context().source_boot_uuid
588 : event_loop_->boot_uuid();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800589 // Write!
590 const auto start = event_loop_->monotonic_now();
591 flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
592 max_header_size_);
593 fbb.ForceDefaults(true);
594
595 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
596 f.channel_index, f.log_type));
597 const auto end = event_loop_->monotonic_now();
598 RecordCreateMessageTime(start, end, &f);
599
600 VLOG(2) << "Writing data as node "
601 << FlatbufferToJson(event_loop_->node()) << " for channel "
602 << configuration::CleanedChannelToString(f.fetcher->channel())
603 << " to " << f.writer->filename() << " data "
604 << FlatbufferToJson(
605 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
606 fbb.GetBufferPointer()));
607
608 max_header_size_ = std::max(max_header_size_,
609 fbb.GetSize() - f.fetcher->context().size);
Austin Schuhe46492f2021-07-31 19:49:41 -0700610 f.writer->QueueMessage(&fbb, source_node_boot_uuid, end);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800611 }
612
613 if (f.timestamp_writer != nullptr) {
614 // And now handle timestamps.
615 const auto start = event_loop_->monotonic_now();
616 flatbuffers::FlatBufferBuilder fbb;
617 fbb.ForceDefaults(true);
618
619 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
620 f.channel_index,
621 LogType::kLogDeliveryTimeOnly));
622 const auto end = event_loop_->monotonic_now();
623 RecordCreateMessageTime(start, end, &f);
624
625 VLOG(2) << "Writing timestamps as node "
626 << FlatbufferToJson(event_loop_->node()) << " for channel "
627 << configuration::CleanedChannelToString(f.fetcher->channel())
628 << " to " << f.timestamp_writer->filename() << " timestamp "
629 << FlatbufferToJson(
630 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
631 fbb.GetBufferPointer()));
632
Austin Schuhe46492f2021-07-31 19:49:41 -0700633 // Tell our writer that we know something about the remote boot.
Austin Schuh72211ae2021-08-05 14:02:30 -0700634 f.timestamp_writer->UpdateRemote(
635 f.data_node_index, f.fetcher->context().source_boot_uuid,
636 f.fetcher->context().monotonic_remote_time,
637 f.fetcher->context().monotonic_event_time, f.reliable_forwarding);
Austin Schuhe46492f2021-07-31 19:49:41 -0700638 f.timestamp_writer->QueueMessage(&fbb, event_loop_->boot_uuid(), end);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800639 }
640
641 if (f.contents_writer != nullptr) {
642 const auto start = event_loop_->monotonic_now();
643 // And now handle the special message contents channel. Copy the
644 // message into a FlatBufferBuilder and save it to disk.
645 // TODO(austin): We can be more efficient here when we start to
646 // care...
647 flatbuffers::FlatBufferBuilder fbb;
648 fbb.ForceDefaults(true);
649
650 const RemoteMessage *msg =
651 flatbuffers::GetRoot<RemoteMessage>(f.fetcher->context().data);
652
653 CHECK(msg->has_boot_uuid()) << ": " << aos::FlatbufferToJson(msg);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800654
655 logger::MessageHeader::Builder message_header_builder(fbb);
656
657 // TODO(austin): This needs to check the channel_index and confirm
658 // that it should be logged before squirreling away the timestamp to
659 // disk. We don't want to log irrelevant timestamps.
660
661 // Note: this must match the same order as MessageBridgeServer and
662 // PackMessage. We want identical headers to have identical
663 // on-the-wire formats to make comparing them easier.
664
665 // Translate from the channel index that the event loop uses to the
666 // channel index in the log file.
667 message_header_builder.add_channel_index(
668 event_loop_to_logged_channel_index_[msg->channel_index()]);
669
670 message_header_builder.add_queue_index(msg->queue_index());
671 message_header_builder.add_monotonic_sent_time(
672 msg->monotonic_sent_time());
673 message_header_builder.add_realtime_sent_time(
674 msg->realtime_sent_time());
675
676 message_header_builder.add_monotonic_remote_time(
677 msg->monotonic_remote_time());
678 message_header_builder.add_realtime_remote_time(
679 msg->realtime_remote_time());
680 message_header_builder.add_remote_queue_index(
681 msg->remote_queue_index());
682
683 message_header_builder.add_monotonic_timestamp_time(
684 f.fetcher->context()
685 .monotonic_event_time.time_since_epoch()
686 .count());
687
688 fbb.FinishSizePrefixed(message_header_builder.Finish());
689 const auto end = event_loop_->monotonic_now();
690 RecordCreateMessageTime(start, end, &f);
691
Austin Schuhe46492f2021-07-31 19:49:41 -0700692 f.contents_writer->QueueMessage(
Austin Schuh572924a2021-07-30 22:32:12 -0700693 &fbb, UUID::FromVector(msg->boot_uuid()), end);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800694 }
695
696 f.written = true;
697 }
698 }
699 last_synchronized_time_ = t;
700}
701
Austin Schuh30586902021-03-30 22:54:08 -0700702void Logger::DoLogData(const monotonic_clock::time_point end_time,
703 bool run_on_logged) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800704 // We want to guarantee that messages aren't out of order by more than
705 // max_out_of_order_duration. To do this, we need sync points. Every write
706 // cycle should be a sync point.
707
708 do {
709 // Move the sync point up by at most polling_period. This forces one sync
710 // per iteration, even if it is small.
711 LogUntil(std::min(last_synchronized_time_ + polling_period_, end_time));
712
Austin Schuh30586902021-03-30 22:54:08 -0700713 if (run_on_logged) {
714 on_logged_period_();
715 }
Austin Schuhb06f03b2021-02-17 22:00:37 -0800716
717 // If we missed cycles, we could be pretty far behind. Spin until we are
718 // caught up.
719 } while (last_synchronized_time_ + polling_period_ < end_time);
720}
721
722void Logger::RecordFetchResult(aos::monotonic_clock::time_point start,
723 aos::monotonic_clock::time_point end,
724 bool got_new, FetcherStruct *fetcher) {
725 const auto duration = end - start;
726 if (!got_new) {
727 ++total_nop_fetch_count_;
728 total_nop_fetch_time_ += duration;
729 return;
730 }
731 ++total_message_fetch_count_;
732 total_message_fetch_bytes_ += fetcher->fetcher->context().size;
733 total_message_fetch_time_ += duration;
734 if (duration > max_message_fetch_time_) {
735 max_message_fetch_time_ = duration;
736 max_message_fetch_time_channel_ = fetcher->channel_index;
737 max_message_fetch_time_size_ = fetcher->fetcher->context().size;
738 }
739}
740
741void Logger::RecordCreateMessageTime(aos::monotonic_clock::time_point start,
742 aos::monotonic_clock::time_point end,
743 FetcherStruct *fetcher) {
744 const auto duration = end - start;
745 total_copy_time_ += duration;
746 ++total_copy_count_;
747 total_copy_bytes_ += fetcher->fetcher->context().size;
748 if (duration > max_copy_time_) {
749 max_copy_time_ = duration;
750 max_copy_time_channel_ = fetcher->channel_index;
751 max_copy_time_size_ = fetcher->fetcher->context().size;
752 }
753}
754
755} // namespace logger
756} // namespace aos