blob: 84e1f43830245365bc388bda911f8a5321a72a93 [file] [log] [blame]
James Kuszmaul38735e82019-12-07 16:42:06 -08001#include "aos/events/logging/logger.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -08002
3#include <fcntl.h>
Austin Schuh4c4e0092019-12-22 16:18:03 -08004#include <limits.h>
Austin Schuhe309d2a2019-11-29 13:25:21 -08005#include <sys/stat.h>
6#include <sys/types.h>
7#include <sys/uio.h>
8#include <vector>
9
Austin Schuhe309d2a2019-11-29 13:25:21 -080010#include "absl/types/span.h"
11#include "aos/events/event_loop.h"
James Kuszmaul38735e82019-12-07 16:42:06 -080012#include "aos/events/logging/logger_generated.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080013#include "aos/flatbuffer_merge.h"
Austin Schuh288479d2019-12-18 19:47:52 -080014#include "aos/network/team_number.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080015#include "aos/time/time.h"
16#include "flatbuffers/flatbuffers.h"
17
Austin Schuh15649d62019-12-28 16:36:38 -080018DEFINE_bool(skip_missing_forwarding_entries, false,
19 "If true, drop any forwarding entries with missing data. If "
20 "false, CHECK.");
Austin Schuhe309d2a2019-11-29 13:25:21 -080021
22namespace aos {
23namespace logger {
24
25namespace chrono = std::chrono;
26
Austin Schuhe309d2a2019-11-29 13:25:21 -080027Logger::Logger(DetachedBufferWriter *writer, EventLoop *event_loop,
28 std::chrono::milliseconds polling_period)
Austin Schuh6f3babe2020-01-26 20:34:50 -080029 : Logger(std::make_unique<LocalLogNamer>(writer, event_loop->node()),
30 event_loop, polling_period) {}
31
32Logger::Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
33 std::chrono::milliseconds polling_period)
Austin Schuhe309d2a2019-11-29 13:25:21 -080034 : event_loop_(event_loop),
Austin Schuh6f3babe2020-01-26 20:34:50 -080035 log_namer_(std::move(log_namer)),
Austin Schuhe309d2a2019-11-29 13:25:21 -080036 timer_handler_(event_loop_->AddTimer([this]() { DoLogData(); })),
37 polling_period_(polling_period) {
Austin Schuh6f3babe2020-01-26 20:34:50 -080038 VLOG(1) << "Starting logger for " << FlatbufferToJson(event_loop_->node());
39 int channel_index = 0;
Austin Schuhe309d2a2019-11-29 13:25:21 -080040 for (const Channel *channel : *event_loop_->configuration()->channels()) {
41 FetcherStruct fs;
Austin Schuh6f3babe2020-01-26 20:34:50 -080042 const bool is_local =
43 configuration::ChannelIsSendableOnNode(channel, event_loop_->node());
44
Austin Schuh15649d62019-12-28 16:36:38 -080045 const bool is_readable =
46 configuration::ChannelIsReadableOnNode(channel, event_loop_->node());
47 const bool log_message = configuration::ChannelMessageIsLoggedOnNode(
48 channel, event_loop_->node()) &&
49 is_readable;
50
51 const bool log_delivery_times =
52 (event_loop_->node() == nullptr)
53 ? false
54 : configuration::ConnectionDeliveryTimeIsLoggedOnNode(
55 channel, event_loop_->node(), event_loop_->node());
56
57 if (log_message || log_delivery_times) {
58 fs.fetcher = event_loop->MakeRawFetcher(channel);
59 VLOG(1) << "Logging channel "
60 << configuration::CleanedChannelToString(channel);
61
62 if (log_delivery_times) {
Austin Schuh6f3babe2020-01-26 20:34:50 -080063 VLOG(1) << " Delivery times";
64 fs.timestamp_writer = log_namer_->MakeTimestampWriter(channel);
Austin Schuh15649d62019-12-28 16:36:38 -080065 }
Austin Schuh6f3babe2020-01-26 20:34:50 -080066 if (log_message) {
67 VLOG(1) << " Data";
68 fs.writer = log_namer_->MakeWriter(channel);
69 if (!is_local) {
70 fs.log_type = LogType::kLogRemoteMessage;
71 }
72 }
73 fs.channel_index = channel_index;
74 fs.written = false;
75 fetchers_.emplace_back(std::move(fs));
Austin Schuh15649d62019-12-28 16:36:38 -080076 }
Austin Schuh6f3babe2020-01-26 20:34:50 -080077 ++channel_index;
Austin Schuhe309d2a2019-11-29 13:25:21 -080078 }
79
80 // When things start, we want to log the header, then the most recent messages
81 // available on each fetcher to capture the previous state, then start
82 // polling.
83 event_loop_->OnRun([this, polling_period]() {
84 // Grab data from each channel right before we declare the log file started
85 // so we can capture the latest message on each channel. This lets us have
86 // non periodic messages with configuration that now get logged.
87 for (FetcherStruct &f : fetchers_) {
Austin Schuh6f3babe2020-01-26 20:34:50 -080088 f.written = !f.fetcher->Fetch();
Austin Schuhe309d2a2019-11-29 13:25:21 -080089 }
90
91 // We need to pick a point in time to declare the log file "started". This
92 // starts here. It needs to be after everything is fetched so that the
93 // fetchers are all pointed at the most recent message before the start
94 // time.
Austin Schuhfa895892020-01-07 20:07:41 -080095 monotonic_start_time_ = event_loop_->monotonic_now();
96 realtime_start_time_ = event_loop_->realtime_now();
97 last_synchronized_time_ = monotonic_start_time_;
Austin Schuhe309d2a2019-11-29 13:25:21 -080098
Austin Schuhfa895892020-01-07 20:07:41 -080099 LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node());
Austin Schuhe309d2a2019-11-29 13:25:21 -0800100
Austin Schuhfa895892020-01-07 20:07:41 -0800101 WriteHeader();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800102
103 timer_handler_->Setup(event_loop_->monotonic_now() + polling_period,
104 polling_period);
105 });
106}
107
Austin Schuhfa895892020-01-07 20:07:41 -0800108void Logger::WriteHeader() {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800109 for (const Node *node : log_namer_->nodes()) {
110 WriteHeader(node);
111 }
112}
113void Logger::WriteHeader(const Node *node) {
Austin Schuhfa895892020-01-07 20:07:41 -0800114 // Now write the header with this timestamp in it.
115 flatbuffers::FlatBufferBuilder fbb;
116 fbb.ForceDefaults(1);
117
118 flatbuffers::Offset<aos::Configuration> configuration_offset =
119 CopyFlatBuffer(event_loop_->configuration(), &fbb);
120
121 flatbuffers::Offset<flatbuffers::String> string_offset =
122 fbb.CreateString(network::GetHostname());
123
124 flatbuffers::Offset<Node> node_offset;
125 if (event_loop_->node() != nullptr) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800126 node_offset = CopyFlatBuffer(node, &fbb);
Austin Schuhfa895892020-01-07 20:07:41 -0800127 }
128
129 aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
130
131 log_file_header_builder.add_name(string_offset);
132
133 // Only add the node if we are running in a multinode configuration.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800134 if (node != nullptr) {
Austin Schuhfa895892020-01-07 20:07:41 -0800135 log_file_header_builder.add_node(node_offset);
136 }
137
138 log_file_header_builder.add_configuration(configuration_offset);
139 // The worst case theoretical out of order is the polling period times 2.
140 // One message could get logged right after the boundary, but be for right
141 // before the next boundary. And the reverse could happen for another
142 // message. Report back 3x to be extra safe, and because the cost isn't
143 // huge on the read side.
144 log_file_header_builder.add_max_out_of_order_duration(
145 std::chrono::duration_cast<std::chrono::nanoseconds>(3 * polling_period_)
146 .count());
147
148 log_file_header_builder.add_monotonic_start_time(
149 std::chrono::duration_cast<std::chrono::nanoseconds>(
150 monotonic_start_time_.time_since_epoch())
151 .count());
152 log_file_header_builder.add_realtime_start_time(
153 std::chrono::duration_cast<std::chrono::nanoseconds>(
154 realtime_start_time_.time_since_epoch())
155 .count());
156
157 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
Austin Schuh6f3babe2020-01-26 20:34:50 -0800158 log_namer_->WriteHeader(&fbb, node);
Austin Schuhfa895892020-01-07 20:07:41 -0800159}
160
161void Logger::Rotate(DetachedBufferWriter *writer) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800162 Rotate(std::make_unique<LocalLogNamer>(writer, event_loop_->node()));
163}
164
165void Logger::Rotate(std::unique_ptr<LogNamer> log_namer) {
Austin Schuhfa895892020-01-07 20:07:41 -0800166 // Force data up until now to be written.
167 DoLogData();
168
169 // Swap the writer out, and re-write the header.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800170 log_namer_ = std::move(log_namer);
171
172 // And then update the writers.
173 for (FetcherStruct &f : fetchers_) {
174 const Channel *channel =
175 event_loop_->configuration()->channels()->Get(f.channel_index);
176 if (f.timestamp_writer != nullptr) {
177 f.timestamp_writer = log_namer_->MakeTimestampWriter(channel);
178 }
179 if (f.writer != nullptr) {
180 f.writer = log_namer_->MakeWriter(channel);
181 }
182 }
183
Austin Schuhfa895892020-01-07 20:07:41 -0800184 WriteHeader();
185}
186
Austin Schuhe309d2a2019-11-29 13:25:21 -0800187void Logger::DoLogData() {
188 // We want to guarentee that messages aren't out of order by more than
189 // max_out_of_order_duration. To do this, we need sync points. Every write
190 // cycle should be a sync point.
Austin Schuhfa895892020-01-07 20:07:41 -0800191 const monotonic_clock::time_point monotonic_now =
192 event_loop_->monotonic_now();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800193
194 do {
195 // Move the sync point up by at most polling_period. This forces one sync
196 // per iteration, even if it is small.
197 last_synchronized_time_ =
198 std::min(last_synchronized_time_ + polling_period_, monotonic_now);
Austin Schuhe309d2a2019-11-29 13:25:21 -0800199 // Write each channel to disk, one at a time.
200 for (FetcherStruct &f : fetchers_) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800201 while (true) {
202 if (f.written) {
203 if (!f.fetcher->FetchNext()) {
204 VLOG(2) << "No new data on "
205 << configuration::CleanedChannelToString(
206 f.fetcher->channel());
207 break;
208 } else {
209 f.written = false;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800210 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800211 }
Austin Schuhe309d2a2019-11-29 13:25:21 -0800212
Austin Schuh6f3babe2020-01-26 20:34:50 -0800213 CHECK(!f.written);
Austin Schuh15649d62019-12-28 16:36:38 -0800214
Austin Schuh6f3babe2020-01-26 20:34:50 -0800215 // TODO(james): Write tests to exercise this logic.
216 if (f.fetcher->context().monotonic_event_time <
217 last_synchronized_time_) {
218 if (f.writer != nullptr) {
Austin Schuh15649d62019-12-28 16:36:38 -0800219 // Write!
220 flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
221 max_header_size_);
222 fbb.ForceDefaults(1);
223
224 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
Austin Schuh6f3babe2020-01-26 20:34:50 -0800225 f.channel_index, f.log_type));
Austin Schuh15649d62019-12-28 16:36:38 -0800226
Austin Schuh6f3babe2020-01-26 20:34:50 -0800227 VLOG(1) << "Writing data as node "
228 << FlatbufferToJson(event_loop_->node()) << " for channel "
Austin Schuh15649d62019-12-28 16:36:38 -0800229 << configuration::CleanedChannelToString(
Austin Schuh6f3babe2020-01-26 20:34:50 -0800230 f.fetcher->channel())
231 << " to " << f.writer->filename()
232 << " data "
233 << FlatbufferToJson(
234 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
235 fbb.GetBufferPointer()));
Austin Schuh15649d62019-12-28 16:36:38 -0800236
237 max_header_size_ = std::max(
238 max_header_size_, fbb.GetSize() - f.fetcher->context().size);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800239 f.writer->QueueSizedFlatbuffer(&fbb);
Austin Schuhe309d2a2019-11-29 13:25:21 -0800240 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800241
242 if (f.timestamp_writer != nullptr) {
243 // And now handle timestamps.
244 flatbuffers::FlatBufferBuilder fbb;
245 fbb.ForceDefaults(1);
246
247 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
248 f.channel_index,
249 LogType::kLogDeliveryTimeOnly));
250
251 VLOG(1) << "Writing timestamps as node "
252 << FlatbufferToJson(event_loop_->node()) << " for channel "
253 << configuration::CleanedChannelToString(
254 f.fetcher->channel())
255 << " to " << f.timestamp_writer->filename()
256 << " timestamp "
257 << FlatbufferToJson(
258 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
259 fbb.GetBufferPointer()));
260
261 f.timestamp_writer->QueueSizedFlatbuffer(&fbb);
262 }
263
264 f.written = true;
265 } else {
266 break;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800267 }
Austin Schuhe309d2a2019-11-29 13:25:21 -0800268 }
Austin Schuhe309d2a2019-11-29 13:25:21 -0800269 }
270
Austin Schuhe309d2a2019-11-29 13:25:21 -0800271 // If we missed cycles, we could be pretty far behind. Spin until we are
272 // caught up.
273 } while (last_synchronized_time_ + polling_period_ < monotonic_now);
Austin Schuhe309d2a2019-11-29 13:25:21 -0800274}
275
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800276LogReader::LogReader(std::string_view filename,
277 const Configuration *replay_configuration)
Austin Schuhfa895892020-01-07 20:07:41 -0800278 : LogReader(std::vector<std::string>{std::string(filename)},
279 replay_configuration) {}
280
281LogReader::LogReader(const std::vector<std::string> &filenames,
282 const Configuration *replay_configuration)
Austin Schuh6f3babe2020-01-26 20:34:50 -0800283 : LogReader(std::vector<std::vector<std::string>>{filenames},
284 replay_configuration) {}
285
286LogReader::LogReader(const std::vector<std::vector<std::string>> &filenames,
287 const Configuration *replay_configuration)
288 : filenames_(filenames),
289 log_file_header_(ReadHeader(filenames[0][0])),
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800290 replay_configuration_(replay_configuration) {
Austin Schuh6331ef92020-01-07 18:28:09 -0800291 MakeRemappedConfig();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800292
293 if (!configuration::MultiNode(configuration())) {
294 auto it = channel_mergers_.insert(std::make_pair(nullptr, State{}));
295 State *state = &(it.first->second);
296
297 state->channel_merger = std::make_unique<ChannelMerger>(filenames);
298 }
Austin Schuhe309d2a2019-11-29 13:25:21 -0800299}
300
Austin Schuhfa895892020-01-07 20:07:41 -0800301LogReader::~LogReader() { Deregister(); }
Austin Schuhe309d2a2019-11-29 13:25:21 -0800302
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800303const Configuration *LogReader::logged_configuration() const {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800304 return log_file_header_.message().configuration();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800305}
306
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800307const Configuration *LogReader::configuration() const {
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800308 return remapped_configuration_;
309}
310
Austin Schuh6f3babe2020-01-26 20:34:50 -0800311std::vector<const Node *> LogReader::Nodes() const {
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800312 // Because the Node pointer will only be valid if it actually points to memory
313 // owned by remapped_configuration_, we need to wait for the
314 // remapped_configuration_ to be populated before accessing it.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800315 //
316 // Also, note, that when ever a map is changed, the nodes in here are
317 // invalidated.
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800318 CHECK(remapped_configuration_ != nullptr)
319 << ": Need to call Register before the node() pointer will be valid.";
Austin Schuh6f3babe2020-01-26 20:34:50 -0800320 return configuration::GetNodes(remapped_configuration_);
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800321}
Austin Schuh15649d62019-12-28 16:36:38 -0800322
Austin Schuh6f3babe2020-01-26 20:34:50 -0800323monotonic_clock::time_point LogReader::monotonic_start_time(const Node *node) {
324 auto it = channel_mergers_.find(node);
325 CHECK(it != channel_mergers_.end())
326 << ": Unknown node " << FlatbufferToJson(node);
327 return it->second.channel_merger->monotonic_start_time();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800328}
329
Austin Schuh6f3babe2020-01-26 20:34:50 -0800330realtime_clock::time_point LogReader::realtime_start_time(const Node *node) {
331 auto it = channel_mergers_.find(node);
332 CHECK(it != channel_mergers_.end())
333 << ": Unknown node " << FlatbufferToJson(node);
334 return it->second.channel_merger->realtime_start_time();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800335}
336
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800337void LogReader::Register() {
338 event_loop_factory_unique_ptr_ =
Austin Schuhac0771c2020-01-07 18:36:30 -0800339 std::make_unique<SimulatedEventLoopFactory>(configuration());
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800340 Register(event_loop_factory_unique_ptr_.get());
341}
342
Austin Schuh92547522019-12-28 14:33:43 -0800343void LogReader::Register(SimulatedEventLoopFactory *event_loop_factory) {
Austin Schuh92547522019-12-28 14:33:43 -0800344 event_loop_factory_ = event_loop_factory;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800345 // We want to start the log file at the last start time of the log files from
346 // all the nodes. Compute how long each node's simulation needs to run to
347 // move time to this point.
348 monotonic_clock::duration run_time = monotonic_clock::duration(0);
Austin Schuh92547522019-12-28 14:33:43 -0800349
Austin Schuh6f3babe2020-01-26 20:34:50 -0800350 for (const Node *node : configuration::GetNodes(configuration())) {
351 auto it = channel_mergers_.insert(std::make_pair(node, State{}));
352
353 State *state = &(it.first->second);
354
355 state->channel_merger = std::make_unique<ChannelMerger>(filenames_);
356
357 state->node_event_loop_factory =
358 event_loop_factory_->GetNodeEventLoopFactory(node);
359 state->event_loop_unique_ptr =
360 event_loop_factory->MakeEventLoop("log_reader", node);
361
362 Register(state->event_loop_unique_ptr.get());
363
364 const monotonic_clock::duration startup_time =
365 state->channel_merger->monotonic_start_time() -
366 state->event_loop->monotonic_now();
367 if (startup_time > run_time) {
368 run_time = startup_time;
369 }
370 }
371
372 // Forwarding is tracked per channel. If it is enabled, we want to turn it
373 // off. Otherwise messages replayed will get forwarded across to the other
374 // nodes, and also replayed on the other nodes. This may not satisfy all our
375 // users, but it'll start the discussion.
376 if (configuration::MultiNode(event_loop_factory_->configuration())) {
377 for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
378 const Channel *channel = logged_configuration()->channels()->Get(i);
379 const Node *node = configuration::GetNode(
380 configuration(), channel->source_node()->string_view());
381
382 auto state_pair = channel_mergers_.find(node);
383 CHECK(state_pair != channel_mergers_.end());
384 State *state = &(state_pair->second);
385
386 const Channel *remapped_channel =
387 RemapChannel(state->event_loop, channel);
388
389 event_loop_factory_->DisableForwarding(remapped_channel);
390 }
391 }
392
393 event_loop_factory_->RunFor(run_time);
Austin Schuh92547522019-12-28 14:33:43 -0800394}
395
Austin Schuhe309d2a2019-11-29 13:25:21 -0800396void LogReader::Register(EventLoop *event_loop) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800397 auto state_pair = channel_mergers_.find(event_loop->node());
398 CHECK(state_pair != channel_mergers_.end());
399 State *state = &(state_pair->second);
400
401 state->event_loop = event_loop;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800402
Tyler Chatow67ddb032020-01-12 14:30:04 -0800403 // We don't run timing reports when trying to print out logged data, because
404 // otherwise we would end up printing out the timing reports themselves...
405 // This is only really relevant when we are replaying into a simulation.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800406 event_loop->SkipTimingReport();
407 event_loop->SkipAosLog();
Austin Schuh39788ff2019-12-01 18:22:57 -0800408
Austin Schuh6f3babe2020-01-26 20:34:50 -0800409 state->channel_merger->SetNode(event_loop->node());
Austin Schuhe309d2a2019-11-29 13:25:21 -0800410
Austin Schuh6f3babe2020-01-26 20:34:50 -0800411 state->channels.resize(logged_configuration()->channels()->size());
Austin Schuh6331ef92020-01-07 18:28:09 -0800412
Austin Schuh6f3babe2020-01-26 20:34:50 -0800413 for (size_t i = 0; i < state->channels.size(); ++i) {
414 const Channel *channel =
415 RemapChannel(event_loop, logged_configuration()->channels()->Get(i));
Austin Schuh6331ef92020-01-07 18:28:09 -0800416
Austin Schuh6f3babe2020-01-26 20:34:50 -0800417 state->channels[i] = event_loop->MakeRawSender(channel);
Austin Schuhe309d2a2019-11-29 13:25:21 -0800418 }
419
Austin Schuh6f3babe2020-01-26 20:34:50 -0800420 state->timer_handler = event_loop->AddTimer([this, state]() {
421 if (state->channel_merger->OldestMessage() == monotonic_clock::max_time) {
422 --live_nodes_;
423 if (live_nodes_ == 0) {
424 event_loop_factory_->Exit();
425 }
James Kuszmaul314f1672020-01-03 20:02:08 -0800426 return;
427 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800428 TimestampMerger::DeliveryTimestamp channel_timestamp;
Austin Schuh05b70472020-01-01 17:11:17 -0800429 int channel_index;
430 FlatbufferVector<MessageHeader> channel_data =
431 FlatbufferVector<MessageHeader>::Empty();
432
433 std::tie(channel_timestamp, channel_index, channel_data) =
Austin Schuh6f3babe2020-01-26 20:34:50 -0800434 state->channel_merger->PopOldest();
Austin Schuh05b70472020-01-01 17:11:17 -0800435
Austin Schuhe309d2a2019-11-29 13:25:21 -0800436 const monotonic_clock::time_point monotonic_now =
Austin Schuh6f3babe2020-01-26 20:34:50 -0800437 state->event_loop->context().monotonic_event_time;
438 CHECK(monotonic_now == channel_timestamp.monotonic_event_time)
Austin Schuhe309d2a2019-11-29 13:25:21 -0800439 << ": Now " << monotonic_now.time_since_epoch().count()
Austin Schuh6f3babe2020-01-26 20:34:50 -0800440 << " trying to send "
441 << channel_timestamp.monotonic_event_time.time_since_epoch().count();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800442
Austin Schuh6f3babe2020-01-26 20:34:50 -0800443 if (channel_timestamp.monotonic_event_time >
444 state->channel_merger->monotonic_start_time() ||
Austin Schuh15649d62019-12-28 16:36:38 -0800445 event_loop_factory_ != nullptr) {
446 if (!FLAGS_skip_missing_forwarding_entries ||
Austin Schuh05b70472020-01-01 17:11:17 -0800447 channel_data.message().data() != nullptr) {
448 CHECK(channel_data.message().data() != nullptr)
449 << ": Got a message without data. Forwarding entry which was "
Austin Schuh6f3babe2020-01-26 20:34:50 -0800450 "not matched? Use --skip_missing_forwarding_entries to ignore "
Austin Schuh15649d62019-12-28 16:36:38 -0800451 "this.";
Austin Schuh92547522019-12-28 14:33:43 -0800452
Austin Schuh15649d62019-12-28 16:36:38 -0800453 // If we have access to the factory, use it to fix the realtime time.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800454 if (state->node_event_loop_factory != nullptr) {
455 state->node_event_loop_factory->SetRealtimeOffset(
456 channel_timestamp.monotonic_event_time,
457 channel_timestamp.realtime_event_time);
Austin Schuh15649d62019-12-28 16:36:38 -0800458 }
459
Austin Schuh6f3babe2020-01-26 20:34:50 -0800460 state->channels[channel_index]->Send(
Austin Schuh05b70472020-01-01 17:11:17 -0800461 channel_data.message().data()->Data(),
462 channel_data.message().data()->size(),
Austin Schuh6f3babe2020-01-26 20:34:50 -0800463 channel_timestamp.monotonic_remote_time,
464 channel_timestamp.realtime_remote_time,
465 channel_timestamp.remote_queue_index);
Austin Schuh92547522019-12-28 14:33:43 -0800466 }
Austin Schuhe309d2a2019-11-29 13:25:21 -0800467 } else {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800468 LOG(WARNING)
469 << "Not sending data from before the start of the log file. "
470 << channel_timestamp.monotonic_event_time.time_since_epoch().count()
471 << " start " << monotonic_start_time().time_since_epoch().count()
472 << " " << FlatbufferToJson(channel_data);
Austin Schuhe309d2a2019-11-29 13:25:21 -0800473 }
474
Austin Schuh6f3babe2020-01-26 20:34:50 -0800475 const monotonic_clock::time_point next_time =
476 state->channel_merger->OldestMessage();
477 if (next_time != monotonic_clock::max_time) {
478 state->timer_handler->Setup(next_time);
James Kuszmaul314f1672020-01-03 20:02:08 -0800479 } else {
480 // Set a timer up immediately after now to die. If we don't do this, then
481 // the senders waiting on the message we just read will never get called.
Austin Schuheecb9282020-01-08 17:43:30 -0800482 if (event_loop_factory_ != nullptr) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800483 state->timer_handler->Setup(monotonic_now +
484 event_loop_factory_->send_delay() +
485 std::chrono::nanoseconds(1));
Austin Schuheecb9282020-01-08 17:43:30 -0800486 }
Austin Schuhe309d2a2019-11-29 13:25:21 -0800487 }
488 });
489
Austin Schuh6f3babe2020-01-26 20:34:50 -0800490 ++live_nodes_;
491
492 if (state->channel_merger->OldestMessage() != monotonic_clock::max_time) {
493 event_loop->OnRun([state]() {
494 state->timer_handler->Setup(state->channel_merger->OldestMessage());
Austin Schuh05b70472020-01-01 17:11:17 -0800495 });
Austin Schuhe309d2a2019-11-29 13:25:21 -0800496 }
497}
498
499void LogReader::Deregister() {
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800500 // Make sure that things get destroyed in the correct order, rather than
501 // relying on getting the order correct in the class definition.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800502 for (const Node *node : Nodes()) {
503 auto state_pair = channel_mergers_.find(node);
504 CHECK(state_pair != channel_mergers_.end());
505 State *state = &(state_pair->second);
506 for (size_t i = 0; i < state->channels.size(); ++i) {
507 state->channels[i].reset();
508 }
509 state->event_loop_unique_ptr.reset();
510 state->event_loop = nullptr;
511 state->node_event_loop_factory = nullptr;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800512 }
Austin Schuh92547522019-12-28 14:33:43 -0800513
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800514 event_loop_factory_unique_ptr_.reset();
515 event_loop_factory_ = nullptr;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800516}
517
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800518void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
519 std::string_view add_prefix) {
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800520 for (size_t ii = 0; ii < logged_configuration()->channels()->size(); ++ii) {
521 const Channel *const channel = logged_configuration()->channels()->Get(ii);
522 if (channel->name()->str() == name &&
523 channel->type()->string_view() == type) {
524 CHECK_EQ(0u, remapped_channels_.count(ii))
525 << "Already remapped channel "
526 << configuration::CleanedChannelToString(channel);
527 remapped_channels_[ii] = std::string(add_prefix) + std::string(name);
528 VLOG(1) << "Remapping channel "
529 << configuration::CleanedChannelToString(channel)
530 << " to have name " << remapped_channels_[ii];
Austin Schuh6331ef92020-01-07 18:28:09 -0800531 MakeRemappedConfig();
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800532 return;
533 }
534 }
535 LOG(FATAL) << "Unabled to locate channel with name " << name << " and type "
536 << type;
537}
538
539void LogReader::MakeRemappedConfig() {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800540 for (std::pair<const Node *const, State> &state : channel_mergers_) {
541 CHECK(!state.second.event_loop)
542 << ": Can't change the mapping after the events are scheduled.";
543 }
Austin Schuhac0771c2020-01-07 18:36:30 -0800544
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800545 // If no remapping occurred and we are using the original config, then there
546 // is nothing interesting to do here.
547 if (remapped_channels_.empty() && replay_configuration_ == nullptr) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800548 remapped_configuration_ = logged_configuration();
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800549 return;
550 }
551 // Config to copy Channel definitions from. Use the specified
552 // replay_configuration_ if it has been provided.
553 const Configuration *const base_config = replay_configuration_ == nullptr
554 ? logged_configuration()
555 : replay_configuration_;
556 // The remapped config will be identical to the base_config, except that it
557 // will have a bunch of extra channels in the channel list, which are exact
558 // copies of the remapped channels, but with different names.
559 // Because the flatbuffers API is a pain to work with, this requires a bit of
560 // a song-and-dance to get copied over.
561 // The order of operations is to:
562 // 1) Make a flatbuffer builder for a config that will just contain a list of
563 // the new channels that we want to add.
564 // 2) For each channel that we are remapping:
565 // a) Make a buffer/builder and construct into it a Channel table that only
566 // contains the new name for the channel.
567 // b) Merge the new channel with just the name into the channel that we are
568 // trying to copy, built in the flatbuffer builder made in 1. This gives
569 // us the new channel definition that we need.
570 // 3) Using this list of offsets, build the Configuration of just new
571 // Channels.
572 // 4) Merge the Configuration with the new Channels into the base_config.
573 // 5) Call MergeConfiguration() on that result to give MergeConfiguration a
574 // chance to sanitize the config.
575
576 // This is the builder that we use for the config containing all the new
577 // channels.
578 flatbuffers::FlatBufferBuilder new_config_fbb;
579 new_config_fbb.ForceDefaults(1);
580 std::vector<flatbuffers::Offset<Channel>> channel_offsets;
581 for (auto &pair : remapped_channels_) {
582 // This is the builder that we use for creating the Channel with just the
583 // new name.
584 flatbuffers::FlatBufferBuilder new_name_fbb;
585 new_name_fbb.ForceDefaults(1);
586 const flatbuffers::Offset<flatbuffers::String> name_offset =
587 new_name_fbb.CreateString(pair.second);
588 ChannelBuilder new_name_builder(new_name_fbb);
589 new_name_builder.add_name(name_offset);
590 new_name_fbb.Finish(new_name_builder.Finish());
591 const FlatbufferDetachedBuffer<Channel> new_name = new_name_fbb.Release();
592 // Retrieve the channel that we want to copy, confirming that it is actually
593 // present in base_config.
594 const Channel *const base_channel = CHECK_NOTNULL(configuration::GetChannel(
595 base_config, logged_configuration()->channels()->Get(pair.first), "",
596 nullptr));
597 // Actually create the new channel and put it into the vector of Offsets
598 // that we will use to create the new Configuration.
599 channel_offsets.emplace_back(MergeFlatBuffers<Channel>(
600 reinterpret_cast<const flatbuffers::Table *>(base_channel),
601 reinterpret_cast<const flatbuffers::Table *>(&new_name.message()),
602 &new_config_fbb));
603 }
604 // Create the Configuration containing the new channels that we want to add.
Austin Schuhfa895892020-01-07 20:07:41 -0800605 const auto new_name_vector_offsets =
606 new_config_fbb.CreateVector(channel_offsets);
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800607 ConfigurationBuilder new_config_builder(new_config_fbb);
608 new_config_builder.add_channels(new_name_vector_offsets);
609 new_config_fbb.Finish(new_config_builder.Finish());
610 const FlatbufferDetachedBuffer<Configuration> new_name_config =
611 new_config_fbb.Release();
612 // Merge the new channels configuration into the base_config, giving us the
613 // remapped configuration.
614 remapped_configuration_buffer_ =
615 std::make_unique<FlatbufferDetachedBuffer<Configuration>>(
616 MergeFlatBuffers<Configuration>(base_config,
617 &new_name_config.message()));
618 // Call MergeConfiguration to deal with sanitizing the config.
619 remapped_configuration_buffer_ =
620 std::make_unique<FlatbufferDetachedBuffer<Configuration>>(
621 configuration::MergeConfiguration(*remapped_configuration_buffer_));
622
623 remapped_configuration_ = &remapped_configuration_buffer_->message();
624}
625
Austin Schuh6f3babe2020-01-26 20:34:50 -0800626const Channel *LogReader::RemapChannel(const EventLoop *event_loop,
627 const Channel *channel) {
628 std::string_view channel_name = channel->name()->string_view();
629 std::string_view channel_type = channel->type()->string_view();
630 const int channel_index =
631 configuration::ChannelIndex(logged_configuration(), channel);
632 // If the channel is remapped, find the correct channel name to use.
633 if (remapped_channels_.count(channel_index) > 0) {
634 VLOG(2) << "Got remapped channel on "
635 << configuration::CleanedChannelToString(channel);
636 channel_name = remapped_channels_[channel_index];
637 }
638
639 VLOG(1) << "Going to remap channel " << channel_name << " " << channel_type;
640 const Channel *remapped_channel = configuration::GetChannel(
641 event_loop->configuration(), channel_name, channel_type,
642 event_loop->name(), event_loop->node());
643
644 CHECK(remapped_channel != nullptr)
645 << ": Unable to send {\"name\": \"" << channel_name << "\", \"type\": \""
646 << channel_type << "\"} because it is not in the provided configuration.";
647
648 return remapped_channel;
649}
650
Austin Schuhe309d2a2019-11-29 13:25:21 -0800651} // namespace logger
652} // namespace aos