blob: 580a12606cf48bbe4fae3bc82a162582aeb974d0 [file] [log] [blame]
Austin Schuhb06f03b2021-02-17 22:00:37 -08001#include "aos/events/logging/log_writer.h"
2
Austin Schuh6bb8a822021-03-31 23:04:39 -07003#include <dirent.h>
4
Austin Schuhb06f03b2021-02-17 22:00:37 -08005#include <functional>
6#include <map>
7#include <vector>
8
9#include "aos/configuration.h"
10#include "aos/events/event_loop.h"
11#include "aos/network/message_bridge_server_generated.h"
12#include "aos/network/team_number.h"
Austin Schuh61e973f2021-02-21 21:43:56 -080013#include "aos/network/timestamp_channel.h"
Austin Schuhb06f03b2021-02-17 22:00:37 -080014
15namespace aos {
16namespace logger {
17namespace {
18using message_bridge::RemoteMessage;
Austin Schuhbd06ae42021-03-31 22:48:21 -070019namespace chrono = std::chrono;
Austin Schuhb06f03b2021-02-17 22:00:37 -080020} // namespace
21
22Logger::Logger(EventLoop *event_loop, const Configuration *configuration,
23 std::function<bool(const Channel *)> should_log)
24 : event_loop_(event_loop),
25 configuration_(configuration),
26 name_(network::GetHostname()),
27 timer_handler_(event_loop_->AddTimer(
Austin Schuh30586902021-03-30 22:54:08 -070028 [this]() { DoLogData(event_loop_->monotonic_now(), true); })),
Austin Schuhb06f03b2021-02-17 22:00:37 -080029 server_statistics_fetcher_(
30 configuration::MultiNode(event_loop_->configuration())
31 ? event_loop_->MakeFetcher<message_bridge::ServerStatistics>(
32 "/aos")
33 : aos::Fetcher<message_bridge::ServerStatistics>()) {
34 VLOG(1) << "Creating logger for " << FlatbufferToJson(event_loop_->node());
35
Austin Schuhb06f03b2021-02-17 22:00:37 -080036 std::map<const Channel *, const Node *> timestamp_logger_channels;
37
Austin Schuh61e973f2021-02-21 21:43:56 -080038 message_bridge::ChannelTimestampFinder finder(event_loop_);
39 for (const Channel *channel : *event_loop_->configuration()->channels()) {
40 if (!configuration::ChannelIsSendableOnNode(channel, event_loop_->node())) {
Austin Schuhb06f03b2021-02-17 22:00:37 -080041 continue;
42 }
Austin Schuh61e973f2021-02-21 21:43:56 -080043 if (!channel->has_destination_nodes()) {
44 continue;
45 }
46 for (const Connection *connection : *channel->destination_nodes()) {
47 if (configuration::ConnectionDeliveryTimeIsLoggedOnNode(
48 connection, event_loop_->node())) {
49 const Node *other_node = configuration::GetNode(
50 event_loop_->configuration(), connection->name()->string_view());
51
52 VLOG(1) << "Timestamps are logged from "
53 << FlatbufferToJson(other_node);
54 timestamp_logger_channels.insert(
55 std::make_pair(finder.ForChannel(channel, connection), other_node));
56 }
57 }
Austin Schuhb06f03b2021-02-17 22:00:37 -080058 }
59
60 const size_t our_node_index =
61 configuration::GetNodeIndex(configuration_, event_loop_->node());
62
63 for (size_t channel_index = 0;
64 channel_index < configuration_->channels()->size(); ++channel_index) {
65 const Channel *const config_channel =
66 configuration_->channels()->Get(channel_index);
67 // The MakeRawFetcher method needs a channel which is in the event loop
68 // configuration() object, not the configuration_ object. Go look that up
69 // from the config.
70 const Channel *channel = aos::configuration::GetChannel(
71 event_loop_->configuration(), config_channel->name()->string_view(),
72 config_channel->type()->string_view(), "", event_loop_->node());
73 CHECK(channel != nullptr)
74 << ": Failed to look up channel "
75 << aos::configuration::CleanedChannelToString(config_channel);
76 if (!should_log(channel)) {
77 continue;
78 }
79
80 FetcherStruct fs;
81 fs.channel_index = channel_index;
82 fs.channel = channel;
83
84 const bool is_local =
85 configuration::ChannelIsSendableOnNode(channel, event_loop_->node());
86
87 const bool is_readable =
88 configuration::ChannelIsReadableOnNode(channel, event_loop_->node());
89 const bool is_logged = configuration::ChannelMessageIsLoggedOnNode(
90 channel, event_loop_->node());
91 const bool log_message = is_logged && is_readable;
92
93 bool log_delivery_times = false;
94 if (event_loop_->node() != nullptr) {
95 log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
96 channel, event_loop_->node(), event_loop_->node());
97 }
98
99 // Now, detect a RemoteMessage timestamp logger where we should just log the
100 // contents to a file directly.
101 const bool log_contents = timestamp_logger_channels.find(channel) !=
102 timestamp_logger_channels.end();
103
104 if (log_message || log_delivery_times || log_contents) {
105 fs.fetcher = event_loop->MakeRawFetcher(channel);
106 VLOG(1) << "Logging channel "
107 << configuration::CleanedChannelToString(channel);
108
109 if (log_delivery_times) {
110 VLOG(1) << " Delivery times";
111 fs.wants_timestamp_writer = true;
112 fs.timestamp_node_index = our_node_index;
113 }
114 if (log_message) {
115 VLOG(1) << " Data";
116 fs.wants_writer = true;
117 if (!is_local) {
118 const Node *source_node = configuration::GetNode(
119 configuration_, channel->source_node()->string_view());
120 fs.data_node_index =
121 configuration::GetNodeIndex(configuration_, source_node);
122 fs.log_type = LogType::kLogRemoteMessage;
123 } else {
124 fs.data_node_index = our_node_index;
125 }
126 }
127 if (log_contents) {
128 VLOG(1) << "Timestamp logger channel "
129 << configuration::CleanedChannelToString(channel);
130 fs.timestamp_node = timestamp_logger_channels.find(channel)->second;
131 fs.wants_contents_writer = true;
132 fs.contents_node_index =
133 configuration::GetNodeIndex(configuration_, fs.timestamp_node);
134 }
135 fetchers_.emplace_back(std::move(fs));
136 }
137 }
138
139 // When we are logging remote timestamps, we need to be able to translate from
140 // the channel index that the event loop uses to the channel index in the
141 // config in the log file.
142 event_loop_to_logged_channel_index_.resize(
143 event_loop->configuration()->channels()->size(), -1);
144 for (size_t event_loop_channel_index = 0;
145 event_loop_channel_index <
146 event_loop->configuration()->channels()->size();
147 ++event_loop_channel_index) {
148 const Channel *event_loop_channel =
149 event_loop->configuration()->channels()->Get(event_loop_channel_index);
150
151 const Channel *logged_channel = aos::configuration::GetChannel(
152 configuration_, event_loop_channel->name()->string_view(),
153 event_loop_channel->type()->string_view(), "",
154 configuration::GetNode(configuration_, event_loop_->node()));
155
156 if (logged_channel != nullptr) {
157 event_loop_to_logged_channel_index_[event_loop_channel_index] =
158 configuration::ChannelIndex(configuration_, logged_channel);
159 }
160 }
161}
162
163Logger::~Logger() {
164 if (log_namer_) {
165 // If we are replaying a log file, or in simulation, we want to force the
166 // last bit of data to be logged. The easiest way to deal with this is to
167 // poll everything as we go to destroy the class, ie, shut down the logger,
168 // and write it to disk.
169 StopLogging(event_loop_->monotonic_now());
170 }
171}
172
Austin Schuh6bb8a822021-03-31 23:04:39 -0700173bool Logger::RenameLogBase(std::string new_base_name) {
174 if (new_base_name == log_namer_->base_name()) {
175 return true;
176 }
177 std::string current_directory = std::string(log_namer_->base_name());
178 std::string new_directory = new_base_name;
179
180 auto current_path_split = current_directory.rfind("/");
181 auto new_path_split = new_directory.rfind("/");
182
183 CHECK(new_base_name.substr(new_path_split) ==
184 current_directory.substr(current_path_split))
185 << "Rename of file base from " << current_directory << " to "
186 << new_directory << " is not supported.";
187
188 current_directory.resize(current_path_split);
189 new_directory.resize(new_path_split);
190 DIR *dir = opendir(current_directory.c_str());
191 if (dir) {
192 closedir(dir);
193 const int result = rename(current_directory.c_str(), new_directory.c_str());
194 if (result != 0) {
195 PLOG(ERROR) << "Unable to rename " << current_directory << " to "
196 << new_directory;
197 return false;
198 }
199 } else {
200 // Handle if directory was already renamed.
201 dir = opendir(new_directory.c_str());
202 if (!dir) {
203 LOG(ERROR) << "Old directory " << current_directory
204 << " missing and new directory " << new_directory
205 << " not present.";
206 return false;
207 }
208 closedir(dir);
209 }
210
211 log_namer_->set_base_name(new_base_name);
212 Rotate();
213 return true;
214}
215
Austin Schuhb06f03b2021-02-17 22:00:37 -0800216void Logger::StartLogging(std::unique_ptr<LogNamer> log_namer,
Austin Schuh34f9e482021-03-31 22:54:18 -0700217 std::optional<UUID> log_start_uuid) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800218 CHECK(!log_namer_) << ": Already logging";
219 log_namer_ = std::move(log_namer);
220
221 std::string config_sha256;
222 if (separate_config_) {
223 flatbuffers::FlatBufferBuilder fbb;
224 flatbuffers::Offset<aos::Configuration> configuration_offset =
225 CopyFlatBuffer(configuration_, &fbb);
226 LogFileHeader::Builder log_file_header_builder(fbb);
227 log_file_header_builder.add_configuration(configuration_offset);
228 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
229 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config_header(
230 fbb.Release());
231 config_sha256 = Sha256(config_header.span());
232 LOG(INFO) << "Config sha256 of " << config_sha256;
233 log_namer_->WriteConfiguration(&config_header, config_sha256);
234 }
235
236 log_event_uuid_ = UUID::Random();
237 log_start_uuid_ = log_start_uuid;
238 VLOG(1) << "Starting logger for " << FlatbufferToJson(event_loop_->node());
239
240 // We want to do as much work as possible before the initial Fetch. Time
241 // between that and actually starting to log opens up the possibility of
242 // falling off the end of the queue during that time.
243
244 for (FetcherStruct &f : fetchers_) {
245 if (f.wants_writer) {
246 f.writer = log_namer_->MakeWriter(f.channel);
247 }
248 if (f.wants_timestamp_writer) {
249 f.timestamp_writer = log_namer_->MakeTimestampWriter(f.channel);
250 }
251 if (f.wants_contents_writer) {
252 f.contents_writer = log_namer_->MakeForwardedTimestampWriter(
253 f.channel, CHECK_NOTNULL(f.timestamp_node));
254 }
255 }
256
Austin Schuh73340842021-07-30 22:32:06 -0700257 log_namer_->SetHeaderTemplate(MakeHeader(config_sha256));
Austin Schuhb06f03b2021-02-17 22:00:37 -0800258
Austin Schuha42ee962021-03-31 22:49:30 -0700259 const aos::monotonic_clock::time_point beginning_time =
260 event_loop_->monotonic_now();
261
Austin Schuhb06f03b2021-02-17 22:00:37 -0800262 // Grab data from each channel right before we declare the log file started
263 // so we can capture the latest message on each channel. This lets us have
264 // non periodic messages with configuration that now get logged.
265 for (FetcherStruct &f : fetchers_) {
266 const auto start = event_loop_->monotonic_now();
267 const bool got_new = f.fetcher->Fetch();
268 const auto end = event_loop_->monotonic_now();
269 RecordFetchResult(start, end, got_new, &f);
270
271 // If there is a message, we want to write it.
272 f.written = f.fetcher->context().data == nullptr;
273 }
274
275 // Clear out any old timestamps in case we are re-starting logging.
Austin Schuh572924a2021-07-30 22:32:12 -0700276 for (size_t i = 0; i < configuration::NodesCount(configuration_); ++i) {
Austin Schuh73340842021-07-30 22:32:06 -0700277 log_namer_->SetStartTimes(
278 i, monotonic_clock::min_time, realtime_clock::min_time,
279 monotonic_clock::min_time, realtime_clock::min_time);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800280 }
281
Austin Schuha42ee962021-03-31 22:49:30 -0700282 const aos::monotonic_clock::time_point fetch_time =
283 event_loop_->monotonic_now();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800284 WriteHeader();
Austin Schuha42ee962021-03-31 22:49:30 -0700285 const aos::monotonic_clock::time_point header_time =
286 event_loop_->monotonic_now();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800287
288 LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node())
Austin Schuha42ee962021-03-31 22:49:30 -0700289 << " start_time " << last_synchronized_time_ << ", took "
290 << chrono::duration<double>(fetch_time - beginning_time).count()
291 << " to fetch, "
292 << chrono::duration<double>(header_time - fetch_time).count()
293 << " to write headers, boot uuid " << event_loop_->boot_uuid();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800294
295 // Force logging up until the start of the log file now, so the messages at
296 // the start are always ordered before the rest of the messages.
297 // Note: this ship may have already sailed, but we don't have to make it
298 // worse.
299 // TODO(austin): Test...
Austin Schuh855f8932021-03-19 22:01:32 -0700300 //
301 // This is safe to call here since we have set last_synchronized_time_ as the
302 // same time as in the header, and all the data before it should be logged
303 // without ordering concerns.
Austin Schuhb06f03b2021-02-17 22:00:37 -0800304 LogUntil(last_synchronized_time_);
305
306 timer_handler_->Setup(event_loop_->monotonic_now() + polling_period_,
307 polling_period_);
308}
309
310std::unique_ptr<LogNamer> Logger::StopLogging(
311 aos::monotonic_clock::time_point end_time) {
312 CHECK(log_namer_) << ": Not logging right now";
313
314 if (end_time != aos::monotonic_clock::min_time) {
Austin Schuh30586902021-03-30 22:54:08 -0700315 // Folks like to use the on_logged_period_ callback to trigger stop and
316 // start events. We can't have those then recurse and try to stop again.
317 // Rather than making everything reentrant, let's just instead block the
318 // callback here.
319 DoLogData(end_time, false);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800320 }
321 timer_handler_->Disable();
322
323 for (FetcherStruct &f : fetchers_) {
324 f.writer = nullptr;
325 f.timestamp_writer = nullptr;
326 f.contents_writer = nullptr;
327 }
Austin Schuhb06f03b2021-02-17 22:00:37 -0800328
329 log_event_uuid_ = UUID::Zero();
Austin Schuh34f9e482021-03-31 22:54:18 -0700330 log_start_uuid_ = std::nullopt;
Austin Schuhb06f03b2021-02-17 22:00:37 -0800331
332 return std::move(log_namer_);
333}
334
335void Logger::WriteHeader() {
336 if (configuration::MultiNode(configuration_)) {
337 server_statistics_fetcher_.Fetch();
338 }
339
Austin Schuh73340842021-07-30 22:32:06 -0700340 const aos::monotonic_clock::time_point monotonic_start_time =
Austin Schuhb06f03b2021-02-17 22:00:37 -0800341 event_loop_->monotonic_now();
Austin Schuh73340842021-07-30 22:32:06 -0700342 const aos::realtime_clock::time_point realtime_start_time =
Austin Schuhb06f03b2021-02-17 22:00:37 -0800343 event_loop_->realtime_now();
344
345 // We need to pick a point in time to declare the log file "started". This
346 // starts here. It needs to be after everything is fetched so that the
347 // fetchers are all pointed at the most recent message before the start
348 // time.
349 last_synchronized_time_ = monotonic_start_time;
350
351 for (const Node *node : log_namer_->nodes()) {
352 const int node_index = configuration::GetNodeIndex(configuration_, node);
353 MaybeUpdateTimestamp(node, node_index, monotonic_start_time,
354 realtime_start_time);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800355 }
356}
357
Austin Schuhb06f03b2021-02-17 22:00:37 -0800358void Logger::WriteMissingTimestamps() {
359 if (configuration::MultiNode(configuration_)) {
360 server_statistics_fetcher_.Fetch();
361 } else {
362 return;
363 }
364
365 if (server_statistics_fetcher_.get() == nullptr) {
366 return;
367 }
368
369 for (const Node *node : log_namer_->nodes()) {
370 const int node_index = configuration::GetNodeIndex(configuration_, node);
371 if (MaybeUpdateTimestamp(
372 node, node_index,
373 server_statistics_fetcher_.context().monotonic_event_time,
374 server_statistics_fetcher_.context().realtime_event_time)) {
Austin Schuh73340842021-07-30 22:32:06 -0700375 VLOG(1) << "Rotating because timestamps changed";
376 log_namer_->Rotate(node);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800377 }
378 }
379}
380
Austin Schuhb06f03b2021-02-17 22:00:37 -0800381bool Logger::MaybeUpdateTimestamp(
382 const Node *node, int node_index,
383 aos::monotonic_clock::time_point monotonic_start_time,
384 aos::realtime_clock::time_point realtime_start_time) {
385 // Bail early if the start times are already set.
Austin Schuh73340842021-07-30 22:32:06 -0700386 if (log_namer_->monotonic_start_time(node_index) !=
Austin Schuhb06f03b2021-02-17 22:00:37 -0800387 monotonic_clock::min_time) {
388 return false;
389 }
390 if (event_loop_->node() == node ||
391 !configuration::MultiNode(configuration_)) {
392 // There are no offsets to compute for ourself, so always succeed.
Austin Schuh73340842021-07-30 22:32:06 -0700393 log_namer_->SetStartTimes(node_index, monotonic_start_time,
394 realtime_start_time, monotonic_start_time,
395 realtime_start_time);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800396 return true;
397 } else if (server_statistics_fetcher_.get() != nullptr) {
398 // We must be a remote node now. Look for the connection and see if it is
399 // connected.
400
401 for (const message_bridge::ServerConnection *connection :
402 *server_statistics_fetcher_->connections()) {
403 if (connection->node()->name()->string_view() !=
404 node->name()->string_view()) {
405 continue;
406 }
407
408 if (connection->state() != message_bridge::State::CONNECTED) {
409 VLOG(1) << node->name()->string_view()
410 << " is not connected, can't start it yet.";
411 break;
412 }
413
Austin Schuhb06f03b2021-02-17 22:00:37 -0800414 if (!connection->has_monotonic_offset()) {
415 VLOG(1) << "Missing monotonic offset for setting start time for node "
416 << aos::FlatbufferToJson(node);
417 break;
418 }
419
420 // Found it and it is connected. Compensate and go.
Austin Schuh73340842021-07-30 22:32:06 -0700421 log_namer_->SetStartTimes(
422 node_index,
423 monotonic_start_time +
424 std::chrono::nanoseconds(connection->monotonic_offset()),
425 realtime_start_time, monotonic_start_time, realtime_start_time);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800426 return true;
427 }
428 }
429 return false;
430}
431
432aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> Logger::MakeHeader(
Austin Schuh73340842021-07-30 22:32:06 -0700433 std::string_view config_sha256) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800434 flatbuffers::FlatBufferBuilder fbb;
435 fbb.ForceDefaults(true);
436
437 flatbuffers::Offset<aos::Configuration> configuration_offset;
438 if (!separate_config_) {
439 configuration_offset = CopyFlatBuffer(configuration_, &fbb);
440 } else {
441 CHECK(!config_sha256.empty());
442 }
443
444 const flatbuffers::Offset<flatbuffers::String> name_offset =
445 fbb.CreateString(name_);
446
447 CHECK(log_event_uuid_ != UUID::Zero());
448 const flatbuffers::Offset<flatbuffers::String> log_event_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800449 log_event_uuid_.PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800450
451 const flatbuffers::Offset<flatbuffers::String> logger_instance_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800452 logger_instance_uuid_.PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800453
454 flatbuffers::Offset<flatbuffers::String> log_start_uuid_offset;
Austin Schuh34f9e482021-03-31 22:54:18 -0700455 if (log_start_uuid_) {
456 log_start_uuid_offset = fbb.CreateString(log_start_uuid_->ToString());
Austin Schuhb06f03b2021-02-17 22:00:37 -0800457 }
458
459 flatbuffers::Offset<flatbuffers::String> config_sha256_offset;
460 if (!config_sha256.empty()) {
461 config_sha256_offset = fbb.CreateString(config_sha256);
462 }
463
464 const flatbuffers::Offset<flatbuffers::String> logger_node_boot_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800465 event_loop_->boot_uuid().PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800466
Austin Schuhb06f03b2021-02-17 22:00:37 -0800467 flatbuffers::Offset<Node> logger_node_offset;
468
469 if (configuration::MultiNode(configuration_)) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800470 logger_node_offset = RecursiveCopyFlatBuffer(event_loop_->node(), &fbb);
471 }
472
473 aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
474
475 log_file_header_builder.add_name(name_offset);
476
477 // Only add the node if we are running in a multinode configuration.
Austin Schuh73340842021-07-30 22:32:06 -0700478 if (configuration::MultiNode(configuration_)) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800479 log_file_header_builder.add_logger_node(logger_node_offset);
480 }
481
482 if (!configuration_offset.IsNull()) {
483 log_file_header_builder.add_configuration(configuration_offset);
484 }
485 // The worst case theoretical out of order is the polling period times 2.
486 // One message could get logged right after the boundary, but be for right
487 // before the next boundary. And the reverse could happen for another
488 // message. Report back 3x to be extra safe, and because the cost isn't
489 // huge on the read side.
490 log_file_header_builder.add_max_out_of_order_duration(
491 std::chrono::nanoseconds(3 * polling_period_).count());
492
Austin Schuhb06f03b2021-02-17 22:00:37 -0800493 log_file_header_builder.add_log_event_uuid(log_event_uuid_offset);
494 log_file_header_builder.add_logger_instance_uuid(logger_instance_uuid_offset);
495 if (!log_start_uuid_offset.IsNull()) {
496 log_file_header_builder.add_log_start_uuid(log_start_uuid_offset);
497 }
498 log_file_header_builder.add_logger_node_boot_uuid(
499 logger_node_boot_uuid_offset);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800500
501 if (!config_sha256_offset.IsNull()) {
502 log_file_header_builder.add_configuration_sha256(config_sha256_offset);
503 }
504
505 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
506 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> result(
507 fbb.Release());
508
509 CHECK(result.Verify()) << ": Built a corrupted header.";
510
511 return result;
512}
513
514void Logger::ResetStatisics() {
515 max_message_fetch_time_ = std::chrono::nanoseconds::zero();
516 max_message_fetch_time_channel_ = -1;
517 max_message_fetch_time_size_ = -1;
518 total_message_fetch_time_ = std::chrono::nanoseconds::zero();
519 total_message_fetch_count_ = 0;
520 total_message_fetch_bytes_ = 0;
521 total_nop_fetch_time_ = std::chrono::nanoseconds::zero();
522 total_nop_fetch_count_ = 0;
523 max_copy_time_ = std::chrono::nanoseconds::zero();
524 max_copy_time_channel_ = -1;
525 max_copy_time_size_ = -1;
526 total_copy_time_ = std::chrono::nanoseconds::zero();
527 total_copy_count_ = 0;
528 total_copy_bytes_ = 0;
529}
530
531void Logger::Rotate() {
532 for (const Node *node : log_namer_->nodes()) {
Austin Schuh73340842021-07-30 22:32:06 -0700533 log_namer_->Rotate(node);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800534 }
535}
536
537void Logger::LogUntil(monotonic_clock::time_point t) {
538 // Grab the latest ServerStatistics message. This will always have the
539 // oppertunity to be >= to the current time, so it will always represent any
540 // reboots which may have happened.
541 WriteMissingTimestamps();
542
Austin Schuhcdd90272021-03-15 12:46:16 -0700543 int our_node_index = aos::configuration::GetNodeIndex(
544 event_loop_->configuration(), event_loop_->node());
545
Austin Schuhb06f03b2021-02-17 22:00:37 -0800546 // Write each channel to disk, one at a time.
547 for (FetcherStruct &f : fetchers_) {
548 while (true) {
549 if (f.written) {
550 const auto start = event_loop_->monotonic_now();
551 const bool got_new = f.fetcher->FetchNext();
552 const auto end = event_loop_->monotonic_now();
553 RecordFetchResult(start, end, got_new, &f);
554 if (!got_new) {
555 VLOG(2) << "No new data on "
556 << configuration::CleanedChannelToString(
557 f.fetcher->channel());
558 break;
559 }
560 f.written = false;
561 }
562
563 // TODO(james): Write tests to exercise this logic.
564 if (f.fetcher->context().monotonic_event_time >= t) {
565 break;
566 }
567 if (f.writer != nullptr) {
Austin Schuh572924a2021-07-30 22:32:12 -0700568 const UUID source_node_boot_uuid =
569 our_node_index != f.data_node_index
570 ? f.fetcher->context().source_boot_uuid
571 : event_loop_->boot_uuid();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800572 // Write!
573 const auto start = event_loop_->monotonic_now();
574 flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
575 max_header_size_);
576 fbb.ForceDefaults(true);
577
578 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
579 f.channel_index, f.log_type));
580 const auto end = event_loop_->monotonic_now();
581 RecordCreateMessageTime(start, end, &f);
582
583 VLOG(2) << "Writing data as node "
584 << FlatbufferToJson(event_loop_->node()) << " for channel "
585 << configuration::CleanedChannelToString(f.fetcher->channel())
586 << " to " << f.writer->filename() << " data "
587 << FlatbufferToJson(
588 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
589 fbb.GetBufferPointer()));
590
591 max_header_size_ = std::max(max_header_size_,
592 fbb.GetSize() - f.fetcher->context().size);
Austin Schuh572924a2021-07-30 22:32:12 -0700593 f.writer->QueueSizedFlatbuffer(&fbb, source_node_boot_uuid, end);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800594 }
595
596 if (f.timestamp_writer != nullptr) {
597 // And now handle timestamps.
598 const auto start = event_loop_->monotonic_now();
599 flatbuffers::FlatBufferBuilder fbb;
600 fbb.ForceDefaults(true);
601
602 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
603 f.channel_index,
604 LogType::kLogDeliveryTimeOnly));
605 const auto end = event_loop_->monotonic_now();
606 RecordCreateMessageTime(start, end, &f);
607
608 VLOG(2) << "Writing timestamps as node "
609 << FlatbufferToJson(event_loop_->node()) << " for channel "
610 << configuration::CleanedChannelToString(f.fetcher->channel())
611 << " to " << f.timestamp_writer->filename() << " timestamp "
612 << FlatbufferToJson(
613 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
614 fbb.GetBufferPointer()));
615
Austin Schuh572924a2021-07-30 22:32:12 -0700616 // TODO(austin): How do I track remote timestamp boot UUIDs? I need to
617 // update the uuid list in the header when one changes and track
618 // timestamps.
619 f.timestamp_writer->QueueSizedFlatbuffer(&fbb, event_loop_->boot_uuid(),
620 end);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800621 }
622
623 if (f.contents_writer != nullptr) {
624 const auto start = event_loop_->monotonic_now();
625 // And now handle the special message contents channel. Copy the
626 // message into a FlatBufferBuilder and save it to disk.
627 // TODO(austin): We can be more efficient here when we start to
628 // care...
629 flatbuffers::FlatBufferBuilder fbb;
630 fbb.ForceDefaults(true);
631
632 const RemoteMessage *msg =
633 flatbuffers::GetRoot<RemoteMessage>(f.fetcher->context().data);
634
635 CHECK(msg->has_boot_uuid()) << ": " << aos::FlatbufferToJson(msg);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800636
637 logger::MessageHeader::Builder message_header_builder(fbb);
638
639 // TODO(austin): This needs to check the channel_index and confirm
640 // that it should be logged before squirreling away the timestamp to
641 // disk. We don't want to log irrelevant timestamps.
642
643 // Note: this must match the same order as MessageBridgeServer and
644 // PackMessage. We want identical headers to have identical
645 // on-the-wire formats to make comparing them easier.
646
647 // Translate from the channel index that the event loop uses to the
648 // channel index in the log file.
649 message_header_builder.add_channel_index(
650 event_loop_to_logged_channel_index_[msg->channel_index()]);
651
652 message_header_builder.add_queue_index(msg->queue_index());
653 message_header_builder.add_monotonic_sent_time(
654 msg->monotonic_sent_time());
655 message_header_builder.add_realtime_sent_time(
656 msg->realtime_sent_time());
657
658 message_header_builder.add_monotonic_remote_time(
659 msg->monotonic_remote_time());
660 message_header_builder.add_realtime_remote_time(
661 msg->realtime_remote_time());
662 message_header_builder.add_remote_queue_index(
663 msg->remote_queue_index());
664
665 message_header_builder.add_monotonic_timestamp_time(
666 f.fetcher->context()
667 .monotonic_event_time.time_since_epoch()
668 .count());
669
670 fbb.FinishSizePrefixed(message_header_builder.Finish());
671 const auto end = event_loop_->monotonic_now();
672 RecordCreateMessageTime(start, end, &f);
673
Austin Schuh572924a2021-07-30 22:32:12 -0700674 f.contents_writer->QueueSizedFlatbuffer(
675 &fbb, UUID::FromVector(msg->boot_uuid()), end);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800676 }
677
678 f.written = true;
679 }
680 }
681 last_synchronized_time_ = t;
682}
683
Austin Schuh30586902021-03-30 22:54:08 -0700684void Logger::DoLogData(const monotonic_clock::time_point end_time,
685 bool run_on_logged) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800686 // We want to guarantee that messages aren't out of order by more than
687 // max_out_of_order_duration. To do this, we need sync points. Every write
688 // cycle should be a sync point.
689
690 do {
691 // Move the sync point up by at most polling_period. This forces one sync
692 // per iteration, even if it is small.
693 LogUntil(std::min(last_synchronized_time_ + polling_period_, end_time));
694
Austin Schuh30586902021-03-30 22:54:08 -0700695 if (run_on_logged) {
696 on_logged_period_();
697 }
Austin Schuhb06f03b2021-02-17 22:00:37 -0800698
699 // If we missed cycles, we could be pretty far behind. Spin until we are
700 // caught up.
701 } while (last_synchronized_time_ + polling_period_ < end_time);
702}
703
704void Logger::RecordFetchResult(aos::monotonic_clock::time_point start,
705 aos::monotonic_clock::time_point end,
706 bool got_new, FetcherStruct *fetcher) {
707 const auto duration = end - start;
708 if (!got_new) {
709 ++total_nop_fetch_count_;
710 total_nop_fetch_time_ += duration;
711 return;
712 }
713 ++total_message_fetch_count_;
714 total_message_fetch_bytes_ += fetcher->fetcher->context().size;
715 total_message_fetch_time_ += duration;
716 if (duration > max_message_fetch_time_) {
717 max_message_fetch_time_ = duration;
718 max_message_fetch_time_channel_ = fetcher->channel_index;
719 max_message_fetch_time_size_ = fetcher->fetcher->context().size;
720 }
721}
722
723void Logger::RecordCreateMessageTime(aos::monotonic_clock::time_point start,
724 aos::monotonic_clock::time_point end,
725 FetcherStruct *fetcher) {
726 const auto duration = end - start;
727 total_copy_time_ += duration;
728 ++total_copy_count_;
729 total_copy_bytes_ += fetcher->fetcher->context().size;
730 if (duration > max_copy_time_) {
731 max_copy_time_ = duration;
732 max_copy_time_channel_ = fetcher->channel_index;
733 max_copy_time_size_ = fetcher->fetcher->context().size;
734 }
735}
736
737} // namespace logger
738} // namespace aos