blob: 3e790b63eb9f2afa121cb9920d35b156e018f4e5 [file] [log] [blame]
James Kuszmaul38735e82019-12-07 16:42:06 -08001#include "aos/events/logging/logger.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -08002
3#include <fcntl.h>
Austin Schuh4c4e0092019-12-22 16:18:03 -08004#include <limits.h>
Austin Schuhe309d2a2019-11-29 13:25:21 -08005#include <sys/stat.h>
6#include <sys/types.h>
7#include <sys/uio.h>
8#include <vector>
9
Austin Schuhe309d2a2019-11-29 13:25:21 -080010#include "absl/types/span.h"
11#include "aos/events/event_loop.h"
James Kuszmaul38735e82019-12-07 16:42:06 -080012#include "aos/events/logging/logger_generated.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080013#include "aos/flatbuffer_merge.h"
Austin Schuh288479d2019-12-18 19:47:52 -080014#include "aos/network/team_number.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080015#include "aos/time/time.h"
16#include "flatbuffers/flatbuffers.h"
17
Austin Schuh15649d62019-12-28 16:36:38 -080018DEFINE_bool(skip_missing_forwarding_entries, false,
19 "If true, drop any forwarding entries with missing data. If "
20 "false, CHECK.");
Austin Schuhe309d2a2019-11-29 13:25:21 -080021
22namespace aos {
23namespace logger {
24
25namespace chrono = std::chrono;
26
Austin Schuhe309d2a2019-11-29 13:25:21 -080027Logger::Logger(DetachedBufferWriter *writer, EventLoop *event_loop,
28 std::chrono::milliseconds polling_period)
29 : event_loop_(event_loop),
30 writer_(writer),
31 timer_handler_(event_loop_->AddTimer([this]() { DoLogData(); })),
32 polling_period_(polling_period) {
33 for (const Channel *channel : *event_loop_->configuration()->channels()) {
34 FetcherStruct fs;
Austin Schuh15649d62019-12-28 16:36:38 -080035 const bool is_readable =
36 configuration::ChannelIsReadableOnNode(channel, event_loop_->node());
37 const bool log_message = configuration::ChannelMessageIsLoggedOnNode(
38 channel, event_loop_->node()) &&
39 is_readable;
40
41 const bool log_delivery_times =
42 (event_loop_->node() == nullptr)
43 ? false
44 : configuration::ConnectionDeliveryTimeIsLoggedOnNode(
45 channel, event_loop_->node(), event_loop_->node());
46
47 if (log_message || log_delivery_times) {
48 fs.fetcher = event_loop->MakeRawFetcher(channel);
49 VLOG(1) << "Logging channel "
50 << configuration::CleanedChannelToString(channel);
51
52 if (log_delivery_times) {
53 if (log_message) {
54 VLOG(1) << " Logging message and delivery times";
55 fs.log_type = LogType::kLogMessageAndDeliveryTime;
56 } else {
57 VLOG(1) << " Logging delivery times only";
58 fs.log_type = LogType::kLogDeliveryTimeOnly;
59 }
60 } else {
61 // We don't have a particularly great use case right now for logging a
62 // forwarded message, but either not logging the delivery times, or
63 // logging them on another node. Fail rather than produce bad results.
64 CHECK(configuration::ChannelIsSendableOnNode(channel,
65 event_loop_->node()))
66 << ": Logger only knows how to log remote messages with "
67 "forwarding timestamps.";
68 VLOG(1) << " Logging message only";
69 fs.log_type = LogType::kLogMessage;
70 }
71 }
72
Austin Schuhe309d2a2019-11-29 13:25:21 -080073 fs.written = false;
74 fetchers_.emplace_back(std::move(fs));
75 }
76
77 // When things start, we want to log the header, then the most recent messages
78 // available on each fetcher to capture the previous state, then start
79 // polling.
80 event_loop_->OnRun([this, polling_period]() {
81 // Grab data from each channel right before we declare the log file started
82 // so we can capture the latest message on each channel. This lets us have
83 // non periodic messages with configuration that now get logged.
84 for (FetcherStruct &f : fetchers_) {
Austin Schuh15649d62019-12-28 16:36:38 -080085 if (f.fetcher.get() != nullptr) {
86 f.written = !f.fetcher->Fetch();
87 }
Austin Schuhe309d2a2019-11-29 13:25:21 -080088 }
89
90 // We need to pick a point in time to declare the log file "started". This
91 // starts here. It needs to be after everything is fetched so that the
92 // fetchers are all pointed at the most recent message before the start
93 // time.
94 const monotonic_clock::time_point monotonic_now =
95 event_loop_->monotonic_now();
96 const realtime_clock::time_point realtime_now = event_loop_->realtime_now();
97 last_synchronized_time_ = monotonic_now;
98
99 {
100 // Now write the header with this timestamp in it.
101 flatbuffers::FlatBufferBuilder fbb;
102 fbb.ForceDefaults(1);
103
104 flatbuffers::Offset<aos::Configuration> configuration_offset =
105 CopyFlatBuffer(event_loop_->configuration(), &fbb);
106
Austin Schuh288479d2019-12-18 19:47:52 -0800107 flatbuffers::Offset<flatbuffers::String> string_offset =
108 fbb.CreateString(network::GetHostname());
109
Austin Schuhfd960622020-01-01 13:22:55 -0800110 flatbuffers::Offset<Node> node_offset;
111 if (event_loop_->node() != nullptr) {
112 node_offset = CopyFlatBuffer(event_loop_->node(), &fbb);
113 }
Austin Schuh15649d62019-12-28 16:36:38 -0800114 LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node());
115
Austin Schuhe309d2a2019-11-29 13:25:21 -0800116 aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
117
Austin Schuh288479d2019-12-18 19:47:52 -0800118 log_file_header_builder.add_name(string_offset);
119
Austin Schuhfd960622020-01-01 13:22:55 -0800120 // Only add the node if we are running in a multinode configuration.
121 if (event_loop_->node() != nullptr) {
122 log_file_header_builder.add_node(node_offset);
123 }
Austin Schuh15649d62019-12-28 16:36:38 -0800124
Austin Schuhe309d2a2019-11-29 13:25:21 -0800125 log_file_header_builder.add_configuration(configuration_offset);
126 // The worst case theoretical out of order is the polling period times 2.
127 // One message could get logged right after the boundary, but be for right
128 // before the next boundary. And the reverse could happen for another
129 // message. Report back 3x to be extra safe, and because the cost isn't
130 // huge on the read side.
131 log_file_header_builder.add_max_out_of_order_duration(
132 std::chrono::duration_cast<std::chrono::nanoseconds>(3 *
133 polling_period)
134 .count());
135
Austin Schuh629c9172019-12-23 20:34:43 -0800136 log_file_header_builder.add_monotonic_start_time(
Austin Schuhe309d2a2019-11-29 13:25:21 -0800137 std::chrono::duration_cast<std::chrono::nanoseconds>(
138 monotonic_now.time_since_epoch())
139 .count());
Austin Schuh629c9172019-12-23 20:34:43 -0800140 log_file_header_builder.add_realtime_start_time(
Austin Schuhe309d2a2019-11-29 13:25:21 -0800141 std::chrono::duration_cast<std::chrono::nanoseconds>(
142 realtime_now.time_since_epoch())
143 .count());
144
145 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
146 writer_->QueueSizedFlatbuffer(&fbb);
147 }
148
149 timer_handler_->Setup(event_loop_->monotonic_now() + polling_period,
150 polling_period);
151 });
152}
153
154void Logger::DoLogData() {
155 // We want to guarentee that messages aren't out of order by more than
156 // max_out_of_order_duration. To do this, we need sync points. Every write
157 // cycle should be a sync point.
158 const monotonic_clock::time_point monotonic_now = monotonic_clock::now();
159
160 do {
161 // Move the sync point up by at most polling_period. This forces one sync
162 // per iteration, even if it is small.
163 last_synchronized_time_ =
164 std::min(last_synchronized_time_ + polling_period_, monotonic_now);
165 size_t channel_index = 0;
166 // Write each channel to disk, one at a time.
167 for (FetcherStruct &f : fetchers_) {
Austin Schuh15649d62019-12-28 16:36:38 -0800168 // Skip any channels which we aren't supposed to log.
169 if (f.fetcher.get() != nullptr) {
170 while (true) {
171 if (f.written) {
172 if (!f.fetcher->FetchNext()) {
173 VLOG(2) << "No new data on "
174 << configuration::CleanedChannelToString(
175 f.fetcher->channel());
176 break;
177 } else {
178 f.written = false;
179 }
Austin Schuhe309d2a2019-11-29 13:25:21 -0800180 }
Austin Schuhe309d2a2019-11-29 13:25:21 -0800181
Austin Schuh15649d62019-12-28 16:36:38 -0800182 CHECK(!f.written);
183
184 // TODO(james): Write tests to exercise this logic.
185 if (f.fetcher->context().monotonic_event_time <
186 last_synchronized_time_) {
187 // Write!
188 flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
189 max_header_size_);
190 fbb.ForceDefaults(1);
191
192 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
193 channel_index, f.log_type));
194
195 VLOG(2) << "Writing data for channel "
196 << configuration::CleanedChannelToString(
197 f.fetcher->channel());
198
199 max_header_size_ = std::max(
200 max_header_size_, fbb.GetSize() - f.fetcher->context().size);
201 writer_->QueueSizedFlatbuffer(&fbb);
202
203 f.written = true;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800204 } else {
Austin Schuh15649d62019-12-28 16:36:38 -0800205 break;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800206 }
207 }
Austin Schuhe309d2a2019-11-29 13:25:21 -0800208 }
209
210 ++channel_index;
211 }
212
213 CHECK_EQ(channel_index, fetchers_.size());
214
215 // If we missed cycles, we could be pretty far behind. Spin until we are
216 // caught up.
217 } while (last_synchronized_time_ + polling_period_ < monotonic_now);
218
219 writer_->Flush();
220}
221
Austin Schuh05b70472020-01-01 17:11:17 -0800222LogReader::LogReader(std::string_view filename)
223 : sorted_message_reader_(filename) {
Austin Schuhe309d2a2019-11-29 13:25:21 -0800224 channels_.resize(configuration()->channels()->size());
Austin Schuhe309d2a2019-11-29 13:25:21 -0800225}
226
James Kuszmaul7daef362019-12-31 18:28:17 -0800227LogReader::~LogReader() {
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800228 Deregister();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800229}
230
Austin Schuh15649d62019-12-28 16:36:38 -0800231const Configuration *LogReader::configuration() const {
Austin Schuh05b70472020-01-01 17:11:17 -0800232 return sorted_message_reader_.configuration();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800233}
234
Austin Schuh05b70472020-01-01 17:11:17 -0800235const Node *LogReader::node() const { return sorted_message_reader_.node(); }
Austin Schuh15649d62019-12-28 16:36:38 -0800236
Austin Schuhe309d2a2019-11-29 13:25:21 -0800237monotonic_clock::time_point LogReader::monotonic_start_time() {
Austin Schuh05b70472020-01-01 17:11:17 -0800238 return sorted_message_reader_.monotonic_start_time();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800239}
240
241realtime_clock::time_point LogReader::realtime_start_time() {
Austin Schuh05b70472020-01-01 17:11:17 -0800242 return sorted_message_reader_.realtime_start_time();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800243}
244
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800245void LogReader::Register() {
246 event_loop_factory_unique_ptr_ =
247 std::make_unique<SimulatedEventLoopFactory>(configuration(), node());
248 Register(event_loop_factory_unique_ptr_.get());
249}
250
Austin Schuh92547522019-12-28 14:33:43 -0800251void LogReader::Register(SimulatedEventLoopFactory *event_loop_factory) {
Austin Schuh92547522019-12-28 14:33:43 -0800252 event_loop_factory_ = event_loop_factory;
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800253 event_loop_unique_ptr_ = event_loop_factory_->MakeEventLoop("log_reader");
Austin Schuh92547522019-12-28 14:33:43 -0800254 // We don't run timing reports when trying to print out logged data, because
255 // otherwise we would end up printing out the timing reports themselves...
256 // This is only really relevant when we are replaying into a simulation.
257 event_loop_unique_ptr_->SkipTimingReport();
258
259 Register(event_loop_unique_ptr_.get());
260 event_loop_factory_->RunFor(monotonic_start_time() -
261 event_loop_factory_->monotonic_now());
262}
263
Austin Schuhe309d2a2019-11-29 13:25:21 -0800264void LogReader::Register(EventLoop *event_loop) {
265 event_loop_ = event_loop;
266
Austin Schuh39788ff2019-12-01 18:22:57 -0800267 // Otherwise we replay the timing report and try to resend it...
268 event_loop_->SkipTimingReport();
269
Austin Schuhe309d2a2019-11-29 13:25:21 -0800270 for (size_t i = 0; i < channels_.size(); ++i) {
271 CHECK_EQ(configuration()->channels()->Get(i)->name(),
272 event_loop_->configuration()->channels()->Get(i)->name());
273 CHECK_EQ(configuration()->channels()->Get(i)->type(),
274 event_loop_->configuration()->channels()->Get(i)->type());
275
Austin Schuh05b70472020-01-01 17:11:17 -0800276 channels_[i] = event_loop_->MakeRawSender(
Austin Schuhe309d2a2019-11-29 13:25:21 -0800277 event_loop_->configuration()->channels()->Get(i));
278 }
279
280 timer_handler_ = event_loop_->AddTimer([this]() {
Austin Schuh05b70472020-01-01 17:11:17 -0800281 monotonic_clock::time_point channel_timestamp;
282 int channel_index;
283 FlatbufferVector<MessageHeader> channel_data =
284 FlatbufferVector<MessageHeader>::Empty();
285
286 std::tie(channel_timestamp, channel_index, channel_data) =
287 sorted_message_reader_.PopOldestChannel();
288
Austin Schuhe309d2a2019-11-29 13:25:21 -0800289 const monotonic_clock::time_point monotonic_now =
Austin Schuhad154822019-12-27 15:45:13 -0800290 event_loop_->context().monotonic_event_time;
Austin Schuh05b70472020-01-01 17:11:17 -0800291 CHECK(monotonic_now == channel_timestamp)
Austin Schuhe309d2a2019-11-29 13:25:21 -0800292 << ": Now " << monotonic_now.time_since_epoch().count()
Austin Schuh05b70472020-01-01 17:11:17 -0800293 << " trying to send " << channel_timestamp.time_since_epoch().count();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800294
Austin Schuh05b70472020-01-01 17:11:17 -0800295 if (channel_timestamp > monotonic_start_time() ||
Austin Schuh15649d62019-12-28 16:36:38 -0800296 event_loop_factory_ != nullptr) {
297 if (!FLAGS_skip_missing_forwarding_entries ||
Austin Schuh05b70472020-01-01 17:11:17 -0800298 channel_data.message().data() != nullptr) {
299 CHECK(channel_data.message().data() != nullptr)
300 << ": Got a message without data. Forwarding entry which was "
301 "not "
Austin Schuh15649d62019-12-28 16:36:38 -0800302 "matched? Use --skip_missing_forwarding_entries to ignore "
303 "this.";
Austin Schuh92547522019-12-28 14:33:43 -0800304
Austin Schuh15649d62019-12-28 16:36:38 -0800305 // If we have access to the factory, use it to fix the realtime time.
306 if (event_loop_factory_ != nullptr) {
307 event_loop_factory_->SetRealtimeOffset(
Austin Schuh05b70472020-01-01 17:11:17 -0800308 monotonic_clock::time_point(chrono::nanoseconds(
309 channel_data.message().monotonic_sent_time())),
310 realtime_clock::time_point(chrono::nanoseconds(
311 channel_data.message().realtime_sent_time())));
Austin Schuh15649d62019-12-28 16:36:38 -0800312 }
313
Austin Schuh05b70472020-01-01 17:11:17 -0800314 channels_[channel_index]->Send(
315 channel_data.message().data()->Data(),
316 channel_data.message().data()->size(),
317 monotonic_clock::time_point(chrono::nanoseconds(
318 channel_data.message().monotonic_remote_time())),
319 realtime_clock::time_point(chrono::nanoseconds(
320 channel_data.message().realtime_remote_time())),
321 channel_data.message().remote_queue_index());
Austin Schuh92547522019-12-28 14:33:43 -0800322 }
Austin Schuhe309d2a2019-11-29 13:25:21 -0800323 } else {
324 LOG(WARNING) << "Not sending data from before the start of the log file. "
Austin Schuh05b70472020-01-01 17:11:17 -0800325 << channel_timestamp.time_since_epoch().count() << " start "
Austin Schuhe309d2a2019-11-29 13:25:21 -0800326 << monotonic_start_time().time_since_epoch().count() << " "
Austin Schuh05b70472020-01-01 17:11:17 -0800327 << FlatbufferToJson(channel_data);
Austin Schuhe309d2a2019-11-29 13:25:21 -0800328 }
329
Austin Schuh05b70472020-01-01 17:11:17 -0800330 if (sorted_message_reader_.active_channel_count() > 0u) {
331 timer_handler_->Setup(sorted_message_reader_.oldest_message().first);
Austin Schuhe309d2a2019-11-29 13:25:21 -0800332 }
333 });
334
Austin Schuh05b70472020-01-01 17:11:17 -0800335 if (sorted_message_reader_.active_channel_count() > 0u) {
336 event_loop_->OnRun([this]() {
337 timer_handler_->Setup(sorted_message_reader_.oldest_message().first);
338 });
Austin Schuhe309d2a2019-11-29 13:25:21 -0800339 }
340}
341
342void LogReader::Deregister() {
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800343 // Make sure that things get destroyed in the correct order, rather than
344 // relying on getting the order correct in the class definition.
Austin Schuhe309d2a2019-11-29 13:25:21 -0800345 for (size_t i = 0; i < channels_.size(); ++i) {
Austin Schuh05b70472020-01-01 17:11:17 -0800346 channels_[i].reset();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800347 }
Austin Schuh92547522019-12-28 14:33:43 -0800348
Austin Schuh92547522019-12-28 14:33:43 -0800349 event_loop_unique_ptr_.reset();
350 event_loop_ = nullptr;
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800351 event_loop_factory_unique_ptr_.reset();
352 event_loop_factory_ = nullptr;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800353}
354
Austin Schuhe309d2a2019-11-29 13:25:21 -0800355} // namespace logger
356} // namespace aos