blob: 1e0f7c8ca67e67e443d76ea7d7cef48965887aa3 [file] [log] [blame]
Austin Schuhb06f03b2021-02-17 22:00:37 -08001#include "aos/events/logging/log_writer.h"
2
Austin Schuh6bb8a822021-03-31 23:04:39 -07003#include <dirent.h>
4
Austin Schuhb06f03b2021-02-17 22:00:37 -08005#include <functional>
6#include <map>
7#include <vector>
8
9#include "aos/configuration.h"
10#include "aos/events/event_loop.h"
11#include "aos/network/message_bridge_server_generated.h"
12#include "aos/network/team_number.h"
Austin Schuh61e973f2021-02-21 21:43:56 -080013#include "aos/network/timestamp_channel.h"
Austin Schuhb0e439d2023-05-15 10:55:40 -070014#include "aos/sha256.h"
Austin Schuhb06f03b2021-02-17 22:00:37 -080015
16namespace aos {
17namespace logger {
18namespace {
19using message_bridge::RemoteMessage;
Austin Schuhbd06ae42021-03-31 22:48:21 -070020namespace chrono = std::chrono;
Austin Schuhb06f03b2021-02-17 22:00:37 -080021} // namespace
22
23Logger::Logger(EventLoop *event_loop, const Configuration *configuration,
24 std::function<bool(const Channel *)> should_log)
25 : event_loop_(event_loop),
26 configuration_(configuration),
Austin Schuh5b728b72021-06-16 14:57:15 -070027 node_(configuration::GetNode(configuration_, event_loop->node())),
28 node_index_(configuration::GetNodeIndex(configuration_, node_)),
Austin Schuhb06f03b2021-02-17 22:00:37 -080029 name_(network::GetHostname()),
Austin Schuh2d612c82023-07-17 13:37:48 -070030 timer_handler_(event_loop_->AddTimer([this]() {
31 DoLogData(event_loop_->monotonic_now() - logging_delay_, true);
32 })),
Austin Schuhb06f03b2021-02-17 22:00:37 -080033 server_statistics_fetcher_(
34 configuration::MultiNode(event_loop_->configuration())
35 ? event_loop_->MakeFetcher<message_bridge::ServerStatistics>(
36 "/aos")
37 : aos::Fetcher<message_bridge::ServerStatistics>()) {
Austin Schuh58646e22021-08-23 23:51:46 -070038 timer_handler_->set_name("channel_poll");
Austin Schuh5b728b72021-06-16 14:57:15 -070039 VLOG(1) << "Creating logger for " << FlatbufferToJson(node_);
Austin Schuhb06f03b2021-02-17 22:00:37 -080040
Naman Gupta41d70c22022-11-21 15:29:52 -080041 // When we are logging remote timestamps, we need to be able to translate
42 // from the channel index that the event loop uses to the channel index in
43 // the config in the log file.
Austin Schuh01f3b392022-01-25 20:03:09 -080044 event_loop_to_logged_channel_index_.resize(
45 event_loop->configuration()->channels()->size(), -1);
46 for (size_t event_loop_channel_index = 0;
47 event_loop_channel_index <
48 event_loop->configuration()->channels()->size();
49 ++event_loop_channel_index) {
50 const Channel *event_loop_channel =
51 event_loop->configuration()->channels()->Get(event_loop_channel_index);
52
53 const Channel *logged_channel = aos::configuration::GetChannel(
54 configuration_, event_loop_channel->name()->string_view(),
55 event_loop_channel->type()->string_view(), "", node_);
56
57 if (logged_channel != nullptr) {
58 event_loop_to_logged_channel_index_[event_loop_channel_index] =
59 configuration::ChannelIndex(configuration_, logged_channel);
60 }
61 }
62
63 // Map to match source channels with the timestamp logger, if the contents
64 // should be reliable, and a list of all channels logged on it to be treated
65 // as reliable.
66 std::map<const Channel *, std::tuple<const Node *, bool, std::vector<bool>>>
67 timestamp_logger_channels;
Austin Schuhb06f03b2021-02-17 22:00:37 -080068
Austin Schuh61e973f2021-02-21 21:43:56 -080069 message_bridge::ChannelTimestampFinder finder(event_loop_);
70 for (const Channel *channel : *event_loop_->configuration()->channels()) {
71 if (!configuration::ChannelIsSendableOnNode(channel, event_loop_->node())) {
Austin Schuhb06f03b2021-02-17 22:00:37 -080072 continue;
73 }
Austin Schuh61e973f2021-02-21 21:43:56 -080074 if (!channel->has_destination_nodes()) {
75 continue;
76 }
Austin Schuh01f3b392022-01-25 20:03:09 -080077 const size_t channel_index =
78 configuration::ChannelIndex(event_loop_->configuration(), channel);
79
Austin Schuh61e973f2021-02-21 21:43:56 -080080 for (const Connection *connection : *channel->destination_nodes()) {
81 if (configuration::ConnectionDeliveryTimeIsLoggedOnNode(
82 connection, event_loop_->node())) {
83 const Node *other_node = configuration::GetNode(
Austin Schuh5b728b72021-06-16 14:57:15 -070084 configuration_, connection->name()->string_view());
Austin Schuh61e973f2021-02-21 21:43:56 -080085
86 VLOG(1) << "Timestamps are logged from "
87 << FlatbufferToJson(other_node);
Austin Schuh01f3b392022-01-25 20:03:09 -080088 // True if each channel's remote timestamps are split into a separate
89 // RemoteMessage channel.
90 const bool is_split =
91 finder.SplitChannelForChannel(channel, connection) != nullptr;
92
93 const Channel *const timestamp_logger_channel =
94 finder.ForChannel(channel, connection);
95
96 auto it = timestamp_logger_channels.find(timestamp_logger_channel);
97 if (it != timestamp_logger_channels.end()) {
98 CHECK(!is_split);
99 CHECK_LT(channel_index, std::get<2>(it->second).size());
Brian Smartt796cca02022-04-12 15:07:21 -0700100 std::get<2>(it->second)[channel_index] =
101 (connection->time_to_live() == 0);
Austin Schuh01f3b392022-01-25 20:03:09 -0800102 } else {
103 if (is_split) {
104 timestamp_logger_channels.insert(std::make_pair(
105 timestamp_logger_channel,
106 std::make_tuple(other_node, (connection->time_to_live() == 0),
107 std::vector<bool>())));
108 } else {
109 std::vector<bool> channel_reliable_contents(
110 event_loop->configuration()->channels()->size(), false);
111 channel_reliable_contents[channel_index] =
112 (connection->time_to_live() == 0);
113
114 timestamp_logger_channels.insert(std::make_pair(
115 timestamp_logger_channel,
116 std::make_tuple(other_node, false,
117 std::move(channel_reliable_contents))));
118 }
119 }
Austin Schuh61e973f2021-02-21 21:43:56 -0800120 }
121 }
Austin Schuhb06f03b2021-02-17 22:00:37 -0800122 }
123
Austin Schuhb06f03b2021-02-17 22:00:37 -0800124 for (size_t channel_index = 0;
125 channel_index < configuration_->channels()->size(); ++channel_index) {
126 const Channel *const config_channel =
127 configuration_->channels()->Get(channel_index);
128 // The MakeRawFetcher method needs a channel which is in the event loop
129 // configuration() object, not the configuration_ object. Go look that up
130 // from the config.
131 const Channel *channel = aos::configuration::GetChannel(
132 event_loop_->configuration(), config_channel->name()->string_view(),
133 config_channel->type()->string_view(), "", event_loop_->node());
134 CHECK(channel != nullptr)
135 << ": Failed to look up channel "
136 << aos::configuration::CleanedChannelToString(config_channel);
Austin Schuh5b728b72021-06-16 14:57:15 -0700137 if (!should_log(config_channel)) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800138 continue;
139 }
140
141 FetcherStruct fs;
142 fs.channel_index = channel_index;
143 fs.channel = channel;
144
145 const bool is_local =
Austin Schuh5b728b72021-06-16 14:57:15 -0700146 configuration::ChannelIsSendableOnNode(config_channel, node_);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800147
148 const bool is_readable =
Austin Schuh5b728b72021-06-16 14:57:15 -0700149 configuration::ChannelIsReadableOnNode(config_channel, node_);
Austin Schuh01f3b392022-01-25 20:03:09 -0800150 const bool is_logged =
151 configuration::ChannelMessageIsLoggedOnNode(config_channel, node_);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800152 const bool log_message = is_logged && is_readable;
153
154 bool log_delivery_times = false;
Austin Schuh5b728b72021-06-16 14:57:15 -0700155 if (configuration::MultiNode(configuration_)) {
Austin Schuh72211ae2021-08-05 14:02:30 -0700156 const aos::Connection *connection =
Austin Schuh5b728b72021-06-16 14:57:15 -0700157 configuration::ConnectionToNode(config_channel, node_);
Austin Schuh72211ae2021-08-05 14:02:30 -0700158
Austin Schuhb06f03b2021-02-17 22:00:37 -0800159 log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
Austin Schuh72211ae2021-08-05 14:02:30 -0700160 connection, event_loop_->node());
161
162 CHECK_EQ(log_delivery_times,
163 configuration::ConnectionDeliveryTimeIsLoggedOnNode(
Austin Schuh5b728b72021-06-16 14:57:15 -0700164 config_channel, node_, node_));
Austin Schuh72211ae2021-08-05 14:02:30 -0700165
166 if (connection) {
167 fs.reliable_forwarding = (connection->time_to_live() == 0);
168 }
Austin Schuhb06f03b2021-02-17 22:00:37 -0800169 }
170
Austin Schuh01f3b392022-01-25 20:03:09 -0800171 // Now, detect a RemoteMessage timestamp logger where we should just log
172 // the contents to a file directly.
Austin Schuhb06f03b2021-02-17 22:00:37 -0800173 const bool log_contents = timestamp_logger_channels.find(channel) !=
174 timestamp_logger_channels.end();
175
176 if (log_message || log_delivery_times || log_contents) {
177 fs.fetcher = event_loop->MakeRawFetcher(channel);
178 VLOG(1) << "Logging channel "
179 << configuration::CleanedChannelToString(channel);
180
181 if (log_delivery_times) {
182 VLOG(1) << " Delivery times";
183 fs.wants_timestamp_writer = true;
Austin Schuh5b728b72021-06-16 14:57:15 -0700184 fs.timestamp_node_index = static_cast<int>(node_index_);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800185 }
Austin Schuhe46492f2021-07-31 19:49:41 -0700186 // Both the timestamp and data writers want data_node_index so it knows
187 // what the source node is.
188 if (log_message || log_delivery_times) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800189 if (!is_local) {
190 const Node *source_node = configuration::GetNode(
191 configuration_, channel->source_node()->string_view());
192 fs.data_node_index =
193 configuration::GetNodeIndex(configuration_, source_node);
Austin Schuhe46492f2021-07-31 19:49:41 -0700194 }
195 }
196 if (log_message) {
197 VLOG(1) << " Data";
198 fs.wants_writer = true;
199 if (!is_local) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800200 fs.log_type = LogType::kLogRemoteMessage;
201 } else {
Austin Schuh5b728b72021-06-16 14:57:15 -0700202 fs.data_node_index = static_cast<int>(node_index_);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800203 }
204 }
205 if (log_contents) {
206 VLOG(1) << "Timestamp logger channel "
207 << configuration::CleanedChannelToString(channel);
Austin Schuh01f3b392022-01-25 20:03:09 -0800208 auto timestamp_logger_channel_info =
209 timestamp_logger_channels.find(channel);
210 CHECK(timestamp_logger_channel_info != timestamp_logger_channels.end());
211 fs.timestamp_node = std::get<0>(timestamp_logger_channel_info->second);
212 fs.reliable_contents =
213 std::get<1>(timestamp_logger_channel_info->second);
214 fs.channel_reliable_contents =
215 std::get<2>(timestamp_logger_channel_info->second);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800216 fs.wants_contents_writer = true;
217 fs.contents_node_index =
218 configuration::GetNodeIndex(configuration_, fs.timestamp_node);
219 }
220 fetchers_.emplace_back(std::move(fs));
221 }
222 }
Austin Schuhb06f03b2021-02-17 22:00:37 -0800223}
224
225Logger::~Logger() {
226 if (log_namer_) {
227 // If we are replaying a log file, or in simulation, we want to force the
228 // last bit of data to be logged. The easiest way to deal with this is to
Austin Schuh01f3b392022-01-25 20:03:09 -0800229 // poll everything as we go to destroy the class, ie, shut down the
230 // logger, and write it to disk.
Austin Schuhb06f03b2021-02-17 22:00:37 -0800231 StopLogging(event_loop_->monotonic_now());
232 }
233}
234
Austin Schuh3b2b5b52023-07-05 11:36:46 -0700235aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> PackConfiguration(
236 const Configuration *const configuration) {
237 flatbuffers::FlatBufferBuilder fbb;
238 flatbuffers::Offset<aos::Configuration> configuration_offset =
239 CopyFlatBuffer(configuration, &fbb);
240 LogFileHeader::Builder log_file_header_builder(fbb);
241 log_file_header_builder.add_configuration(configuration_offset);
242 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
243 return fbb.Release();
244}
245
Brian Smartt796cca02022-04-12 15:07:21 -0700246std::string Logger::WriteConfiguration(LogNamer *log_namer) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800247 std::string config_sha256;
Brian Smartt03c00da2022-02-24 10:25:00 -0800248
Austin Schuhb06f03b2021-02-17 22:00:37 -0800249 if (separate_config_) {
Austin Schuh3b2b5b52023-07-05 11:36:46 -0700250 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config_header =
251 PackConfiguration(configuration_);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800252 config_sha256 = Sha256(config_header.span());
Alexei Strots6bd1e642023-07-20 19:36:27 -0700253 VLOG(1) << "Config sha256 of " << config_sha256;
Brian Smartt03c00da2022-02-24 10:25:00 -0800254 log_namer->WriteConfiguration(&config_header, config_sha256);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800255 }
256
Brian Smartt03c00da2022-02-24 10:25:00 -0800257 return config_sha256;
258}
259
260void Logger::StartLogging(std::unique_ptr<LogNamer> log_namer,
261 std::optional<UUID> log_start_uuid) {
262 CHECK(!log_namer_) << ": Already logging";
263
264 VLOG(1) << "Starting logger for " << FlatbufferToJson(node_);
265
266 auto config_sha256 = WriteConfiguration(log_namer.get());
267
268 log_namer_ = std::move(log_namer);
269
Austin Schuhb06f03b2021-02-17 22:00:37 -0800270 log_event_uuid_ = UUID::Random();
271 log_start_uuid_ = log_start_uuid;
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700272 log_namer_->SetHeaderTemplate(MakeHeader(config_sha256));
Austin Schuhb06f03b2021-02-17 22:00:37 -0800273
274 // We want to do as much work as possible before the initial Fetch. Time
275 // between that and actually starting to log opens up the possibility of
276 // falling off the end of the queue during that time.
277
278 for (FetcherStruct &f : fetchers_) {
279 if (f.wants_writer) {
280 f.writer = log_namer_->MakeWriter(f.channel);
281 }
282 if (f.wants_timestamp_writer) {
283 f.timestamp_writer = log_namer_->MakeTimestampWriter(f.channel);
284 }
285 if (f.wants_contents_writer) {
286 f.contents_writer = log_namer_->MakeForwardedTimestampWriter(
287 f.channel, CHECK_NOTNULL(f.timestamp_node));
288 }
289 }
290
Austin Schuha42ee962021-03-31 22:49:30 -0700291 const aos::monotonic_clock::time_point beginning_time =
292 event_loop_->monotonic_now();
293
Austin Schuh2d612c82023-07-17 13:37:48 -0700294 log_until_time_ = beginning_time;
295
Austin Schuhb06f03b2021-02-17 22:00:37 -0800296 // Grab data from each channel right before we declare the log file started
297 // so we can capture the latest message on each channel. This lets us have
298 // non periodic messages with configuration that now get logged.
299 for (FetcherStruct &f : fetchers_) {
300 const auto start = event_loop_->monotonic_now();
301 const bool got_new = f.fetcher->Fetch();
302 const auto end = event_loop_->monotonic_now();
303 RecordFetchResult(start, end, got_new, &f);
304
305 // If there is a message, we want to write it.
306 f.written = f.fetcher->context().data == nullptr;
307 }
308
309 // Clear out any old timestamps in case we are re-starting logging.
Austin Schuh572924a2021-07-30 22:32:12 -0700310 for (size_t i = 0; i < configuration::NodesCount(configuration_); ++i) {
Austin Schuh58646e22021-08-23 23:51:46 -0700311 log_namer_->ClearStartTimes();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800312 }
313
Austin Schuha42ee962021-03-31 22:49:30 -0700314 const aos::monotonic_clock::time_point fetch_time =
315 event_loop_->monotonic_now();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800316 WriteHeader();
Austin Schuha42ee962021-03-31 22:49:30 -0700317 const aos::monotonic_clock::time_point header_time =
318 event_loop_->monotonic_now();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800319
Brian Smartt796cca02022-04-12 15:07:21 -0700320 VLOG(1) << "Logging node as " << FlatbufferToJson(node_) << " start_time "
321 << last_synchronized_time_ << ", took "
Brian Smartt03c00da2022-02-24 10:25:00 -0800322 << chrono::duration<double>(fetch_time - beginning_time).count()
323 << " to fetch, "
324 << chrono::duration<double>(header_time - fetch_time).count()
325 << " to write headers, boot uuid " << event_loop_->boot_uuid();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800326
327 // Force logging up until the start of the log file now, so the messages at
328 // the start are always ordered before the rest of the messages.
329 // Note: this ship may have already sailed, but we don't have to make it
330 // worse.
331 // TODO(austin): Test...
Austin Schuh855f8932021-03-19 22:01:32 -0700332 //
Naman Gupta41d70c22022-11-21 15:29:52 -0800333 // This is safe to call here since we have set last_synchronized_time_ as
334 // the same time as in the header, and all the data before it should be
335 // logged without ordering concerns.
Austin Schuhb06f03b2021-02-17 22:00:37 -0800336 LogUntil(last_synchronized_time_);
337
Philipp Schradera6712522023-07-05 20:25:11 -0700338 timer_handler_->Schedule(event_loop_->monotonic_now() + polling_period_,
339 polling_period_);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800340}
341
Brian Smartt796cca02022-04-12 15:07:21 -0700342std::unique_ptr<LogNamer> Logger::RestartLogging(
Austin Schuh2d612c82023-07-17 13:37:48 -0700343 std::unique_ptr<LogNamer> log_namer, std::optional<UUID> log_start_uuid,
344 std::optional<monotonic_clock::time_point> end_time) {
Brian Smartt03c00da2022-02-24 10:25:00 -0800345 CHECK(log_namer_) << ": Unexpected restart while not logging";
346
347 VLOG(1) << "Restarting logger for " << FlatbufferToJson(node_);
348
Austin Schuh2d612c82023-07-17 13:37:48 -0700349 // Grab a representative time on both the RT and monotonic clock.
350 // Average a monotonic clock before and after to reduce the error.
351 const aos::monotonic_clock::time_point monotonic_now1 =
352 event_loop_->monotonic_now();
353 const aos::realtime_clock::time_point realtime_now =
354 event_loop_->realtime_now();
355 const aos::monotonic_clock::time_point monotonic_now2 =
356 event_loop_->monotonic_now();
Austin Schuh08dba8f2023-05-01 08:29:30 -0700357
Austin Schuh2d612c82023-07-17 13:37:48 -0700358 // Log until the provided end time.
359 if (end_time) {
360 CHECK_LE(*end_time, monotonic_now1) << ": Can't log into the future.";
361 // DoLogData is a bit fragile.
362 if (*end_time > last_synchronized_time_) {
363 DoLogData(*end_time, false);
Austin Schuh08dba8f2023-05-01 08:29:30 -0700364 }
365 }
366
Austin Schuh2d612c82023-07-17 13:37:48 -0700367 // We are now synchronized up to last_synchronized_time_. We only have record
368 // of messages from before last_synchronized_time_, so it is a safe start
369 // time.
Brian Smartt03c00da2022-02-24 10:25:00 -0800370
371 std::unique_ptr<LogNamer> old_log_namer = std::move(log_namer_);
372 log_namer_ = std::move(log_namer);
373
Austin Schuh2d612c82023-07-17 13:37:48 -0700374 // Our start time is now how far we logged until before.
375 const aos::monotonic_clock::time_point monotonic_start_time =
376 last_synchronized_time_;
Austin Schuh41f8df92022-04-15 11:45:52 -0700377 const aos::realtime_clock::time_point realtime_start_time =
Austin Schuh2d612c82023-07-17 13:37:48 -0700378 (realtime_now + (last_synchronized_time_.time_since_epoch() -
379 ((monotonic_now1.time_since_epoch() +
380 monotonic_now2.time_since_epoch()) /
381 2)));
Brian Smartt03c00da2022-02-24 10:25:00 -0800382
383 auto config_sha256 = WriteConfiguration(log_namer_.get());
384
385 log_event_uuid_ = UUID::Random();
386 log_start_uuid_ = log_start_uuid;
387
388 log_namer_->SetHeaderTemplate(MakeHeader(config_sha256));
389
390 // Note that WriteHeader updates last_synchronized_time_ to be the
391 // current time when it is called, which is then the "start time"
392 // of the new (restarted) log. This timestamp will be after
Naman Gupta41d70c22022-11-21 15:29:52 -0800393 // the timestamp of the last message fetched on each channel, but is
394 // carefully picked per the comment above to not violate
395 // max_out_of_order_duration.
Austin Schuh41f8df92022-04-15 11:45:52 -0700396 WriteHeader(monotonic_start_time, realtime_start_time);
Brian Smartt03c00da2022-02-24 10:25:00 -0800397
398 const aos::monotonic_clock::time_point header_time =
399 event_loop_->monotonic_now();
400
Austin Schuh08dba8f2023-05-01 08:29:30 -0700401 // Close out the old writers to free up memory to be used by the new writers.
402 old_log_namer->Close();
403
Brian Smartt03c00da2022-02-24 10:25:00 -0800404 for (FetcherStruct &f : fetchers_) {
Brian Smartt03c00da2022-02-24 10:25:00 -0800405 // Create writers from the new namer
Brian Smartt03c00da2022-02-24 10:25:00 -0800406
407 if (f.wants_writer) {
Austin Schuh08dba8f2023-05-01 08:29:30 -0700408 f.writer = log_namer_->MakeWriter(f.channel);
Brian Smartt03c00da2022-02-24 10:25:00 -0800409 }
410 if (f.wants_timestamp_writer) {
Austin Schuh08dba8f2023-05-01 08:29:30 -0700411 f.timestamp_writer = log_namer_->MakeTimestampWriter(f.channel);
Brian Smartt03c00da2022-02-24 10:25:00 -0800412 }
413 if (f.wants_contents_writer) {
Austin Schuh08dba8f2023-05-01 08:29:30 -0700414 f.contents_writer = log_namer_->MakeForwardedTimestampWriter(
Brian Smartt03c00da2022-02-24 10:25:00 -0800415 f.channel, CHECK_NOTNULL(f.timestamp_node));
416 }
417
Austin Schuh08dba8f2023-05-01 08:29:30 -0700418 // Mark each channel with data as not written. That triggers each channel
419 // to be re-logged.
420 f.written = f.fetcher->context().data == nullptr;
Brian Smartt03c00da2022-02-24 10:25:00 -0800421 }
422
Austin Schuh08dba8f2023-05-01 08:29:30 -0700423 // And now make sure to log everything up to the start time in 1 big go so we
424 // make sure we have it before we let the world start logging normally again.
425 LogUntil(monotonic_start_time);
426
Brian Smartt03c00da2022-02-24 10:25:00 -0800427 const aos::monotonic_clock::time_point channel_time =
428 event_loop_->monotonic_now();
429
Brian Smartt796cca02022-04-12 15:07:21 -0700430 VLOG(1) << "Logging node as " << FlatbufferToJson(node_) << " restart_time "
431 << last_synchronized_time_ << ", took "
Austin Schuh2d612c82023-07-17 13:37:48 -0700432 << chrono::duration<double>(header_time - monotonic_now1).count()
Brian Smartt03c00da2022-02-24 10:25:00 -0800433 << " to prepare and write header, "
434 << chrono::duration<double>(channel_time - header_time).count()
Brian Smartt796cca02022-04-12 15:07:21 -0700435 << " to write initial channel messages, boot uuid "
436 << event_loop_->boot_uuid();
Brian Smartt03c00da2022-02-24 10:25:00 -0800437
438 return old_log_namer;
439}
440
Austin Schuhb06f03b2021-02-17 22:00:37 -0800441std::unique_ptr<LogNamer> Logger::StopLogging(
442 aos::monotonic_clock::time_point end_time) {
443 CHECK(log_namer_) << ": Not logging right now";
444
445 if (end_time != aos::monotonic_clock::min_time) {
Austin Schuh30586902021-03-30 22:54:08 -0700446 // Folks like to use the on_logged_period_ callback to trigger stop and
447 // start events. We can't have those then recurse and try to stop again.
448 // Rather than making everything reentrant, let's just instead block the
449 // callback here.
450 DoLogData(end_time, false);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800451 }
452 timer_handler_->Disable();
453
454 for (FetcherStruct &f : fetchers_) {
455 f.writer = nullptr;
456 f.timestamp_writer = nullptr;
457 f.contents_writer = nullptr;
458 }
Austin Schuhb06f03b2021-02-17 22:00:37 -0800459
460 log_event_uuid_ = UUID::Zero();
Austin Schuh34f9e482021-03-31 22:54:18 -0700461 log_start_uuid_ = std::nullopt;
Austin Schuhb06f03b2021-02-17 22:00:37 -0800462
Alexei Strotsbc776d52023-04-15 23:46:45 -0700463 log_namer_->Close();
464
Austin Schuhb06f03b2021-02-17 22:00:37 -0800465 return std::move(log_namer_);
466}
467
Austin Schuh41f8df92022-04-15 11:45:52 -0700468void Logger::WriteHeader(aos::monotonic_clock::time_point monotonic_start_time,
469 aos::realtime_clock::time_point realtime_start_time) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800470 if (configuration::MultiNode(configuration_)) {
471 server_statistics_fetcher_.Fetch();
472 }
473
Austin Schuh41f8df92022-04-15 11:45:52 -0700474 if (monotonic_start_time == aos::monotonic_clock::min_time) {
475 monotonic_start_time = event_loop_->monotonic_now();
476 realtime_start_time = event_loop_->realtime_now();
477 }
Austin Schuhb06f03b2021-02-17 22:00:37 -0800478
479 // We need to pick a point in time to declare the log file "started". This
480 // starts here. It needs to be after everything is fetched so that the
481 // fetchers are all pointed at the most recent message before the start
482 // time.
483 last_synchronized_time_ = monotonic_start_time;
484
485 for (const Node *node : log_namer_->nodes()) {
486 const int node_index = configuration::GetNodeIndex(configuration_, node);
487 MaybeUpdateTimestamp(node, node_index, monotonic_start_time,
488 realtime_start_time);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800489 }
490}
491
Austin Schuhb06f03b2021-02-17 22:00:37 -0800492void Logger::WriteMissingTimestamps() {
493 if (configuration::MultiNode(configuration_)) {
494 server_statistics_fetcher_.Fetch();
495 } else {
496 return;
497 }
498
499 if (server_statistics_fetcher_.get() == nullptr) {
500 return;
501 }
502
503 for (const Node *node : log_namer_->nodes()) {
504 const int node_index = configuration::GetNodeIndex(configuration_, node);
505 if (MaybeUpdateTimestamp(
506 node, node_index,
507 server_statistics_fetcher_.context().monotonic_event_time,
508 server_statistics_fetcher_.context().realtime_event_time)) {
Austin Schuh58646e22021-08-23 23:51:46 -0700509 VLOG(1) << "Timestamps changed on " << aos::FlatbufferToJson(node);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800510 }
511 }
512}
513
Austin Schuhb06f03b2021-02-17 22:00:37 -0800514bool Logger::MaybeUpdateTimestamp(
515 const Node *node, int node_index,
516 aos::monotonic_clock::time_point monotonic_start_time,
517 aos::realtime_clock::time_point realtime_start_time) {
518 // Bail early if the start times are already set.
Austin Schuh58646e22021-08-23 23:51:46 -0700519 if (node_ == node || !configuration::MultiNode(configuration_)) {
520 if (log_namer_->monotonic_start_time(node_index,
521 event_loop_->boot_uuid()) !=
522 monotonic_clock::min_time) {
523 return false;
524 }
Brian Smartt03c00da2022-02-24 10:25:00 -0800525
Austin Schuhb06f03b2021-02-17 22:00:37 -0800526 // There are no offsets to compute for ourself, so always succeed.
Austin Schuh58646e22021-08-23 23:51:46 -0700527 log_namer_->SetStartTimes(node_index, event_loop_->boot_uuid(),
528 monotonic_start_time, realtime_start_time,
529 monotonic_start_time, realtime_start_time);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800530 return true;
531 } else if (server_statistics_fetcher_.get() != nullptr) {
532 // We must be a remote node now. Look for the connection and see if it is
533 // connected.
James Kuszmaul17607fb2021-10-15 20:00:32 -0700534 CHECK(server_statistics_fetcher_->has_connections());
Austin Schuhb06f03b2021-02-17 22:00:37 -0800535
536 for (const message_bridge::ServerConnection *connection :
537 *server_statistics_fetcher_->connections()) {
538 if (connection->node()->name()->string_view() !=
539 node->name()->string_view()) {
540 continue;
541 }
542
543 if (connection->state() != message_bridge::State::CONNECTED) {
544 VLOG(1) << node->name()->string_view()
545 << " is not connected, can't start it yet.";
546 break;
547 }
548
Austin Schuhb06f03b2021-02-17 22:00:37 -0800549 if (!connection->has_monotonic_offset()) {
550 VLOG(1) << "Missing monotonic offset for setting start time for node "
551 << aos::FlatbufferToJson(node);
552 break;
553 }
554
James Kuszmaul17607fb2021-10-15 20:00:32 -0700555 CHECK(connection->has_boot_uuid());
Austin Schuh58646e22021-08-23 23:51:46 -0700556 const UUID boot_uuid =
557 UUID::FromString(connection->boot_uuid()->string_view());
558
559 if (log_namer_->monotonic_start_time(node_index, boot_uuid) !=
560 monotonic_clock::min_time) {
561 break;
562 }
563
564 VLOG(1) << "Updating start time for "
565 << aos::FlatbufferToJson(connection);
566
Austin Schuhb06f03b2021-02-17 22:00:37 -0800567 // Found it and it is connected. Compensate and go.
Austin Schuh73340842021-07-30 22:32:06 -0700568 log_namer_->SetStartTimes(
Austin Schuh58646e22021-08-23 23:51:46 -0700569 node_index, boot_uuid,
Austin Schuh73340842021-07-30 22:32:06 -0700570 monotonic_start_time +
571 std::chrono::nanoseconds(connection->monotonic_offset()),
572 realtime_start_time, monotonic_start_time, realtime_start_time);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800573 return true;
574 }
575 }
576 return false;
577}
578
579aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> Logger::MakeHeader(
Austin Schuh73340842021-07-30 22:32:06 -0700580 std::string_view config_sha256) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800581 flatbuffers::FlatBufferBuilder fbb;
582 fbb.ForceDefaults(true);
583
584 flatbuffers::Offset<aos::Configuration> configuration_offset;
585 if (!separate_config_) {
586 configuration_offset = CopyFlatBuffer(configuration_, &fbb);
587 } else {
588 CHECK(!config_sha256.empty());
589 }
590
591 const flatbuffers::Offset<flatbuffers::String> name_offset =
592 fbb.CreateString(name_);
593
Austin Schuhfa712682022-05-11 16:43:42 -0700594 const flatbuffers::Offset<flatbuffers::String> logger_sha1_offset =
595 logger_sha1_.empty() ? 0 : fbb.CreateString(logger_sha1_);
596 const flatbuffers::Offset<flatbuffers::String> logger_version_offset =
597 logger_version_.empty() ? 0 : fbb.CreateString(logger_version_);
598
Austin Schuhb06f03b2021-02-17 22:00:37 -0800599 CHECK(log_event_uuid_ != UUID::Zero());
600 const flatbuffers::Offset<flatbuffers::String> log_event_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800601 log_event_uuid_.PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800602
603 const flatbuffers::Offset<flatbuffers::String> logger_instance_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800604 logger_instance_uuid_.PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800605
606 flatbuffers::Offset<flatbuffers::String> log_start_uuid_offset;
Austin Schuh34f9e482021-03-31 22:54:18 -0700607 if (log_start_uuid_) {
608 log_start_uuid_offset = fbb.CreateString(log_start_uuid_->ToString());
Austin Schuhb06f03b2021-02-17 22:00:37 -0800609 }
610
611 flatbuffers::Offset<flatbuffers::String> config_sha256_offset;
612 if (!config_sha256.empty()) {
613 config_sha256_offset = fbb.CreateString(config_sha256);
614 }
615
616 const flatbuffers::Offset<flatbuffers::String> logger_node_boot_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800617 event_loop_->boot_uuid().PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800618
Austin Schuhb06f03b2021-02-17 22:00:37 -0800619 flatbuffers::Offset<Node> logger_node_offset;
620
621 if (configuration::MultiNode(configuration_)) {
Austin Schuh5b728b72021-06-16 14:57:15 -0700622 logger_node_offset = RecursiveCopyFlatBuffer(node_, &fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800623 }
624
625 aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
626
627 log_file_header_builder.add_name(name_offset);
Austin Schuhfa712682022-05-11 16:43:42 -0700628 if (!logger_sha1_offset.IsNull()) {
629 log_file_header_builder.add_logger_sha1(logger_sha1_offset);
630 }
631 if (!logger_version_offset.IsNull()) {
632 log_file_header_builder.add_logger_version(logger_version_offset);
633 }
Austin Schuhb06f03b2021-02-17 22:00:37 -0800634
635 // Only add the node if we are running in a multinode configuration.
Austin Schuh73340842021-07-30 22:32:06 -0700636 if (configuration::MultiNode(configuration_)) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800637 log_file_header_builder.add_logger_node(logger_node_offset);
638 }
639
640 if (!configuration_offset.IsNull()) {
641 log_file_header_builder.add_configuration(configuration_offset);
642 }
643 // The worst case theoretical out of order is the polling period times 2.
644 // One message could get logged right after the boundary, but be for right
645 // before the next boundary. And the reverse could happen for another
646 // message. Report back 3x to be extra safe, and because the cost isn't
647 // huge on the read side.
648 log_file_header_builder.add_max_out_of_order_duration(
649 std::chrono::nanoseconds(3 * polling_period_).count());
650
Austin Schuhb06f03b2021-02-17 22:00:37 -0800651 log_file_header_builder.add_log_event_uuid(log_event_uuid_offset);
652 log_file_header_builder.add_logger_instance_uuid(logger_instance_uuid_offset);
653 if (!log_start_uuid_offset.IsNull()) {
654 log_file_header_builder.add_log_start_uuid(log_start_uuid_offset);
655 }
656 log_file_header_builder.add_logger_node_boot_uuid(
657 logger_node_boot_uuid_offset);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800658
659 if (!config_sha256_offset.IsNull()) {
660 log_file_header_builder.add_configuration_sha256(config_sha256_offset);
661 }
662
663 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
664 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> result(
665 fbb.Release());
666
667 CHECK(result.Verify()) << ": Built a corrupted header.";
668
669 return result;
670}
671
672void Logger::ResetStatisics() {
673 max_message_fetch_time_ = std::chrono::nanoseconds::zero();
674 max_message_fetch_time_channel_ = -1;
675 max_message_fetch_time_size_ = -1;
676 total_message_fetch_time_ = std::chrono::nanoseconds::zero();
677 total_message_fetch_count_ = 0;
678 total_message_fetch_bytes_ = 0;
679 total_nop_fetch_time_ = std::chrono::nanoseconds::zero();
680 total_nop_fetch_count_ = 0;
681 max_copy_time_ = std::chrono::nanoseconds::zero();
682 max_copy_time_channel_ = -1;
683 max_copy_time_size_ = -1;
684 total_copy_time_ = std::chrono::nanoseconds::zero();
685 total_copy_count_ = 0;
686 total_copy_bytes_ = 0;
687}
688
689void Logger::Rotate() {
690 for (const Node *node : log_namer_->nodes()) {
Austin Schuh73340842021-07-30 22:32:06 -0700691 log_namer_->Rotate(node);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800692 }
693}
694
Brian Smartt03c00da2022-02-24 10:25:00 -0800695void Logger::WriteData(NewDataWriter *writer, const FetcherStruct &f) {
696 if (writer != nullptr) {
697 const UUID source_node_boot_uuid =
698 static_cast<int>(node_index_) != f.data_node_index
699 ? f.fetcher->context().source_boot_uuid
700 : event_loop_->boot_uuid();
701 // Write!
702 const auto start = event_loop_->monotonic_now();
Brian Smartt03c00da2022-02-24 10:25:00 -0800703
Naman Gupta41d70c22022-11-21 15:29:52 -0800704 ContextDataCopier coppier(f.fetcher->context(), f.channel_index, f.log_type,
705 event_loop_);
Brian Smartt03c00da2022-02-24 10:25:00 -0800706
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700707 aos::monotonic_clock::time_point message_time =
708 static_cast<int>(node_index_) != f.data_node_index
709 ? f.fetcher->context().monotonic_remote_time
710 : f.fetcher->context().monotonic_event_time;
711 writer->CopyMessage(&coppier, source_node_boot_uuid, start, message_time);
Austin Schuh48d10d62022-10-16 22:19:23 -0700712 RecordCreateMessageTime(start, coppier.end_time(), f);
Brian Smartt03c00da2022-02-24 10:25:00 -0800713
Brian Smartt796cca02022-04-12 15:07:21 -0700714 VLOG(2) << "Wrote data as node " << FlatbufferToJson(node_)
715 << " for channel "
Brian Smartt03c00da2022-02-24 10:25:00 -0800716 << configuration::CleanedChannelToString(f.fetcher->channel())
Alexei Strotsbc082d82023-05-03 08:43:42 -0700717 << " to " << writer->name();
Brian Smartt03c00da2022-02-24 10:25:00 -0800718 }
719}
720
Brian Smartt796cca02022-04-12 15:07:21 -0700721void Logger::WriteTimestamps(NewDataWriter *timestamp_writer,
722 const FetcherStruct &f) {
Brian Smartt03c00da2022-02-24 10:25:00 -0800723 if (timestamp_writer != nullptr) {
724 // And now handle timestamps.
Brian Smartt03c00da2022-02-24 10:25:00 -0800725
726 // Tell our writer that we know something about the remote boot.
727 timestamp_writer->UpdateRemote(
728 f.data_node_index, f.fetcher->context().source_boot_uuid,
729 f.fetcher->context().monotonic_remote_time,
730 f.fetcher->context().monotonic_event_time, f.reliable_forwarding);
Austin Schuh48d10d62022-10-16 22:19:23 -0700731
732 const auto start = event_loop_->monotonic_now();
733 ContextDataCopier coppier(f.fetcher->context(), f.channel_index,
Naman Gupta41d70c22022-11-21 15:29:52 -0800734 LogType::kLogDeliveryTimeOnly, event_loop_);
Austin Schuh48d10d62022-10-16 22:19:23 -0700735
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700736 timestamp_writer->CopyMessage(&coppier, event_loop_->boot_uuid(), start,
737 f.fetcher->context().monotonic_event_time);
Austin Schuh48d10d62022-10-16 22:19:23 -0700738 RecordCreateMessageTime(start, coppier.end_time(), f);
Brian Smartt03c00da2022-02-24 10:25:00 -0800739
Brian Smartt796cca02022-04-12 15:07:21 -0700740 VLOG(2) << "Wrote timestamps as node " << FlatbufferToJson(node_)
741 << " for channel "
Brian Smartt03c00da2022-02-24 10:25:00 -0800742 << configuration::CleanedChannelToString(f.fetcher->channel())
Alexei Strotsbc082d82023-05-03 08:43:42 -0700743 << " to " << timestamp_writer->name() << " timestamp";
Brian Smartt03c00da2022-02-24 10:25:00 -0800744 }
745}
746
Brian Smartt796cca02022-04-12 15:07:21 -0700747void Logger::WriteContent(NewDataWriter *contents_writer,
748 const FetcherStruct &f) {
Brian Smartt03c00da2022-02-24 10:25:00 -0800749 if (contents_writer != nullptr) {
750 const auto start = event_loop_->monotonic_now();
751 // And now handle the special message contents channel. Copy the
752 // message into a FlatBufferBuilder and save it to disk.
Brian Smartt03c00da2022-02-24 10:25:00 -0800753 const RemoteMessage *msg =
754 flatbuffers::GetRoot<RemoteMessage>(f.fetcher->context().data);
755
756 CHECK(msg->has_boot_uuid()) << ": " << aos::FlatbufferToJson(msg);
Brian Smartt03c00da2022-02-24 10:25:00 -0800757 // Translate from the channel index that the event loop uses to the
758 // channel index in the log file.
Austin Schuhf2d0e682022-10-16 14:20:58 -0700759 const int channel_index =
760 event_loop_to_logged_channel_index_[msg->channel_index()];
Brian Smartt03c00da2022-02-24 10:25:00 -0800761
762 const aos::monotonic_clock::time_point monotonic_timestamp_time =
763 f.fetcher->context().monotonic_event_time;
Brian Smartt03c00da2022-02-24 10:25:00 -0800764
Brian Smartt03c00da2022-02-24 10:25:00 -0800765 // Timestamps tell us information about what happened too!
766 // Capture any reboots so UpdateRemote is properly recorded.
767 contents_writer->UpdateBoot(UUID::FromVector(msg->boot_uuid()));
768
769 // Start with recording info about the data flowing from our node to the
770 // remote.
771 const bool reliable =
772 f.channel_reliable_contents.size() != 0u
773 ? f.channel_reliable_contents[msg->channel_index()]
774 : f.reliable_contents;
775
Brian Smartt796cca02022-04-12 15:07:21 -0700776 contents_writer->UpdateRemote(
777 node_index_, event_loop_->boot_uuid(),
Brian Smartt03c00da2022-02-24 10:25:00 -0800778 monotonic_clock::time_point(
779 chrono::nanoseconds(msg->monotonic_remote_time())),
780 monotonic_clock::time_point(
781 chrono::nanoseconds(msg->monotonic_sent_time())),
782 reliable, monotonic_timestamp_time);
783
Austin Schuh48d10d62022-10-16 22:19:23 -0700784 RemoteMessageCopier coppier(msg, channel_index, monotonic_timestamp_time,
Naman Gupta41d70c22022-11-21 15:29:52 -0800785 event_loop_);
Austin Schuh48d10d62022-10-16 22:19:23 -0700786
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700787 contents_writer->CopyMessage(
788 &coppier, UUID::FromVector(msg->boot_uuid()), start,
789 monotonic_clock::time_point(
790 chrono::nanoseconds(msg->monotonic_sent_time())));
Austin Schuh48d10d62022-10-16 22:19:23 -0700791
792 RecordCreateMessageTime(start, coppier.end_time(), f);
Brian Smartt03c00da2022-02-24 10:25:00 -0800793 }
794}
795
796void Logger::WriteFetchedRecord(FetcherStruct &f) {
797 WriteData(f.writer, f);
798 WriteTimestamps(f.timestamp_writer, f);
799 WriteContent(f.contents_writer, f);
800}
801
Austin Schuh08dba8f2023-05-01 08:29:30 -0700802std::pair<bool, monotonic_clock::time_point> Logger::LogUntil(
803 monotonic_clock::time_point t) {
804 bool wrote_messages = false;
805 monotonic_clock::time_point newest_record = monotonic_clock::min_time;
Brian Smartt03c00da2022-02-24 10:25:00 -0800806
Austin Schuh2d612c82023-07-17 13:37:48 -0700807 log_until_time_ = t;
808
Austin Schuhb06f03b2021-02-17 22:00:37 -0800809 // Grab the latest ServerStatistics message. This will always have the
810 // oppertunity to be >= to the current time, so it will always represent any
811 // reboots which may have happened.
812 WriteMissingTimestamps();
813
814 // Write each channel to disk, one at a time.
815 for (FetcherStruct &f : fetchers_) {
Austin Schuh08dba8f2023-05-01 08:29:30 -0700816 if (f.fetcher->context().data != nullptr) {
817 newest_record =
818 std::max(newest_record, f.fetcher->context().monotonic_event_time);
819 }
820
Austin Schuhb06f03b2021-02-17 22:00:37 -0800821 while (true) {
822 if (f.written) {
823 const auto start = event_loop_->monotonic_now();
Austin Schuh2d612c82023-07-17 13:37:48 -0700824 const bool got_new =
825 f.fetcher->FetchNextIf(std::ref(fetch_next_if_fn_));
Austin Schuhb06f03b2021-02-17 22:00:37 -0800826 const auto end = event_loop_->monotonic_now();
827 RecordFetchResult(start, end, got_new, &f);
828 if (!got_new) {
829 VLOG(2) << "No new data on "
830 << configuration::CleanedChannelToString(
831 f.fetcher->channel());
832 break;
833 }
Austin Schuh08dba8f2023-05-01 08:29:30 -0700834 newest_record =
835 std::max(newest_record, f.fetcher->context().monotonic_event_time);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800836 f.written = false;
837 }
838
839 // TODO(james): Write tests to exercise this logic.
Austin Schuh2d612c82023-07-17 13:37:48 -0700840 CHECK_LE(f.fetcher->context().monotonic_event_time, t);
841
842 // At startup, we can end up grabbing a message at the current time.
843 // Ignore it.
844 if (f.fetcher->context().monotonic_event_time == t) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800845 break;
846 }
Austin Schuhb06f03b2021-02-17 22:00:37 -0800847
Brian Smartt03c00da2022-02-24 10:25:00 -0800848 WriteFetchedRecord(f);
Austin Schuh08dba8f2023-05-01 08:29:30 -0700849 wrote_messages = true;
Austin Schuhb06f03b2021-02-17 22:00:37 -0800850
851 f.written = true;
852 }
853 }
854 last_synchronized_time_ = t;
Brian Smartt03c00da2022-02-24 10:25:00 -0800855
Austin Schuh08dba8f2023-05-01 08:29:30 -0700856 return std::make_pair(wrote_messages, newest_record);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800857}
858
Austin Schuh30586902021-03-30 22:54:08 -0700859void Logger::DoLogData(const monotonic_clock::time_point end_time,
860 bool run_on_logged) {
Austin Schuh2d612c82023-07-17 13:37:48 -0700861 if (end_time < last_synchronized_time_) return;
862
863 DCHECK(is_started());
Austin Schuhb06f03b2021-02-17 22:00:37 -0800864 // We want to guarantee that messages aren't out of order by more than
865 // max_out_of_order_duration. To do this, we need sync points. Every write
866 // cycle should be a sync point.
867
868 do {
869 // Move the sync point up by at most polling_period. This forces one sync
870 // per iteration, even if it is small.
871 LogUntil(std::min(last_synchronized_time_ + polling_period_, end_time));
872
Austin Schuh30586902021-03-30 22:54:08 -0700873 if (run_on_logged) {
Austin Schuh2f864452023-07-17 14:53:08 -0700874 on_logged_period_(last_synchronized_time_);
Austin Schuh30586902021-03-30 22:54:08 -0700875 }
Austin Schuhb06f03b2021-02-17 22:00:37 -0800876
877 // If we missed cycles, we could be pretty far behind. Spin until we are
878 // caught up.
879 } while (last_synchronized_time_ + polling_period_ < end_time);
880}
881
882void Logger::RecordFetchResult(aos::monotonic_clock::time_point start,
883 aos::monotonic_clock::time_point end,
884 bool got_new, FetcherStruct *fetcher) {
885 const auto duration = end - start;
886 if (!got_new) {
887 ++total_nop_fetch_count_;
888 total_nop_fetch_time_ += duration;
889 return;
890 }
891 ++total_message_fetch_count_;
892 total_message_fetch_bytes_ += fetcher->fetcher->context().size;
893 total_message_fetch_time_ += duration;
894 if (duration > max_message_fetch_time_) {
895 max_message_fetch_time_ = duration;
896 max_message_fetch_time_channel_ = fetcher->channel_index;
897 max_message_fetch_time_size_ = fetcher->fetcher->context().size;
898 }
899}
900
901void Logger::RecordCreateMessageTime(aos::monotonic_clock::time_point start,
902 aos::monotonic_clock::time_point end,
Brian Smartt03c00da2022-02-24 10:25:00 -0800903 const FetcherStruct &fetcher) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800904 const auto duration = end - start;
905 total_copy_time_ += duration;
906 ++total_copy_count_;
Brian Smartt03c00da2022-02-24 10:25:00 -0800907 total_copy_bytes_ += fetcher.fetcher->context().size;
Austin Schuhb06f03b2021-02-17 22:00:37 -0800908 if (duration > max_copy_time_) {
909 max_copy_time_ = duration;
Brian Smartt03c00da2022-02-24 10:25:00 -0800910 max_copy_time_channel_ = fetcher.channel_index;
911 max_copy_time_size_ = fetcher.fetcher->context().size;
Austin Schuhb06f03b2021-02-17 22:00:37 -0800912 }
913}
914
915} // namespace logger
916} // namespace aos