blob: fffc886d92e7c71a389cc5035e3b1498b784ad6c [file] [log] [blame]
Austin Schuhb06f03b2021-02-17 22:00:37 -08001#include "aos/events/logging/log_writer.h"
2
Austin Schuh6bb8a822021-03-31 23:04:39 -07003#include <dirent.h>
4
Austin Schuhb06f03b2021-02-17 22:00:37 -08005#include <functional>
6#include <map>
7#include <vector>
8
9#include "aos/configuration.h"
10#include "aos/events/event_loop.h"
11#include "aos/network/message_bridge_server_generated.h"
12#include "aos/network/team_number.h"
Austin Schuh61e973f2021-02-21 21:43:56 -080013#include "aos/network/timestamp_channel.h"
Austin Schuhb06f03b2021-02-17 22:00:37 -080014
15namespace aos {
16namespace logger {
17namespace {
18using message_bridge::RemoteMessage;
Austin Schuhbd06ae42021-03-31 22:48:21 -070019namespace chrono = std::chrono;
Austin Schuhb06f03b2021-02-17 22:00:37 -080020} // namespace
21
22Logger::Logger(EventLoop *event_loop, const Configuration *configuration,
23 std::function<bool(const Channel *)> should_log)
24 : event_loop_(event_loop),
25 configuration_(configuration),
Austin Schuh5b728b72021-06-16 14:57:15 -070026 node_(configuration::GetNode(configuration_, event_loop->node())),
27 node_index_(configuration::GetNodeIndex(configuration_, node_)),
Austin Schuhb06f03b2021-02-17 22:00:37 -080028 name_(network::GetHostname()),
29 timer_handler_(event_loop_->AddTimer(
Austin Schuh30586902021-03-30 22:54:08 -070030 [this]() { DoLogData(event_loop_->monotonic_now(), true); })),
Austin Schuhb06f03b2021-02-17 22:00:37 -080031 server_statistics_fetcher_(
32 configuration::MultiNode(event_loop_->configuration())
33 ? event_loop_->MakeFetcher<message_bridge::ServerStatistics>(
34 "/aos")
35 : aos::Fetcher<message_bridge::ServerStatistics>()) {
Austin Schuh5b728b72021-06-16 14:57:15 -070036 VLOG(1) << "Creating logger for " << FlatbufferToJson(node_);
Austin Schuhb06f03b2021-02-17 22:00:37 -080037
Austin Schuhb06f03b2021-02-17 22:00:37 -080038 std::map<const Channel *, const Node *> timestamp_logger_channels;
39
Austin Schuh61e973f2021-02-21 21:43:56 -080040 message_bridge::ChannelTimestampFinder finder(event_loop_);
41 for (const Channel *channel : *event_loop_->configuration()->channels()) {
42 if (!configuration::ChannelIsSendableOnNode(channel, event_loop_->node())) {
Austin Schuhb06f03b2021-02-17 22:00:37 -080043 continue;
44 }
Austin Schuh61e973f2021-02-21 21:43:56 -080045 if (!channel->has_destination_nodes()) {
46 continue;
47 }
48 for (const Connection *connection : *channel->destination_nodes()) {
49 if (configuration::ConnectionDeliveryTimeIsLoggedOnNode(
50 connection, event_loop_->node())) {
51 const Node *other_node = configuration::GetNode(
Austin Schuh5b728b72021-06-16 14:57:15 -070052 configuration_, connection->name()->string_view());
Austin Schuh61e973f2021-02-21 21:43:56 -080053
54 VLOG(1) << "Timestamps are logged from "
55 << FlatbufferToJson(other_node);
56 timestamp_logger_channels.insert(
57 std::make_pair(finder.ForChannel(channel, connection), other_node));
58 }
59 }
Austin Schuhb06f03b2021-02-17 22:00:37 -080060 }
61
Austin Schuhb06f03b2021-02-17 22:00:37 -080062 for (size_t channel_index = 0;
63 channel_index < configuration_->channels()->size(); ++channel_index) {
64 const Channel *const config_channel =
65 configuration_->channels()->Get(channel_index);
66 // The MakeRawFetcher method needs a channel which is in the event loop
67 // configuration() object, not the configuration_ object. Go look that up
68 // from the config.
69 const Channel *channel = aos::configuration::GetChannel(
70 event_loop_->configuration(), config_channel->name()->string_view(),
71 config_channel->type()->string_view(), "", event_loop_->node());
72 CHECK(channel != nullptr)
73 << ": Failed to look up channel "
74 << aos::configuration::CleanedChannelToString(config_channel);
Austin Schuh5b728b72021-06-16 14:57:15 -070075 if (!should_log(config_channel)) {
Austin Schuhb06f03b2021-02-17 22:00:37 -080076 continue;
77 }
78
79 FetcherStruct fs;
80 fs.channel_index = channel_index;
81 fs.channel = channel;
82
83 const bool is_local =
Austin Schuh5b728b72021-06-16 14:57:15 -070084 configuration::ChannelIsSendableOnNode(config_channel, node_);
Austin Schuhb06f03b2021-02-17 22:00:37 -080085
86 const bool is_readable =
Austin Schuh5b728b72021-06-16 14:57:15 -070087 configuration::ChannelIsReadableOnNode(config_channel, node_);
Austin Schuhb06f03b2021-02-17 22:00:37 -080088 const bool is_logged = configuration::ChannelMessageIsLoggedOnNode(
Austin Schuh5b728b72021-06-16 14:57:15 -070089 config_channel, node_);
Austin Schuhb06f03b2021-02-17 22:00:37 -080090 const bool log_message = is_logged && is_readable;
91
92 bool log_delivery_times = false;
Austin Schuh5b728b72021-06-16 14:57:15 -070093 if (configuration::MultiNode(configuration_)) {
Austin Schuh72211ae2021-08-05 14:02:30 -070094 const aos::Connection *connection =
Austin Schuh5b728b72021-06-16 14:57:15 -070095 configuration::ConnectionToNode(config_channel, node_);
Austin Schuh72211ae2021-08-05 14:02:30 -070096
Austin Schuhb06f03b2021-02-17 22:00:37 -080097 log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
Austin Schuh72211ae2021-08-05 14:02:30 -070098 connection, event_loop_->node());
99
100 CHECK_EQ(log_delivery_times,
101 configuration::ConnectionDeliveryTimeIsLoggedOnNode(
Austin Schuh5b728b72021-06-16 14:57:15 -0700102 config_channel, node_, node_));
Austin Schuh72211ae2021-08-05 14:02:30 -0700103
104 if (connection) {
105 fs.reliable_forwarding = (connection->time_to_live() == 0);
106 }
Austin Schuhb06f03b2021-02-17 22:00:37 -0800107 }
108
109 // Now, detect a RemoteMessage timestamp logger where we should just log the
110 // contents to a file directly.
111 const bool log_contents = timestamp_logger_channels.find(channel) !=
112 timestamp_logger_channels.end();
113
114 if (log_message || log_delivery_times || log_contents) {
115 fs.fetcher = event_loop->MakeRawFetcher(channel);
116 VLOG(1) << "Logging channel "
117 << configuration::CleanedChannelToString(channel);
118
119 if (log_delivery_times) {
120 VLOG(1) << " Delivery times";
121 fs.wants_timestamp_writer = true;
Austin Schuh5b728b72021-06-16 14:57:15 -0700122 fs.timestamp_node_index = static_cast<int>(node_index_);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800123 }
Austin Schuhe46492f2021-07-31 19:49:41 -0700124 // Both the timestamp and data writers want data_node_index so it knows
125 // what the source node is.
126 if (log_message || log_delivery_times) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800127 if (!is_local) {
128 const Node *source_node = configuration::GetNode(
129 configuration_, channel->source_node()->string_view());
130 fs.data_node_index =
131 configuration::GetNodeIndex(configuration_, source_node);
Austin Schuhe46492f2021-07-31 19:49:41 -0700132 }
133 }
134 if (log_message) {
135 VLOG(1) << " Data";
136 fs.wants_writer = true;
137 if (!is_local) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800138 fs.log_type = LogType::kLogRemoteMessage;
139 } else {
Austin Schuh5b728b72021-06-16 14:57:15 -0700140 fs.data_node_index = static_cast<int>(node_index_);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800141 }
142 }
143 if (log_contents) {
144 VLOG(1) << "Timestamp logger channel "
145 << configuration::CleanedChannelToString(channel);
146 fs.timestamp_node = timestamp_logger_channels.find(channel)->second;
147 fs.wants_contents_writer = true;
148 fs.contents_node_index =
149 configuration::GetNodeIndex(configuration_, fs.timestamp_node);
150 }
151 fetchers_.emplace_back(std::move(fs));
152 }
153 }
154
155 // When we are logging remote timestamps, we need to be able to translate from
156 // the channel index that the event loop uses to the channel index in the
157 // config in the log file.
158 event_loop_to_logged_channel_index_.resize(
159 event_loop->configuration()->channels()->size(), -1);
160 for (size_t event_loop_channel_index = 0;
161 event_loop_channel_index <
162 event_loop->configuration()->channels()->size();
163 ++event_loop_channel_index) {
164 const Channel *event_loop_channel =
165 event_loop->configuration()->channels()->Get(event_loop_channel_index);
166
167 const Channel *logged_channel = aos::configuration::GetChannel(
168 configuration_, event_loop_channel->name()->string_view(),
Austin Schuh5b728b72021-06-16 14:57:15 -0700169 event_loop_channel->type()->string_view(), "", node_);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800170
171 if (logged_channel != nullptr) {
172 event_loop_to_logged_channel_index_[event_loop_channel_index] =
173 configuration::ChannelIndex(configuration_, logged_channel);
174 }
175 }
176}
177
178Logger::~Logger() {
179 if (log_namer_) {
180 // If we are replaying a log file, or in simulation, we want to force the
181 // last bit of data to be logged. The easiest way to deal with this is to
182 // poll everything as we go to destroy the class, ie, shut down the logger,
183 // and write it to disk.
184 StopLogging(event_loop_->monotonic_now());
185 }
186}
187
Austin Schuh6bb8a822021-03-31 23:04:39 -0700188bool Logger::RenameLogBase(std::string new_base_name) {
189 if (new_base_name == log_namer_->base_name()) {
190 return true;
191 }
192 std::string current_directory = std::string(log_namer_->base_name());
193 std::string new_directory = new_base_name;
194
195 auto current_path_split = current_directory.rfind("/");
196 auto new_path_split = new_directory.rfind("/");
197
198 CHECK(new_base_name.substr(new_path_split) ==
199 current_directory.substr(current_path_split))
200 << "Rename of file base from " << current_directory << " to "
201 << new_directory << " is not supported.";
202
203 current_directory.resize(current_path_split);
204 new_directory.resize(new_path_split);
205 DIR *dir = opendir(current_directory.c_str());
206 if (dir) {
207 closedir(dir);
208 const int result = rename(current_directory.c_str(), new_directory.c_str());
209 if (result != 0) {
210 PLOG(ERROR) << "Unable to rename " << current_directory << " to "
211 << new_directory;
212 return false;
213 }
214 } else {
215 // Handle if directory was already renamed.
216 dir = opendir(new_directory.c_str());
217 if (!dir) {
218 LOG(ERROR) << "Old directory " << current_directory
219 << " missing and new directory " << new_directory
220 << " not present.";
221 return false;
222 }
223 closedir(dir);
224 }
225
226 log_namer_->set_base_name(new_base_name);
227 Rotate();
228 return true;
229}
230
Austin Schuhb06f03b2021-02-17 22:00:37 -0800231void Logger::StartLogging(std::unique_ptr<LogNamer> log_namer,
Austin Schuh34f9e482021-03-31 22:54:18 -0700232 std::optional<UUID> log_start_uuid) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800233 CHECK(!log_namer_) << ": Already logging";
234 log_namer_ = std::move(log_namer);
235
236 std::string config_sha256;
237 if (separate_config_) {
238 flatbuffers::FlatBufferBuilder fbb;
239 flatbuffers::Offset<aos::Configuration> configuration_offset =
240 CopyFlatBuffer(configuration_, &fbb);
241 LogFileHeader::Builder log_file_header_builder(fbb);
242 log_file_header_builder.add_configuration(configuration_offset);
243 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
244 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config_header(
245 fbb.Release());
246 config_sha256 = Sha256(config_header.span());
247 LOG(INFO) << "Config sha256 of " << config_sha256;
248 log_namer_->WriteConfiguration(&config_header, config_sha256);
249 }
250
251 log_event_uuid_ = UUID::Random();
252 log_start_uuid_ = log_start_uuid;
Austin Schuh5b728b72021-06-16 14:57:15 -0700253 VLOG(1) << "Starting logger for " << FlatbufferToJson(node_);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800254
255 // We want to do as much work as possible before the initial Fetch. Time
256 // between that and actually starting to log opens up the possibility of
257 // falling off the end of the queue during that time.
258
259 for (FetcherStruct &f : fetchers_) {
260 if (f.wants_writer) {
261 f.writer = log_namer_->MakeWriter(f.channel);
262 }
263 if (f.wants_timestamp_writer) {
264 f.timestamp_writer = log_namer_->MakeTimestampWriter(f.channel);
265 }
266 if (f.wants_contents_writer) {
267 f.contents_writer = log_namer_->MakeForwardedTimestampWriter(
268 f.channel, CHECK_NOTNULL(f.timestamp_node));
269 }
270 }
271
Austin Schuh73340842021-07-30 22:32:06 -0700272 log_namer_->SetHeaderTemplate(MakeHeader(config_sha256));
Austin Schuhb06f03b2021-02-17 22:00:37 -0800273
Austin Schuha42ee962021-03-31 22:49:30 -0700274 const aos::monotonic_clock::time_point beginning_time =
275 event_loop_->monotonic_now();
276
Austin Schuhb06f03b2021-02-17 22:00:37 -0800277 // Grab data from each channel right before we declare the log file started
278 // so we can capture the latest message on each channel. This lets us have
279 // non periodic messages with configuration that now get logged.
280 for (FetcherStruct &f : fetchers_) {
281 const auto start = event_loop_->monotonic_now();
282 const bool got_new = f.fetcher->Fetch();
283 const auto end = event_loop_->monotonic_now();
284 RecordFetchResult(start, end, got_new, &f);
285
286 // If there is a message, we want to write it.
287 f.written = f.fetcher->context().data == nullptr;
288 }
289
290 // Clear out any old timestamps in case we are re-starting logging.
Austin Schuh572924a2021-07-30 22:32:12 -0700291 for (size_t i = 0; i < configuration::NodesCount(configuration_); ++i) {
Austin Schuh73340842021-07-30 22:32:06 -0700292 log_namer_->SetStartTimes(
293 i, monotonic_clock::min_time, realtime_clock::min_time,
294 monotonic_clock::min_time, realtime_clock::min_time);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800295 }
296
Austin Schuha42ee962021-03-31 22:49:30 -0700297 const aos::monotonic_clock::time_point fetch_time =
298 event_loop_->monotonic_now();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800299 WriteHeader();
Austin Schuha42ee962021-03-31 22:49:30 -0700300 const aos::monotonic_clock::time_point header_time =
301 event_loop_->monotonic_now();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800302
Austin Schuh5b728b72021-06-16 14:57:15 -0700303 LOG(INFO) << "Logging node as " << FlatbufferToJson(node_)
Austin Schuha42ee962021-03-31 22:49:30 -0700304 << " start_time " << last_synchronized_time_ << ", took "
305 << chrono::duration<double>(fetch_time - beginning_time).count()
306 << " to fetch, "
307 << chrono::duration<double>(header_time - fetch_time).count()
308 << " to write headers, boot uuid " << event_loop_->boot_uuid();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800309
310 // Force logging up until the start of the log file now, so the messages at
311 // the start are always ordered before the rest of the messages.
312 // Note: this ship may have already sailed, but we don't have to make it
313 // worse.
314 // TODO(austin): Test...
Austin Schuh855f8932021-03-19 22:01:32 -0700315 //
316 // This is safe to call here since we have set last_synchronized_time_ as the
317 // same time as in the header, and all the data before it should be logged
318 // without ordering concerns.
Austin Schuhb06f03b2021-02-17 22:00:37 -0800319 LogUntil(last_synchronized_time_);
320
321 timer_handler_->Setup(event_loop_->monotonic_now() + polling_period_,
322 polling_period_);
323}
324
325std::unique_ptr<LogNamer> Logger::StopLogging(
326 aos::monotonic_clock::time_point end_time) {
327 CHECK(log_namer_) << ": Not logging right now";
328
329 if (end_time != aos::monotonic_clock::min_time) {
Austin Schuh30586902021-03-30 22:54:08 -0700330 // Folks like to use the on_logged_period_ callback to trigger stop and
331 // start events. We can't have those then recurse and try to stop again.
332 // Rather than making everything reentrant, let's just instead block the
333 // callback here.
334 DoLogData(end_time, false);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800335 }
336 timer_handler_->Disable();
337
338 for (FetcherStruct &f : fetchers_) {
339 f.writer = nullptr;
340 f.timestamp_writer = nullptr;
341 f.contents_writer = nullptr;
342 }
Austin Schuhb06f03b2021-02-17 22:00:37 -0800343
344 log_event_uuid_ = UUID::Zero();
Austin Schuh34f9e482021-03-31 22:54:18 -0700345 log_start_uuid_ = std::nullopt;
Austin Schuhb06f03b2021-02-17 22:00:37 -0800346
347 return std::move(log_namer_);
348}
349
350void Logger::WriteHeader() {
351 if (configuration::MultiNode(configuration_)) {
352 server_statistics_fetcher_.Fetch();
353 }
354
Austin Schuh73340842021-07-30 22:32:06 -0700355 const aos::monotonic_clock::time_point monotonic_start_time =
Austin Schuhb06f03b2021-02-17 22:00:37 -0800356 event_loop_->monotonic_now();
Austin Schuh73340842021-07-30 22:32:06 -0700357 const aos::realtime_clock::time_point realtime_start_time =
Austin Schuhb06f03b2021-02-17 22:00:37 -0800358 event_loop_->realtime_now();
359
360 // We need to pick a point in time to declare the log file "started". This
361 // starts here. It needs to be after everything is fetched so that the
362 // fetchers are all pointed at the most recent message before the start
363 // time.
364 last_synchronized_time_ = monotonic_start_time;
365
366 for (const Node *node : log_namer_->nodes()) {
367 const int node_index = configuration::GetNodeIndex(configuration_, node);
368 MaybeUpdateTimestamp(node, node_index, monotonic_start_time,
369 realtime_start_time);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800370 }
371}
372
Austin Schuhb06f03b2021-02-17 22:00:37 -0800373void Logger::WriteMissingTimestamps() {
374 if (configuration::MultiNode(configuration_)) {
375 server_statistics_fetcher_.Fetch();
376 } else {
377 return;
378 }
379
380 if (server_statistics_fetcher_.get() == nullptr) {
381 return;
382 }
383
384 for (const Node *node : log_namer_->nodes()) {
385 const int node_index = configuration::GetNodeIndex(configuration_, node);
386 if (MaybeUpdateTimestamp(
387 node, node_index,
388 server_statistics_fetcher_.context().monotonic_event_time,
389 server_statistics_fetcher_.context().realtime_event_time)) {
Austin Schuh73340842021-07-30 22:32:06 -0700390 VLOG(1) << "Rotating because timestamps changed";
391 log_namer_->Rotate(node);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800392 }
393 }
394}
395
Austin Schuhb06f03b2021-02-17 22:00:37 -0800396bool Logger::MaybeUpdateTimestamp(
397 const Node *node, int node_index,
398 aos::monotonic_clock::time_point monotonic_start_time,
399 aos::realtime_clock::time_point realtime_start_time) {
400 // Bail early if the start times are already set.
Austin Schuh73340842021-07-30 22:32:06 -0700401 if (log_namer_->monotonic_start_time(node_index) !=
Austin Schuhb06f03b2021-02-17 22:00:37 -0800402 monotonic_clock::min_time) {
403 return false;
404 }
Austin Schuh5b728b72021-06-16 14:57:15 -0700405 if (node_ == node ||
Austin Schuhb06f03b2021-02-17 22:00:37 -0800406 !configuration::MultiNode(configuration_)) {
407 // There are no offsets to compute for ourself, so always succeed.
Austin Schuh73340842021-07-30 22:32:06 -0700408 log_namer_->SetStartTimes(node_index, monotonic_start_time,
409 realtime_start_time, monotonic_start_time,
410 realtime_start_time);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800411 return true;
412 } else if (server_statistics_fetcher_.get() != nullptr) {
413 // We must be a remote node now. Look for the connection and see if it is
414 // connected.
415
416 for (const message_bridge::ServerConnection *connection :
417 *server_statistics_fetcher_->connections()) {
418 if (connection->node()->name()->string_view() !=
419 node->name()->string_view()) {
420 continue;
421 }
422
423 if (connection->state() != message_bridge::State::CONNECTED) {
424 VLOG(1) << node->name()->string_view()
425 << " is not connected, can't start it yet.";
426 break;
427 }
428
Austin Schuhb06f03b2021-02-17 22:00:37 -0800429 if (!connection->has_monotonic_offset()) {
430 VLOG(1) << "Missing monotonic offset for setting start time for node "
431 << aos::FlatbufferToJson(node);
432 break;
433 }
434
435 // Found it and it is connected. Compensate and go.
Austin Schuh73340842021-07-30 22:32:06 -0700436 log_namer_->SetStartTimes(
437 node_index,
438 monotonic_start_time +
439 std::chrono::nanoseconds(connection->monotonic_offset()),
440 realtime_start_time, monotonic_start_time, realtime_start_time);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800441 return true;
442 }
443 }
444 return false;
445}
446
447aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> Logger::MakeHeader(
Austin Schuh73340842021-07-30 22:32:06 -0700448 std::string_view config_sha256) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800449 flatbuffers::FlatBufferBuilder fbb;
450 fbb.ForceDefaults(true);
451
452 flatbuffers::Offset<aos::Configuration> configuration_offset;
453 if (!separate_config_) {
454 configuration_offset = CopyFlatBuffer(configuration_, &fbb);
455 } else {
456 CHECK(!config_sha256.empty());
457 }
458
459 const flatbuffers::Offset<flatbuffers::String> name_offset =
460 fbb.CreateString(name_);
461
462 CHECK(log_event_uuid_ != UUID::Zero());
463 const flatbuffers::Offset<flatbuffers::String> log_event_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800464 log_event_uuid_.PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800465
466 const flatbuffers::Offset<flatbuffers::String> logger_instance_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800467 logger_instance_uuid_.PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800468
469 flatbuffers::Offset<flatbuffers::String> log_start_uuid_offset;
Austin Schuh34f9e482021-03-31 22:54:18 -0700470 if (log_start_uuid_) {
471 log_start_uuid_offset = fbb.CreateString(log_start_uuid_->ToString());
Austin Schuhb06f03b2021-02-17 22:00:37 -0800472 }
473
474 flatbuffers::Offset<flatbuffers::String> config_sha256_offset;
475 if (!config_sha256.empty()) {
476 config_sha256_offset = fbb.CreateString(config_sha256);
477 }
478
479 const flatbuffers::Offset<flatbuffers::String> logger_node_boot_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800480 event_loop_->boot_uuid().PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800481
Austin Schuhb06f03b2021-02-17 22:00:37 -0800482 flatbuffers::Offset<Node> logger_node_offset;
483
484 if (configuration::MultiNode(configuration_)) {
Austin Schuh5b728b72021-06-16 14:57:15 -0700485 logger_node_offset = RecursiveCopyFlatBuffer(node_, &fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800486 }
487
488 aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
489
490 log_file_header_builder.add_name(name_offset);
491
492 // Only add the node if we are running in a multinode configuration.
Austin Schuh73340842021-07-30 22:32:06 -0700493 if (configuration::MultiNode(configuration_)) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800494 log_file_header_builder.add_logger_node(logger_node_offset);
495 }
496
497 if (!configuration_offset.IsNull()) {
498 log_file_header_builder.add_configuration(configuration_offset);
499 }
500 // The worst case theoretical out of order is the polling period times 2.
501 // One message could get logged right after the boundary, but be for right
502 // before the next boundary. And the reverse could happen for another
503 // message. Report back 3x to be extra safe, and because the cost isn't
504 // huge on the read side.
505 log_file_header_builder.add_max_out_of_order_duration(
506 std::chrono::nanoseconds(3 * polling_period_).count());
507
Austin Schuhb06f03b2021-02-17 22:00:37 -0800508 log_file_header_builder.add_log_event_uuid(log_event_uuid_offset);
509 log_file_header_builder.add_logger_instance_uuid(logger_instance_uuid_offset);
510 if (!log_start_uuid_offset.IsNull()) {
511 log_file_header_builder.add_log_start_uuid(log_start_uuid_offset);
512 }
513 log_file_header_builder.add_logger_node_boot_uuid(
514 logger_node_boot_uuid_offset);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800515
516 if (!config_sha256_offset.IsNull()) {
517 log_file_header_builder.add_configuration_sha256(config_sha256_offset);
518 }
519
520 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
521 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> result(
522 fbb.Release());
523
524 CHECK(result.Verify()) << ": Built a corrupted header.";
525
526 return result;
527}
528
529void Logger::ResetStatisics() {
530 max_message_fetch_time_ = std::chrono::nanoseconds::zero();
531 max_message_fetch_time_channel_ = -1;
532 max_message_fetch_time_size_ = -1;
533 total_message_fetch_time_ = std::chrono::nanoseconds::zero();
534 total_message_fetch_count_ = 0;
535 total_message_fetch_bytes_ = 0;
536 total_nop_fetch_time_ = std::chrono::nanoseconds::zero();
537 total_nop_fetch_count_ = 0;
538 max_copy_time_ = std::chrono::nanoseconds::zero();
539 max_copy_time_channel_ = -1;
540 max_copy_time_size_ = -1;
541 total_copy_time_ = std::chrono::nanoseconds::zero();
542 total_copy_count_ = 0;
543 total_copy_bytes_ = 0;
544}
545
546void Logger::Rotate() {
547 for (const Node *node : log_namer_->nodes()) {
Austin Schuh73340842021-07-30 22:32:06 -0700548 log_namer_->Rotate(node);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800549 }
550}
551
552void Logger::LogUntil(monotonic_clock::time_point t) {
553 // Grab the latest ServerStatistics message. This will always have the
554 // oppertunity to be >= to the current time, so it will always represent any
555 // reboots which may have happened.
556 WriteMissingTimestamps();
557
558 // Write each channel to disk, one at a time.
559 for (FetcherStruct &f : fetchers_) {
560 while (true) {
561 if (f.written) {
562 const auto start = event_loop_->monotonic_now();
563 const bool got_new = f.fetcher->FetchNext();
564 const auto end = event_loop_->monotonic_now();
565 RecordFetchResult(start, end, got_new, &f);
566 if (!got_new) {
567 VLOG(2) << "No new data on "
568 << configuration::CleanedChannelToString(
569 f.fetcher->channel());
570 break;
571 }
572 f.written = false;
573 }
574
575 // TODO(james): Write tests to exercise this logic.
576 if (f.fetcher->context().monotonic_event_time >= t) {
577 break;
578 }
579 if (f.writer != nullptr) {
Austin Schuh572924a2021-07-30 22:32:12 -0700580 const UUID source_node_boot_uuid =
Austin Schuh5b728b72021-06-16 14:57:15 -0700581 static_cast<int>(node_index_) != f.data_node_index
Austin Schuh572924a2021-07-30 22:32:12 -0700582 ? f.fetcher->context().source_boot_uuid
583 : event_loop_->boot_uuid();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800584 // Write!
585 const auto start = event_loop_->monotonic_now();
586 flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
587 max_header_size_);
588 fbb.ForceDefaults(true);
589
590 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
591 f.channel_index, f.log_type));
592 const auto end = event_loop_->monotonic_now();
593 RecordCreateMessageTime(start, end, &f);
594
595 VLOG(2) << "Writing data as node "
Austin Schuh5b728b72021-06-16 14:57:15 -0700596 << FlatbufferToJson(node_) << " for channel "
Austin Schuhb06f03b2021-02-17 22:00:37 -0800597 << configuration::CleanedChannelToString(f.fetcher->channel())
598 << " to " << f.writer->filename() << " data "
599 << FlatbufferToJson(
600 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
601 fbb.GetBufferPointer()));
602
603 max_header_size_ = std::max(max_header_size_,
604 fbb.GetSize() - f.fetcher->context().size);
Austin Schuhe46492f2021-07-31 19:49:41 -0700605 f.writer->QueueMessage(&fbb, source_node_boot_uuid, end);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800606 }
607
608 if (f.timestamp_writer != nullptr) {
609 // And now handle timestamps.
610 const auto start = event_loop_->monotonic_now();
611 flatbuffers::FlatBufferBuilder fbb;
612 fbb.ForceDefaults(true);
613
614 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
615 f.channel_index,
616 LogType::kLogDeliveryTimeOnly));
617 const auto end = event_loop_->monotonic_now();
618 RecordCreateMessageTime(start, end, &f);
619
620 VLOG(2) << "Writing timestamps as node "
Austin Schuh5b728b72021-06-16 14:57:15 -0700621 << FlatbufferToJson(node_) << " for channel "
Austin Schuhb06f03b2021-02-17 22:00:37 -0800622 << configuration::CleanedChannelToString(f.fetcher->channel())
623 << " to " << f.timestamp_writer->filename() << " timestamp "
624 << FlatbufferToJson(
625 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
626 fbb.GetBufferPointer()));
627
Austin Schuhe46492f2021-07-31 19:49:41 -0700628 // Tell our writer that we know something about the remote boot.
Austin Schuh72211ae2021-08-05 14:02:30 -0700629 f.timestamp_writer->UpdateRemote(
630 f.data_node_index, f.fetcher->context().source_boot_uuid,
631 f.fetcher->context().monotonic_remote_time,
632 f.fetcher->context().monotonic_event_time, f.reliable_forwarding);
Austin Schuhe46492f2021-07-31 19:49:41 -0700633 f.timestamp_writer->QueueMessage(&fbb, event_loop_->boot_uuid(), end);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800634 }
635
636 if (f.contents_writer != nullptr) {
637 const auto start = event_loop_->monotonic_now();
638 // And now handle the special message contents channel. Copy the
639 // message into a FlatBufferBuilder and save it to disk.
640 // TODO(austin): We can be more efficient here when we start to
641 // care...
642 flatbuffers::FlatBufferBuilder fbb;
643 fbb.ForceDefaults(true);
644
645 const RemoteMessage *msg =
646 flatbuffers::GetRoot<RemoteMessage>(f.fetcher->context().data);
647
648 CHECK(msg->has_boot_uuid()) << ": " << aos::FlatbufferToJson(msg);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800649
650 logger::MessageHeader::Builder message_header_builder(fbb);
651
652 // TODO(austin): This needs to check the channel_index and confirm
653 // that it should be logged before squirreling away the timestamp to
654 // disk. We don't want to log irrelevant timestamps.
655
656 // Note: this must match the same order as MessageBridgeServer and
657 // PackMessage. We want identical headers to have identical
658 // on-the-wire formats to make comparing them easier.
659
660 // Translate from the channel index that the event loop uses to the
661 // channel index in the log file.
662 message_header_builder.add_channel_index(
663 event_loop_to_logged_channel_index_[msg->channel_index()]);
664
665 message_header_builder.add_queue_index(msg->queue_index());
666 message_header_builder.add_monotonic_sent_time(
667 msg->monotonic_sent_time());
668 message_header_builder.add_realtime_sent_time(
669 msg->realtime_sent_time());
670
671 message_header_builder.add_monotonic_remote_time(
672 msg->monotonic_remote_time());
673 message_header_builder.add_realtime_remote_time(
674 msg->realtime_remote_time());
675 message_header_builder.add_remote_queue_index(
676 msg->remote_queue_index());
677
678 message_header_builder.add_monotonic_timestamp_time(
679 f.fetcher->context()
680 .monotonic_event_time.time_since_epoch()
681 .count());
682
683 fbb.FinishSizePrefixed(message_header_builder.Finish());
684 const auto end = event_loop_->monotonic_now();
685 RecordCreateMessageTime(start, end, &f);
686
Austin Schuhe46492f2021-07-31 19:49:41 -0700687 f.contents_writer->QueueMessage(
Austin Schuh572924a2021-07-30 22:32:12 -0700688 &fbb, UUID::FromVector(msg->boot_uuid()), end);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800689 }
690
691 f.written = true;
692 }
693 }
694 last_synchronized_time_ = t;
695}
696
Austin Schuh30586902021-03-30 22:54:08 -0700697void Logger::DoLogData(const monotonic_clock::time_point end_time,
698 bool run_on_logged) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800699 // We want to guarantee that messages aren't out of order by more than
700 // max_out_of_order_duration. To do this, we need sync points. Every write
701 // cycle should be a sync point.
702
703 do {
704 // Move the sync point up by at most polling_period. This forces one sync
705 // per iteration, even if it is small.
706 LogUntil(std::min(last_synchronized_time_ + polling_period_, end_time));
707
Austin Schuh30586902021-03-30 22:54:08 -0700708 if (run_on_logged) {
709 on_logged_period_();
710 }
Austin Schuhb06f03b2021-02-17 22:00:37 -0800711
712 // If we missed cycles, we could be pretty far behind. Spin until we are
713 // caught up.
714 } while (last_synchronized_time_ + polling_period_ < end_time);
715}
716
717void Logger::RecordFetchResult(aos::monotonic_clock::time_point start,
718 aos::monotonic_clock::time_point end,
719 bool got_new, FetcherStruct *fetcher) {
720 const auto duration = end - start;
721 if (!got_new) {
722 ++total_nop_fetch_count_;
723 total_nop_fetch_time_ += duration;
724 return;
725 }
726 ++total_message_fetch_count_;
727 total_message_fetch_bytes_ += fetcher->fetcher->context().size;
728 total_message_fetch_time_ += duration;
729 if (duration > max_message_fetch_time_) {
730 max_message_fetch_time_ = duration;
731 max_message_fetch_time_channel_ = fetcher->channel_index;
732 max_message_fetch_time_size_ = fetcher->fetcher->context().size;
733 }
734}
735
736void Logger::RecordCreateMessageTime(aos::monotonic_clock::time_point start,
737 aos::monotonic_clock::time_point end,
738 FetcherStruct *fetcher) {
739 const auto duration = end - start;
740 total_copy_time_ += duration;
741 ++total_copy_count_;
742 total_copy_bytes_ += fetcher->fetcher->context().size;
743 if (duration > max_copy_time_) {
744 max_copy_time_ = duration;
745 max_copy_time_channel_ = fetcher->channel_index;
746 max_copy_time_size_ = fetcher->fetcher->context().size;
747 }
748}
749
750} // namespace logger
751} // namespace aos