blob: c7f64ae2c0e593933cc49c6c570d59516a3a2443 [file] [log] [blame]
James Kuszmaul38735e82019-12-07 16:42:06 -08001#include "aos/events/logging/logger.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -08002
3#include <fcntl.h>
Austin Schuh4c4e0092019-12-22 16:18:03 -08004#include <limits.h>
Austin Schuhe309d2a2019-11-29 13:25:21 -08005#include <sys/stat.h>
6#include <sys/types.h>
7#include <sys/uio.h>
8#include <vector>
9
Austin Schuhe309d2a2019-11-29 13:25:21 -080010#include "absl/types/span.h"
11#include "aos/events/event_loop.h"
James Kuszmaul38735e82019-12-07 16:42:06 -080012#include "aos/events/logging/logger_generated.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080013#include "aos/flatbuffer_merge.h"
Austin Schuh288479d2019-12-18 19:47:52 -080014#include "aos/network/team_number.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080015#include "aos/time/time.h"
16#include "flatbuffers/flatbuffers.h"
17
Austin Schuh15649d62019-12-28 16:36:38 -080018DEFINE_bool(skip_missing_forwarding_entries, false,
19 "If true, drop any forwarding entries with missing data. If "
20 "false, CHECK.");
Austin Schuhe309d2a2019-11-29 13:25:21 -080021
22namespace aos {
23namespace logger {
24
25namespace chrono = std::chrono;
26
Austin Schuhe309d2a2019-11-29 13:25:21 -080027Logger::Logger(DetachedBufferWriter *writer, EventLoop *event_loop,
28 std::chrono::milliseconds polling_period)
Austin Schuh6f3babe2020-01-26 20:34:50 -080029 : Logger(std::make_unique<LocalLogNamer>(writer, event_loop->node()),
30 event_loop, polling_period) {}
31
32Logger::Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
33 std::chrono::milliseconds polling_period)
Austin Schuhe309d2a2019-11-29 13:25:21 -080034 : event_loop_(event_loop),
Austin Schuh6f3babe2020-01-26 20:34:50 -080035 log_namer_(std::move(log_namer)),
Austin Schuhe309d2a2019-11-29 13:25:21 -080036 timer_handler_(event_loop_->AddTimer([this]() { DoLogData(); })),
37 polling_period_(polling_period) {
Austin Schuh6f3babe2020-01-26 20:34:50 -080038 VLOG(1) << "Starting logger for " << FlatbufferToJson(event_loop_->node());
39 int channel_index = 0;
Austin Schuhe309d2a2019-11-29 13:25:21 -080040 for (const Channel *channel : *event_loop_->configuration()->channels()) {
41 FetcherStruct fs;
Austin Schuh6f3babe2020-01-26 20:34:50 -080042 const bool is_local =
43 configuration::ChannelIsSendableOnNode(channel, event_loop_->node());
44
Austin Schuh15649d62019-12-28 16:36:38 -080045 const bool is_readable =
46 configuration::ChannelIsReadableOnNode(channel, event_loop_->node());
47 const bool log_message = configuration::ChannelMessageIsLoggedOnNode(
48 channel, event_loop_->node()) &&
49 is_readable;
50
51 const bool log_delivery_times =
52 (event_loop_->node() == nullptr)
53 ? false
54 : configuration::ConnectionDeliveryTimeIsLoggedOnNode(
55 channel, event_loop_->node(), event_loop_->node());
56
57 if (log_message || log_delivery_times) {
58 fs.fetcher = event_loop->MakeRawFetcher(channel);
59 VLOG(1) << "Logging channel "
60 << configuration::CleanedChannelToString(channel);
61
62 if (log_delivery_times) {
Austin Schuh6f3babe2020-01-26 20:34:50 -080063 VLOG(1) << " Delivery times";
64 fs.timestamp_writer = log_namer_->MakeTimestampWriter(channel);
Austin Schuh15649d62019-12-28 16:36:38 -080065 }
Austin Schuh6f3babe2020-01-26 20:34:50 -080066 if (log_message) {
67 VLOG(1) << " Data";
68 fs.writer = log_namer_->MakeWriter(channel);
69 if (!is_local) {
70 fs.log_type = LogType::kLogRemoteMessage;
71 }
72 }
73 fs.channel_index = channel_index;
74 fs.written = false;
75 fetchers_.emplace_back(std::move(fs));
Austin Schuh15649d62019-12-28 16:36:38 -080076 }
Austin Schuh6f3babe2020-01-26 20:34:50 -080077 ++channel_index;
Austin Schuhe309d2a2019-11-29 13:25:21 -080078 }
79
80 // When things start, we want to log the header, then the most recent messages
81 // available on each fetcher to capture the previous state, then start
82 // polling.
83 event_loop_->OnRun([this, polling_period]() {
84 // Grab data from each channel right before we declare the log file started
85 // so we can capture the latest message on each channel. This lets us have
86 // non periodic messages with configuration that now get logged.
87 for (FetcherStruct &f : fetchers_) {
Austin Schuh6f3babe2020-01-26 20:34:50 -080088 f.written = !f.fetcher->Fetch();
Austin Schuhe309d2a2019-11-29 13:25:21 -080089 }
90
91 // We need to pick a point in time to declare the log file "started". This
92 // starts here. It needs to be after everything is fetched so that the
93 // fetchers are all pointed at the most recent message before the start
94 // time.
Austin Schuhfa895892020-01-07 20:07:41 -080095 monotonic_start_time_ = event_loop_->monotonic_now();
96 realtime_start_time_ = event_loop_->realtime_now();
97 last_synchronized_time_ = monotonic_start_time_;
Austin Schuhe309d2a2019-11-29 13:25:21 -080098
Austin Schuhcde938c2020-02-02 17:30:07 -080099 LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node())
100 << " start_time " << monotonic_start_time_;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800101
Austin Schuhfa895892020-01-07 20:07:41 -0800102 WriteHeader();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800103
104 timer_handler_->Setup(event_loop_->monotonic_now() + polling_period,
105 polling_period);
106 });
107}
108
Austin Schuhcde938c2020-02-02 17:30:07 -0800109// TODO(austin): Set the remote start time to the first time we see a remote
110// message when we are logging those messages separate? Need to signal what to
111// do, or how to get a good timestamp.
112
Austin Schuhfa895892020-01-07 20:07:41 -0800113void Logger::WriteHeader() {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800114 for (const Node *node : log_namer_->nodes()) {
115 WriteHeader(node);
116 }
117}
118void Logger::WriteHeader(const Node *node) {
Austin Schuhfa895892020-01-07 20:07:41 -0800119 // Now write the header with this timestamp in it.
120 flatbuffers::FlatBufferBuilder fbb;
121 fbb.ForceDefaults(1);
122
123 flatbuffers::Offset<aos::Configuration> configuration_offset =
124 CopyFlatBuffer(event_loop_->configuration(), &fbb);
125
126 flatbuffers::Offset<flatbuffers::String> string_offset =
127 fbb.CreateString(network::GetHostname());
128
129 flatbuffers::Offset<Node> node_offset;
130 if (event_loop_->node() != nullptr) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800131 node_offset = CopyFlatBuffer(node, &fbb);
Austin Schuhfa895892020-01-07 20:07:41 -0800132 }
133
134 aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
135
136 log_file_header_builder.add_name(string_offset);
137
138 // Only add the node if we are running in a multinode configuration.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800139 if (node != nullptr) {
Austin Schuhfa895892020-01-07 20:07:41 -0800140 log_file_header_builder.add_node(node_offset);
141 }
142
143 log_file_header_builder.add_configuration(configuration_offset);
144 // The worst case theoretical out of order is the polling period times 2.
145 // One message could get logged right after the boundary, but be for right
146 // before the next boundary. And the reverse could happen for another
147 // message. Report back 3x to be extra safe, and because the cost isn't
148 // huge on the read side.
149 log_file_header_builder.add_max_out_of_order_duration(
150 std::chrono::duration_cast<std::chrono::nanoseconds>(3 * polling_period_)
151 .count());
152
153 log_file_header_builder.add_monotonic_start_time(
154 std::chrono::duration_cast<std::chrono::nanoseconds>(
155 monotonic_start_time_.time_since_epoch())
156 .count());
157 log_file_header_builder.add_realtime_start_time(
158 std::chrono::duration_cast<std::chrono::nanoseconds>(
159 realtime_start_time_.time_since_epoch())
160 .count());
161
162 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
Austin Schuh6f3babe2020-01-26 20:34:50 -0800163 log_namer_->WriteHeader(&fbb, node);
Austin Schuhfa895892020-01-07 20:07:41 -0800164}
165
166void Logger::Rotate(DetachedBufferWriter *writer) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800167 Rotate(std::make_unique<LocalLogNamer>(writer, event_loop_->node()));
168}
169
170void Logger::Rotate(std::unique_ptr<LogNamer> log_namer) {
Austin Schuhfa895892020-01-07 20:07:41 -0800171 // Force data up until now to be written.
172 DoLogData();
173
174 // Swap the writer out, and re-write the header.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800175 log_namer_ = std::move(log_namer);
176
177 // And then update the writers.
178 for (FetcherStruct &f : fetchers_) {
179 const Channel *channel =
180 event_loop_->configuration()->channels()->Get(f.channel_index);
181 if (f.timestamp_writer != nullptr) {
182 f.timestamp_writer = log_namer_->MakeTimestampWriter(channel);
183 }
184 if (f.writer != nullptr) {
185 f.writer = log_namer_->MakeWriter(channel);
186 }
187 }
188
Austin Schuhfa895892020-01-07 20:07:41 -0800189 WriteHeader();
190}
191
Austin Schuhe309d2a2019-11-29 13:25:21 -0800192void Logger::DoLogData() {
193 // We want to guarentee that messages aren't out of order by more than
194 // max_out_of_order_duration. To do this, we need sync points. Every write
195 // cycle should be a sync point.
Austin Schuhfa895892020-01-07 20:07:41 -0800196 const monotonic_clock::time_point monotonic_now =
197 event_loop_->monotonic_now();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800198
199 do {
200 // Move the sync point up by at most polling_period. This forces one sync
201 // per iteration, even if it is small.
202 last_synchronized_time_ =
203 std::min(last_synchronized_time_ + polling_period_, monotonic_now);
Austin Schuhe309d2a2019-11-29 13:25:21 -0800204 // Write each channel to disk, one at a time.
205 for (FetcherStruct &f : fetchers_) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800206 while (true) {
207 if (f.written) {
208 if (!f.fetcher->FetchNext()) {
209 VLOG(2) << "No new data on "
210 << configuration::CleanedChannelToString(
211 f.fetcher->channel());
212 break;
213 } else {
214 f.written = false;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800215 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800216 }
Austin Schuhe309d2a2019-11-29 13:25:21 -0800217
Austin Schuh6f3babe2020-01-26 20:34:50 -0800218 CHECK(!f.written);
Austin Schuh15649d62019-12-28 16:36:38 -0800219
Austin Schuh6f3babe2020-01-26 20:34:50 -0800220 // TODO(james): Write tests to exercise this logic.
221 if (f.fetcher->context().monotonic_event_time <
222 last_synchronized_time_) {
223 if (f.writer != nullptr) {
Austin Schuh15649d62019-12-28 16:36:38 -0800224 // Write!
225 flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
226 max_header_size_);
227 fbb.ForceDefaults(1);
228
229 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
Austin Schuh6f3babe2020-01-26 20:34:50 -0800230 f.channel_index, f.log_type));
Austin Schuh15649d62019-12-28 16:36:38 -0800231
Austin Schuhcde938c2020-02-02 17:30:07 -0800232 VLOG(2) << "Writing data as node "
Austin Schuh6f3babe2020-01-26 20:34:50 -0800233 << FlatbufferToJson(event_loop_->node()) << " for channel "
Austin Schuh15649d62019-12-28 16:36:38 -0800234 << configuration::CleanedChannelToString(
Austin Schuh6f3babe2020-01-26 20:34:50 -0800235 f.fetcher->channel())
Austin Schuhcde938c2020-02-02 17:30:07 -0800236 << " to " << f.writer->filename() << " data "
Austin Schuh6f3babe2020-01-26 20:34:50 -0800237 << FlatbufferToJson(
238 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
239 fbb.GetBufferPointer()));
Austin Schuh15649d62019-12-28 16:36:38 -0800240
241 max_header_size_ = std::max(
242 max_header_size_, fbb.GetSize() - f.fetcher->context().size);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800243 f.writer->QueueSizedFlatbuffer(&fbb);
Austin Schuhe309d2a2019-11-29 13:25:21 -0800244 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800245
246 if (f.timestamp_writer != nullptr) {
247 // And now handle timestamps.
248 flatbuffers::FlatBufferBuilder fbb;
249 fbb.ForceDefaults(1);
250
251 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
252 f.channel_index,
253 LogType::kLogDeliveryTimeOnly));
254
Austin Schuhcde938c2020-02-02 17:30:07 -0800255 VLOG(2) << "Writing timestamps as node "
Austin Schuh6f3babe2020-01-26 20:34:50 -0800256 << FlatbufferToJson(event_loop_->node()) << " for channel "
257 << configuration::CleanedChannelToString(
258 f.fetcher->channel())
Austin Schuhcde938c2020-02-02 17:30:07 -0800259 << " to " << f.timestamp_writer->filename() << " timestamp "
Austin Schuh6f3babe2020-01-26 20:34:50 -0800260 << FlatbufferToJson(
261 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
262 fbb.GetBufferPointer()));
263
264 f.timestamp_writer->QueueSizedFlatbuffer(&fbb);
265 }
266
267 f.written = true;
268 } else {
269 break;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800270 }
Austin Schuhe309d2a2019-11-29 13:25:21 -0800271 }
Austin Schuhe309d2a2019-11-29 13:25:21 -0800272 }
273
Austin Schuhe309d2a2019-11-29 13:25:21 -0800274 // If we missed cycles, we could be pretty far behind. Spin until we are
275 // caught up.
276 } while (last_synchronized_time_ + polling_period_ < monotonic_now);
Austin Schuhe309d2a2019-11-29 13:25:21 -0800277}
278
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800279LogReader::LogReader(std::string_view filename,
280 const Configuration *replay_configuration)
Austin Schuhfa895892020-01-07 20:07:41 -0800281 : LogReader(std::vector<std::string>{std::string(filename)},
282 replay_configuration) {}
283
284LogReader::LogReader(const std::vector<std::string> &filenames,
285 const Configuration *replay_configuration)
Austin Schuh6f3babe2020-01-26 20:34:50 -0800286 : LogReader(std::vector<std::vector<std::string>>{filenames},
287 replay_configuration) {}
288
289LogReader::LogReader(const std::vector<std::vector<std::string>> &filenames,
290 const Configuration *replay_configuration)
291 : filenames_(filenames),
292 log_file_header_(ReadHeader(filenames[0][0])),
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800293 replay_configuration_(replay_configuration) {
Austin Schuh6331ef92020-01-07 18:28:09 -0800294 MakeRemappedConfig();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800295
296 if (!configuration::MultiNode(configuration())) {
297 auto it = channel_mergers_.insert(std::make_pair(nullptr, State{}));
298 State *state = &(it.first->second);
299
300 state->channel_merger = std::make_unique<ChannelMerger>(filenames);
301 }
Austin Schuhe309d2a2019-11-29 13:25:21 -0800302}
303
Austin Schuhfa895892020-01-07 20:07:41 -0800304LogReader::~LogReader() { Deregister(); }
Austin Schuhe309d2a2019-11-29 13:25:21 -0800305
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800306const Configuration *LogReader::logged_configuration() const {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800307 return log_file_header_.message().configuration();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800308}
309
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800310const Configuration *LogReader::configuration() const {
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800311 return remapped_configuration_;
312}
313
Austin Schuh6f3babe2020-01-26 20:34:50 -0800314std::vector<const Node *> LogReader::Nodes() const {
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800315 // Because the Node pointer will only be valid if it actually points to memory
316 // owned by remapped_configuration_, we need to wait for the
317 // remapped_configuration_ to be populated before accessing it.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800318 //
319 // Also, note, that when ever a map is changed, the nodes in here are
320 // invalidated.
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800321 CHECK(remapped_configuration_ != nullptr)
322 << ": Need to call Register before the node() pointer will be valid.";
Austin Schuh6f3babe2020-01-26 20:34:50 -0800323 return configuration::GetNodes(remapped_configuration_);
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800324}
Austin Schuh15649d62019-12-28 16:36:38 -0800325
Austin Schuh6f3babe2020-01-26 20:34:50 -0800326monotonic_clock::time_point LogReader::monotonic_start_time(const Node *node) {
327 auto it = channel_mergers_.find(node);
328 CHECK(it != channel_mergers_.end())
329 << ": Unknown node " << FlatbufferToJson(node);
330 return it->second.channel_merger->monotonic_start_time();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800331}
332
Austin Schuh6f3babe2020-01-26 20:34:50 -0800333realtime_clock::time_point LogReader::realtime_start_time(const Node *node) {
334 auto it = channel_mergers_.find(node);
335 CHECK(it != channel_mergers_.end())
336 << ": Unknown node " << FlatbufferToJson(node);
337 return it->second.channel_merger->realtime_start_time();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800338}
339
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800340void LogReader::Register() {
341 event_loop_factory_unique_ptr_ =
Austin Schuhac0771c2020-01-07 18:36:30 -0800342 std::make_unique<SimulatedEventLoopFactory>(configuration());
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800343 Register(event_loop_factory_unique_ptr_.get());
344}
345
Austin Schuh92547522019-12-28 14:33:43 -0800346void LogReader::Register(SimulatedEventLoopFactory *event_loop_factory) {
Austin Schuh92547522019-12-28 14:33:43 -0800347 event_loop_factory_ = event_loop_factory;
Austin Schuh92547522019-12-28 14:33:43 -0800348
Austin Schuh6f3babe2020-01-26 20:34:50 -0800349 for (const Node *node : configuration::GetNodes(configuration())) {
350 auto it = channel_mergers_.insert(std::make_pair(node, State{}));
351
352 State *state = &(it.first->second);
353
354 state->channel_merger = std::make_unique<ChannelMerger>(filenames_);
355
356 state->node_event_loop_factory =
357 event_loop_factory_->GetNodeEventLoopFactory(node);
358 state->event_loop_unique_ptr =
359 event_loop_factory->MakeEventLoop("log_reader", node);
360
361 Register(state->event_loop_unique_ptr.get());
Austin Schuhcde938c2020-02-02 17:30:07 -0800362 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800363
Austin Schuhcde938c2020-02-02 17:30:07 -0800364 // Basic idea is that we want to
365 // 1) Find the node which booted first.
366 // 2) Setup the clocks so that each clock is at the time it would be at when
367 // the first node booted.
368
369 realtime_clock::time_point earliest_boot_time = realtime_clock::max_time;
370 for (std::pair<const Node *const, State> &state_pair : channel_mergers_) {
371 State *state = &(state_pair.second);
372
373 const realtime_clock::time_point boot_time =
374 state->channel_merger->realtime_start_time() -
375 state->channel_merger->monotonic_start_time().time_since_epoch();
376
377 if (boot_time < earliest_boot_time) {
378 earliest_boot_time = boot_time;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800379 }
380 }
381
Austin Schuhcde938c2020-02-02 17:30:07 -0800382 // We want to start the log file at the last start time of the log files from
383 // all the nodes. Compute how long each node's simulation needs to run to
384 // move time to this point.
385 monotonic_clock::duration run_time = monotonic_clock::duration(0);
386
387 for (std::pair<const Node *const, State> &state_pair : channel_mergers_) {
388 State *state = &(state_pair.second);
389
390 const realtime_clock::time_point boot_time =
391 state->channel_merger->realtime_start_time() -
392 state->channel_merger->monotonic_start_time().time_since_epoch();
393
394 // And start each node's clocks so the realtime clocks line up for the start
395 // times. This will let us start using it, but isn't good enough.
396 state->node_event_loop_factory->SetMonotonicNow(
397 monotonic_clock::time_point(earliest_boot_time - boot_time));
398 state->node_event_loop_factory->SetRealtimeOffset(
399 state->channel_merger->monotonic_start_time(),
400 state->channel_merger->realtime_start_time());
401 run_time =
402 std::max(run_time, state->channel_merger->monotonic_start_time() -
403 state->node_event_loop_factory->monotonic_now());
404 }
405
Austin Schuh6f3babe2020-01-26 20:34:50 -0800406 // Forwarding is tracked per channel. If it is enabled, we want to turn it
407 // off. Otherwise messages replayed will get forwarded across to the other
408 // nodes, and also replayed on the other nodes. This may not satisfy all our
409 // users, but it'll start the discussion.
410 if (configuration::MultiNode(event_loop_factory_->configuration())) {
411 for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
412 const Channel *channel = logged_configuration()->channels()->Get(i);
413 const Node *node = configuration::GetNode(
414 configuration(), channel->source_node()->string_view());
415
416 auto state_pair = channel_mergers_.find(node);
417 CHECK(state_pair != channel_mergers_.end());
418 State *state = &(state_pair->second);
419
420 const Channel *remapped_channel =
421 RemapChannel(state->event_loop, channel);
422
423 event_loop_factory_->DisableForwarding(remapped_channel);
424 }
425 }
426
Austin Schuhcde938c2020-02-02 17:30:07 -0800427 // While we are starting the system up, we might be relying on matching data
428 // to timestamps on log files where the timestamp log file starts before the
429 // data. In this case, it is reasonable to expect missing data.
430 ignore_missing_data_ = true;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800431 event_loop_factory_->RunFor(run_time);
Austin Schuhcde938c2020-02-02 17:30:07 -0800432 // Now that we are running for real, missing data means that the log file is
433 // corrupted or went wrong.
434 ignore_missing_data_ = false;
Austin Schuh92547522019-12-28 14:33:43 -0800435}
436
Austin Schuhe309d2a2019-11-29 13:25:21 -0800437void LogReader::Register(EventLoop *event_loop) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800438 auto state_pair = channel_mergers_.find(event_loop->node());
439 CHECK(state_pair != channel_mergers_.end());
440 State *state = &(state_pair->second);
441
442 state->event_loop = event_loop;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800443
Tyler Chatow67ddb032020-01-12 14:30:04 -0800444 // We don't run timing reports when trying to print out logged data, because
445 // otherwise we would end up printing out the timing reports themselves...
446 // This is only really relevant when we are replaying into a simulation.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800447 event_loop->SkipTimingReport();
448 event_loop->SkipAosLog();
Austin Schuh39788ff2019-12-01 18:22:57 -0800449
Austin Schuh6f3babe2020-01-26 20:34:50 -0800450 state->channel_merger->SetNode(event_loop->node());
Austin Schuhe309d2a2019-11-29 13:25:21 -0800451
Austin Schuh6f3babe2020-01-26 20:34:50 -0800452 state->channels.resize(logged_configuration()->channels()->size());
Austin Schuh6331ef92020-01-07 18:28:09 -0800453
Austin Schuh6f3babe2020-01-26 20:34:50 -0800454 for (size_t i = 0; i < state->channels.size(); ++i) {
455 const Channel *channel =
456 RemapChannel(event_loop, logged_configuration()->channels()->Get(i));
Austin Schuh6331ef92020-01-07 18:28:09 -0800457
Austin Schuh6f3babe2020-01-26 20:34:50 -0800458 state->channels[i] = event_loop->MakeRawSender(channel);
Austin Schuhe309d2a2019-11-29 13:25:21 -0800459 }
460
Austin Schuh6f3babe2020-01-26 20:34:50 -0800461 state->timer_handler = event_loop->AddTimer([this, state]() {
462 if (state->channel_merger->OldestMessage() == monotonic_clock::max_time) {
463 --live_nodes_;
464 if (live_nodes_ == 0) {
465 event_loop_factory_->Exit();
466 }
James Kuszmaul314f1672020-01-03 20:02:08 -0800467 return;
468 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800469 TimestampMerger::DeliveryTimestamp channel_timestamp;
Austin Schuh05b70472020-01-01 17:11:17 -0800470 int channel_index;
471 FlatbufferVector<MessageHeader> channel_data =
472 FlatbufferVector<MessageHeader>::Empty();
473
474 std::tie(channel_timestamp, channel_index, channel_data) =
Austin Schuh6f3babe2020-01-26 20:34:50 -0800475 state->channel_merger->PopOldest();
Austin Schuh05b70472020-01-01 17:11:17 -0800476
Austin Schuhe309d2a2019-11-29 13:25:21 -0800477 const monotonic_clock::time_point monotonic_now =
Austin Schuh6f3babe2020-01-26 20:34:50 -0800478 state->event_loop->context().monotonic_event_time;
479 CHECK(monotonic_now == channel_timestamp.monotonic_event_time)
Austin Schuhe309d2a2019-11-29 13:25:21 -0800480 << ": Now " << monotonic_now.time_since_epoch().count()
Austin Schuh6f3babe2020-01-26 20:34:50 -0800481 << " trying to send "
482 << channel_timestamp.monotonic_event_time.time_since_epoch().count();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800483
Austin Schuh6f3babe2020-01-26 20:34:50 -0800484 if (channel_timestamp.monotonic_event_time >
485 state->channel_merger->monotonic_start_time() ||
Austin Schuh15649d62019-12-28 16:36:38 -0800486 event_loop_factory_ != nullptr) {
Austin Schuhcde938c2020-02-02 17:30:07 -0800487 if ((!ignore_missing_data_ && !FLAGS_skip_missing_forwarding_entries) ||
Austin Schuh05b70472020-01-01 17:11:17 -0800488 channel_data.message().data() != nullptr) {
489 CHECK(channel_data.message().data() != nullptr)
490 << ": Got a message without data. Forwarding entry which was "
Austin Schuh6f3babe2020-01-26 20:34:50 -0800491 "not matched? Use --skip_missing_forwarding_entries to ignore "
Austin Schuh15649d62019-12-28 16:36:38 -0800492 "this.";
Austin Schuh92547522019-12-28 14:33:43 -0800493
Austin Schuh15649d62019-12-28 16:36:38 -0800494 // If we have access to the factory, use it to fix the realtime time.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800495 if (state->node_event_loop_factory != nullptr) {
496 state->node_event_loop_factory->SetRealtimeOffset(
497 channel_timestamp.monotonic_event_time,
498 channel_timestamp.realtime_event_time);
Austin Schuh15649d62019-12-28 16:36:38 -0800499 }
500
Austin Schuh6f3babe2020-01-26 20:34:50 -0800501 state->channels[channel_index]->Send(
Austin Schuh05b70472020-01-01 17:11:17 -0800502 channel_data.message().data()->Data(),
503 channel_data.message().data()->size(),
Austin Schuh6f3babe2020-01-26 20:34:50 -0800504 channel_timestamp.monotonic_remote_time,
505 channel_timestamp.realtime_remote_time,
506 channel_timestamp.remote_queue_index);
Austin Schuh92547522019-12-28 14:33:43 -0800507 }
Austin Schuhe309d2a2019-11-29 13:25:21 -0800508 } else {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800509 LOG(WARNING)
510 << "Not sending data from before the start of the log file. "
511 << channel_timestamp.monotonic_event_time.time_since_epoch().count()
512 << " start " << monotonic_start_time().time_since_epoch().count()
513 << " " << FlatbufferToJson(channel_data);
Austin Schuhe309d2a2019-11-29 13:25:21 -0800514 }
515
Austin Schuh6f3babe2020-01-26 20:34:50 -0800516 const monotonic_clock::time_point next_time =
517 state->channel_merger->OldestMessage();
518 if (next_time != monotonic_clock::max_time) {
519 state->timer_handler->Setup(next_time);
James Kuszmaul314f1672020-01-03 20:02:08 -0800520 } else {
521 // Set a timer up immediately after now to die. If we don't do this, then
522 // the senders waiting on the message we just read will never get called.
Austin Schuheecb9282020-01-08 17:43:30 -0800523 if (event_loop_factory_ != nullptr) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800524 state->timer_handler->Setup(monotonic_now +
525 event_loop_factory_->send_delay() +
526 std::chrono::nanoseconds(1));
Austin Schuheecb9282020-01-08 17:43:30 -0800527 }
Austin Schuhe309d2a2019-11-29 13:25:21 -0800528 }
529 });
530
Austin Schuh6f3babe2020-01-26 20:34:50 -0800531 ++live_nodes_;
532
533 if (state->channel_merger->OldestMessage() != monotonic_clock::max_time) {
534 event_loop->OnRun([state]() {
535 state->timer_handler->Setup(state->channel_merger->OldestMessage());
Austin Schuh05b70472020-01-01 17:11:17 -0800536 });
Austin Schuhe309d2a2019-11-29 13:25:21 -0800537 }
538}
539
540void LogReader::Deregister() {
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800541 // Make sure that things get destroyed in the correct order, rather than
542 // relying on getting the order correct in the class definition.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800543 for (const Node *node : Nodes()) {
544 auto state_pair = channel_mergers_.find(node);
545 CHECK(state_pair != channel_mergers_.end());
546 State *state = &(state_pair->second);
547 for (size_t i = 0; i < state->channels.size(); ++i) {
548 state->channels[i].reset();
549 }
550 state->event_loop_unique_ptr.reset();
551 state->event_loop = nullptr;
552 state->node_event_loop_factory = nullptr;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800553 }
Austin Schuh92547522019-12-28 14:33:43 -0800554
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800555 event_loop_factory_unique_ptr_.reset();
556 event_loop_factory_ = nullptr;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800557}
558
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800559void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
560 std::string_view add_prefix) {
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800561 for (size_t ii = 0; ii < logged_configuration()->channels()->size(); ++ii) {
562 const Channel *const channel = logged_configuration()->channels()->Get(ii);
563 if (channel->name()->str() == name &&
564 channel->type()->string_view() == type) {
565 CHECK_EQ(0u, remapped_channels_.count(ii))
566 << "Already remapped channel "
567 << configuration::CleanedChannelToString(channel);
568 remapped_channels_[ii] = std::string(add_prefix) + std::string(name);
569 VLOG(1) << "Remapping channel "
570 << configuration::CleanedChannelToString(channel)
571 << " to have name " << remapped_channels_[ii];
Austin Schuh6331ef92020-01-07 18:28:09 -0800572 MakeRemappedConfig();
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800573 return;
574 }
575 }
576 LOG(FATAL) << "Unabled to locate channel with name " << name << " and type "
577 << type;
578}
579
580void LogReader::MakeRemappedConfig() {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800581 for (std::pair<const Node *const, State> &state : channel_mergers_) {
582 CHECK(!state.second.event_loop)
583 << ": Can't change the mapping after the events are scheduled.";
584 }
Austin Schuhac0771c2020-01-07 18:36:30 -0800585
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800586 // If no remapping occurred and we are using the original config, then there
587 // is nothing interesting to do here.
588 if (remapped_channels_.empty() && replay_configuration_ == nullptr) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800589 remapped_configuration_ = logged_configuration();
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800590 return;
591 }
592 // Config to copy Channel definitions from. Use the specified
593 // replay_configuration_ if it has been provided.
594 const Configuration *const base_config = replay_configuration_ == nullptr
595 ? logged_configuration()
596 : replay_configuration_;
597 // The remapped config will be identical to the base_config, except that it
598 // will have a bunch of extra channels in the channel list, which are exact
599 // copies of the remapped channels, but with different names.
600 // Because the flatbuffers API is a pain to work with, this requires a bit of
601 // a song-and-dance to get copied over.
602 // The order of operations is to:
603 // 1) Make a flatbuffer builder for a config that will just contain a list of
604 // the new channels that we want to add.
605 // 2) For each channel that we are remapping:
606 // a) Make a buffer/builder and construct into it a Channel table that only
607 // contains the new name for the channel.
608 // b) Merge the new channel with just the name into the channel that we are
609 // trying to copy, built in the flatbuffer builder made in 1. This gives
610 // us the new channel definition that we need.
611 // 3) Using this list of offsets, build the Configuration of just new
612 // Channels.
613 // 4) Merge the Configuration with the new Channels into the base_config.
614 // 5) Call MergeConfiguration() on that result to give MergeConfiguration a
615 // chance to sanitize the config.
616
617 // This is the builder that we use for the config containing all the new
618 // channels.
619 flatbuffers::FlatBufferBuilder new_config_fbb;
620 new_config_fbb.ForceDefaults(1);
621 std::vector<flatbuffers::Offset<Channel>> channel_offsets;
622 for (auto &pair : remapped_channels_) {
623 // This is the builder that we use for creating the Channel with just the
624 // new name.
625 flatbuffers::FlatBufferBuilder new_name_fbb;
626 new_name_fbb.ForceDefaults(1);
627 const flatbuffers::Offset<flatbuffers::String> name_offset =
628 new_name_fbb.CreateString(pair.second);
629 ChannelBuilder new_name_builder(new_name_fbb);
630 new_name_builder.add_name(name_offset);
631 new_name_fbb.Finish(new_name_builder.Finish());
632 const FlatbufferDetachedBuffer<Channel> new_name = new_name_fbb.Release();
633 // Retrieve the channel that we want to copy, confirming that it is actually
634 // present in base_config.
635 const Channel *const base_channel = CHECK_NOTNULL(configuration::GetChannel(
636 base_config, logged_configuration()->channels()->Get(pair.first), "",
637 nullptr));
638 // Actually create the new channel and put it into the vector of Offsets
639 // that we will use to create the new Configuration.
640 channel_offsets.emplace_back(MergeFlatBuffers<Channel>(
641 reinterpret_cast<const flatbuffers::Table *>(base_channel),
642 reinterpret_cast<const flatbuffers::Table *>(&new_name.message()),
643 &new_config_fbb));
644 }
645 // Create the Configuration containing the new channels that we want to add.
Austin Schuhfa895892020-01-07 20:07:41 -0800646 const auto new_name_vector_offsets =
647 new_config_fbb.CreateVector(channel_offsets);
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800648 ConfigurationBuilder new_config_builder(new_config_fbb);
649 new_config_builder.add_channels(new_name_vector_offsets);
650 new_config_fbb.Finish(new_config_builder.Finish());
651 const FlatbufferDetachedBuffer<Configuration> new_name_config =
652 new_config_fbb.Release();
653 // Merge the new channels configuration into the base_config, giving us the
654 // remapped configuration.
655 remapped_configuration_buffer_ =
656 std::make_unique<FlatbufferDetachedBuffer<Configuration>>(
657 MergeFlatBuffers<Configuration>(base_config,
658 &new_name_config.message()));
659 // Call MergeConfiguration to deal with sanitizing the config.
660 remapped_configuration_buffer_ =
661 std::make_unique<FlatbufferDetachedBuffer<Configuration>>(
662 configuration::MergeConfiguration(*remapped_configuration_buffer_));
663
664 remapped_configuration_ = &remapped_configuration_buffer_->message();
665}
666
Austin Schuh6f3babe2020-01-26 20:34:50 -0800667const Channel *LogReader::RemapChannel(const EventLoop *event_loop,
668 const Channel *channel) {
669 std::string_view channel_name = channel->name()->string_view();
670 std::string_view channel_type = channel->type()->string_view();
671 const int channel_index =
672 configuration::ChannelIndex(logged_configuration(), channel);
673 // If the channel is remapped, find the correct channel name to use.
674 if (remapped_channels_.count(channel_index) > 0) {
675 VLOG(2) << "Got remapped channel on "
676 << configuration::CleanedChannelToString(channel);
677 channel_name = remapped_channels_[channel_index];
678 }
679
680 VLOG(1) << "Going to remap channel " << channel_name << " " << channel_type;
681 const Channel *remapped_channel = configuration::GetChannel(
682 event_loop->configuration(), channel_name, channel_type,
683 event_loop->name(), event_loop->node());
684
685 CHECK(remapped_channel != nullptr)
686 << ": Unable to send {\"name\": \"" << channel_name << "\", \"type\": \""
687 << channel_type << "\"} because it is not in the provided configuration.";
688
689 return remapped_channel;
690}
691
Austin Schuhe309d2a2019-11-29 13:25:21 -0800692} // namespace logger
693} // namespace aos