Merge "Fix null termination of arguments list in starter"
diff --git a/WORKSPACE b/WORKSPACE
index b9031fb..42a5945 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -130,17 +130,17 @@
http_archive(
name = "platforms",
sha256 = "3c4057c53b64dd3f2c753e0a80bbb6ccb29fb437910200c911dd51454baf619b",
- url = "https://www.frc971.org/Build-Dependencies/platforms_10b4d2bdde25ea1e66c02c3f83a6d921000a7272.zip",
strip_prefix = "platforms-10b4d2bdde25ea1e66c02c3f83a6d921000a7272",
+ url = "https://www.frc971.org/Build-Dependencies/platforms_10b4d2bdde25ea1e66c02c3f83a6d921000a7272.zip",
)
http_archive(
name = "bazel_skylib",
+ sha256 = "1c531376ac7e5a180e0237938a2536de0c54d93f5c278634818e0efc952dd56c",
urls = [
"https://mirror.bazel.build/github.com/bazelbuild/bazel-skylib/releases/download/1.0.3/bazel-skylib-1.0.3.tar.gz",
"https://github.com/bazelbuild/bazel-skylib/releases/download/1.0.3/bazel-skylib-1.0.3.tar.gz",
],
- sha256 = "1c531376ac7e5a180e0237938a2536de0c54d93f5c278634818e0efc952dd56c",
)
http_archive(
diff --git a/aos/BUILD b/aos/BUILD
index 4ccfd0e..617b91c 100644
--- a/aos/BUILD
+++ b/aos/BUILD
@@ -1,5 +1,7 @@
load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library", "flatbuffer_py_library", "flatbuffer_ts_library")
+exports_files(["aos_dump_autocomplete.sh"])
+
filegroup(
name = "prime_binaries",
srcs = [
diff --git a/aos/events/epoll.cc b/aos/events/epoll.cc
index b9e7edb..3e2148d 100644
--- a/aos/events/epoll.cc
+++ b/aos/events/epoll.cc
@@ -72,39 +72,49 @@
void EPoll::Run() {
run_ = true;
+ // As long as run_ is true or we still have events to process, keep polling.
while (true) {
- // Pull a single event out. Infinite timeout if we are supposed to be
- // running, and 0 length timeout otherwise. This lets us flush the event
- // queue before quitting.
- struct epoll_event event;
- int num_events = epoll_wait(epoll_fd_, &event, 1, run_ ? -1 : 0);
- // Retry on EINTR and nothing else.
- if (num_events == -1) {
- if (errno == EINTR) {
- continue;
- }
- PCHECK(num_events != -1);
- }
- if (!run_) {
- // If we ran out of events, quit.
- if (num_events == 0) {
+ // If we ran out of events and Quit() was called, quit
+ if (!Poll(run_)) {
+ if (!run_) {
return;
}
}
- EventData *const event_data = static_cast<struct EventData *>(event.data.ptr);
- if (event.events & kInEvents) {
- CHECK(event_data->in_fn)
- << ": No handler registered for input events on " << event_data->fd;
- event_data->in_fn();
- }
- if (event.events & kOutEvents) {
- CHECK(event_data->out_fn)
- << ": No handler registered for output events on " << event_data->fd;
- event_data->out_fn();
- }
}
}
+bool EPoll::Poll(bool block) {
+ // Pull a single event out. Infinite timeout if we are supposed to be
+ // running, and 0 length timeout otherwise. This lets us flush the event
+ // queue before quitting.
+ struct epoll_event event;
+ int num_events = epoll_wait(epoll_fd_, &event, 1, block ? -1 : 0);
+ // Retry on EINTR and nothing else.
+ if (num_events == -1) {
+ if (errno == EINTR) {
+ return false;
+ }
+ PCHECK(num_events != -1);
+ }
+
+ if (num_events == 0) {
+ return false;
+ }
+
+ EventData *const event_data = static_cast<struct EventData *>(event.data.ptr);
+ if (event.events & kInEvents) {
+ CHECK(event_data->in_fn)
+ << ": No handler registered for input events on " << event_data->fd;
+ event_data->in_fn();
+ }
+ if (event.events & kOutEvents) {
+ CHECK(event_data->out_fn)
+ << ": No handler registered for output events on " << event_data->fd;
+ event_data->out_fn();
+ }
+ return true;
+}
+
void EPoll::Quit() { PCHECK(write(quit_signal_fd_, "q", 1) == 1); }
void EPoll::OnReadable(int fd, ::std::function<void()> function) {
diff --git a/aos/events/epoll.h b/aos/events/epoll.h
index 51a20d0..01438ae 100644
--- a/aos/events/epoll.h
+++ b/aos/events/epoll.h
@@ -59,6 +59,12 @@
// Runs until Quit() is called.
void Run();
+ // Consumes a single epoll event. Blocks indefinitely if block is true, or
+ // does not block at all. Returns true if an event was consumed, and false on
+ // any retryable error or if no events are available. Dies fatally on
+ // non-retryable errors.
+ bool Poll(bool block);
+
// Quits. Async safe.
void Quit();
diff --git a/aos/events/logging/log_namer.cc b/aos/events/logging/log_namer.cc
index ec60143..a73be9b 100644
--- a/aos/events/logging/log_namer.cc
+++ b/aos/events/logging/log_namer.cc
@@ -50,6 +50,11 @@
UpdateHeader(header, uuid_, part_number_);
data_writer_->QueueSpan(header->span());
}
+void LocalLogNamer::Reboot(
+ const Node * /*node*/,
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> * /*header*/) {
+ LOG(FATAL) << "Can't reboot a single node.";
+}
DetachedBufferWriter *LocalLogNamer::MakeTimestampWriter(
const Channel *channel) {
@@ -104,8 +109,23 @@
void MultiNodeLogNamer::Rotate(
const Node *node,
aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) {
+ DoRotate(node, header, false);
+}
+
+void MultiNodeLogNamer::Reboot(
+ const Node *node,
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) {
+ DoRotate(node, header, true);
+}
+
+void MultiNodeLogNamer::DoRotate(
+ const Node *node,
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header, bool reboot) {
if (node == this->node()) {
if (data_writer_.writer) {
+ if (reboot) {
+ data_writer_.uuid = UUID::Random();
+ }
++data_writer_.part_number;
}
OpenDataWriter();
@@ -115,6 +135,9 @@
for (std::pair<const Channel *const, DataWriter> &data_writer :
data_writers_) {
if (node == data_writer.second.node) {
+ if (reboot) {
+ data_writer.second.uuid = UUID::Random();
+ }
++data_writer.second.part_number;
data_writer.second.rotate(data_writer.first, &data_writer.second);
UpdateHeader(header, data_writer.second.uuid,
diff --git a/aos/events/logging/log_namer.h b/aos/events/logging/log_namer.h
index 5384ab2..5534131 100644
--- a/aos/events/logging/log_namer.h
+++ b/aos/events/logging/log_namer.h
@@ -62,6 +62,12 @@
const Node *node,
aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) = 0;
+ // Reboots all log files for the provided node. The provided header will be
+ // modified and written per WriteHeader above. Resets any parts UUIDs.
+ virtual void Reboot(
+ const Node *node,
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) = 0;
+
// Returns all the nodes that data is being written for.
const std::vector<const Node *> &nodes() const { return nodes_; }
@@ -100,6 +106,10 @@
aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header)
override;
+ void Reboot(const Node *node,
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header)
+ override;
+
DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) override;
DetachedBufferWriter *MakeForwardedTimestampWriter(
@@ -167,6 +177,10 @@
aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header)
override;
+ void Reboot(const Node *node,
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header)
+ override;
+
DetachedBufferWriter *MakeWriter(const Channel *channel) override;
DetachedBufferWriter *MakeForwardedTimestampWriter(const Channel *channel,
@@ -275,10 +289,17 @@
std::unique_ptr<DetachedBufferWriter> writer = nullptr;
const Node *node;
size_t part_number = 0;
- const UUID uuid = UUID::Random();
+ UUID uuid = UUID::Random();
std::function<void(const Channel *, DataWriter *)> rotate;
};
+ // Implements Rotate and Reboot, controlled by the 'reboot' flag. The only
+ // difference between the two is if DataWriter::uuid is reset or not.
+ void DoRotate(
+ const Node *node,
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
+ bool reboot);
+
// Opens up a writer for timestamps forwarded back.
void OpenForwardedTimestampWriter(const Channel *channel,
DataWriter *data_writer);
diff --git a/aos/events/logging/logfile_sorting.cc b/aos/events/logging/logfile_sorting.cc
index 1998119..22fc513 100644
--- a/aos/events/logging/logfile_sorting.cc
+++ b/aos/events/logging/logfile_sorting.cc
@@ -81,9 +81,17 @@
aos::monotonic_clock::time_point monotonic_start_time;
aos::realtime_clock::time_point realtime_start_time;
+ aos::monotonic_clock::time_point logger_monotonic_start_time =
+ aos::monotonic_clock::min_time;
+ aos::realtime_clock::time_point logger_realtime_start_time =
+ aos::realtime_clock::min_time;
+
// Node to save.
std::string node;
+ // The boot UUID of the node which generated this data.
+ std::string source_boot_uuid;
+
// Pairs of the filename and the part index for sorting.
std::vector<std::pair<std::string, int>> parts;
};
@@ -91,6 +99,9 @@
// Struct to hold both the node, and the parts associated with it.
struct UnsortedLogPartsMap {
std::string logger_node;
+ // The boot UUID of the node this log file was created on.
+ std::string logger_boot_uuid;
+
aos::monotonic_clock::time_point monotonic_start_time =
aos::monotonic_clock::min_time;
aos::realtime_clock::time_point realtime_start_time =
@@ -133,6 +144,12 @@
chrono::nanoseconds(log_header->message().monotonic_start_time()));
const realtime_clock::time_point realtime_start_time(
chrono::nanoseconds(log_header->message().realtime_start_time()));
+ const monotonic_clock::time_point logger_monotonic_start_time(
+ chrono::nanoseconds(
+ log_header->message().logger_monotonic_start_time()));
+ const realtime_clock::time_point logger_realtime_start_time(
+ chrono::nanoseconds(
+ log_header->message().logger_realtime_start_time()));
const std::string_view node =
log_header->message().has_node()
@@ -144,6 +161,16 @@
? log_header->message().logger_node()->name()->string_view()
: "";
+ const std::string_view logger_boot_uuid =
+ log_header->message().has_logger_node_boot_uuid()
+ ? log_header->message().logger_node_boot_uuid()->string_view()
+ : "";
+
+ const std::string_view source_boot_uuid =
+ log_header->message().has_source_node_boot_uuid()
+ ? log_header->message().source_node_boot_uuid()->string_view()
+ : "";
+
// Looks like an old log. No UUID, index, and also single node. We have
// little to no multi-node log files in the wild without part UUIDs and
// indexes which we care much about.
@@ -200,8 +227,10 @@
.insert(std::make_pair(log_event_uuid, UnsortedLogPartsMap()))
.first;
log_it->second.logger_node = logger_node;
+ log_it->second.logger_boot_uuid = logger_boot_uuid;
} else {
CHECK_EQ(log_it->second.logger_node, logger_node);
+ CHECK_EQ(log_it->second.logger_boot_uuid, logger_boot_uuid);
}
if (node == log_it->second.logger_node) {
@@ -222,14 +251,25 @@
.first;
it->second.monotonic_start_time = monotonic_start_time;
it->second.realtime_start_time = realtime_start_time;
+ it->second.logger_monotonic_start_time = logger_monotonic_start_time;
+ it->second.logger_realtime_start_time = logger_realtime_start_time;
it->second.node = std::string(node);
+ it->second.source_boot_uuid = source_boot_uuid;
+ } else {
+ CHECK_EQ(it->second.source_boot_uuid, source_boot_uuid);
}
// First part might be min_time. If it is, try to put a better time on it.
if (it->second.monotonic_start_time == monotonic_clock::min_time) {
it->second.monotonic_start_time = monotonic_start_time;
+ it->second.logger_monotonic_start_time = logger_monotonic_start_time;
+ it->second.logger_realtime_start_time = logger_realtime_start_time;
} else if (monotonic_start_time != monotonic_clock::min_time) {
CHECK_EQ(it->second.monotonic_start_time, monotonic_start_time);
+ CHECK_EQ(it->second.logger_monotonic_start_time,
+ logger_monotonic_start_time);
+ CHECK_EQ(it->second.logger_realtime_start_time,
+ logger_realtime_start_time);
}
if (it->second.realtime_start_time == realtime_clock::min_time) {
it->second.realtime_start_time = realtime_start_time;
@@ -285,6 +325,7 @@
LogFile new_file;
new_file.log_event_uuid = logs.first;
new_file.logger_node = logs.second.logger_node;
+ new_file.logger_boot_uuid = logs.second.logger_boot_uuid;
new_file.monotonic_start_time = logs.second.monotonic_start_time;
new_file.realtime_start_time = logs.second.realtime_start_time;
new_file.corrupted = corrupted;
@@ -293,7 +334,12 @@
LogParts new_parts;
new_parts.monotonic_start_time = parts.second.monotonic_start_time;
new_parts.realtime_start_time = parts.second.realtime_start_time;
+ new_parts.logger_monotonic_start_time =
+ parts.second.logger_monotonic_start_time;
+ new_parts.logger_realtime_start_time =
+ parts.second.logger_realtime_start_time;
new_parts.log_event_uuid = logs.first;
+ new_parts.source_boot_uuid = parts.second.source_boot_uuid;
new_parts.parts_uuid = parts.first;
new_parts.node = std::move(parts.second.node);
@@ -341,38 +387,46 @@
}
std::ostream &operator<<(std::ostream &stream, const LogFile &file) {
- stream << "{";
+ stream << "{\n";
if (!file.log_event_uuid.empty()) {
- stream << "\"log_event_uuid\": \"" << file.log_event_uuid << "\", ";
+ stream << " \"log_event_uuid\": \"" << file.log_event_uuid << "\",\n";
}
if (!file.logger_node.empty()) {
- stream << "\"logger_node\": \"" << file.logger_node << "\", ";
+ stream << " \"logger_node\": \"" << file.logger_node << "\",\n";
}
- stream << "\"monotonic_start_time\": " << file.monotonic_start_time
- << ", \"realtime_start_time\": " << file.realtime_start_time << ", [";
- stream << "\"parts\": [";
+ if (!file.logger_boot_uuid.empty()) {
+ stream << " \"logger_boot_uuid\": \"" << file.logger_boot_uuid << "\",\n";
+ }
+ stream << " \"monotonic_start_time\": " << file.monotonic_start_time
+ << ",\n \"realtime_start_time\": " << file.realtime_start_time
+ << ",\n";
+ stream << " \"parts\": [\n";
for (size_t i = 0; i < file.parts.size(); ++i) {
if (i != 0u) {
- stream << ", ";
+ stream << ",\n";
}
stream << file.parts[i];
}
- stream << "]}";
+ stream << " ]\n}";
return stream;
}
std::ostream &operator<<(std::ostream &stream, const LogParts &parts) {
- stream << "{";
+ stream << " {\n";
if (!parts.log_event_uuid.empty()) {
- stream << "\"log_event_uuid\": \"" << parts.log_event_uuid << "\", ";
+ stream << " \"log_event_uuid\": \"" << parts.log_event_uuid << "\",\n";
}
if (!parts.parts_uuid.empty()) {
- stream << "\"parts_uuid\": \"" << parts.parts_uuid << "\", ";
+ stream << " \"parts_uuid\": \"" << parts.parts_uuid << "\",\n";
}
if (!parts.node.empty()) {
- stream << "\"node\": \"" << parts.node << "\", ";
+ stream << " \"node\": \"" << parts.node << "\",\n";
}
- stream << "\"monotonic_start_time\": " << parts.monotonic_start_time
- << ", \"realtime_start_time\": " << parts.realtime_start_time << ", [";
+ if (!parts.source_boot_uuid.empty()) {
+ stream << " \"source_boot_uuid\": \"" << parts.source_boot_uuid << "\",\n";
+ }
+ stream << " \"monotonic_start_time\": " << parts.monotonic_start_time
+ << ",\n \"realtime_start_time\": " << parts.realtime_start_time
+ << ",\n \"parts\": [";
for (size_t i = 0; i < parts.parts.size(); ++i) {
if (i != 0u) {
@@ -381,7 +435,7 @@
stream << parts.parts[i];
}
- stream << "]}";
+ stream << "]\n }";
return stream;
}
diff --git a/aos/events/logging/logfile_sorting.h b/aos/events/logging/logfile_sorting.h
index b955a09..bfe1aa4 100644
--- a/aos/events/logging/logfile_sorting.h
+++ b/aos/events/logging/logfile_sorting.h
@@ -19,6 +19,12 @@
aos::monotonic_clock::time_point monotonic_start_time;
aos::realtime_clock::time_point realtime_start_time;
+ // Time on the logger node (if applicable) that this log file started.
+ aos::monotonic_clock::time_point logger_monotonic_start_time =
+ aos::monotonic_clock::min_time;
+ aos::realtime_clock::time_point logger_realtime_start_time =
+ aos::realtime_clock::min_time;
+
// UUIDs if available.
std::string log_event_uuid;
std::string parts_uuid;
@@ -26,6 +32,11 @@
// The node this represents, or empty if we are in a single node world.
std::string node;
+ // Boot UUID of the node which generated this data, if available. For local
+ // data and timestamps, this is the same as the logger_boot_uuid. For remote
+ // data, this is the boot_uuid of the remote node.
+ std::string source_boot_uuid;
+
// Pre-sorted list of parts.
std::vector<std::string> parts;
};
@@ -38,6 +49,8 @@
// The node the logger was running on (if available)
std::string logger_node;
+ // Boot UUID of the node running the logger.
+ std::string logger_boot_uuid;
// The start time on the logger node.
aos::monotonic_clock::time_point monotonic_start_time;
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 3f85cc7..969f221 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -454,10 +454,16 @@
// time.
// TODO(austin): Does this work with startup when we don't know the remote
// start time too? Look at one of those logs to compare.
- if (monotonic_sent_time > parts_.monotonic_start_time) {
+ if (monotonic_sent_time >
+ parts_.monotonic_start_time + max_out_of_order_duration()) {
+ after_start_ = true;
+ }
+ if (after_start_) {
CHECK_GE(monotonic_sent_time,
newest_timestamp_ - max_out_of_order_duration())
- << ": Max out of order exceeded. " << parts_;
+ << ": Max out of order exceeded. " << parts_ << ", start time is "
+ << parts_.monotonic_start_time << " currently reading "
+ << filename();
}
return message;
}
@@ -581,7 +587,8 @@
return nullptr;
}
- CHECK_GE(messages_.begin()->timestamp, last_message_time_);
+ CHECK_GE(messages_.begin()->timestamp, last_message_time_)
+ << DebugString() << " reading " << parts_message_reader_.filename();
last_message_time_ = messages_.begin()->timestamp;
return &(*messages_.begin());
}
@@ -591,8 +598,16 @@
std::string LogPartsSorter::DebugString() const {
std::stringstream ss;
ss << "messages: [\n";
+ int count = 0;
+ bool no_dots = true;
for (const Message &m : messages_) {
- ss << m << "\n";
+ if (count < 15 || count > static_cast<int>(messages_.size()) - 15) {
+ ss << m << "\n";
+ } else if (no_dots) {
+ ss << "...\n";
+ no_dots = false;
+ }
+ ++count;
}
ss << "] <- " << parts_message_reader_.filename();
return ss.str();
@@ -834,12 +849,6 @@
}
Message TimestampMapper::MatchingMessageFor(const Message &message) {
- TimestampMapper *peer =
- CHECK_NOTNULL(nodes_data_[source_node_[message.channel_index]].peer);
- // The queue which will have the matching data, if available.
- std::deque<Message> *data_queue =
- &peer->nodes_data_[node()].channels[message.channel_index].messages;
-
// Figure out what queue index we are looking for.
CHECK(message.data.message().has_remote_queue_index());
const uint32_t remote_queue_index =
@@ -853,6 +862,23 @@
const realtime_clock::time_point realtime_remote_time(
std::chrono::nanoseconds(message.data.message().realtime_remote_time()));
+ TimestampMapper *peer = nodes_data_[source_node_[message.channel_index]].peer;
+
+ // We only register the peers which we have data for. So, if we are being
+ // asked to pull a timestamp from a peer which doesn't exist, return an empty
+ // message.
+ if (peer == nullptr) {
+ return Message{
+ .channel_index = message.channel_index,
+ .queue_index = remote_queue_index,
+ .timestamp = monotonic_remote_time,
+ .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
+ }
+
+ // The queue which will have the matching data, if available.
+ std::deque<Message> *data_queue =
+ &peer->nodes_data_[node()].channels[message.channel_index].messages;
+
peer->QueueUntil(monotonic_remote_time);
if (data_queue->empty()) {
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index fd600d3..61f16aa 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -314,6 +314,15 @@
bool done_ = false;
MessageReader message_reader_;
+ // True after we have seen a message after the start of the log. The
+ // guarentees on logging essentially are that all data from before the
+ // starting time of the log may be arbitrarily out of order, but once we get
+ // max_out_of_order_duration past the start, everything will remain within
+ // max_out_of_order_duration. We shouldn't see anything before the start
+ // after we've seen a message that is at least max_out_of_order_duration after
+ // the start.
+ bool after_start_ = false;
+
monotonic_clock::time_point newest_timestamp_ = monotonic_clock::min_time;
};
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index a380451..9029ce1 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -1162,6 +1162,64 @@
realtime_clock::time_point(chrono::seconds(1000)));
}
+// Tests that when a peer isn't registered, we treat that as if there was no
+// data available.
+TEST_F(TimestampMapperTest, NoPeer) {
+ const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+ {
+ DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
+ writer0.QueueSpan(config0_.span());
+ DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
+ writer1.QueueSpan(config2_.span());
+
+ MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
+ }
+
+ const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+
+ ASSERT_EQ(parts[0].logger_node, "pi1");
+ ASSERT_EQ(parts[1].logger_node, "pi2");
+
+ TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+
+ {
+ std::deque<TimestampedMessage> output1;
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ ASSERT_TRUE(mapper1.Front() == nullptr);
+
+ EXPECT_EQ(output1[0].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(1000));
+ EXPECT_FALSE(output1[0].data.Verify());
+ EXPECT_EQ(output1[1].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(2000));
+ EXPECT_FALSE(output1[1].data.Verify());
+ EXPECT_EQ(output1[2].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(3000));
+ EXPECT_FALSE(output1[2].data.Verify());
+ }
+}
+
} // namespace testing
} // namespace logger
} // namespace aos
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index a73d678..0338e62 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -35,6 +35,10 @@
DEFINE_bool(skip_order_validation, false,
"If true, ignore any out of orderness in replay");
+DEFINE_double(
+ time_estimation_buffer_seconds, 2.0,
+ "The time to buffer ahead in the log file to accurately reconstruct time.");
+
namespace aos {
namespace logger {
namespace {
@@ -50,6 +54,14 @@
return result.value();
}
+std::string LogFileVectorToString(std::vector<LogFile> log_files) {
+ std::stringstream ss;
+ for (const auto f : log_files) {
+ ss << f << "\n";
+ }
+ return ss.str();
+}
+
// Copies the channel, removing the schema as we go. If new_name is provided,
// it is used instead of the name inside the channel. If new_type is provided,
// it is used instead of the type in the channel.
@@ -118,8 +130,6 @@
std::function<bool(const Channel *)> should_log)
: event_loop_(event_loop),
configuration_(configuration),
- boot_uuid_(
- util::ReadFileToStringOrDie("/proc/sys/kernel/random/boot_id")),
name_(network::GetHostname()),
timer_handler_(event_loop_->AddTimer(
[this]() { DoLogData(event_loop_->monotonic_now()); })),
@@ -182,7 +192,6 @@
}
FetcherStruct fs;
- fs.node_index = our_node_index;
fs.channel_index = channel_index;
fs.channel = channel;
@@ -214,12 +223,19 @@
if (log_delivery_times) {
VLOG(1) << " Delivery times";
fs.wants_timestamp_writer = true;
+ fs.timestamp_node_index = our_node_index;
}
if (log_message) {
VLOG(1) << " Data";
fs.wants_writer = true;
if (!is_local) {
+ const Node *source_node = configuration::GetNode(
+ configuration_, channel->source_node()->string_view());
+ fs.data_node_index =
+ configuration::GetNodeIndex(configuration_, source_node);
fs.log_type = LogType::kLogRemoteMessage;
+ } else {
+ fs.data_node_index = our_node_index;
}
}
if (log_contents) {
@@ -227,7 +243,7 @@
<< configuration::CleanedChannelToString(channel);
fs.timestamp_node = timestamp_logger_channels.find(channel)->second;
fs.wants_contents_writer = true;
- fs.node_index =
+ fs.contents_node_index =
configuration::GetNodeIndex(configuration_, fs.timestamp_node);
}
fetchers_.emplace_back(std::move(fs));
@@ -319,7 +335,8 @@
// Clear out any old timestamps in case we are re-starting logging.
for (size_t i = 0; i < node_state_.size(); ++i) {
- SetStartTime(i, monotonic_clock::min_time, realtime_clock::min_time);
+ SetStartTime(i, monotonic_clock::min_time, realtime_clock::min_time,
+ monotonic_clock::min_time, realtime_clock::min_time);
}
WriteHeader();
@@ -327,6 +344,13 @@
LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node())
<< " start_time " << last_synchronized_time_;
+ // Force logging up until the start of the log file now, so the messages at
+ // the start are always ordered before the rest of the messages.
+ // Note: this ship may have already sailed, but we don't have to make it
+ // worse.
+ // TODO(austin): Test...
+ LogUntil(last_synchronized_time_);
+
timer_handler_->Setup(event_loop_->monotonic_now() + polling_period_,
polling_period_);
}
@@ -373,10 +397,53 @@
const int node_index = configuration::GetNodeIndex(configuration_, node);
MaybeUpdateTimestamp(node, node_index, monotonic_start_time,
realtime_start_time);
- log_namer_->WriteHeader(&node_state_[node_index].log_file_header, node);
+ MaybeWriteHeader(node_index, node);
}
}
+void Logger::MaybeWriteHeader(int node_index) {
+ if (configuration::MultiNode(configuration_)) {
+ return MaybeWriteHeader(node_index,
+ configuration_->nodes()->Get(node_index));
+ } else {
+ return MaybeWriteHeader(node_index, nullptr);
+ }
+}
+
+void Logger::MaybeWriteHeader(int node_index, const Node *node) {
+ // This function is responsible for writing the header when the header both
+ // has valid data, and when it needs to be written.
+ if (node_state_[node_index].header_written &&
+ node_state_[node_index].header_valid) {
+ // The header has been written and is valid, nothing to do.
+ return;
+ }
+ if (!node_state_[node_index].has_source_node_boot_uuid) {
+ // Can't write a header if we don't have the boot UUID.
+ return;
+ }
+
+ // WriteHeader writes the first header in a log file. We want to do this only
+ // once.
+ //
+ // Rotate rewrites the same header with a new part ID, but keeps the same part
+ // UUID. We don't want that when things reboot, because that implies that
+ // parts go together across a reboot.
+ //
+ // Reboot resets the parts UUID. So, once we've written a header the first
+ // time, we want to use Reboot to rotate the log and reset the parts UUID.
+ //
+ // header_valid is cleared whenever the remote reboots.
+ if (node_state_[node_index].header_written) {
+ log_namer_->Reboot(node, &node_state_[node_index].log_file_header);
+ } else {
+ log_namer_->WriteHeader(&node_state_[node_index].log_file_header, node);
+
+ node_state_[node_index].header_written = true;
+ }
+ node_state_[node_index].header_valid = true;
+}
+
void Logger::WriteMissingTimestamps() {
if (configuration::MultiNode(configuration_)) {
server_statistics_fetcher_.Fetch();
@@ -394,14 +461,20 @@
node, node_index,
server_statistics_fetcher_.context().monotonic_event_time,
server_statistics_fetcher_.context().realtime_event_time)) {
+ CHECK(node_state_[node_index].header_written);
+ CHECK(node_state_[node_index].header_valid);
log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
+ } else {
+ MaybeWriteHeader(node_index, node);
}
}
}
-void Logger::SetStartTime(size_t node_index,
- aos::monotonic_clock::time_point monotonic_start_time,
- aos::realtime_clock::time_point realtime_start_time) {
+void Logger::SetStartTime(
+ size_t node_index, aos::monotonic_clock::time_point monotonic_start_time,
+ aos::realtime_clock::time_point realtime_start_time,
+ aos::monotonic_clock::time_point logger_monotonic_start_time,
+ aos::realtime_clock::time_point logger_realtime_start_time) {
node_state_[node_index].monotonic_start_time = monotonic_start_time;
node_state_[node_index].realtime_start_time = realtime_start_time;
node_state_[node_index]
@@ -410,6 +483,30 @@
std::chrono::duration_cast<std::chrono::nanoseconds>(
monotonic_start_time.time_since_epoch())
.count());
+
+ // Add logger start times if they are available in the log file header.
+ if (node_state_[node_index]
+ .log_file_header.mutable_message()
+ ->has_logger_monotonic_start_time()) {
+ node_state_[node_index]
+ .log_file_header.mutable_message()
+ ->mutate_logger_monotonic_start_time(
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ logger_monotonic_start_time.time_since_epoch())
+ .count());
+ }
+
+ if (node_state_[node_index]
+ .log_file_header.mutable_message()
+ ->has_logger_realtime_start_time()) {
+ node_state_[node_index]
+ .log_file_header.mutable_message()
+ ->mutate_logger_realtime_start_time(
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ logger_realtime_start_time.time_since_epoch())
+ .count());
+ }
+
if (node_state_[node_index]
.log_file_header.mutable_message()
->has_realtime_start_time()) {
@@ -431,47 +528,57 @@
monotonic_clock::min_time) {
return false;
}
- if (configuration::MultiNode(configuration_)) {
- if (event_loop_->node() == node) {
- // There are no offsets to compute for ourself, so always succeed.
- SetStartTime(node_index, monotonic_start_time, realtime_start_time);
- return true;
- } else if (server_statistics_fetcher_.get() != nullptr) {
- // We must be a remote node now. Look for the connection and see if it is
- // connected.
-
- for (const message_bridge::ServerConnection *connection :
- *server_statistics_fetcher_->connections()) {
- if (connection->node()->name()->string_view() !=
- node->name()->string_view()) {
- continue;
- }
-
- if (connection->state() != message_bridge::State::CONNECTED) {
- VLOG(1) << node->name()->string_view()
- << " is not connected, can't start it yet.";
- break;
- }
-
- if (!connection->has_monotonic_offset()) {
- VLOG(1) << "Missing monotonic offset for setting start time for node "
- << aos::FlatbufferToJson(node);
- break;
- }
-
- VLOG(1) << "Updating start time for " << aos::FlatbufferToJson(node);
-
- // Found it and it is connected. Compensate and go.
- monotonic_start_time +=
- std::chrono::nanoseconds(connection->monotonic_offset());
-
- SetStartTime(node_index, monotonic_start_time, realtime_start_time);
- return true;
- }
- }
- } else {
- SetStartTime(node_index, monotonic_start_time, realtime_start_time);
+ if (event_loop_->node() == node ||
+ !configuration::MultiNode(configuration_)) {
+ // There are no offsets to compute for ourself, so always succeed.
+ SetStartTime(node_index, monotonic_start_time, realtime_start_time,
+ monotonic_start_time, realtime_start_time);
+ node_state_[node_index].SetBootUUID(event_loop_->boot_uuid().string_view());
return true;
+ } else if (server_statistics_fetcher_.get() != nullptr) {
+ // We must be a remote node now. Look for the connection and see if it is
+ // connected.
+
+ for (const message_bridge::ServerConnection *connection :
+ *server_statistics_fetcher_->connections()) {
+ if (connection->node()->name()->string_view() !=
+ node->name()->string_view()) {
+ continue;
+ }
+
+ if (connection->state() != message_bridge::State::CONNECTED) {
+ VLOG(1) << node->name()->string_view()
+ << " is not connected, can't start it yet.";
+ break;
+ }
+
+ // Update the boot UUID as soon as we know we are connected.
+ if (!connection->has_boot_uuid()) {
+ VLOG(1) << "Missing boot_uuid for node " << aos::FlatbufferToJson(node);
+ break;
+ }
+
+ if (!node_state_[node_index].has_source_node_boot_uuid ||
+ node_state_[node_index].source_node_boot_uuid !=
+ connection->boot_uuid()->string_view()) {
+ node_state_[node_index].SetBootUUID(
+ connection->boot_uuid()->string_view());
+ }
+
+ if (!connection->has_monotonic_offset()) {
+ VLOG(1) << "Missing monotonic offset for setting start time for node "
+ << aos::FlatbufferToJson(node);
+ break;
+ }
+
+ // Found it and it is connected. Compensate and go.
+ SetStartTime(node_index,
+ monotonic_start_time +
+ std::chrono::nanoseconds(connection->monotonic_offset()),
+ realtime_start_time, monotonic_start_time,
+ realtime_start_time);
+ return true;
+ }
}
return false;
}
@@ -502,8 +609,11 @@
log_start_uuid_offset = fbb.CreateString(log_start_uuid_);
}
- const flatbuffers::Offset<flatbuffers::String> boot_uuid_offset =
- fbb.CreateString(boot_uuid_);
+ const flatbuffers::Offset<flatbuffers::String> logger_node_boot_uuid_offset =
+ fbb.CreateString(event_loop_->boot_uuid().string_view());
+
+ const flatbuffers::Offset<flatbuffers::String> source_node_boot_uuid_offset =
+ fbb.CreateString(event_loop_->boot_uuid().string_view());
const flatbuffers::Offset<flatbuffers::String> parts_uuid_offset =
fbb.CreateString("00000000-0000-4000-8000-000000000000");
@@ -545,6 +655,15 @@
std::chrono::duration_cast<std::chrono::nanoseconds>(
realtime_clock::min_time.time_since_epoch())
.count());
+ } else {
+ log_file_header_builder.add_logger_monotonic_start_time(
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ monotonic_clock::min_time.time_since_epoch())
+ .count());
+ log_file_header_builder.add_logger_realtime_start_time(
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ realtime_clock::min_time.time_since_epoch())
+ .count());
}
log_file_header_builder.add_log_event_uuid(log_event_uuid_offset);
@@ -552,7 +671,10 @@
if (!log_start_uuid_offset.IsNull()) {
log_file_header_builder.add_log_start_uuid(log_start_uuid_offset);
}
- log_file_header_builder.add_boot_uuid(boot_uuid_offset);
+ log_file_header_builder.add_logger_node_boot_uuid(
+ logger_node_boot_uuid_offset);
+ log_file_header_builder.add_source_node_boot_uuid(
+ source_node_boot_uuid_offset);
log_file_header_builder.add_parts_uuid(parts_uuid_offset);
log_file_header_builder.add_parts_index(0);
@@ -591,6 +713,9 @@
}
void Logger::LogUntil(monotonic_clock::time_point t) {
+ // Grab the latest ServerStatistics message. This will always have the
+ // oppertunity to be >= to the current time, so it will always represent any
+ // reboots which may have happened.
WriteMissingTimestamps();
// Write each channel to disk, one at a time.
@@ -636,6 +761,9 @@
max_header_size_ = std::max(max_header_size_,
fbb.GetSize() - f.fetcher->context().size);
+ CHECK(node_state_[f.data_node_index].header_valid)
+ << ": Can't write data before the header on channel "
+ << configuration::CleanedChannelToString(f.fetcher->channel());
f.writer->QueueSizedFlatbuffer(&fbb);
}
@@ -659,6 +787,9 @@
flatbuffers::GetSizePrefixedRoot<MessageHeader>(
fbb.GetBufferPointer()));
+ CHECK(node_state_[f.timestamp_node_index].header_valid)
+ << ": Can't write data before the header on channel "
+ << configuration::CleanedChannelToString(f.fetcher->channel());
f.timestamp_writer->QueueSizedFlatbuffer(&fbb);
}
@@ -674,6 +805,16 @@
const RemoteMessage *msg =
flatbuffers::GetRoot<RemoteMessage>(f.fetcher->context().data);
+ CHECK(msg->has_boot_uuid()) << ": " << aos::FlatbufferToJson(msg);
+ if (!node_state_[f.contents_node_index].has_source_node_boot_uuid ||
+ node_state_[f.contents_node_index].source_node_boot_uuid !=
+ msg->boot_uuid()->string_view()) {
+ node_state_[f.contents_node_index].SetBootUUID(
+ msg->boot_uuid()->string_view());
+
+ MaybeWriteHeader(f.contents_node_index);
+ }
+
logger::MessageHeader::Builder message_header_builder(fbb);
// TODO(austin): This needs to check the channel_index and confirm
@@ -706,6 +847,9 @@
const auto end = event_loop_->monotonic_now();
RecordCreateMessageTime(start, end, &f);
+ CHECK(node_state_[f.contents_node_index].header_valid)
+ << ": Can't write data before the header on channel "
+ << configuration::CleanedChannelToString(f.fetcher->channel());
f.contents_writer->QueueSizedFlatbuffer(&fbb);
}
@@ -917,6 +1061,18 @@
configuration::GetNodeIndex(configuration(), node);
std::vector<LogParts> filtered_parts = FilterPartsForNode(
log_files_, node != nullptr ? node->name()->string_view() : "");
+
+ // Confirm that all the parts are from the same boot if there are enough
+ // parts to not be from the same boot.
+ if (filtered_parts.size() > 1u) {
+ for (size_t i = 1; i < filtered_parts.size(); ++i) {
+ CHECK_EQ(filtered_parts[i].source_boot_uuid,
+ filtered_parts[0].source_boot_uuid)
+ << ": Found parts from different boots "
+ << LogFileVectorToString(log_files_);
+ }
+ }
+
states_[node_index] = std::make_unique<State>(
filtered_parts.size() == 0u
? nullptr
@@ -1418,6 +1574,10 @@
<< state->remote_node(timestamped_message.channel_index)
->name()
->string_view()
+ << " while trying to send a message on "
+ << configuration::CleanedChannelToString(
+ logged_configuration()->channels()->Get(
+ timestamped_message.channel_index))
<< " " << state->DebugString();
} else if (timestamped_message.monotonic_remote_time >=
state->monotonic_remote_now(
@@ -2017,6 +2177,9 @@
remote_timestamp_senders_[timestamped_message.channel_index]
->MakeBuilder();
+ flatbuffers::Offset<flatbuffers::String> boot_uuid_offset =
+ builder.fbb()->CreateString(event_loop_->boot_uuid().string_view());
+
RemoteMessage::Builder message_header_builder =
builder.MakeBuilder<RemoteMessage>();
@@ -2037,6 +2200,7 @@
timestamped_message.realtime_remote_time.time_since_epoch().count());
message_header_builder.add_remote_queue_index(remote_queue_index);
+ message_header_builder.add_boot_uuid(boot_uuid_offset);
builder.Send(message_header_builder.Finish());
}
@@ -2101,7 +2265,8 @@
(sorted_messages_.size() > 0
? std::get<0>(sorted_messages_.front()).monotonic_event_time
: timestamp_mapper_->monotonic_start_time()) +
- std::chrono::seconds(2);
+ chrono::duration_cast<chrono::seconds>(
+ chrono::duration<double>(FLAGS_time_estimation_buffer_seconds));
while (true) {
TimestampedMessage *m = timestamp_mapper_->Front();
@@ -2109,8 +2274,9 @@
return;
}
if (sorted_messages_.size() > 0) {
- // Stop placing sorted messages on the list once we have 2 seconds
- // queued up (but queue at least until the log starts.
+ // Stop placing sorted messages on the list once we have
+ // --time_estimation_buffer_seconds seconds queued up (but queue at least
+ // until the log starts.
if (end_queue_time <
std::get<0>(sorted_messages_.back()).monotonic_event_time) {
return;
diff --git a/aos/events/logging/logger.fbs b/aos/events/logging/logger.fbs
index fecfd6e..fef68f4 100644
--- a/aos/events/logging/logger.fbs
+++ b/aos/events/logging/logger.fbs
@@ -56,7 +56,15 @@
// All log parts generated on a single node while it is booted will have the
// same value here. It also matches with the one used by systemd.
- boot_uuid: string (id: 11);
+ logger_node_boot_uuid: string (id: 11);
+
+ // Empty if we didn't have one at the time.
+ source_node_boot_uuid: string (id: 13);
+
+ // Timestamps that this logfile started at on the logger's clocks. This is
+ // mostly useful when trying to deduce the order of node reboots.
+ logger_monotonic_start_time:int64 = -9223372036854775808 (id: 14);
+ logger_realtime_start_time:int64 = -9223372036854775808 (id: 15);
// All log events across all nodes produced by a single high-level start event
// will have the same value here.
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index a248f80..d042f90 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -180,7 +180,12 @@
bool wants_contents_writer = false;
DetachedBufferWriter *contents_writer = nullptr;
- int node_index = 0;
+ // Node which this data is from, or -1 if it is unknown.
+ int data_node_index = -1;
+ // Node that this timestamp is for, or -1 if it is known.
+ int timestamp_node_index = -1;
+ // Node that the contents this contents_writer will log are from.
+ int contents_node_index = -1;
};
// Vector mapping from the channel index from the event loop to the logged
@@ -193,14 +198,46 @@
aos::realtime_clock::time_point realtime_start_time =
aos::realtime_clock::min_time;
+ bool has_source_node_boot_uuid = false;
+
+ // This is an initial UUID that is a valid UUID4 and is pretty obvious that
+ // it isn't valid.
+ std::string source_node_boot_uuid = "00000000-0000-4000-8000-000000000000";
+
aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> log_file_header =
aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>::Empty();
+
+ // True if a header has been written to the start of a log file.
+ bool header_written = false;
+ // True if the current written header represents the contents which will
+ // follow. This is cleared when boot_uuid is known to not match anymore.
+ bool header_valid = false;
+
+ // Sets the source_node_boot_uuid, properly updating everything.
+ void SetBootUUID(std::string_view new_source_node_boot_uuid) {
+ source_node_boot_uuid = new_source_node_boot_uuid;
+ header_valid = false;
+ has_source_node_boot_uuid = true;
+
+ flatbuffers::String *source_node_boot_uuid_string =
+ log_file_header.mutable_message()->mutable_source_node_boot_uuid();
+ CHECK_EQ(source_node_boot_uuid.size(),
+ source_node_boot_uuid_string->size());
+ memcpy(source_node_boot_uuid_string->data(), source_node_boot_uuid.data(),
+ source_node_boot_uuid.size());
+ }
};
void WriteHeader();
+
aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
const Node *node);
+ // Writes the header for the provided node if enough information is valid.
+ void MaybeWriteHeader(int node_index);
+ // Overload for when we already know node as well.
+ void MaybeWriteHeader(int node_index, const Node *node);
+
bool MaybeUpdateTimestamp(
const Node *node, int node_index,
aos::monotonic_clock::time_point monotonic_start_time,
@@ -222,9 +259,11 @@
FetcherStruct *fetcher);
// Sets the start time for a specific node.
- void SetStartTime(size_t node_index,
- aos::monotonic_clock::time_point monotonic_start_time,
- aos::realtime_clock::time_point realtime_start_time);
+ void SetStartTime(
+ size_t node_index, aos::monotonic_clock::time_point monotonic_start_time,
+ aos::realtime_clock::time_point realtime_start_time,
+ aos::monotonic_clock::time_point logger_monotonic_start_time,
+ aos::realtime_clock::time_point logger_realtime_start_time);
EventLoop *const event_loop_;
// The configuration to place at the top of the log file.
@@ -235,7 +274,6 @@
std::unique_ptr<LogNamer> log_namer_;
// Empty indicates there isn't one.
std::string log_start_uuid_;
- const std::string boot_uuid_;
// Name to save in the log file. Defaults to hostname.
std::string name_;
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 8115400..3f4893e 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -1,5 +1,6 @@
#include "aos/events/logging/logger.h"
+#include "absl/strings/str_format.h"
#include "aos/events/event_loop.h"
#include "aos/events/message_counter.h"
#include "aos/events/ping_lib.h"
@@ -360,6 +361,30 @@
}
}
+std::vector<std::string> MakeLogFiles(std::string logfile_base) {
+ return std::vector<std::string>(
+ {logfile_base + "_pi1_data.part0.bfbs",
+ logfile_base + "_pi2_data/test/aos.examples.Pong.part0.bfbs",
+ logfile_base + "_pi2_data/test/aos.examples.Pong.part1.bfbs",
+ logfile_base + "_pi2_data.part0.bfbs",
+ logfile_base + "_timestamps/pi1/aos/remote_timestamps/pi2/"
+ "aos.message_bridge.RemoteMessage.part0.bfbs",
+ logfile_base + "_timestamps/pi1/aos/remote_timestamps/pi2/"
+ "aos.message_bridge.RemoteMessage.part1.bfbs",
+ logfile_base + "_timestamps/pi2/aos/remote_timestamps/pi1/"
+ "aos.message_bridge.RemoteMessage.part0.bfbs",
+ logfile_base + "_timestamps/pi2/aos/remote_timestamps/pi1/"
+ "aos.message_bridge.RemoteMessage.part1.bfbs",
+ logfile_base +
+ "_pi1_data/pi1/aos/aos.message_bridge.Timestamp.part0.bfbs",
+ logfile_base +
+ "_pi1_data/pi1/aos/aos.message_bridge.Timestamp.part1.bfbs",
+ logfile_base +
+ "_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part0.bfbs",
+ logfile_base +
+ "_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part1.bfbs"});
+}
+
class MultinodeLoggerTest : public ::testing::Test {
public:
MultinodeLoggerTest()
@@ -372,27 +397,24 @@
configuration::GetNode(event_loop_factory_.configuration(), "pi2")),
tmp_dir_(aos::testing::TestTmpDir()),
logfile_base_(tmp_dir_ + "/multi_logfile"),
- logfiles_(
+ pi1_reboot_logfiles_(
{logfile_base_ + "_pi1_data.part0.bfbs",
logfile_base_ + "_pi2_data/test/aos.examples.Pong.part0.bfbs",
logfile_base_ + "_pi2_data/test/aos.examples.Pong.part1.bfbs",
- logfile_base_ + "_pi2_data.part0.bfbs",
+ logfile_base_ + "_pi2_data/test/aos.examples.Pong.part2.bfbs",
logfile_base_ + "_timestamps/pi1/aos/remote_timestamps/pi2/"
"aos.message_bridge.RemoteMessage.part0.bfbs",
logfile_base_ + "_timestamps/pi1/aos/remote_timestamps/pi2/"
"aos.message_bridge.RemoteMessage.part1.bfbs",
- logfile_base_ + "_timestamps/pi2/aos/remote_timestamps/pi1/"
- "aos.message_bridge.RemoteMessage.part0.bfbs",
- logfile_base_ + "_timestamps/pi2/aos/remote_timestamps/pi1/"
- "aos.message_bridge.RemoteMessage.part1.bfbs",
- logfile_base_ +
- "_pi1_data/pi1/aos/aos.message_bridge.Timestamp.part0.bfbs",
- logfile_base_ +
- "_pi1_data/pi1/aos/aos.message_bridge.Timestamp.part1.bfbs",
+ logfile_base_ + "_timestamps/pi1/aos/remote_timestamps/pi2/"
+ "aos.message_bridge.RemoteMessage.part2.bfbs",
logfile_base_ +
"_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part0.bfbs",
logfile_base_ +
- "_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part1.bfbs"}),
+ "_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part1.bfbs",
+ logfile_base_ +
+ "_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part2.bfbs"}),
+ logfiles_(MakeLogFiles(logfile_base_)),
structured_logfiles_{
std::vector<std::string>{logfiles_[0]},
std::vector<std::string>{logfiles_[1], logfiles_[2]},
@@ -411,6 +433,14 @@
unlink((file + ".xz").c_str());
}
+ for (const auto file : MakeLogFiles("relogged")) {
+ unlink(file.c_str());
+ }
+
+ for (const auto file : pi1_reboot_logfiles_) {
+ unlink(file.c_str());
+ }
+
LOG(INFO) << "Logging data to " << logfiles_[0] << ", " << logfiles_[1]
<< " and " << logfiles_[2];
}
@@ -545,6 +575,7 @@
std::string tmp_dir_;
std::string logfile_base_;
+ std::vector<std::string> pi1_reboot_logfiles_;
std::vector<std::string> logfiles_;
std::vector<std::vector<std::string>> structured_logfiles_;
@@ -1478,9 +1509,9 @@
pi1_event_loop->MakeWatcher(
"/aos/remote_timestamps/pi2",
- [&pi1_event_loop, pi1_timestamp_channel, ping_timestamp_channel,
- &pi1_timestamp_on_pi1_fetcher, &pi1_timestamp_on_pi2_fetcher,
- &ping_on_pi1_fetcher,
+ [&pi1_event_loop, &pi2_event_loop, pi1_timestamp_channel,
+ ping_timestamp_channel, &pi1_timestamp_on_pi1_fetcher,
+ &pi1_timestamp_on_pi2_fetcher, &ping_on_pi1_fetcher,
&ping_on_pi2_fetcher](const RemoteMessage &header) {
const aos::monotonic_clock::time_point header_monotonic_sent_time(
chrono::nanoseconds(header.monotonic_sent_time()));
@@ -1511,6 +1542,10 @@
header.channel_index()));
}
+ ASSERT_TRUE(header.has_boot_uuid());
+ EXPECT_EQ(header.boot_uuid()->string_view(),
+ pi2_event_loop->boot_uuid().string_view());
+
EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
EXPECT_EQ(pi2_context->remote_queue_index, header.remote_queue_index());
EXPECT_EQ(pi2_context->queue_index, header.queue_index());
@@ -1530,9 +1565,9 @@
});
pi2_event_loop->MakeWatcher(
"/aos/remote_timestamps/pi1",
- [&pi2_event_loop, pi2_timestamp_channel, pong_timestamp_channel,
- &pi2_timestamp_on_pi2_fetcher, &pi2_timestamp_on_pi1_fetcher,
- &pong_on_pi2_fetcher,
+ [&pi2_event_loop, &pi1_event_loop, pi2_timestamp_channel,
+ pong_timestamp_channel, &pi2_timestamp_on_pi2_fetcher,
+ &pi2_timestamp_on_pi1_fetcher, &pong_on_pi2_fetcher,
&pong_on_pi1_fetcher](const RemoteMessage &header) {
const aos::monotonic_clock::time_point header_monotonic_sent_time(
chrono::nanoseconds(header.monotonic_sent_time()));
@@ -1563,6 +1598,10 @@
header.channel_index()));
}
+ ASSERT_TRUE(header.has_boot_uuid());
+ EXPECT_EQ(header.boot_uuid()->string_view(),
+ pi1_event_loop->boot_uuid().string_view());
+
EXPECT_EQ(pi2_context->queue_index, header.remote_queue_index());
EXPECT_EQ(pi1_context->remote_queue_index, header.remote_queue_index());
EXPECT_EQ(pi1_context->queue_index, header.queue_index());
@@ -1602,9 +1641,92 @@
reader.Deregister();
}
+// Tests that we properly populate and extract the logger_start time by setting
+// up a clock difference between 2 nodes and looking at the resulting parts.
+TEST_F(MultinodeLoggerTest, LoggerStartTime) {
+ {
+ LoggerState pi1_logger = MakeLogger(pi1_);
+ LoggerState pi2_logger = MakeLogger(pi2_);
+
+ NodeEventLoopFactory *pi2 =
+ event_loop_factory_.GetNodeEventLoopFactory(pi2_);
+
+ pi2->SetDistributedOffset(chrono::seconds(1000), 1.0);
+
+ StartLogger(&pi1_logger);
+ StartLogger(&pi2_logger);
+
+ event_loop_factory_.RunFor(chrono::milliseconds(10000));
+ }
+
+ for (const LogFile &log_file : SortParts(logfiles_)) {
+ for (const LogParts &log_part : log_file.parts) {
+ if (log_part.node == log_file.logger_node) {
+ EXPECT_EQ(log_part.logger_monotonic_start_time,
+ aos::monotonic_clock::min_time);
+ EXPECT_EQ(log_part.logger_realtime_start_time,
+ aos::realtime_clock::min_time);
+ } else {
+ const chrono::seconds offset = log_file.logger_node == "pi1"
+ ? -chrono::seconds(1000)
+ : chrono::seconds(1000);
+ EXPECT_EQ(log_part.logger_monotonic_start_time,
+ log_part.monotonic_start_time + offset);
+ EXPECT_EQ(log_part.logger_realtime_start_time,
+ log_file.realtime_start_time +
+ (log_part.logger_monotonic_start_time -
+ log_file.monotonic_start_time));
+ }
+ }
+ }
+}
+
// TODO(austin): We can write a test which recreates a logfile and confirms that
// we get it back. That is the ultimate test.
+// Tests that we properly recreate forwarded timestamps when replaying a log.
+// This should be enough that we can then re-run the logger and get a valid log
+// back.
+TEST_F(MultinodeLoggerDeathTest, RemoteReboot) {
+ std::string pi2_boot1;
+ std::string pi2_boot2;
+ {
+ pi2_boot1 = event_loop_factory_.GetNodeEventLoopFactory(pi2_)
+ ->boot_uuid()
+ .string_view();
+ LoggerState pi1_logger = MakeLogger(pi1_);
+
+ event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+ StartLogger(&pi1_logger);
+
+ event_loop_factory_.RunFor(chrono::milliseconds(10000));
+
+ event_loop_factory_.GetNodeEventLoopFactory(pi2_)->Reboot();
+
+ pi2_boot2 = event_loop_factory_.GetNodeEventLoopFactory(pi2_)
+ ->boot_uuid()
+ .string_view();
+
+ event_loop_factory_.RunFor(chrono::milliseconds(20000));
+ }
+
+ // Confirm that we refuse to replay logs with missing boot uuids.
+ EXPECT_DEATH(
+ {
+ LogReader reader(SortParts(pi1_reboot_logfiles_));
+
+ SimulatedEventLoopFactory log_reader_factory(reader.configuration());
+ log_reader_factory.set_send_delay(chrono::microseconds(0));
+
+ // This sends out the fetched messages and advances time to the start of
+ // the log file.
+ reader.Register(&log_reader_factory);
+ },
+ absl::StrFormat("(%s|%s).*(%s|%s).*Found parts from different boots",
+ pi2_boot1, pi2_boot2, pi2_boot2, pi2_boot1));
+}
+
} // namespace testing
} // namespace logger
} // namespace aos
diff --git a/frc971/control_loops/drivetrain/ssdrivetrain.cc b/frc971/control_loops/drivetrain/ssdrivetrain.cc
index cd29e39..12a6bcc 100644
--- a/frc971/control_loops/drivetrain/ssdrivetrain.cc
+++ b/frc971/control_loops/drivetrain/ssdrivetrain.cc
@@ -148,7 +148,7 @@
if (!goal->has_max_ss_voltage()) {
max_voltage_ = kMaxVoltage;
} else {
- max_voltage_ = goal->has_max_ss_voltage();
+ max_voltage_ = goal->max_ss_voltage();
}
use_profile_ = !kf_->controller().Kff().isZero(0) &&