Merge "Adding option to solve pose (solvePnP) without RANSAC by default"
diff --git a/aos/configuration.cc b/aos/configuration.cc
index 24e15d9..2b9ac6d 100644
--- a/aos/configuration.cc
+++ b/aos/configuration.cc
@@ -1122,6 +1122,13 @@
LOG(FATAL) << "Unknown logger config " << static_cast<int>(channel->logger());
}
+size_t ConnectionCount(const Channel *channel) {
+ if (!channel->has_destination_nodes()) {
+ return 0;
+ }
+ return channel->destination_nodes()->size();
+}
+
const Connection *ConnectionToNode(const Channel *channel, const Node *node) {
if (!channel->has_destination_nodes()) {
return nullptr;
diff --git a/aos/configuration.h b/aos/configuration.h
index 8dd4851..ac5c90c 100644
--- a/aos/configuration.h
+++ b/aos/configuration.h
@@ -134,6 +134,9 @@
bool ChannelMessageIsLoggedOnNode(const Channel *channel,
std::string_view node_name);
+// Returns the number of connections.
+size_t ConnectionCount(const Channel *channel);
+
const Connection *ConnectionToNode(const Channel *channel, const Node *node);
// Returns true if the delivery timestamps are supposed to be logged on this
// node.
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 79d09e4..e27e775 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -375,6 +375,7 @@
":simple_channel",
"//aos:init",
"//aos:realtime",
+ "//aos/events/logging:boot_timestamp",
"//aos/events/logging:logger_fbs",
"//aos/ipc_lib:index",
"//aos/network:message_bridge_client_status",
diff --git a/aos/events/event_loop.cc b/aos/events/event_loop.cc
index b0487d5..8728111 100644
--- a/aos/events/event_loop.cc
+++ b/aos/events/event_loop.cc
@@ -84,6 +84,13 @@
configuration_(configuration) {}
EventLoop::~EventLoop() {
+ if(!senders_.empty()) {
+ for (const RawSender *sender : senders_) {
+ LOG(ERROR) << " Sender "
+ << configuration::StrippedChannelToString(sender->channel())
+ << " still open";
+ }
+ }
CHECK_EQ(senders_.size(), 0u) << ": Not all senders destroyed";
CHECK_EQ(events_.size(), 0u) << ": Not all events unregistered";
}
diff --git a/aos/events/event_scheduler.cc b/aos/events/event_scheduler.cc
index 3739f49..fdeaf1e 100644
--- a/aos/events/event_scheduler.cc
+++ b/aos/events/event_scheduler.cc
@@ -41,11 +41,23 @@
return events_list_.begin()->first;
}
+void EventScheduler::Shutdown() {
+ on_shutdown_();
+}
+
+void EventScheduler::Startup() {
+ ++boot_count_;
+ RunOnStartup();
+}
+
void EventScheduler::CallOldestEvent() {
CHECK_GT(events_list_.size(), 0u);
auto iter = events_list_.begin();
- CHECK_EQ(monotonic_now(), iter->first)
- << ": Time is wrong on node " << node_index_;
+ const logger::BootTimestamp t =
+ FromDistributedClock(scheduler_scheduler_->distributed_now());
+ VLOG(1) << "Got time back " << t;
+ CHECK_EQ(t.boot, boot_count_);
+ CHECK_EQ(t.time, iter->first) << ": Time is wrong on node " << node_index_;
::std::function<void()> callback = ::std::move(iter->second);
events_list_.erase(iter);
@@ -61,6 +73,15 @@
on_run_.clear();
}
+void EventScheduler::RunOnStartup() {
+ for (size_t i = 0; i < on_startup_.size(); ++i) {
+ on_startup_[i]();
+ }
+ on_startup_.clear();
+}
+
+void EventScheduler::RunStarted() { started_(); }
+
std::ostream &operator<<(std::ostream &stream,
const aos::distributed_clock::time_point &now) {
// Print it the same way we print a monotonic time. Literally.
@@ -72,11 +93,55 @@
CHECK(std::find(schedulers_.begin(), schedulers_.end(), scheduler) ==
schedulers_.end());
CHECK(scheduler->scheduler_scheduler_ == nullptr);
+ CHECK_EQ(scheduler->node_index(), schedulers_.size());
schedulers_.emplace_back(scheduler);
scheduler->scheduler_scheduler_ = this;
}
+void EventSchedulerScheduler::Reboot() {
+ const std::vector<logger::BootTimestamp> × =
+ std::get<1>(reboots_.front());
+ CHECK_EQ(times.size(), schedulers_.size());
+
+ VLOG(1) << "Rebooting at " << now_;
+ for (const auto &time : times) {
+ VLOG(1) << " " << time;
+ }
+
+ is_running_ = false;
+
+ // Shut everything down.
+ std::vector<size_t> rebooted;
+ for (size_t node_index = 0; node_index < schedulers_.size(); ++node_index) {
+ if (schedulers_[node_index]->boot_count() == times[node_index].boot) {
+ continue;
+ } else {
+ rebooted.emplace_back(node_index);
+ CHECK_EQ(schedulers_[node_index]->boot_count() + 1,
+ times[node_index].boot);
+ schedulers_[node_index]->Shutdown();
+ }
+ }
+
+ // And start it back up again to reboot. When something starts back up
+ // (especially message_bridge), it could try to send stuff out. We want
+ // to move everything over to the new boot before doing that.
+ for (const size_t node_index : rebooted) {
+ CHECK_EQ(schedulers_[node_index]->boot_count() + 1, times[node_index].boot);
+ schedulers_[node_index]->Startup();
+ }
+
+ for (const size_t node_index : rebooted) {
+ schedulers_[node_index]->RunStarted();
+ }
+
+ for (const size_t node_index : rebooted) {
+ schedulers_[node_index]->RunOnRun();
+ }
+ is_running_ = true;
+}
+
void EventSchedulerScheduler::RunFor(distributed_clock::duration duration) {
distributed_clock::time_point end_time = now_ + duration;
logging::ScopedLogRestorer prev_logger;
@@ -86,6 +151,25 @@
while (is_running_) {
std::tuple<distributed_clock::time_point, EventScheduler *> oldest_event =
OldestEvent();
+ if (!reboots_.empty() &&
+ std::get<0>(reboots_.front()) <= std::get<0>(oldest_event)) {
+ // Reboot is next.
+ if (std::get<0>(reboots_.front()) > end_time) {
+ // Reboot is after our end time, give up.
+ is_running_ = false;
+ break;
+ }
+
+ CHECK_LE(now_,
+ std::get<0>(reboots_.front()) + std::chrono::nanoseconds(1))
+ << ": Simulated time went backwards by too much. Please "
+ "investigate.";
+ now_ = std::get<0>(reboots_.front());
+ Reboot();
+ reboots_.erase(reboots_.begin());
+ continue;
+ }
+
// No events left, bail.
if (std::get<0>(oldest_event) == distributed_clock::max_time ||
std::get<0>(oldest_event) > end_time) {
@@ -117,6 +201,18 @@
while (is_running_) {
std::tuple<distributed_clock::time_point, EventScheduler *> oldest_event =
OldestEvent();
+ if (!reboots_.empty() &&
+ std::get<0>(reboots_.front()) <= std::get<0>(oldest_event)) {
+ // Reboot is next.
+ CHECK_LE(now_,
+ std::get<0>(reboots_.front()) + std::chrono::nanoseconds(1))
+ << ": Simulated time went backwards by too much. Please "
+ "investigate.";
+ now_ = std::get<0>(reboots_.front());
+ Reboot();
+ reboots_.erase(reboots_.begin());
+ continue;
+ }
// No events left, bail.
if (std::get<0>(oldest_event) == distributed_clock::max_time) {
break;
diff --git a/aos/events/event_scheduler.h b/aos/events/event_scheduler.h
index f981ef2..cc70757 100644
--- a/aos/events/event_scheduler.h
+++ b/aos/events/event_scheduler.h
@@ -9,6 +9,7 @@
#include <vector>
#include "aos/events/event_loop.h"
+#include "aos/events/logging/boot_timestamp.h"
#include "aos/logging/implementations.h"
#include "aos/time/time.h"
#include "glog/logging.h"
@@ -49,18 +50,35 @@
public:
virtual ~TimeConverter() {}
+ // Returns the boot UUID for a node and boot. Note: the boot UUID for
+ // subsequent calls needs to be the same each time.
+ virtual UUID boot_uuid(size_t node_index, size_t boot_count) = 0;
+
+ void set_reboot_found(
+ std::function<void(distributed_clock::time_point,
+ const std::vector<logger::BootTimestamp> &)>
+ fn) {
+ reboot_found_ = fn;
+ }
+
// Converts a time to the distributed clock for scheduling and cross-node
// time measurement.
virtual distributed_clock::time_point ToDistributedClock(
- size_t node_index, monotonic_clock::time_point time) = 0;
+ size_t node_index, logger::BootTimestamp time) = 0;
// Takes the distributed time and converts it to the monotonic clock for this
// node.
- virtual monotonic_clock::time_point FromDistributedClock(
- size_t node_index, distributed_clock::time_point time) = 0;
+ virtual logger::BootTimestamp FromDistributedClock(
+ size_t node_index, distributed_clock::time_point time,
+ size_t boot_count) = 0;
// Called whenever time passes this point and we can forget about it.
virtual void ObserveTimePassed(distributed_clock::time_point time) = 0;
+
+ protected:
+ std::function<void(distributed_clock::time_point,
+ const std::vector<logger::BootTimestamp> &)>
+ reboot_found_;
};
class EventSchedulerScheduler;
@@ -70,14 +88,19 @@
using ChannelType =
std::multimap<monotonic_clock::time_point, std::function<void()>>;
using Token = ChannelType::iterator;
+ EventScheduler(size_t node_index) : node_index_(node_index) {}
// Sets the time converter in use for this scheduler (and the corresponding
// node index)
void SetTimeConverter(size_t node_index, TimeConverter *converter) {
- node_index_ = node_index;
+ CHECK_EQ(node_index_, node_index);
converter_ = converter;
}
+ UUID boot_uuid() {
+ return converter_->boot_uuid(node_index_, boot_count_);
+ }
+
// Schedule an event with a callback function
// Returns an iterator to the event
Token Schedule(monotonic_clock::time_point time,
@@ -88,6 +111,22 @@
on_run_.emplace_back(std::move(callback));
}
+ // Schedules a callback when the event scheduler starts.
+ void ScheduleOnStartup(std::function<void()> callback) {
+ on_startup_.emplace_back(std::move(callback));
+ }
+
+ void set_on_shutdown(std::function<void()> callback) {
+ on_shutdown_ = std::move(callback);
+ }
+
+ void set_started(std::function<void()> callback) {
+ started_ = std::move(callback);
+ }
+
+ std::function<void()> started_;
+ std::function<void()> on_shutdown_;
+
Token InvalidToken() { return events_list_.end(); }
// Deschedule an event by its iterator
@@ -96,11 +135,17 @@
// Runs the OnRun callbacks.
void RunOnRun();
+ // Runs the OnStartup callbacks.
+ void RunOnStartup();
+
+ // Runs the Started callback.
+ void RunStarted();
+
// Returns true if events are being handled.
inline bool is_running() const;
// Returns the timestamp of the next event to trigger.
- aos::monotonic_clock::time_point OldestEvent();
+ monotonic_clock::time_point OldestEvent();
// Handles the next event.
void CallOldestEvent();
@@ -108,27 +153,44 @@
// measurement.
distributed_clock::time_point ToDistributedClock(
monotonic_clock::time_point time) const {
- return converter_->ToDistributedClock(node_index_, time);
+ return converter_->ToDistributedClock(node_index_,
+ {.boot = boot_count_, .time = time});
}
// Takes the distributed time and converts it to the monotonic clock for this
// node.
- monotonic_clock::time_point FromDistributedClock(
+ logger::BootTimestamp FromDistributedClock(
distributed_clock::time_point time) const {
- return converter_->FromDistributedClock(node_index_, time);
+ return converter_->FromDistributedClock(node_index_, time, boot_count_);
}
// Returns the current monotonic time on this node calculated from the
// distributed clock.
inline monotonic_clock::time_point monotonic_now() const;
+ // Returns the current monotonic time on this node calculated from the
+ // distributed clock.
+ inline distributed_clock::time_point distributed_now() const;
+
+ size_t boot_count() const { return boot_count_; }
+
+ size_t node_index() const { return node_index_; }
+
+ // For implementing reboots.
+ void Shutdown();
+ void Startup();
+
private:
friend class EventSchedulerScheduler;
+
// Current execution time.
monotonic_clock::time_point monotonic_now_ = monotonic_clock::epoch();
+ size_t boot_count_ = 0;
+
// List of functions to run (once) when running.
std::vector<std::function<void()>> on_run_;
+ std::vector<std::function<void()>> on_startup_;
// Multimap holding times to run functions. These are stored in order, and
// the order is the callback tree.
@@ -146,16 +208,29 @@
class UnityConverter final : public TimeConverter {
public:
distributed_clock::time_point ToDistributedClock(
- size_t /*node_index*/, monotonic_clock::time_point time) override {
- return distributed_clock::epoch() + time.time_since_epoch();
+ size_t /*node_index*/, logger::BootTimestamp time) override {
+ CHECK_EQ(time.boot, 0u) << ": Reboots unsupported by default.";
+ return distributed_clock::epoch() + time.time.time_since_epoch();
}
- monotonic_clock::time_point FromDistributedClock(
- size_t /*node_index*/, distributed_clock::time_point time) override {
- return monotonic_clock::epoch() + time.time_since_epoch();
+ logger::BootTimestamp FromDistributedClock(
+ size_t /*node_index*/, distributed_clock::time_point time,
+ size_t boot_count) override {
+ CHECK_EQ(boot_count, 0u);
+ return logger::BootTimestamp{
+ .boot = boot_count,
+ .time = monotonic_clock::epoch() + time.time_since_epoch()};
}
void ObserveTimePassed(distributed_clock::time_point /*time*/) override {}
+
+ UUID boot_uuid(size_t /*node_index*/, size_t boot_count) override {
+ CHECK_EQ(boot_count, 0u);
+ return uuid_;
+ }
+
+ private:
+ const UUID uuid_ = UUID::Random();
};
UnityConverter unity_converter_;
@@ -196,6 +271,27 @@
// Returns the current distributed time.
distributed_clock::time_point distributed_now() const { return now_; }
+ void RunOnStartup() {
+ CHECK(!is_running_);
+ for (EventScheduler *scheduler : schedulers_) {
+ scheduler->RunOnStartup();
+ }
+ for (EventScheduler *scheduler : schedulers_) {
+ scheduler->RunStarted();
+ }
+ }
+
+ void SetTimeConverter(TimeConverter *time_converter) {
+ time_converter->set_reboot_found(
+ [this](distributed_clock::time_point reboot_time,
+ const std::vector<logger::BootTimestamp> &node_times) {
+ if (!reboots_.empty()) {
+ CHECK_GT(reboot_time, std::get<0>(reboots_.back()));
+ }
+ reboots_.emplace_back(reboot_time, node_times);
+ });
+ }
+
private:
// Handles running the OnRun functions.
void RunOnRun() {
@@ -206,6 +302,8 @@
}
}
+ void Reboot();
+
// Returns the next event time and scheduler on which to run it.
std::tuple<distributed_clock::time_point, EventScheduler *> OldestEvent();
@@ -215,10 +313,22 @@
distributed_clock::time_point now_ = distributed_clock::epoch();
// List of schedulers to run in sync.
std::vector<EventScheduler *> schedulers_;
+
+ // List of when to reboot each node.
+ std::vector<std::tuple<distributed_clock::time_point,
+ std::vector<logger::BootTimestamp>>>
+ reboots_;
};
+inline distributed_clock::time_point EventScheduler::distributed_now() const {
+ return scheduler_scheduler_->distributed_now();
+}
inline monotonic_clock::time_point EventScheduler::monotonic_now() const {
- return FromDistributedClock(scheduler_scheduler_->distributed_now());
+ const logger::BootTimestamp t =
+ FromDistributedClock(scheduler_scheduler_->distributed_now());
+ CHECK_EQ(t.boot, boot_count_) << ": " << " " << t << " d "
+ << scheduler_scheduler_->distributed_now();
+ return t.time;
}
inline bool EventScheduler::is_running() const {
diff --git a/aos/events/event_scheduler_test.cc b/aos/events/event_scheduler_test.cc
index 3bcf6ca..54fb91a 100644
--- a/aos/events/event_scheduler_test.cc
+++ b/aos/events/event_scheduler_test.cc
@@ -7,6 +7,7 @@
namespace aos {
namespace chrono = std::chrono;
+using aos::logger::BootTimestamp;
// Legacy time converter for keeping old tests working. Has numerical precision
// problems.
@@ -14,7 +15,12 @@
public:
SlopeOffsetTimeConverter(size_t nodes_count)
: distributed_offset_(nodes_count, std::chrono::seconds(0)),
- distributed_slope_(nodes_count, 1.0) {}
+ distributed_slope_(nodes_count, 1.0) {
+ uuids_.reserve(nodes_count);
+ while (uuids_.size() < nodes_count) {
+ uuids_.emplace_back(UUID::Random());
+ }
+ }
// Sets the offset between the distributed and monotonic clock.
// monotonic = distributed * slope + offset;
@@ -26,19 +32,29 @@
}
distributed_clock::time_point ToDistributedClock(
- size_t node_index, monotonic_clock::time_point time) override {
+ size_t node_index, BootTimestamp time) override {
+ CHECK_EQ(time.boot, 0u);
return distributed_clock::epoch() +
std::chrono::duration_cast<std::chrono::nanoseconds>(
(time.time_since_epoch() - distributed_offset_[node_index]) /
distributed_slope_[node_index]);
}
- monotonic_clock::time_point FromDistributedClock(
- size_t node_index, distributed_clock::time_point time) override {
- return monotonic_clock::epoch() +
- std::chrono::duration_cast<std::chrono::nanoseconds>(
- time.time_since_epoch() * distributed_slope_[node_index]) +
- distributed_offset_[node_index];
+ BootTimestamp FromDistributedClock(size_t node_index,
+ distributed_clock::time_point time,
+ size_t boot_index) override {
+ CHECK_EQ(boot_index, 0u);
+ return {
+ .boot = 0u,
+ .time = monotonic_clock::epoch() +
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ time.time_since_epoch() * distributed_slope_[node_index]) +
+ distributed_offset_[node_index]};
+ }
+
+ UUID boot_uuid(size_t node_index, size_t boot_count) override {
+ CHECK_EQ(boot_count, 0u);
+ return uuids_[node_index];
}
void ObserveTimePassed(distributed_clock::time_point /*time*/) override {}
@@ -48,20 +64,21 @@
// distributed = monotonic + offset;
std::vector<std::chrono::nanoseconds> distributed_offset_;
std::vector<double> distributed_slope_;
+ std::vector<UUID> uuids_;
};
// Tests that the default parameters (slope of 1, offest of 0) behave as
// an identity.
TEST(EventSchedulerTest, IdentityTimeConversion) {
SlopeOffsetTimeConverter time(1);
- EventScheduler s;
+ EventScheduler s(0);
s.SetTimeConverter(0u, &time);
EXPECT_EQ(s.FromDistributedClock(distributed_clock::epoch()),
- monotonic_clock::epoch());
+ BootTimestamp::epoch());
EXPECT_EQ(
s.FromDistributedClock(distributed_clock::epoch() + chrono::seconds(1)),
- monotonic_clock::epoch() + chrono::seconds(1));
+ BootTimestamp::epoch() + chrono::seconds(1));
EXPECT_EQ(s.ToDistributedClock(monotonic_clock::epoch()),
distributed_clock::epoch());
@@ -73,16 +90,16 @@
// Tests that a non-unity slope is computed correctly.
TEST(EventSchedulerTest, DoubleTimeConversion) {
SlopeOffsetTimeConverter time(1);
- EventScheduler s;
+ EventScheduler s(0);
s.SetTimeConverter(0u, &time);
time.SetDistributedOffset(0u, std::chrono::seconds(7), 2.0);
EXPECT_EQ(s.FromDistributedClock(distributed_clock::epoch()),
- monotonic_clock::epoch() + chrono::seconds(7));
+ BootTimestamp::epoch() + chrono::seconds(7));
EXPECT_EQ(
s.FromDistributedClock(distributed_clock::epoch() + chrono::seconds(1)),
- monotonic_clock::epoch() + chrono::seconds(9));
+ BootTimestamp::epoch() + chrono::seconds(9));
EXPECT_EQ(s.ToDistributedClock(monotonic_clock::epoch() + chrono::seconds(7)),
distributed_clock::epoch());
diff --git a/aos/events/logging/boot_timestamp.cc b/aos/events/logging/boot_timestamp.cc
index cdd33b6..b3511f4 100644
--- a/aos/events/logging/boot_timestamp.cc
+++ b/aos/events/logging/boot_timestamp.cc
@@ -17,4 +17,10 @@
<< ", .duration=" << duration.duration.count() << "ns}";
}
+std::ostream &operator<<(std::ostream &os,
+ const struct BootQueueIndex &queue_index) {
+ return os << "{.boot=" << queue_index.boot
+ << ", .index=" << queue_index.index << "}";
+}
+
} // namespace aos::logger
diff --git a/aos/events/logging/boot_timestamp.h b/aos/events/logging/boot_timestamp.h
index 64c824a..dac6533 100644
--- a/aos/events/logging/boot_timestamp.h
+++ b/aos/events/logging/boot_timestamp.h
@@ -70,9 +70,47 @@
}
};
+// Structure to hold both a boot and queue index. Queue indices reset after
+// reboot, so we need to track them.
+struct BootQueueIndex {
+ // Boot number for this queue index.
+ size_t boot = std::numeric_limits<size_t>::max();
+ // Queue index.
+ uint32_t index = std::numeric_limits<uint32_t>::max();
+
+ // Returns a QueueIndex representing an invalid index. Since
+ // std::numeric_limits<uint32_t>::max() is never used in the QueueIndex code
+ // and is reserved as an Invalid value, this will never collide.
+ static BootQueueIndex Invalid() {
+ return {.boot = std::numeric_limits<size_t>::max(),
+ .index = std::numeric_limits<uint32_t>::max()};
+ }
+
+ bool operator==(const BootQueueIndex &b2) const {
+ return index == b2.index && boot == b2.boot;
+ }
+ bool operator!=(const BootQueueIndex &b2) const {
+ return index != b2.index || boot != b2.boot;
+ }
+ bool operator<(const BootQueueIndex &b2) const {
+ if (boot == b2.boot) {
+ return index < b2.index;
+ }
+ return boot < b2.boot;
+ }
+ bool operator>(const BootQueueIndex &b2) const {
+ if (boot == b2.boot) {
+ return index > b2.index;
+ }
+ return boot > b2.boot;
+ }
+};
+
std::ostream &operator<<(std::ostream &os,
const struct BootTimestamp ×tamp);
std::ostream &operator<<(std::ostream &os, const struct BootDuration &duration);
+std::ostream &operator<<(std::ostream &os,
+ const struct BootQueueIndex &queue_index);
inline bool BootTimestamp::operator<(const BootTimestamp &m2) const {
if (boot != m2.boot) {
diff --git a/aos/events/logging/log_cat.cc b/aos/events/logging/log_cat.cc
index 5079606..5a9040e 100644
--- a/aos/events/logging/log_cat.cc
+++ b/aos/events/logging/log_cat.cc
@@ -79,6 +79,223 @@
}
}
+// Prints out raw log parts to stdout.
+int PrintRaw(int argc, char **argv) {
+ if (argc != 2) {
+ LOG(FATAL) << "Expected 1 logfile as an argument.";
+ }
+ aos::logger::SpanReader reader(argv[1]);
+ absl::Span<const uint8_t> raw_log_file_header_span = reader.ReadMessage();
+
+ if (raw_log_file_header_span == absl::Span<const uint8_t>()) {
+ LOG(WARNING) << "Empty log file on " << reader.filename();
+ return 0;
+ }
+
+ // Now, reproduce the log file header deduplication logic inline so we can
+ // print out all the headers we find.
+ aos::SizePrefixedFlatbufferVector<aos::logger::LogFileHeader> log_file_header(
+ raw_log_file_header_span);
+ if (!log_file_header.Verify()) {
+ LOG(ERROR) << "Header corrupted on " << reader.filename();
+ return 1;
+ }
+ while (true) {
+ absl::Span<const uint8_t> maybe_header_data = reader.PeekMessage();
+ if (maybe_header_data == absl::Span<const uint8_t>()) {
+ break;
+ }
+
+ aos::SizePrefixedFlatbufferSpan<aos::logger::LogFileHeader> maybe_header(
+ maybe_header_data);
+ if (maybe_header.Verify()) {
+ std::cout << aos::FlatbufferToJson(
+ log_file_header, {.multi_line = FLAGS_pretty,
+ .max_vector_size = static_cast<size_t>(
+ FLAGS_max_vector_size)})
+ << std::endl;
+ LOG(WARNING) << "Found duplicate LogFileHeader in " << reader.filename();
+ log_file_header =
+ aos::SizePrefixedFlatbufferVector<aos::logger::LogFileHeader>(
+ maybe_header_data);
+
+ reader.ConsumeMessage();
+ } else {
+ break;
+ }
+ }
+
+ // And now use the final sha256 to match the raw_header.
+ std::optional<aos::logger::MessageReader> raw_header_reader;
+ const aos::logger::LogFileHeader *full_header = &log_file_header.message();
+ if (!FLAGS_raw_header.empty()) {
+ raw_header_reader.emplace(FLAGS_raw_header);
+ std::cout << aos::FlatbufferToJson(full_header,
+ {.multi_line = FLAGS_pretty,
+ .max_vector_size = static_cast<size_t>(
+ FLAGS_max_vector_size)})
+ << std::endl;
+ CHECK_EQ(
+ full_header->configuration_sha256()->string_view(),
+ aos::logger::Sha256(raw_header_reader->raw_log_file_header().span()));
+ full_header = raw_header_reader->log_file_header();
+ }
+
+ if (!FLAGS_print) {
+ return 0;
+ }
+
+ std::cout << aos::FlatbufferToJson(full_header,
+ {.multi_line = FLAGS_pretty,
+ .max_vector_size = static_cast<size_t>(
+ FLAGS_max_vector_size)})
+ << std::endl;
+ CHECK(full_header->has_configuration())
+ << ": Missing configuration! You may want to provide the path to the "
+ "logged configuration file using the --raw_header flag.";
+
+ while (true) {
+ const aos::SizePrefixedFlatbufferSpan<aos::logger::MessageHeader> message(
+ reader.ReadMessage());
+ if (message.span() == absl::Span<const uint8_t>()) {
+ break;
+ }
+ CHECK(message.Verify());
+
+ const auto *const channels = full_header->configuration()->channels();
+ const size_t channel_index = message.message().channel_index();
+ CHECK_LT(channel_index, channels->size());
+ const aos::Channel *const channel = channels->Get(channel_index);
+
+ CHECK(message.Verify()) << absl::BytesToHexString(
+ std::string_view(reinterpret_cast<const char *>(message.span().data()),
+ message.span().size()));
+
+ if (message.message().data() != nullptr) {
+ CHECK(channel->has_schema());
+
+ CHECK(flatbuffers::Verify(
+ *channel->schema(), *channel->schema()->root_table(),
+ message.message().data()->data(), message.message().data()->size()))
+ << ": Corrupted flatbuffer on " << channel->name()->c_str() << " "
+ << channel->type()->c_str();
+ }
+
+ if (FLAGS_format_raw && message.message().data() != nullptr) {
+ std::cout << aos::configuration::StrippedChannelToString(channel) << " "
+ << aos::FlatbufferToJson(message, {.multi_line = FLAGS_pretty,
+ .max_vector_size = 4})
+ << ": "
+ << aos::FlatbufferToJson(
+ channel->schema(), message.message().data()->data(),
+ {FLAGS_pretty,
+ static_cast<size_t>(FLAGS_max_vector_size)})
+ << std::endl;
+ } else {
+ std::cout << aos::configuration::StrippedChannelToString(channel) << " "
+ << aos::FlatbufferToJson(
+ message, {FLAGS_pretty,
+ static_cast<size_t>(FLAGS_max_vector_size)})
+ << std::endl;
+ }
+ }
+ return 0;
+}
+
+// This class prints out all data from a node on a boot.
+class NodePrinter {
+ public:
+ NodePrinter(aos::EventLoop *event_loop, uint64_t *message_print_counter,
+ aos::SimulatedEventLoopFactory *factory,
+ aos::FastStringBuilder *builder)
+ : factory_(factory),
+ event_loop_(event_loop),
+ message_print_counter_(message_print_counter),
+ node_name_(
+ event_loop_->node() == nullptr
+ ? ""
+ : std::string(event_loop->node()->name()->string_view()) + " "),
+ builder_(builder) {
+ event_loop_->SkipTimingReport();
+ event_loop_->SkipAosLog();
+
+ const flatbuffers::Vector<flatbuffers::Offset<aos::Channel>> *channels =
+ event_loop_->configuration()->channels();
+
+ for (flatbuffers::uoffset_t i = 0; i < channels->size(); i++) {
+ const aos::Channel *channel = channels->Get(i);
+ const flatbuffers::string_view name = channel->name()->string_view();
+ const flatbuffers::string_view type = channel->type()->string_view();
+ if (name.find(FLAGS_name) != std::string::npos &&
+ type.find(FLAGS_type) != std::string::npos) {
+ if (!aos::configuration::ChannelIsReadableOnNode(channel,
+ event_loop_->node())) {
+ continue;
+ }
+ VLOG(1) << "Listening on " << name << " " << type;
+
+ CHECK_NOTNULL(channel->schema());
+ event_loop_->MakeRawWatcher(
+ channel, [this, channel](const aos::Context &context,
+ const void * /*message*/) {
+ if (!FLAGS_print) {
+ return;
+ }
+
+ if (!FLAGS_fetch && !started_) {
+ return;
+ }
+
+ PrintMessage(node_name_, channel, context, builder_);
+ ++(*message_print_counter_);
+ if (FLAGS_count > 0 && *message_print_counter_ >= FLAGS_count) {
+ factory_->Exit();
+ }
+ });
+ }
+ }
+ }
+
+ void SetStarted(bool started, aos::monotonic_clock::time_point monotonic_now,
+ aos::realtime_clock::time_point realtime_now) {
+ started_ = started;
+ if (started_) {
+ std::cout << std::endl;
+ std::cout << (event_loop_->node() != nullptr
+ ? (event_loop_->node()->name()->str() + " ")
+ : "")
+ << "Log starting at " << realtime_now << " (" << monotonic_now
+ << ")";
+ std::cout << std::endl << std::endl;
+ } else {
+ std::cout << std::endl;
+ std::cout << (event_loop_->node() != nullptr
+ ? (event_loop_->node()->name()->str() + " ")
+ : "")
+ << "Log shutting down at " << realtime_now << " ("
+ << monotonic_now << ")";
+ std::cout << std::endl << std::endl;
+ }
+ }
+
+ private:
+ struct MessageInfo {
+ std::string node_name;
+ std::unique_ptr<aos::RawFetcher> fetcher;
+ };
+
+ aos::SimulatedEventLoopFactory *factory_;
+ aos::EventLoop *event_loop_;
+
+ uint64_t *message_print_counter_ = nullptr;
+
+ std::string node_name_;
+
+ bool started_ = false;
+
+ aos::FastStringBuilder *builder_;
+};
+
int main(int argc, char **argv) {
gflags::SetUsageMessage(
"Usage:\n"
@@ -95,138 +312,15 @@
aos::InitGoogle(&argc, &argv);
if (FLAGS_raw) {
- if (argc != 2) {
- LOG(FATAL) << "Expected 1 logfile as an argument.";
- }
- aos::logger::SpanReader reader(argv[1]);
- absl::Span<const uint8_t> raw_log_file_header_span = reader.ReadMessage();
-
- if (raw_log_file_header_span == absl::Span<const uint8_t>()) {
- LOG(WARNING) << "Empty log file on " << reader.filename();
- return 0;
- }
-
- // Now, reproduce the log file header deduplication logic inline so we can
- // print out all the headers we find.
- aos::SizePrefixedFlatbufferVector<aos::logger::LogFileHeader>
- log_file_header(raw_log_file_header_span);
- if (!log_file_header.Verify()) {
- LOG(ERROR) << "Header corrupted on " << reader.filename();
- return 1;
- }
- while (true) {
- absl::Span<const uint8_t> maybe_header_data = reader.PeekMessage();
- if (maybe_header_data == absl::Span<const uint8_t>()) {
- break;
- }
-
- aos::SizePrefixedFlatbufferSpan<aos::logger::LogFileHeader> maybe_header(
- maybe_header_data);
- if (maybe_header.Verify()) {
- std::cout << aos::FlatbufferToJson(
- log_file_header,
- {.multi_line = FLAGS_pretty,
- .max_vector_size =
- static_cast<size_t>(FLAGS_max_vector_size)})
- << std::endl;
- LOG(WARNING) << "Found duplicate LogFileHeader in "
- << reader.filename();
- log_file_header =
- aos::SizePrefixedFlatbufferVector<aos::logger::LogFileHeader>(
- maybe_header_data);
-
- reader.ConsumeMessage();
- } else {
- break;
- }
- }
-
- // And now use the final sha256 to match the raw_header.
- std::optional<aos::logger::MessageReader> raw_header_reader;
- const aos::logger::LogFileHeader *full_header = &log_file_header.message();
- if (!FLAGS_raw_header.empty()) {
- raw_header_reader.emplace(FLAGS_raw_header);
- std::cout << aos::FlatbufferToJson(
- full_header, {.multi_line = FLAGS_pretty,
- .max_vector_size = static_cast<size_t>(
- FLAGS_max_vector_size)})
- << std::endl;
- CHECK_EQ(
- full_header->configuration_sha256()->string_view(),
- aos::logger::Sha256(raw_header_reader->raw_log_file_header().span()));
- full_header = raw_header_reader->log_file_header();
- }
-
- if (!FLAGS_print) {
- return 0;
- }
-
- std::cout << aos::FlatbufferToJson(full_header,
- {.multi_line = FLAGS_pretty,
- .max_vector_size = static_cast<size_t>(
- FLAGS_max_vector_size)})
- << std::endl;
- CHECK(full_header->has_configuration())
- << ": Missing configuration! You may want to provide the path to the "
- "logged configuration file using the --raw_header flag.";
-
- while (true) {
- const aos::SizePrefixedFlatbufferSpan<aos::logger::MessageHeader> message(
- reader.ReadMessage());
- if (message.span() == absl::Span<const uint8_t>()) {
- break;
- }
- CHECK(message.Verify());
-
- const auto *const channels = full_header->configuration()->channels();
- const size_t channel_index = message.message().channel_index();
- CHECK_LT(channel_index, channels->size());
- const aos::Channel *const channel = channels->Get(channel_index);
-
- CHECK(message.Verify()) << absl::BytesToHexString(std::string_view(
- reinterpret_cast<const char *>(message.span().data()),
- message.span().size()));
-
- if (message.message().data() != nullptr) {
- CHECK(channel->has_schema());
-
- CHECK(flatbuffers::Verify(
- *channel->schema(), *channel->schema()->root_table(),
- message.message().data()->data(), message.message().data()->size()))
- << ": Corrupted flatbuffer on " << channel->name()->c_str() << " "
- << channel->type()->c_str();
- }
-
- if (FLAGS_format_raw && message.message().data() != nullptr) {
- std::cout << aos::configuration::StrippedChannelToString(channel) << " "
- << aos::FlatbufferToJson(message, {.multi_line = FLAGS_pretty,
- .max_vector_size = 4})
- << ": "
- << aos::FlatbufferToJson(
- channel->schema(), message.message().data()->data(),
- {FLAGS_pretty,
- static_cast<size_t>(FLAGS_max_vector_size)})
- << std::endl;
- } else {
- std::cout << aos::configuration::StrippedChannelToString(channel) << " "
- << aos::FlatbufferToJson(
- message, {FLAGS_pretty,
- static_cast<size_t>(FLAGS_max_vector_size)})
- << std::endl;
- }
- }
- return 0;
+ return PrintRaw(argc, argv);
}
if (argc < 2) {
LOG(FATAL) << "Expected at least 1 logfile as an argument.";
}
- const std::vector<std::string> unsorted_logfiles =
- aos::logger::FindLogs(argc, argv);
-
const std::vector<aos::logger::LogFile> logfiles =
- aos::logger::SortParts(unsorted_logfiles);
+ aos::logger::SortParts(aos::logger::FindLogs(argc, argv));
for (auto &it : logfiles) {
VLOG(1) << it;
@@ -249,32 +343,10 @@
return 0;
}
- aos::FastStringBuilder builder;
-
- aos::SimulatedEventLoopFactory event_loop_factory(reader.configuration());
- reader.Register(&event_loop_factory);
-
- std::vector<std::unique_ptr<aos::EventLoop>> printer_event_loops;
-
- bool found_channel = false;
-
- uint64_t message_print_counter = 0;
-
- for (const aos::Node *node :
- aos::configuration::GetNodes(event_loop_factory.configuration())) {
- std::unique_ptr<aos::EventLoop> printer_event_loop =
- event_loop_factory.MakeEventLoop("printer", node);
- printer_event_loop->SkipTimingReport();
- printer_event_loop->SkipAosLog();
-
- struct MessageInfo {
- std::string node_name;
- std::unique_ptr<aos::RawFetcher> fetcher;
- };
- std::vector<MessageInfo> messages_before_start;
-
+ {
+ bool found_channel = false;
const flatbuffers::Vector<flatbuffers::Offset<aos::Channel>> *channels =
- printer_event_loop->configuration()->channels();
+ reader.configuration()->channels();
for (flatbuffers::uoffset_t i = 0; i < channels->size(); i++) {
const aos::Channel *channel = channels->Get(i);
@@ -282,91 +354,54 @@
const flatbuffers::string_view type = channel->type()->string_view();
if (name.find(FLAGS_name) != std::string::npos &&
type.find(FLAGS_type) != std::string::npos) {
- if (!aos::configuration::ChannelIsReadableOnNode(
- channel, printer_event_loop->node())) {
- continue;
- }
- VLOG(1) << "Listening on " << name << " " << type;
-
- std::string node_name =
- node == nullptr ? ""
- : std::string(node->name()->string_view()) + " ";
-
- CHECK_NOTNULL(channel->schema());
-
- // Fetch the last message on this channel from before the log start
- // time.
- if (FLAGS_fetch) {
- std::unique_ptr<aos::RawFetcher> fetcher =
- printer_event_loop->MakeRawFetcher(channel);
- if (fetcher->Fetch()) {
- MessageInfo message{.node_name = node_name,
- .fetcher = std::move(fetcher)};
- // Insert it sorted into the vector so we can print in time order
- // instead of channel order at the start.
- auto it = std::lower_bound(
- messages_before_start.begin(), messages_before_start.end(),
- message, [](const MessageInfo &lhs, const MessageInfo &rhs) {
- if (lhs.fetcher->context().monotonic_event_time <
- rhs.fetcher->context().monotonic_event_time) {
- return true;
- }
- if (lhs.fetcher->context().monotonic_event_time >
- rhs.fetcher->context().monotonic_event_time) {
- return false;
- }
- return lhs.fetcher->channel() < rhs.fetcher->channel();
- });
- messages_before_start.insert(it, std::move(message));
- }
- }
-
- printer_event_loop->MakeRawWatcher(
- channel, [channel, node_name, &builder, &event_loop_factory,
- &message_print_counter](const aos::Context &context,
- const void * /*message*/) {
- if (FLAGS_print) {
- PrintMessage(node_name, channel, context, &builder);
- ++message_print_counter;
- if (FLAGS_count > 0 && message_print_counter >= FLAGS_count) {
- event_loop_factory.Exit();
- }
- }
- });
found_channel = true;
}
}
-
- // Print the messages from before the log start time.
- // TODO(austin): Sort between nodes too when it becomes annoying enough.
- for (const MessageInfo &message : messages_before_start) {
- if (FLAGS_print) {
- PrintMessage(message.node_name, message.fetcher->channel(),
- message.fetcher->context(), &builder);
- ++message_print_counter;
- if (FLAGS_count > 0 && message_print_counter >= FLAGS_count) {
- // We are done. Clean up and exit.
- reader.Deregister();
- return 0;
- }
- }
+ if (!found_channel) {
+ LOG(FATAL) << "Could not find any channels";
}
- printer_event_loops.emplace_back(std::move(printer_event_loop));
-
- std::cout << std::endl;
- std::cout << (node != nullptr ? (node->name()->str() + " ") : "")
- << "Log starting at " << reader.realtime_start_time(node) << " ("
- << reader.monotonic_start_time(node) << ")";
- std::cout << std::endl << std::endl;
}
- if (!found_channel) {
- LOG(FATAL) << "Could not find any channels";
- }
+ aos::FastStringBuilder builder;
- if (FLAGS_fetch) {
- // New line to separate fetched messages from non-fetched messages.
- std::cout << std::endl;
+ aos::SimulatedEventLoopFactory event_loop_factory(reader.configuration());
+
+ reader.RegisterWithoutStarting(&event_loop_factory);
+
+ uint64_t message_print_counter = 0;
+
+ std::vector<NodePrinter *> printers;
+ printers.resize(
+ aos::configuration::NodesCount(event_loop_factory.configuration()),
+ nullptr);
+
+ for (const aos::Node *node :
+ aos::configuration::GetNodes(event_loop_factory.configuration())) {
+ size_t node_index = aos::configuration::GetNodeIndex(
+ event_loop_factory.configuration(), node);
+ // Spin up the printer, and hook up the SetStarted method so that it gets
+ // notified when the log starts and stops.
+ aos::NodeEventLoopFactory *node_factory =
+ event_loop_factory.GetNodeEventLoopFactory(node);
+ node_factory->OnStartup([&event_loop_factory, node_factory,
+ &message_print_counter, &builder, &printers,
+ node_index]() {
+ printers[node_index] = node_factory->AlwaysStart<NodePrinter>(
+ "printer", &message_print_counter, &event_loop_factory, &builder);
+ });
+ node_factory->OnShutdown(
+ [&printers, node_index]() { printers[node_index] = nullptr; });
+
+ reader.OnStart(node, [&printers, node_index, node_factory]() {
+ CHECK(printers[node_index]);
+ printers[node_index]->SetStarted(true, node_factory->monotonic_now(),
+ node_factory->realtime_now());
+ });
+ reader.OnEnd(node, [&printers, node_index, node_factory]() {
+ CHECK(printers[node_index]);
+ printers[node_index]->SetStarted(false, node_factory->monotonic_now(),
+ node_factory->realtime_now());
+ });
}
event_loop_factory.Run();
diff --git a/aos/events/logging/log_namer.cc b/aos/events/logging/log_namer.cc
index a880f35..dc71348 100644
--- a/aos/events/logging/log_namer.cc
+++ b/aos/events/logging/log_namer.cc
@@ -38,6 +38,7 @@
void NewDataWriter::Rotate() {
// No need to rotate if nothing has been written.
if (header_written_) {
+ VLOG(1) << "Rotated " << filename();
++parts_index_;
reopen_(this);
header_written_ = false;
@@ -57,9 +58,12 @@
const monotonic_clock::time_point monotonic_remote_time,
const monotonic_clock::time_point monotonic_event_time,
const bool reliable) {
+ // Trigger rotation if anything in the header changes.
bool rotate = false;
CHECK_LT(remote_node_index, state_.size());
State &state = state_[remote_node_index];
+
+ // Did the remote boot UUID change?
if (state.boot_uuid != remote_node_boot_uuid) {
VLOG(1) << filename() << " Remote " << remote_node_index << " updated to "
<< remote_node_boot_uuid << " from " << state.boot_uuid;
@@ -73,9 +77,15 @@
rotate = true;
}
+
+ // Did the unreliable timestamps change?
if (!reliable) {
if (state.oldest_remote_unreliable_monotonic_timestamp >
monotonic_remote_time) {
+ VLOG(1) << filename() << " Remote " << remote_node_index
+ << " oldest_remote_unreliable_monotonic_timestamp updated from "
+ << state.oldest_remote_unreliable_monotonic_timestamp << " to "
+ << monotonic_remote_time;
state.oldest_remote_unreliable_monotonic_timestamp =
monotonic_remote_time;
state.oldest_local_unreliable_monotonic_timestamp = monotonic_event_time;
@@ -83,7 +93,12 @@
}
}
+ // Did any of the timestamps change?
if (state.oldest_remote_monotonic_timestamp > monotonic_remote_time) {
+ VLOG(1) << filename() << " Remote " << remote_node_index
+ << " oldest_remote_monotonic_timestamp updated from "
+ << state.oldest_remote_monotonic_timestamp << " to "
+ << monotonic_remote_time;
state.oldest_remote_monotonic_timestamp = monotonic_remote_time;
state.oldest_local_monotonic_timestamp = monotonic_event_time;
rotate = true;
@@ -97,7 +112,7 @@
void NewDataWriter::QueueMessage(flatbuffers::FlatBufferBuilder *fbb,
const UUID &source_node_boot_uuid,
aos::monotonic_clock::time_point now) {
- // TODO(austin): Handle remote nodes changing too, not just the source node.
+ // Trigger a reboot if we detect the boot UUID change.
if (state_[node_index_].boot_uuid != source_node_boot_uuid) {
state_[node_index_].boot_uuid = source_node_boot_uuid;
if (header_written_) {
@@ -106,10 +121,20 @@
QueueHeader(MakeHeader());
}
+
+ // If the start time has changed for this node, trigger a rotation.
+ if (log_namer_->monotonic_start_time(node_index_, source_node_boot_uuid) !=
+ monotonic_start_time_) {
+ CHECK(header_written_);
+ Rotate();
+ }
+
+ CHECK_EQ(log_namer_->monotonic_start_time(node_index_, source_node_boot_uuid),
+ monotonic_start_time_);
CHECK_EQ(state_[node_index_].boot_uuid, source_node_boot_uuid);
+ CHECK(writer);
CHECK(header_written_) << ": Attempting to write message before header to "
<< writer->filename();
- CHECK(writer);
writer->QueueSizedFlatbuffer(fbb, now);
}
@@ -139,11 +164,17 @@
reopen_(this);
}
+ VLOG(1) << "Writing to " << filename() << " "
+ << aos::FlatbufferToJson(
+ header, {.multi_line = false, .max_vector_size = 100});
+
// TODO(austin): This triggers a dummy allocation that we don't need as part
// of releasing. Can we skip it?
CHECK(writer);
writer->QueueSizedFlatbuffer(header.Release());
header_written_ = true;
+ monotonic_start_time_ = log_namer_->monotonic_start_time(
+ node_index_, state_[node_index_].boot_uuid);
}
void NewDataWriter::Close() {
@@ -153,9 +184,20 @@
header_written_ = false;
}
+LogNamer::NodeState *LogNamer::GetNodeState(size_t node_index,
+ const UUID &boot_uuid) {
+ auto it = node_states_.find(std::make_pair(node_index, boot_uuid));
+ if (it == node_states_.end()) {
+ it =
+ node_states_.emplace(std::make_pair(node_index, boot_uuid), NodeState())
+ .first;
+ }
+ return &it->second;
+}
+
aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> LogNamer::MakeHeader(
size_t node_index, const std::vector<NewDataWriter::State> &state,
- const UUID &parts_uuid, int parts_index) const {
+ const UUID &parts_uuid, int parts_index) {
const UUID &source_node_boot_uuid = state[node_index].boot_uuid;
const Node *const source_node =
configuration::GetNode(configuration_, node_index);
@@ -281,27 +323,26 @@
log_file_header_builder.add_max_out_of_order_duration(
header_.message().max_out_of_order_duration());
+ NodeState *node_state = GetNodeState(node_index, source_node_boot_uuid);
log_file_header_builder.add_monotonic_start_time(
std::chrono::duration_cast<std::chrono::nanoseconds>(
- node_states_[node_index].monotonic_start_time.time_since_epoch())
+ node_state->monotonic_start_time.time_since_epoch())
.count());
if (source_node == node_) {
log_file_header_builder.add_realtime_start_time(
std::chrono::duration_cast<std::chrono::nanoseconds>(
- node_states_[node_index].realtime_start_time.time_since_epoch())
+ node_state->realtime_start_time.time_since_epoch())
.count());
} else {
// Fill out the legacy start times. Since these were implemented to never
// change on reboot, they aren't very helpful in tracking what happened.
log_file_header_builder.add_logger_monotonic_start_time(
std::chrono::duration_cast<std::chrono::nanoseconds>(
- node_states_[node_index]
- .logger_monotonic_start_time.time_since_epoch())
+ node_state->logger_monotonic_start_time.time_since_epoch())
.count());
log_file_header_builder.add_logger_realtime_start_time(
std::chrono::duration_cast<std::chrono::nanoseconds>(
- node_states_[node_index]
- .logger_realtime_start_time.time_since_epoch())
+ node_state->logger_realtime_start_time.time_since_epoch())
.count());
}
diff --git a/aos/events/logging/log_namer.h b/aos/events/logging/log_namer.h
index c2c8dd1..f1f1829 100644
--- a/aos/events/logging/log_namer.h
+++ b/aos/events/logging/log_namer.h
@@ -7,6 +7,7 @@
#include <string_view>
#include <vector>
+#include "absl/container/btree_map.h"
#include "aos/events/logging/logfile_utils.h"
#include "aos/events/logging/logger_generated.h"
#include "aos/uuid.h"
@@ -22,6 +23,11 @@
//
// Class to manage writing data to log files. This lets us track which boot the
// written header has in it, and if the header has been written or not.
+//
+// The design of this class is that instead of being notified when any of the
+// header data changes, it polls and owns that decision. This makes it much
+// harder to write corrupted data. If that becomes a performance problem, we
+// can DCHECK and take it out of production binaries.
class NewDataWriter {
public:
// Constructs a NewDataWriter.
@@ -54,7 +60,9 @@
aos::monotonic_clock::time_point now);
// Returns the filename of the writer.
- std::string_view filename() const { return writer->filename(); }
+ std::string_view filename() const {
+ return writer ? writer->filename() : "(closed)";
+ }
void Close();
@@ -96,6 +104,8 @@
aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader();
+ monotonic_clock::time_point monotonic_start_time_ = monotonic_clock::min_time;
+
const Node *const node_ = nullptr;
const size_t node_index_ = 0;
LogNamer *log_namer_;
@@ -121,7 +131,6 @@
node_(node),
logger_node_index_(configuration::GetNodeIndex(configuration_, node_)) {
nodes_.emplace_back(node_);
- node_states_.resize(configuration::NodesCount(configuration_));
}
virtual ~LogNamer() {}
@@ -180,45 +189,35 @@
UUID::FromString(header_.message().logger_node_boot_uuid());
}
- void SetStartTimes(size_t node_index,
+ void ClearStartTimes() {
+ node_states_.clear();
+ }
+
+ void SetStartTimes(size_t node_index, const UUID &boot_uuid,
monotonic_clock::time_point monotonic_start_time,
realtime_clock::time_point realtime_start_time,
monotonic_clock::time_point logger_monotonic_start_time,
realtime_clock::time_point logger_realtime_start_time) {
- node_states_[node_index].monotonic_start_time = monotonic_start_time;
- node_states_[node_index].realtime_start_time = realtime_start_time;
- node_states_[node_index].logger_monotonic_start_time =
- logger_monotonic_start_time;
- node_states_[node_index].logger_realtime_start_time =
- logger_realtime_start_time;
-
- // TODO(austin): Track that the header has changed and needs to be
- // rewritten down here rather than up in log_writer.
+ VLOG(1) << "Setting node " << node_index << " to start time "
+ << monotonic_start_time << " rt " << realtime_start_time << " UUID "
+ << boot_uuid;
+ NodeState *node_state = GetNodeState(node_index, boot_uuid);
+ node_state->monotonic_start_time = monotonic_start_time;
+ node_state->realtime_start_time = realtime_start_time;
+ node_state->logger_monotonic_start_time = logger_monotonic_start_time;
+ node_state->logger_realtime_start_time = logger_realtime_start_time;
}
- monotonic_clock::time_point monotonic_start_time(size_t node_index) const {
- return node_states_[node_index].monotonic_start_time;
+ monotonic_clock::time_point monotonic_start_time(size_t node_index,
+ const UUID &boot_uuid) {
+ DCHECK_NE(boot_uuid, UUID::Zero());
+
+ NodeState *node_state = GetNodeState(node_index, boot_uuid);
+ return node_state->monotonic_start_time;
}
protected:
- // Creates a new header by copying fields out of the template and combining
- // them with the arguments provided.
- aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
- size_t node_index, const std::vector<NewDataWriter::State> &state,
- const UUID &parts_uuid, int parts_index) const;
-
- EventLoop *event_loop_;
- const Configuration *const configuration_;
- const Node *const node_;
- const size_t logger_node_index_;
- UUID logger_node_boot_uuid_;
- std::vector<const Node *> nodes_;
-
- friend NewDataWriter;
-
// Structure with state per node about times and such.
- // TODO(austin): some of this lives better in NewDataWriter once we move
- // ownership of deciding when to write headers into LogNamer.
struct NodeState {
// Time when this node started logging.
monotonic_clock::time_point monotonic_start_time =
@@ -231,7 +230,28 @@
realtime_clock::time_point logger_realtime_start_time =
realtime_clock::min_time;
};
- std::vector<NodeState> node_states_;
+
+ // Creates a new header by copying fields out of the template and combining
+ // them with the arguments provided.
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
+ size_t node_index, const std::vector<NewDataWriter::State> &state,
+ const UUID &parts_uuid, int parts_index);
+
+ EventLoop *event_loop_;
+ const Configuration *const configuration_;
+ const Node *const node_;
+ const size_t logger_node_index_;
+ UUID logger_node_boot_uuid_;
+ std::vector<const Node *> nodes_;
+
+ friend NewDataWriter;
+
+ // Returns the start/stop time state structure for a node and boot. We can
+ // have data from multiple boots, and it makes sense to reuse the start/stop
+ // times if we get data from the same boot again.
+ NodeState *GetNodeState(size_t node_index, const UUID &boot_uuid);
+
+ absl::btree_map<std::pair<size_t, UUID>, NodeState> node_states_;
aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> header_ =
aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>::Empty();
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index b320474..5fe61cd 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -49,14 +49,6 @@
namespace logger {
namespace {
-std::string LogFileVectorToString(std::vector<LogFile> log_files) {
- std::stringstream ss;
- for (const auto &f : log_files) {
- ss << f << "\n";
- }
- return ss.str();
-}
-
// Copies the channel, removing the schema as we go. If new_name is provided,
// it is used instead of the name inside the channel. If new_type is provided,
// it is used instead of the type in the channel.
@@ -230,7 +222,8 @@
if (!configuration::MultiNode(configuration())) {
states_.emplace_back(std::make_unique<State>(
- std::make_unique<TimestampMapper>(FilterPartsForNode(log_files_, ""))));
+ std::make_unique<TimestampMapper>(FilterPartsForNode(log_files_, "")),
+ nullptr));
} else {
if (replay_configuration) {
CHECK_EQ(logged_configuration()->nodes()->size(),
@@ -295,19 +288,82 @@
return state->realtime_start_time(0);
}
+void LogReader::OnStart(std::function<void()> fn) {
+ CHECK(!configuration::MultiNode(configuration()));
+ OnStart(nullptr, std::move(fn));
+}
+
+void LogReader::OnStart(const Node *node, std::function<void()> fn) {
+ const int node_index = configuration::GetNodeIndex(configuration(), node);
+ CHECK_GE(node_index, 0);
+ CHECK_LT(node_index, static_cast<int>(states_.size()));
+ State *state = states_[node_index].get();
+ CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
+
+ state->OnStart(std::move(fn));
+}
+
+void LogReader::State::OnStart(std::function<void()> fn) {
+ on_starts_.emplace_back(std::move(fn));
+}
+
+void LogReader::State::RunOnStart() {
+ SetRealtimeOffset(monotonic_start_time(boot_count()),
+ realtime_start_time(boot_count()));
+
+ VLOG(1) << "Starting " << MaybeNodeName(node()) << "at time "
+ << monotonic_start_time(boot_count());
+ for (size_t i = 0; i < on_starts_.size(); ++i) {
+ on_starts_[i]();
+ }
+ stopped_ = false;
+ started_ = true;
+}
+
+void LogReader::OnEnd(std::function<void()> fn) {
+ CHECK(!configuration::MultiNode(configuration()));
+ OnEnd(nullptr, std::move(fn));
+}
+
+void LogReader::OnEnd(const Node *node, std::function<void()> fn) {
+ const int node_index = configuration::GetNodeIndex(configuration(), node);
+ CHECK_GE(node_index, 0);
+ CHECK_LT(node_index, static_cast<int>(states_.size()));
+ State *state = states_[node_index].get();
+ CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
+
+ state->OnEnd(std::move(fn));
+}
+
+void LogReader::State::OnEnd(std::function<void()> fn) {
+ on_ends_.emplace_back(std::move(fn));
+}
+
+void LogReader::State::RunOnEnd() {
+ VLOG(1) << "Ending " << MaybeNodeName(node()) << "at time "
+ << monotonic_start_time(boot_count());
+ for (size_t i = 0; i < on_ends_.size(); ++i) {
+ on_ends_[i]();
+ }
+
+ stopped_ = true;
+ started_ = false;
+}
+
void LogReader::Register() {
event_loop_factory_unique_ptr_ =
std::make_unique<SimulatedEventLoopFactory>(configuration());
Register(event_loop_factory_unique_ptr_.get());
}
-void LogReader::Register(SimulatedEventLoopFactory *event_loop_factory) {
+void LogReader::RegisterWithoutStarting(
+ SimulatedEventLoopFactory *event_loop_factory) {
event_loop_factory_ = event_loop_factory;
remapped_configuration_ = event_loop_factory_->configuration();
filters_ =
std::make_unique<message_bridge::MultiNodeNoncausalOffsetEstimator>(
event_loop_factory_->configuration(), logged_configuration(),
- FLAGS_skip_order_validation,
+ log_files_[0].boots, FLAGS_skip_order_validation,
chrono::duration_cast<chrono::nanoseconds>(
chrono::duration<double>(FLAGS_time_estimation_buffer_seconds)));
@@ -318,29 +374,14 @@
std::vector<LogParts> filtered_parts = FilterPartsForNode(
log_files_, node != nullptr ? node->name()->string_view() : "");
- // Confirm that all the parts are from the same boot if there are enough
- // parts to not be from the same boot.
- if (filtered_parts.size() > 1u) {
- for (size_t i = 1; i < filtered_parts.size(); ++i) {
- CHECK_EQ(filtered_parts[i].source_boot_uuid,
- filtered_parts[0].source_boot_uuid)
- << ": Found parts from different boots for node "
- << node->name()->string_view() << " "
- << LogFileVectorToString(log_files_);
- }
- if (!filtered_parts[0].source_boot_uuid.empty()) {
- event_loop_factory_->GetNodeEventLoopFactory(node)->set_boot_uuid(
- filtered_parts[0].source_boot_uuid);
- }
- }
-
states_[node_index] = std::make_unique<State>(
filtered_parts.size() == 0u
? nullptr
- : std::make_unique<TimestampMapper>(std::move(filtered_parts)));
+ : std::make_unique<TimestampMapper>(std::move(filtered_parts)),
+ node);
State *state = states_[node_index].get();
- state->set_event_loop(state->SetNodeEventLoopFactory(
- event_loop_factory_->GetNodeEventLoopFactory(node)));
+ state->SetNodeEventLoopFactory(
+ event_loop_factory_->GetNodeEventLoopFactory(node));
state->SetChannelCount(logged_configuration()->channels()->size());
timestamp_mappers.emplace_back(state->timestamp_mapper());
@@ -372,7 +413,22 @@
configuration::GetNodeIndex(configuration(), node);
State *state = states_[node_index].get();
- Register(state->event_loop());
+ // If we didn't find any log files with data in them, we won't ever get a
+ // callback or be live. So skip the rest of the setup.
+ if (state->OldestMessageTime() == BootTimestamp::max_time()) {
+ continue;
+ }
+ ++live_nodes_;
+
+ NodeEventLoopFactory *node_factory =
+ event_loop_factory_->GetNodeEventLoopFactory(node);
+ node_factory->OnStartup([this, state, node]() {
+ RegisterDuringStartup(state->MakeEventLoop(), node);
+ });
+ node_factory->OnShutdown([this, state, node]() {
+ RegisterDuringStartup(nullptr, node);
+ state->DestroyEventLoop();
+ });
}
if (live_nodes_ == 0) {
@@ -387,34 +443,6 @@
state->SeedSortedMessages();
}
- // We want to start the log file at the last start time of the log files
- // from all the nodes. Compute how long each node's simulation needs to run
- // to move time to this point.
- distributed_clock::time_point start_time = distributed_clock::min_time;
-
- // TODO(austin): We want an "OnStart" callback for each node rather than
- // running until the last node.
-
- for (std::unique_ptr<State> &state : states_) {
- VLOG(1) << "Start time is " << state->monotonic_start_time(0)
- << " for node " << MaybeNodeName(state->event_loop()->node())
- << "now " << state->monotonic_now();
- if (state->monotonic_start_time(0) == monotonic_clock::min_time) {
- continue;
- }
- // And start computing the start time on the distributed clock now that
- // that works.
- start_time = std::max(
- start_time, state->ToDistributedClock(state->monotonic_start_time(0)));
- }
-
- // TODO(austin): If a node doesn't have a start time, we might not queue
- // enough. If this happens, we'll explode with a frozen error eventually.
-
- CHECK_GE(start_time, distributed_clock::epoch())
- << ": Hmm, we have a node starting before the start of time. Offset "
- "everything.";
-
// Forwarding is tracked per channel. If it is enabled, we want to turn it
// off. Otherwise messages replayed will get forwarded across to the other
// nodes, and also replayed on the other nodes. This may not satisfy all
@@ -429,7 +457,7 @@
states_[configuration::GetNodeIndex(configuration(), node)].get();
const Channel *remapped_channel =
- RemapChannel(state->event_loop(), channel);
+ RemapChannel(state->event_loop(), node, channel);
event_loop_factory_->DisableForwarding(remapped_channel);
}
@@ -438,6 +466,37 @@
// from both the real message bridge and simulated message bridge.
event_loop_factory_->DisableStatistics();
}
+}
+
+void LogReader::Register(SimulatedEventLoopFactory *event_loop_factory) {
+ RegisterWithoutStarting(event_loop_factory);
+ // We want to start the log file at the last start time of the log files
+ // from all the nodes. Compute how long each node's simulation needs to run
+ // to move time to this point.
+ distributed_clock::time_point start_time = distributed_clock::min_time;
+
+ // TODO(austin): We want an "OnStart" callback for each node rather than
+ // running until the last node.
+
+ for (std::unique_ptr<State> &state : states_) {
+ VLOG(1) << "Start time is " << state->monotonic_start_time(0)
+ << " for node " << MaybeNodeName(state->node()) << "now "
+ << state->monotonic_now();
+ if (state->monotonic_start_time(0) == monotonic_clock::min_time) {
+ continue;
+ }
+ // And start computing the start time on the distributed clock now that
+ // that works.
+ start_time = std::max(
+ start_time, state->ToDistributedClock(state->monotonic_start_time(0)));
+ }
+
+ // TODO(austin): If a node doesn't have a start time, we might not queue
+ // enough. If this happens, we'll explode with a frozen error eventually.
+
+ CHECK_GE(start_time, distributed_clock::epoch())
+ << ": Hmm, we have a node starting before the start of time. Offset "
+ "everything.";
// While we are starting the system up, we might be relying on matching data
// to timestamps on log files where the timestamp log file starts before the
@@ -478,23 +537,53 @@
}
void LogReader::Register(EventLoop *event_loop) {
+ Register(event_loop, event_loop->node());
+}
+
+void LogReader::Register(EventLoop *event_loop, const Node *node) {
State *state =
- states_[configuration::GetNodeIndex(configuration(), event_loop->node())]
- .get();
+ states_[configuration::GetNodeIndex(configuration(), node)].get();
+
+ // If we didn't find any log files with data in them, we won't ever get a
+ // callback or be live. So skip the rest of the setup.
+ if (state->OldestMessageTime() == BootTimestamp::max_time()) {
+ return;
+ }
+ ++live_nodes_;
+
+ if (event_loop_factory_ != nullptr) {
+ event_loop_factory_->GetNodeEventLoopFactory(node)->OnStartup(
+ [this, event_loop, node]() {
+ RegisterDuringStartup(event_loop, node);
+ });
+ } else {
+ RegisterDuringStartup(event_loop, node);
+ }
+}
+
+void LogReader::RegisterDuringStartup(EventLoop *event_loop, const Node *node) {
+ if (event_loop) {
+ CHECK(event_loop->configuration() == configuration());
+ }
+
+ State *state =
+ states_[configuration::GetNodeIndex(configuration(), node)].get();
state->set_event_loop(event_loop);
// We don't run timing reports when trying to print out logged data, because
// otherwise we would end up printing out the timing reports themselves...
// This is only really relevant when we are replaying into a simulation.
- event_loop->SkipTimingReport();
- event_loop->SkipAosLog();
+ if (event_loop) {
+ event_loop->SkipTimingReport();
+ event_loop->SkipAosLog();
+ }
for (size_t logged_channel_index = 0;
logged_channel_index < logged_configuration()->channels()->size();
++logged_channel_index) {
const Channel *channel = RemapChannel(
- event_loop,
+ event_loop, node,
logged_configuration()->channels()->Get(logged_channel_index));
if (channel->logger() == LoggerConfig::NOT_LOGGED) {
@@ -502,45 +591,58 @@
}
message_bridge::NoncausalOffsetEstimator *filter = nullptr;
- RemoteMessageSender *remote_timestamp_sender = nullptr;
State *source_state = nullptr;
-
- if (!configuration::ChannelIsSendableOnNode(channel, event_loop->node()) &&
- configuration::ChannelIsReadableOnNode(channel, event_loop->node())) {
- // We've got a message which is being forwarded to this node.
+ if (!configuration::ChannelIsSendableOnNode(channel, node) &&
+ configuration::ChannelIsReadableOnNode(channel, node)) {
const Node *source_node = configuration::GetNode(
- event_loop->configuration(), channel->source_node()->string_view());
- filter = GetFilter(event_loop->node(), source_node);
+ configuration(), channel->source_node()->string_view());
- // Delivery timestamps are supposed to be logged back on the source node.
- // Configure remote timestamps to be sent.
- const Connection *connection =
- configuration::ConnectionToNode(channel, event_loop->node());
- const bool delivery_time_is_logged =
- configuration::ConnectionDeliveryTimeIsLoggedOnNode(connection,
- source_node);
+ // We've got a message which is being forwarded to this node.
+ filter = GetFilter(node, source_node);
source_state =
states_[configuration::GetNodeIndex(configuration(), source_node)]
.get();
-
- if (delivery_time_is_logged) {
- remote_timestamp_sender =
- source_state->RemoteTimestampSender(channel, connection);
- }
}
- state->SetChannel(
- logged_channel_index,
- configuration::ChannelIndex(event_loop->configuration(), channel),
- event_loop->MakeRawSender(channel), filter, remote_timestamp_sender,
- source_state);
+ // We are the source, and it is forwarded.
+ const bool is_forwarded =
+ configuration::ChannelIsSendableOnNode(channel, node) &&
+ configuration::ConnectionCount(channel);
+
+ state->SetChannel(logged_channel_index,
+ configuration::ChannelIndex(configuration(), channel),
+ event_loop ? event_loop->MakeRawSender(channel) : nullptr,
+ filter, is_forwarded, source_state);
+
+ if (is_forwarded) {
+ const Node *source_node = configuration::GetNode(
+ configuration(), channel->source_node()->string_view());
+
+ for (const Connection *connection : *channel->destination_nodes()) {
+ const bool delivery_time_is_logged =
+ configuration::ConnectionDeliveryTimeIsLoggedOnNode(connection,
+ source_node);
+
+ if (delivery_time_is_logged) {
+ State *destination_state =
+ states_[configuration::GetNodeIndex(
+ configuration(), connection->name()->string_view())]
+ .get();
+ destination_state->SetRemoteTimestampSender(
+ logged_channel_index,
+ event_loop ? state->RemoteTimestampSender(channel, connection)
+ : nullptr);
+ }
+ }
+ }
}
- // If we didn't find any log files with data in them, we won't ever get a
- // callback or be live. So skip the rest of the setup.
- if (state->OldestMessageTime() == monotonic_clock::max_time) {
+ if (!event_loop) {
+ state->ClearRemoteTimestampSenders();
+ state->set_timer_handler(nullptr);
+ state->set_startup_timer(nullptr);
return;
}
@@ -548,7 +650,7 @@
VLOG(1) << "Starting sending " << MaybeNodeName(state->event_loop()->node())
<< "at " << state->event_loop()->context().monotonic_event_time
<< " now " << state->monotonic_now();
- if (state->OldestMessageTime() == monotonic_clock::max_time) {
+ if (state->OldestMessageTime() == BootTimestamp::max_time()) {
--live_nodes_;
VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Node down!";
if (exit_on_finish_ && live_nodes_ == 0) {
@@ -558,9 +660,9 @@
}
TimestampedMessage timestamped_message = state->PopOldest();
- CHECK_EQ(timestamped_message.monotonic_event_time.boot, 0u);
- CHECK_EQ(timestamped_message.monotonic_remote_time.boot, 0u);
- CHECK_EQ(timestamped_message.monotonic_timestamp_time.boot, 0u);
+
+ CHECK_EQ(timestamped_message.monotonic_event_time.boot,
+ state->boot_count());
const monotonic_clock::time_point monotonic_now =
state->event_loop()->context().monotonic_event_time;
@@ -570,7 +672,8 @@
<< monotonic_now << " trying to send "
<< timestamped_message.monotonic_event_time << " failure "
<< state->DebugString();
- } else if (BootTimestamp{.boot = 0u, .time = monotonic_now} !=
+ } else if (BootTimestamp{.boot = state->boot_count(),
+ .time = monotonic_now} !=
timestamped_message.monotonic_event_time) {
LOG(WARNING) << "Check failed: monotonic_now == "
"timestamped_message.monotonic_event_time) ("
@@ -597,10 +700,13 @@
// happen after the effect even though we know they are at the same
// time. I doubt anyone will notice for a bit, but we should really
// fix that.
+ BootTimestamp monotonic_remote_now =
+ state->monotonic_remote_now(timestamped_message.channel_index);
if (!FLAGS_skip_order_validation) {
- CHECK_LE(
- timestamped_message.monotonic_remote_time.time,
- state->monotonic_remote_now(timestamped_message.channel_index))
+ CHECK_EQ(timestamped_message.monotonic_remote_time.boot,
+ monotonic_remote_now.boot);
+ CHECK_LE(timestamped_message.monotonic_remote_time,
+ monotonic_remote_now)
<< state->event_loop()->node()->name()->string_view() << " to "
<< state->remote_node(timestamped_message.channel_index)
->name()
@@ -610,9 +716,13 @@
logged_configuration()->channels()->Get(
timestamped_message.channel_index))
<< " " << state->DebugString();
- } else if (timestamped_message.monotonic_remote_time.time >
- state->monotonic_remote_now(
- timestamped_message.channel_index)) {
+ } else if (monotonic_remote_now.boot !=
+ timestamped_message.monotonic_remote_time.boot) {
+ LOG(WARNING) << "Missmatched boots, " << monotonic_remote_now.boot
+ << " vs "
+ << timestamped_message.monotonic_remote_time.boot;
+ } else if (timestamped_message.monotonic_remote_time >
+ monotonic_remote_now) {
LOG(WARNING)
<< "Check failed: timestamped_message.monotonic_remote_time < "
"state->monotonic_remote_now(timestamped_message.channel_"
@@ -665,7 +775,8 @@
<< timestamped_message.channel_index << ", "
<< configuration::CleanedChannelToString(
logged_configuration()->channels()->Get(
- timestamped_message.channel_index));
+ timestamped_message.channel_index))
+ << " " << timestamped_message;
// The user might be working with log files from 1 node but forgot to
// configure the infrastructure to log data for a remote channel on that
@@ -713,7 +824,10 @@
// rest. It is confusing when part of your data gets replayed but not
// all. Read the rest of the messages and drop them on the floor while
// doing some basic validation.
- while (state->OldestMessageTime() != monotonic_clock::max_time) {
+ while (state->OldestMessageTime() != BootTimestamp::max_time()) {
+ // TODO(austin): This force queues up the rest of the log file for all
+ // the other nodes. We should do this through the timer instead to
+ // keep memory usage down.
TimestampedMessage next = state->PopOldest();
// Make sure that once we have seen the last message on a channel,
// data doesn't start back up again. If the user wants to play
@@ -727,8 +841,9 @@
LOG(FATAL)
<< "Found missing data in the middle of the log file on "
"channel "
- << next.channel_index << " Last "
- << last_message[next.channel_index] << state->DebugString();
+ << next.channel_index << " " << next << " Last "
+ << last_message[next.channel_index] << " "
+ << state->DebugString();
}
}
}
@@ -745,16 +860,26 @@
{.multi_line = false, .max_vector_size = 100});
}
- const monotonic_clock::time_point next_time = state->OldestMessageTime();
- if (next_time != monotonic_clock::max_time) {
+ const BootTimestamp next_time = state->OldestMessageTime();
+ if (next_time != BootTimestamp::max_time()) {
+ if (next_time.boot != state->boot_count()) {
+ VLOG(1) << "Next message for "
+ << MaybeNodeName(state->event_loop()->node())
+ << "is on the next boot, " << next_time << " now is "
+ << state->monotonic_now();
+ CHECK(event_loop_factory_);
+ state->RunOnEnd();
+ return;
+ }
VLOG(1) << "Scheduling " << MaybeNodeName(state->event_loop()->node())
- << "wakeup for " << next_time << "("
- << state->ToDistributedClock(next_time)
+ << "wakeup for " << next_time.time << "("
+ << state->ToDistributedClock(next_time.time)
<< " distributed), now is " << state->monotonic_now();
- state->Setup(next_time);
+ state->Setup(next_time.time);
} else {
VLOG(1) << MaybeNodeName(state->event_loop()->node())
<< "No next message, scheduling shutdown";
+ state->RunOnEnd();
// Set a timer up immediately after now to die. If we don't do this,
// then the senders waiting on the message we just read will never get
// called.
@@ -769,10 +894,15 @@
<< state->monotonic_now();
}));
- ++live_nodes_;
-
- if (state->OldestMessageTime() != monotonic_clock::max_time) {
- event_loop->OnRun([state]() { state->Setup(state->OldestMessageTime()); });
+ if (state->OldestMessageTime() != BootTimestamp::max_time()) {
+ state->set_startup_timer(
+ event_loop->AddTimer([state]() { state->RunOnStart(); }));
+ event_loop->OnRun([state]() {
+ BootTimestamp next_time = state->OldestMessageTime();
+ CHECK_EQ(next_time.boot, state->boot_count());
+ state->Setup(next_time.time);
+ state->SetupStartupTimer();
+ });
}
}
@@ -1101,6 +1231,7 @@
}
const Channel *LogReader::RemapChannel(const EventLoop *event_loop,
+ const Node *node,
const Channel *channel) {
std::string_view channel_name = channel->name()->string_view();
std::string_view channel_type = channel->type()->string_view();
@@ -1115,8 +1246,8 @@
VLOG(2) << "Going to remap channel " << channel_name << " " << channel_type;
const Channel *remapped_channel = configuration::GetChannel(
- event_loop->configuration(), channel_name, channel_type,
- event_loop->name(), event_loop->node());
+ configuration(), channel_name, channel_type,
+ event_loop ? event_loop->name() : "log_reader", node);
CHECK(remapped_channel != nullptr)
<< ": Unable to send {\"name\": \"" << channel_name << "\", \"type\": \""
@@ -1125,8 +1256,9 @@
return remapped_channel;
}
-LogReader::State::State(std::unique_ptr<TimestampMapper> timestamp_mapper)
- : timestamp_mapper_(std::move(timestamp_mapper)) {}
+LogReader::State::State(std::unique_ptr<TimestampMapper> timestamp_mapper,
+ const Node *node)
+ : timestamp_mapper_(std::move(timestamp_mapper)), node_(node) {}
void LogReader::State::AddPeer(State *peer) {
if (timestamp_mapper_ && peer->timestamp_mapper_) {
@@ -1134,12 +1266,9 @@
}
}
-EventLoop *LogReader::State::SetNodeEventLoopFactory(
+void LogReader::State::SetNodeEventLoopFactory(
NodeEventLoopFactory *node_event_loop_factory) {
node_event_loop_factory_ = node_event_loop_factory;
- event_loop_unique_ptr_ =
- node_event_loop_factory_->MakeEventLoop("log_reader");
- return event_loop_unique_ptr_.get();
}
void LogReader::State::SetChannelCount(size_t count) {
@@ -1151,22 +1280,23 @@
queue_index_map_.resize(count);
}
+void LogReader::State::SetRemoteTimestampSender(
+ size_t logged_channel_index, RemoteMessageSender *remote_timestamp_sender) {
+ remote_timestamp_senders_[logged_channel_index] = remote_timestamp_sender;
+}
+
void LogReader::State::SetChannel(
size_t logged_channel_index, size_t factory_channel_index,
std::unique_ptr<RawSender> sender,
- message_bridge::NoncausalOffsetEstimator *filter,
- RemoteMessageSender *remote_timestamp_sender, State *source_state) {
+ message_bridge::NoncausalOffsetEstimator *filter, bool is_forwarded,
+ State *source_state) {
channels_[logged_channel_index] = std::move(sender);
filters_[logged_channel_index] = filter;
- remote_timestamp_senders_[logged_channel_index] = remote_timestamp_sender;
+ channel_source_state_[logged_channel_index] = source_state;
- if (source_state) {
- channel_source_state_[logged_channel_index] = source_state;
-
- if (remote_timestamp_sender != nullptr) {
- source_state->queue_index_map_[logged_channel_index] =
- std::make_unique<std::vector<State::ContiguousSentTimestamp>>();
- }
+ if (is_forwarded) {
+ queue_index_map_[logged_channel_index] =
+ std::make_unique<std::vector<State::ContiguousSentTimestamp>>();
}
factory_channel_index_[logged_channel_index] = factory_channel_index;
@@ -1174,12 +1304,14 @@
bool LogReader::State::Send(const TimestampedMessage ×tamped_message) {
aos::RawSender *sender = channels_[timestamped_message.channel_index].get();
+ CHECK(sender);
uint32_t remote_queue_index = 0xffffffff;
if (remote_timestamp_senders_[timestamped_message.channel_index] != nullptr) {
+ State *source_state =
+ CHECK_NOTNULL(channel_source_state_[timestamped_message.channel_index]);
std::vector<ContiguousSentTimestamp> *queue_index_map = CHECK_NOTNULL(
- CHECK_NOTNULL(channel_source_state_[timestamped_message.channel_index])
- ->queue_index_map_[timestamped_message.channel_index]
+ source_state->queue_index_map_[timestamped_message.channel_index]
.get());
struct SentTimestamp {
@@ -1187,10 +1319,11 @@
uint32_t queue_index;
} search;
- CHECK_EQ(timestamped_message.monotonic_remote_time.boot, 0u);
+ CHECK_EQ(timestamped_message.monotonic_remote_time.boot,
+ source_state->boot_count());
search.monotonic_event_time =
timestamped_message.monotonic_remote_time.time;
- search.queue_index = timestamped_message.remote_queue_index;
+ search.queue_index = timestamped_message.remote_queue_index.index;
// Find the sent time if available.
auto element = std::lower_bound(
@@ -1220,28 +1353,30 @@
// other node isn't done yet. So there is no send time, but there is a
// receive time.
if (element != queue_index_map->end()) {
- CHECK_EQ(timestamped_message.monotonic_remote_time.boot, 0u);
+ CHECK_EQ(timestamped_message.monotonic_remote_time.boot,
+ source_state->boot_count());
CHECK_GE(timestamped_message.monotonic_remote_time.time,
element->starting_monotonic_event_time);
CHECK_LE(timestamped_message.monotonic_remote_time.time,
element->ending_monotonic_event_time);
- CHECK_GE(timestamped_message.remote_queue_index,
+ CHECK_GE(timestamped_message.remote_queue_index.index,
element->starting_queue_index);
- CHECK_LE(timestamped_message.remote_queue_index,
+ CHECK_LE(timestamped_message.remote_queue_index.index,
element->ending_queue_index);
- remote_queue_index = timestamped_message.remote_queue_index +
+ remote_queue_index = timestamped_message.remote_queue_index.index +
element->actual_queue_index -
element->starting_queue_index;
} else {
VLOG(1) << "No timestamp match in the map.";
}
+ CHECK_EQ(timestamped_message.monotonic_remote_time.boot,
+ source_state->boot_count());
}
// Send! Use the replayed queue index here instead of the logged queue index
// for the remote queue index. This makes re-logging work.
- CHECK_EQ(timestamped_message.monotonic_remote_time.boot, 0u);
const bool sent = sender->Send(
timestamped_message.data.message().data()->Data(),
timestamped_message.data.message().data()->size(),
@@ -1255,15 +1390,15 @@
if (!sent) return false;
if (queue_index_map_[timestamped_message.channel_index]) {
+ CHECK_EQ(timestamped_message.monotonic_event_time.boot, boot_count());
if (queue_index_map_[timestamped_message.channel_index]->empty()) {
// Nothing here, start a range with 0 length.
ContiguousSentTimestamp timestamp;
- CHECK_EQ(timestamped_message.monotonic_event_time.boot, 0u);
timestamp.starting_monotonic_event_time =
timestamp.ending_monotonic_event_time =
timestamped_message.monotonic_event_time.time;
timestamp.starting_queue_index = timestamp.ending_queue_index =
- timestamped_message.queue_index;
+ timestamped_message.queue_index.index;
timestamp.actual_queue_index = sender->sent_queue_index();
queue_index_map_[timestamped_message.channel_index]->emplace_back(
timestamp);
@@ -1273,20 +1408,18 @@
ContiguousSentTimestamp *back =
&queue_index_map_[timestamped_message.channel_index]->back();
if ((back->starting_queue_index - back->actual_queue_index) ==
- (timestamped_message.queue_index - sender->sent_queue_index())) {
- back->ending_queue_index = timestamped_message.queue_index;
- CHECK_EQ(timestamped_message.monotonic_event_time.boot, 0u);
+ (timestamped_message.queue_index.index - sender->sent_queue_index())) {
+ back->ending_queue_index = timestamped_message.queue_index.index;
back->ending_monotonic_event_time =
timestamped_message.monotonic_event_time.time;
} else {
// Otherwise, make a new one.
- CHECK_EQ(timestamped_message.monotonic_event_time.boot, 0u);
ContiguousSentTimestamp timestamp;
timestamp.starting_monotonic_event_time =
timestamp.ending_monotonic_event_time =
timestamped_message.monotonic_event_time.time;
timestamp.starting_queue_index = timestamp.ending_queue_index =
- timestamped_message.queue_index;
+ timestamped_message.queue_index.index;
timestamp.actual_queue_index = sender->sent_queue_index();
queue_index_map_[timestamped_message.channel_index]->emplace_back(
timestamp);
@@ -1298,6 +1431,9 @@
// map.
} else if (remote_timestamp_senders_[timestamped_message.channel_index] !=
nullptr) {
+ State *source_state =
+ CHECK_NOTNULL(channel_source_state_[timestamped_message.channel_index]);
+
flatbuffers::FlatBufferBuilder fbb;
fbb.ForceDefaults(true);
flatbuffers::Offset<flatbuffers::Vector<uint8_t>> boot_uuid_offset =
@@ -1316,7 +1452,8 @@
sender->realtime_sent_time().time_since_epoch().count());
message_header_builder.add_queue_index(sender->sent_queue_index());
- CHECK_EQ(timestamped_message.monotonic_remote_time.boot, 0u);
+ CHECK_EQ(timestamped_message.monotonic_remote_time.boot,
+ source_state->boot_count());
message_header_builder.add_monotonic_remote_time(
timestamped_message.monotonic_remote_time.time.time_since_epoch()
.count());
@@ -1328,10 +1465,10 @@
fbb.Finish(message_header_builder.Finish());
- CHECK_EQ(timestamped_message.monotonic_timestamp_time.boot, 0u);
remote_timestamp_senders_[timestamped_message.channel_index]->Send(
FlatbufferDetachedBuffer<RemoteMessage>(fbb.Release()),
- timestamped_message.monotonic_timestamp_time.time);
+ timestamped_message.monotonic_timestamp_time,
+ source_state->boot_count());
}
return true;
@@ -1362,7 +1499,7 @@
void LogReader::RemoteMessageSender::Send(
FlatbufferDetachedBuffer<RemoteMessage> remote_message,
- monotonic_clock::time_point monotonic_timestamp_time) {
+ BootTimestamp monotonic_timestamp_time, size_t source_boot_count) {
// There are 2 variants of logs.
// 1) Logs without monotonic_timestamp_time
// 2) Logs with monotonic_timestamp_time
@@ -1386,20 +1523,22 @@
// timestamp to distinguish 2 and 3, and ignore 1. If we don't have a
// monotonic_timestamp_time, this means the message was logged locally and
// remote timestamps can be ignored.
- if (monotonic_timestamp_time == monotonic_clock::min_time) {
+ if (monotonic_timestamp_time == BootTimestamp::min_time()) {
return;
}
+ CHECK_EQ(monotonic_timestamp_time.boot, source_boot_count);
+
remote_timestamps_.emplace(
std::upper_bound(
remote_timestamps_.begin(), remote_timestamps_.end(),
- monotonic_timestamp_time,
+ monotonic_timestamp_time.time,
[](const aos::monotonic_clock::time_point monotonic_timestamp_time,
const Timestamp ×tamp) {
return monotonic_timestamp_time <
timestamp.monotonic_timestamp_time;
}),
- std::move(remote_message), monotonic_timestamp_time);
+ std::move(remote_message), monotonic_timestamp_time.time);
ScheduleTimestamp();
}
@@ -1475,18 +1614,8 @@
timestamp_mapper_->PopFront();
SeedSortedMessages();
- CHECK_EQ(result.monotonic_remote_time.boot, 0u);
+ CHECK_EQ(result.monotonic_event_time.boot, boot_count());
- if (result.monotonic_remote_time.time != monotonic_clock::min_time) {
- message_bridge::NoncausalOffsetEstimator *filter =
- filters_[result.channel_index];
- CHECK(filter != nullptr);
-
- // TODO(austin): We probably want to push this down into the timestamp
- // mapper directly.
- // TODO(austin): This hard-codes the boot to 0. We need to fix that.
- filter->Pop(event_loop_->node(), {0, event_loop_->monotonic_now()});
- }
VLOG(1) << "Popped " << result
<< configuration::CleanedChannelToString(
event_loop_->configuration()->channels()->Get(
@@ -1494,18 +1623,17 @@
return result;
}
-monotonic_clock::time_point LogReader::State::OldestMessageTime() const {
+BootTimestamp LogReader::State::OldestMessageTime() const {
if (timestamp_mapper_ == nullptr) {
- return monotonic_clock::max_time;
+ return BootTimestamp::max_time();
}
TimestampedMessage *result_ptr = timestamp_mapper_->Front();
if (result_ptr == nullptr) {
- return monotonic_clock::max_time;
+ return BootTimestamp::max_time();
}
- CHECK_EQ(result_ptr->monotonic_event_time.boot, 0u);
VLOG(2) << MaybeNodeName(event_loop_->node()) << "oldest message at "
<< result_ptr->monotonic_event_time.time;
- return result_ptr->monotonic_event_time.time;
+ return result_ptr->monotonic_event_time;
}
void LogReader::State::SeedSortedMessages() {
@@ -1516,6 +1644,9 @@
}
void LogReader::State::Deregister() {
+ if (started_ && !stopped_) {
+ RunOnEnd();
+ }
for (size_t i = 0; i < channels_.size(); ++i) {
channels_[i].reset();
}
diff --git a/aos/events/logging/log_reader.h b/aos/events/logging/log_reader.h
index 5cfb5df..8293956 100644
--- a/aos/events/logging/log_reader.h
+++ b/aos/events/logging/log_reader.h
@@ -73,6 +73,10 @@
// below, but can be anything as long as the locations needed to send
// everything are available.
void Register(SimulatedEventLoopFactory *event_loop_factory);
+ // Registers all the callbacks to send the log file data out to an event loop
+ // factory. This does not start replaying or change the current distributed
+ // time of the factory. It does change the monotonic clocks to be right.
+ void RegisterWithoutStarting(SimulatedEventLoopFactory *event_loop_factory);
// Creates an SimulatedEventLoopFactory accessible via event_loop_factory(),
// and then calls Register.
void Register();
@@ -80,6 +84,13 @@
// only useful when replaying live.
void Register(EventLoop *event_loop);
+ // Called whenever a log file starts for a node.
+ void OnStart(std::function<void()> fn);
+ void OnStart(const Node *node, std::function<void()> fn);
+ // Called whenever a log file ends for a node.
+ void OnEnd(std::function<void()> fn);
+ void OnEnd(const Node *node, std::function<void()> fn);
+
// Unregisters the senders. You only need to call this if you separately
// supplied an event loop or event loop factory and the lifetimes are such
// that they need to be explicitly destroyed before the LogReader destructor
@@ -175,7 +186,11 @@
}
private:
- const Channel *RemapChannel(const EventLoop *event_loop,
+ void Register(EventLoop *event_loop, const Node *node);
+
+ void RegisterDuringStartup(EventLoop *event_loop, const Node *node);
+
+ const Channel *RemapChannel(const EventLoop *event_loop, const Node *node,
const Channel *channel);
// Queues at least max_out_of_order_duration_ messages into channels_.
@@ -206,7 +221,7 @@
// send it immediately.
void Send(
FlatbufferDetachedBuffer<message_bridge::RemoteMessage> remote_message,
- monotonic_clock::time_point monotonic_timestamp_time);
+ BootTimestamp monotonic_timestamp_time, size_t source_boot_count);
private:
// Handles actually sending the timestamp if we were delayed.
@@ -239,7 +254,7 @@
// State per node.
class State {
public:
- State(std::unique_ptr<TimestampMapper> timestamp_mapper);
+ State(std::unique_ptr<TimestampMapper> timestamp_mapper, const Node *node);
// Connects up the timestamp mappers.
void AddPeer(State *peer);
@@ -251,12 +266,44 @@
TimestampedMessage PopOldest();
// Returns the monotonic time of the oldest message.
- monotonic_clock::time_point OldestMessageTime() const;
+ BootTimestamp OldestMessageTime() const;
+
+ size_t boot_count() const {
+ // If we are replaying directly into an event loop, we can't reboot. So
+ // we will stay stuck on the 0th boot.
+ if (!node_event_loop_factory_) return 0u;
+ return node_event_loop_factory_->boot_count();
+ }
// Primes the queues inside State. Should be called before calling
// OldestMessageTime.
void SeedSortedMessages();
+ void SetupStartupTimer() {
+ const monotonic_clock::time_point start_time =
+ monotonic_start_time(boot_count());
+ if (start_time == monotonic_clock::min_time) {
+ LOG(ERROR)
+ << "No start time, skipping, please figure out when this happens";
+ RunOnStart();
+ return;
+ }
+ CHECK_GT(start_time, event_loop_->monotonic_now());
+ startup_timer_->Setup(start_time);
+ }
+
+ void set_startup_timer(TimerHandler *timer_handler) {
+ startup_timer_ = timer_handler;
+ if (startup_timer_) {
+ if (event_loop_->node() != nullptr) {
+ startup_timer_->set_name(absl::StrCat(
+ event_loop_->node()->name()->string_view(), "_startup"));
+ } else {
+ startup_timer_->set_name("startup");
+ }
+ }
+ }
+
// Returns the starting time for this node.
monotonic_clock::time_point monotonic_start_time(size_t boot_count) const {
return timestamp_mapper_
@@ -271,13 +318,19 @@
// Sets the node event loop factory for replaying into a
// SimulatedEventLoopFactory. Returns the EventLoop to use.
- EventLoop *SetNodeEventLoopFactory(
- NodeEventLoopFactory *node_event_loop_factory);
+ void SetNodeEventLoopFactory(NodeEventLoopFactory *node_event_loop_factory);
// Sets and gets the event loop to use.
void set_event_loop(EventLoop *event_loop) { event_loop_ = event_loop; }
EventLoop *event_loop() { return event_loop_; }
+ const Node *node() const { return node_; }
+
+ void Register(EventLoop *event_loop);
+
+ void OnStart(std::function<void()> fn);
+ void OnEnd(std::function<void()> fn);
+
// Sets the current realtime offset from the monotonic clock for this node
// (if we are on a simulated event loop).
void SetRealtimeOffset(monotonic_clock::time_point monotonic_time,
@@ -302,19 +355,29 @@
// Returns the current time on the remote node which sends messages on
// channel_index.
- monotonic_clock::time_point monotonic_remote_now(size_t channel_index) {
- return channel_source_state_[channel_index]
- ->node_event_loop_factory_->monotonic_now();
+ BootTimestamp monotonic_remote_now(size_t channel_index) {
+ State *s = channel_source_state_[channel_index];
+ return BootTimestamp{
+ .boot = s->boot_count(),
+ .time = s->node_event_loop_factory_->monotonic_now()};
}
// Returns the start time of the remote for the provided channel.
monotonic_clock::time_point monotonic_remote_start_time(
- size_t boot_count,
- size_t channel_index) {
+ size_t boot_count, size_t channel_index) {
return channel_source_state_[channel_index]->monotonic_start_time(
boot_count);
}
+ void DestroyEventLoop() { event_loop_unique_ptr_.reset(); }
+
+ EventLoop *MakeEventLoop() {
+ CHECK(!event_loop_unique_ptr_);
+ event_loop_unique_ptr_ =
+ node_event_loop_factory_->MakeEventLoop("log_reader");
+ return event_loop_unique_ptr_.get();
+ }
+
distributed_clock::time_point RemoteToDistributedClock(
size_t channel_index, monotonic_clock::time_point time) {
return channel_source_state_[channel_index]
@@ -337,15 +400,30 @@
void SetChannel(size_t logged_channel_index, size_t factory_channel_index,
std::unique_ptr<RawSender> sender,
message_bridge::NoncausalOffsetEstimator *filter,
- RemoteMessageSender *remote_timestamp_sender,
- State *source_state);
+ bool is_forwarded, State *source_state);
+
+ void SetRemoteTimestampSender(size_t logged_channel_index,
+ RemoteMessageSender *remote_timestamp_sender);
+
+ void RunOnStart();
+ void RunOnEnd();
// Unregisters everything so we can destory the event loop.
+ // TODO(austin): Is this needed? OnShutdown should be able to serve this
+ // need.
void Deregister();
// Sets the current TimerHandle for the replay callback.
void set_timer_handler(TimerHandler *timer_handler) {
timer_handler_ = timer_handler;
+ if (timer_handler_) {
+ if (event_loop_->node() != nullptr) {
+ timer_handler_->set_name(absl::StrCat(
+ event_loop_->node()->name()->string_view(), "_main"));
+ } else {
+ timer_handler_->set_name("main");
+ }
+ }
}
// Sets the next wakeup time on the replay callback.
@@ -364,6 +442,11 @@
return timestamp_mapper_->DebugString();
}
+ void ClearRemoteTimestampSenders() {
+ channel_timestamp_loggers_.clear();
+ timestamp_loggers_.clear();
+ }
+
private:
// Log file.
std::unique_ptr<TimestampMapper> timestamp_mapper_;
@@ -408,9 +491,11 @@
NodeEventLoopFactory *node_event_loop_factory_ = nullptr;
std::unique_ptr<EventLoop> event_loop_unique_ptr_;
// Event loop.
+ const Node *node_ = nullptr;
EventLoop *event_loop_ = nullptr;
// And timer used to send messages.
- TimerHandler *timer_handler_;
+ TimerHandler *timer_handler_ = nullptr;
+ TimerHandler *startup_timer_ = nullptr;
// Filters (or nullptr if it isn't a forwarded channel) for each channel.
// This corresponds to the object which is shared among all the channels
@@ -432,6 +517,12 @@
// is the channel that timestamps are published to.
absl::btree_map<const Channel *, std::shared_ptr<RemoteMessageSender>>
timestamp_loggers_;
+
+ std::vector<std::function<void()>> on_starts_;
+ std::vector<std::function<void()>> on_ends_;
+
+ bool stopped_ = false;
+ bool started_ = false;
};
// Node index -> State.
diff --git a/aos/events/logging/log_writer.cc b/aos/events/logging/log_writer.cc
index fffc886..4e34b61 100644
--- a/aos/events/logging/log_writer.cc
+++ b/aos/events/logging/log_writer.cc
@@ -33,6 +33,7 @@
? event_loop_->MakeFetcher<message_bridge::ServerStatistics>(
"/aos")
: aos::Fetcher<message_bridge::ServerStatistics>()) {
+ timer_handler_->set_name("channel_poll");
VLOG(1) << "Creating logger for " << FlatbufferToJson(node_);
std::map<const Channel *, const Node *> timestamp_logger_channels;
@@ -289,9 +290,7 @@
// Clear out any old timestamps in case we are re-starting logging.
for (size_t i = 0; i < configuration::NodesCount(configuration_); ++i) {
- log_namer_->SetStartTimes(
- i, monotonic_clock::min_time, realtime_clock::min_time,
- monotonic_clock::min_time, realtime_clock::min_time);
+ log_namer_->ClearStartTimes();
}
const aos::monotonic_clock::time_point fetch_time =
@@ -387,8 +386,7 @@
node, node_index,
server_statistics_fetcher_.context().monotonic_event_time,
server_statistics_fetcher_.context().realtime_event_time)) {
- VLOG(1) << "Rotating because timestamps changed";
- log_namer_->Rotate(node);
+ VLOG(1) << "Timestamps changed on " << aos::FlatbufferToJson(node);
}
}
}
@@ -398,16 +396,16 @@
aos::monotonic_clock::time_point monotonic_start_time,
aos::realtime_clock::time_point realtime_start_time) {
// Bail early if the start times are already set.
- if (log_namer_->monotonic_start_time(node_index) !=
- monotonic_clock::min_time) {
- return false;
- }
- if (node_ == node ||
- !configuration::MultiNode(configuration_)) {
+ if (node_ == node || !configuration::MultiNode(configuration_)) {
+ if (log_namer_->monotonic_start_time(node_index,
+ event_loop_->boot_uuid()) !=
+ monotonic_clock::min_time) {
+ return false;
+ }
// There are no offsets to compute for ourself, so always succeed.
- log_namer_->SetStartTimes(node_index, monotonic_start_time,
- realtime_start_time, monotonic_start_time,
- realtime_start_time);
+ log_namer_->SetStartTimes(node_index, event_loop_->boot_uuid(),
+ monotonic_start_time, realtime_start_time,
+ monotonic_start_time, realtime_start_time);
return true;
} else if (server_statistics_fetcher_.get() != nullptr) {
// We must be a remote node now. Look for the connection and see if it is
@@ -432,9 +430,20 @@
break;
}
+ const UUID boot_uuid =
+ UUID::FromString(connection->boot_uuid()->string_view());
+
+ if (log_namer_->monotonic_start_time(node_index, boot_uuid) !=
+ monotonic_clock::min_time) {
+ break;
+ }
+
+ VLOG(1) << "Updating start time for "
+ << aos::FlatbufferToJson(connection);
+
// Found it and it is connected. Compensate and go.
log_namer_->SetStartTimes(
- node_index,
+ node_index, boot_uuid,
monotonic_start_time +
std::chrono::nanoseconds(connection->monotonic_offset()),
realtime_start_time, monotonic_start_time, realtime_start_time);
diff --git a/aos/events/logging/logfile_sorting.cc b/aos/events/logging/logfile_sorting.cc
index e27cecb..8e536c8 100644
--- a/aos/events/logging/logfile_sorting.cc
+++ b/aos/events/logging/logfile_sorting.cc
@@ -785,6 +785,7 @@
new_file.realtime_start_time = logs.second.realtime_start_time;
new_file.name = logs.second.name;
new_file.corrupted = corrupted;
+ new_file.boots = boot_counts;
bool seen_part = false;
std::string config_sha256;
for (std::pair<const std::pair<std::string, std::string>, UnsortedLogParts>
diff --git a/aos/events/logging/logfile_sorting.h b/aos/events/logging/logfile_sorting.h
index 1d05dc7..857f351 100644
--- a/aos/events/logging/logfile_sorting.h
+++ b/aos/events/logging/logfile_sorting.h
@@ -102,6 +102,7 @@
// object for log files with the same config.
std::string config_sha256;
std::shared_ptr<const aos::Configuration> config;
+ std::shared_ptr<const Boots> boots;
};
std::ostream &operator<<(std::ostream &stream, const LogFile &file);
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 20e38d2..6d73d67 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -57,8 +57,9 @@
if (fd_ == -1 && errno == ENOSPC) {
ran_out_of_space_ = true;
} else {
- PCHECK(fd_ != -1) << ": Failed to open " << filename << " for writing";
- VLOG(1) << "Opened " << filename << " for writing";
+ PCHECK(fd_ != -1) << ": Failed to open " << this->filename()
+ << " for writing";
+ VLOG(1) << "Opened " << this->filename() << " for writing";
}
}
}
@@ -147,7 +148,7 @@
}
}
fd_ = -1;
- VLOG(1) << "Closed " << filename_;
+ VLOG(1) << "Closed " << filename();
}
void DetachedBufferWriter::Flush() {
@@ -651,7 +652,7 @@
<< ", .queue_index=" << m.queue_index
<< ", .monotonic_event_time=" << m.monotonic_event_time
<< ", .realtime_event_time=" << m.realtime_event_time;
- if (m.remote_queue_index != 0xffffffff) {
+ if (m.remote_queue_index != BootQueueIndex::Invalid()) {
os << ", .remote_queue_index=" << m.remote_queue_index;
}
if (m.monotonic_remote_time != BootTimestamp::min_time()) {
@@ -704,16 +705,22 @@
size_t monotonic_remote_boot = 0xffffff;
if (m.value().message().has_monotonic_remote_time()) {
+ const Node *node = parts().config->nodes()->Get(
+ source_node_index_[m->message().channel_index()]);
+
std::optional<size_t> boot = parts_message_reader_.boot_count(
source_node_index_[m->message().channel_index()]);
- CHECK(boot) << ": Failed to find boot for node "
+ CHECK(boot) << ": Failed to find boot for node " << MaybeNodeName(node)
+ << ", with index "
<< source_node_index_[m->message().channel_index()];
monotonic_remote_boot = *boot;
}
messages_.insert(Message{
.channel_index = m.value().message().channel_index(),
- .queue_index = m.value().message().queue_index(),
+ .queue_index =
+ BootQueueIndex{.boot = parts().boot_count,
+ .index = m.value().message().queue_index()},
.timestamp =
BootTimestamp{
.boot = parts().boot_count,
@@ -986,7 +993,7 @@
.monotonic_event_time = m->timestamp,
.realtime_event_time = aos::realtime_clock::time_point(
std::chrono::nanoseconds(m->data.message().realtime_sent_time())),
- .remote_queue_index = 0xffffffff,
+ .remote_queue_index = BootQueueIndex::Invalid(),
.monotonic_remote_time = BootTimestamp::min_time(),
.realtime_remote_time = realtime_clock::min_time,
.monotonic_timestamp_time = BootTimestamp::min_time(),
@@ -1070,7 +1077,9 @@
.monotonic_event_time = m->timestamp,
.realtime_event_time = aos::realtime_clock::time_point(
std::chrono::nanoseconds(m->data.message().realtime_sent_time())),
- .remote_queue_index = m->data.message().remote_queue_index(),
+ .remote_queue_index =
+ BootQueueIndex{.boot = m->monotonic_remote_boot,
+ .index = m->data.message().remote_queue_index()},
.monotonic_remote_time =
{m->monotonic_remote_boot,
monotonic_clock::time_point(std::chrono::nanoseconds(
@@ -1139,8 +1148,9 @@
Message TimestampMapper::MatchingMessageFor(const Message &message) {
// Figure out what queue index we are looking for.
CHECK(message.data.message().has_remote_queue_index());
- const uint32_t remote_queue_index =
- message.data.message().remote_queue_index();
+ const BootQueueIndex remote_queue_index =
+ BootQueueIndex{.boot = message.monotonic_remote_boot,
+ .index = message.data.message().remote_queue_index()};
CHECK(message.data.message().has_monotonic_remote_time());
CHECK(message.data.message().has_realtime_remote_time());
@@ -1199,11 +1209,17 @@
// The algorithm below is constant time with some assumptions. We need there
// to be no missing messages in the data stream. This also assumes a queue
// hasn't wrapped. That is conservative, but should let us get started.
- if (data_queue->back().queue_index - data_queue->front().queue_index + 1u ==
- data_queue->size()) {
+ if (data_queue->back().queue_index.boot ==
+ data_queue->front().queue_index.boot &&
+ (data_queue->back().queue_index.index -
+ data_queue->front().queue_index.index + 1u ==
+ data_queue->size())) {
+ CHECK_EQ(remote_queue_index.boot, data_queue->front().queue_index.boot);
// Pull the data out and confirm that the timestamps match as expected.
- Message result = std::move(
- (*data_queue)[remote_queue_index - data_queue->front().queue_index]);
+ //
+ // TODO(austin): Move if not reliable.
+ Message result = (*data_queue)[remote_queue_index.index -
+ data_queue->front().queue_index.index];
CHECK_EQ(result.timestamp, monotonic_remote_time)
<< ": Queue index matches, but timestamp doesn't. Please investigate!";
@@ -1213,15 +1229,20 @@
<< ": Queue index matches, but timestamp doesn't. Please investigate!";
// Now drop the data off the front. We have deduplicated timestamps, so we
// are done. And all the data is in order.
- data_queue->erase(data_queue->begin(),
- data_queue->begin() + (1 + remote_queue_index -
- data_queue->front().queue_index));
+ data_queue->erase(
+ data_queue->begin(),
+ data_queue->begin() +
+ (remote_queue_index.index - data_queue->front().queue_index.index));
return result;
} else {
- auto it = std::find_if(data_queue->begin(), data_queue->end(),
- [remote_queue_index](const Message &m) {
- return m.queue_index == remote_queue_index;
- });
+ // TODO(austin): Binary search.
+ auto it = std::find_if(
+ data_queue->begin(), data_queue->end(),
+ [remote_queue_index,
+ remote_boot = monotonic_remote_time.boot](const Message &m) {
+ return m.queue_index == remote_queue_index &&
+ m.timestamp.boot == remote_boot;
+ });
if (it == data_queue->end()) {
return Message{
.channel_index = message.channel_index,
@@ -1241,6 +1262,8 @@
realtime_remote_time)
<< ": Queue index matches, but timestamp doesn't. Please investigate!";
+ // TODO(austin): We still go in order, so we can erase from the beginning to
+ // our iterator minus 1. That'll keep 1 in the queue.
data_queue->erase(it);
return result;
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 56b582f..26fc74d 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -373,7 +373,10 @@
// The channel.
uint32_t channel_index = 0xffffffff;
// The local queue index.
- uint32_t queue_index = 0xffffffff;
+ // TODO(austin): Technically the boot inside queue_index is redundant with
+ // timestamp. In practice, it is less error-prone to duplicate it. Maybe a
+ // function to return the combined struct?
+ BootQueueIndex queue_index;
// The local timestamp.
BootTimestamp timestamp;
@@ -398,11 +401,11 @@
struct TimestampedMessage {
uint32_t channel_index = 0xffffffff;
- uint32_t queue_index = 0xffffffff;
+ BootQueueIndex queue_index;
BootTimestamp monotonic_event_time;
realtime_clock::time_point realtime_event_time = realtime_clock::min_time;
- uint32_t remote_queue_index = 0xffffffff;
+ BootQueueIndex remote_queue_index;
BootTimestamp monotonic_remote_time;
realtime_clock::time_point realtime_remote_time = realtime_clock::min_time;
@@ -598,8 +601,6 @@
size_t node() const { return boot_merger_.node(); }
// The start time of this log.
- // TODO(austin): This concept is probably wrong... We have start times per
- // boot, and an order of them.
monotonic_clock::time_point monotonic_start_time(size_t boot) const {
return boot_merger_.monotonic_start_time(boot);
}
@@ -695,6 +696,15 @@
// Queues m into matched_messages_.
void QueueMessage(Message *m);
+ // Returns the name of the node this class is sorting for.
+ std::string_view node_name() const {
+ return configuration_->has_nodes() ? configuration_->nodes()
+ ->Get(boot_merger_.node())
+ ->name()
+ ->string_view()
+ : "(single node)";
+ }
+
// The node merger to source messages from.
BootMerger boot_merger_;
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index 763bb7f..1c67312 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -225,14 +225,14 @@
const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
Message m1{.channel_index = 0,
- .queue_index = 0,
+ .queue_index = BootQueueIndex{.boot = 0, .index = 0u},
.timestamp =
BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1)},
.monotonic_remote_boot = 0xffffff,
.monotonic_timestamp_boot = 0xffffff,
.data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
Message m2{.channel_index = 0,
- .queue_index = 0,
+ .queue_index = BootQueueIndex{.boot = 0, .index = 0u},
.timestamp =
BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2)},
.monotonic_remote_boot = 0xffffff,
@@ -253,8 +253,8 @@
m1.channel_index = 0;
m2.channel_index = 0;
- m1.queue_index = 0;
- m2.queue_index = 1;
+ m1.queue_index.index = 0u;
+ m2.queue_index.index = 1u;
EXPECT_LT(m1, m2);
EXPECT_GE(m2, m1);
@@ -695,7 +695,8 @@
ASSERT_TRUE(parts_sorter.Front() != nullptr);
parts_sorter.PopFront();
- EXPECT_DEATH({ parts_sorter.Front(); }, "Max out of order of 100000000ns exceeded.");
+ EXPECT_DEATH({ parts_sorter.Front(); },
+ "Max out of order of 100000000ns exceeded.");
}
// Tests that we can merge data from 2 separate files, including duplicate data.
@@ -2093,7 +2094,6 @@
const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1b_;
};
-
// Tests that we can match timestamps on delivered messages in the presence of
// reboots on the node receiving timestamps.
TEST_F(RebootTimestampMapperTest, ReadNode0First) {
@@ -2114,6 +2114,10 @@
e + chrono::milliseconds(1000), 0, chrono::seconds(100),
e + chrono::milliseconds(1001)));
+ writer1b.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(1000), 0, chrono::seconds(21),
+ e + chrono::milliseconds(2001)));
+
writer0b.QueueSizedFlatbuffer(
MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
writer1b.QueueSizedFlatbuffer(MakeTimestampMessage(
@@ -2127,7 +2131,8 @@
e + chrono::milliseconds(3001)));
}
- const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
+ const std::vector<LogFile> parts =
+ SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
for (const auto &x : parts) {
LOG(INFO) << x;
@@ -2185,6 +2190,8 @@
EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
EXPECT_EQ(output0[0].monotonic_event_time.time,
e + chrono::milliseconds(1000));
+ EXPECT_EQ(output0[0].queue_index,
+ (BootQueueIndex{.boot = 0u, .index = 0u}));
EXPECT_EQ(output0[0].monotonic_remote_time, BootTimestamp::min_time());
EXPECT_EQ(output0[0].monotonic_timestamp_time, BootTimestamp::min_time());
EXPECT_TRUE(output0[0].data.Verify());
@@ -2192,6 +2199,8 @@
EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
EXPECT_EQ(output0[1].monotonic_event_time.time,
e + chrono::milliseconds(2000));
+ EXPECT_EQ(output0[1].queue_index,
+ (BootQueueIndex{.boot = 0u, .index = 1u}));
EXPECT_EQ(output0[1].monotonic_remote_time, BootTimestamp::min_time());
EXPECT_EQ(output0[1].monotonic_timestamp_time, BootTimestamp::min_time());
EXPECT_TRUE(output0[1].data.Verify());
@@ -2199,6 +2208,8 @@
EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
EXPECT_EQ(output0[2].monotonic_event_time.time,
e + chrono::milliseconds(3000));
+ EXPECT_EQ(output0[2].queue_index,
+ (BootQueueIndex{.boot = 0u, .index = 2u}));
EXPECT_EQ(output0[2].monotonic_remote_time, BootTimestamp::min_time());
EXPECT_EQ(output0[2].monotonic_timestamp_time, BootTimestamp::min_time());
EXPECT_TRUE(output0[2].data.Verify());
@@ -2232,13 +2243,18 @@
mapper1.PopFront();
EXPECT_TRUE(mapper1.started());
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_TRUE(mapper1.started());
+
EXPECT_EQ(mapper0_count, 3u);
- EXPECT_EQ(mapper1_count, 3u);
+ EXPECT_EQ(mapper1_count, 4u);
ASSERT_TRUE(mapper1.Front() == nullptr);
EXPECT_EQ(mapper0_count, 3u);
- EXPECT_EQ(mapper1_count, 3u);
+ EXPECT_EQ(mapper1_count, 4u);
EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
EXPECT_EQ(output1[0].monotonic_event_time.time,
@@ -2246,6 +2262,8 @@
EXPECT_EQ(output1[0].monotonic_remote_time.boot, 0u);
EXPECT_EQ(output1[0].monotonic_remote_time.time,
e + chrono::milliseconds(1000));
+ EXPECT_EQ(output1[0].remote_queue_index,
+ (BootQueueIndex{.boot = 0u, .index = 0u}));
EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
e + chrono::milliseconds(1001));
@@ -2254,9 +2272,11 @@
EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
EXPECT_EQ(output1[1].monotonic_event_time.time,
e + chrono::seconds(20) + chrono::milliseconds(2000));
+ EXPECT_EQ(output1[1].remote_queue_index,
+ (BootQueueIndex{.boot = 0u, .index = 0u}));
EXPECT_EQ(output1[1].monotonic_remote_time.boot, 0u);
EXPECT_EQ(output1[1].monotonic_remote_time.time,
- e + chrono::milliseconds(2000));
+ e + chrono::milliseconds(1000));
EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
e + chrono::milliseconds(2001));
@@ -2264,18 +2284,34 @@
EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
EXPECT_EQ(output1[2].monotonic_event_time.time,
- e + chrono::seconds(20) + chrono::milliseconds(3000));
+ e + chrono::seconds(20) + chrono::milliseconds(2000));
EXPECT_EQ(output1[2].monotonic_remote_time.boot, 0u);
EXPECT_EQ(output1[2].monotonic_remote_time.time,
- e + chrono::milliseconds(3000));
+ e + chrono::milliseconds(2000));
+ EXPECT_EQ(output1[2].remote_queue_index,
+ (BootQueueIndex{.boot = 0u, .index = 1u}));
EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
- e + chrono::milliseconds(3001));
+ e + chrono::milliseconds(2001));
EXPECT_TRUE(output1[2].data.Verify());
+ EXPECT_EQ(output1[3].monotonic_event_time.boot, 1u);
+ EXPECT_EQ(output1[3].monotonic_event_time.time,
+ e + chrono::seconds(20) + chrono::milliseconds(3000));
+ EXPECT_EQ(output1[3].monotonic_remote_time.boot, 0u);
+ EXPECT_EQ(output1[3].monotonic_remote_time.time,
+ e + chrono::milliseconds(3000));
+ EXPECT_EQ(output1[3].remote_queue_index,
+ (BootQueueIndex{.boot = 0u, .index = 2u}));
+ EXPECT_EQ(output1[3].monotonic_timestamp_time.boot, 0u);
+ EXPECT_EQ(output1[3].monotonic_timestamp_time.time,
+ e + chrono::milliseconds(3001));
+ EXPECT_TRUE(output1[3].data.Verify());
+
LOG(INFO) << output1[0];
LOG(INFO) << output1[1];
LOG(INFO) << output1[2];
+ LOG(INFO) << output1[3];
}
}
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 195d442..226c084 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -435,25 +435,19 @@
absl::StrCat("aos/events/logging/", GetParam().config)))),
time_converter_(configuration::NodesCount(&config_.message())),
event_loop_factory_(&config_.message()),
- pi1_(
- configuration::GetNode(event_loop_factory_.configuration(), "pi1")),
+ pi1_(event_loop_factory_.GetNodeEventLoopFactory("pi1")),
pi1_index_(configuration::GetNodeIndex(
- event_loop_factory_.configuration(), pi1_)),
- pi2_(
- configuration::GetNode(event_loop_factory_.configuration(), "pi2")),
+ event_loop_factory_.configuration(), pi1_->node())),
+ pi2_(event_loop_factory_.GetNodeEventLoopFactory("pi2")),
pi2_index_(configuration::GetNodeIndex(
- event_loop_factory_.configuration(), pi2_)),
+ event_loop_factory_.configuration(), pi2_->node())),
tmp_dir_(aos::testing::TestTmpDir()),
logfile_base1_(tmp_dir_ + "/multi_logfile1"),
logfile_base2_(tmp_dir_ + "/multi_logfile2"),
pi1_reboot_logfiles_(MakePi1RebootLogfiles()),
logfiles_(MakeLogFiles(logfile_base1_, logfile_base2_)),
pi1_single_direction_logfiles_(MakePi1SingleDirectionLogfiles()),
- structured_logfiles_(StructureLogFiles()),
- ping_event_loop_(event_loop_factory_.MakeEventLoop("ping", pi1_)),
- ping_(ping_event_loop_.get()),
- pong_event_loop_(event_loop_factory_.MakeEventLoop("pong", pi2_)),
- pong_(pong_event_loop_.get()) {
+ structured_logfiles_(StructureLogFiles()) {
LOG(INFO) << "Config " << GetParam().config;
event_loop_factory_.SetTimeConverter(&time_converter_);
@@ -474,6 +468,9 @@
LOG(INFO) << "Logging data to " << logfiles_[0] << ", " << logfiles_[1]
<< " and " << logfiles_[2];
+
+ pi1_->OnStartup([this]() { pi1_->AlwaysStart<Ping>("ping"); });
+ pi2_->OnStartup([this]() { pi2_->AlwaysStart<Pong>("pong"); });
}
bool shared() const { return GetParam().shared; }
@@ -559,13 +556,14 @@
result.emplace_back(logfile_base1_ + "_pi1_data.part0.bfbs");
result.emplace_back(logfile_base1_ + "_pi1_data.part1.bfbs");
result.emplace_back(logfile_base1_ + "_pi1_data.part2.bfbs");
- result.emplace_back(logfile_base1_ + "_pi1_data.part3.bfbs");
result.emplace_back(logfile_base1_ +
"_pi2_data/test/aos.examples.Pong.part0.bfbs");
result.emplace_back(logfile_base1_ +
"_pi2_data/test/aos.examples.Pong.part1.bfbs");
result.emplace_back(logfile_base1_ +
"_pi2_data/test/aos.examples.Pong.part2.bfbs");
+ result.emplace_back(logfile_base1_ +
+ "_pi2_data/test/aos.examples.Pong.part3.bfbs");
result.emplace_back(
logfile_base1_ +
"_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part0.bfbs");
@@ -576,6 +574,9 @@
logfile_base1_ +
"_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part2.bfbs");
result.emplace_back(
+ logfile_base1_ +
+ "_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part3.bfbs");
+ result.emplace_back(
absl::StrCat(logfile_base1_, "_", GetParam().sha256, ".bfbs"));
if (shared()) {
result.emplace_back(logfile_base1_ +
@@ -587,6 +588,9 @@
result.emplace_back(logfile_base1_ +
"_timestamps/pi1/aos/remote_timestamps/pi2/"
"aos.message_bridge.RemoteMessage.part2.bfbs");
+ result.emplace_back(logfile_base1_ +
+ "_timestamps/pi1/aos/remote_timestamps/pi2/"
+ "aos.message_bridge.RemoteMessage.part3.bfbs");
} else {
result.emplace_back(logfile_base1_ +
"_timestamps/pi1/aos/remote_timestamps/pi2/pi1/aos/"
@@ -600,6 +604,10 @@
"_timestamps/pi1/aos/remote_timestamps/pi2/pi1/aos/"
"aos-message_bridge-Timestamp/"
"aos.message_bridge.RemoteMessage.part2.bfbs");
+ result.emplace_back(logfile_base1_ +
+ "_timestamps/pi1/aos/remote_timestamps/pi2/pi1/aos/"
+ "aos-message_bridge-Timestamp/"
+ "aos.message_bridge.RemoteMessage.part3.bfbs");
result.emplace_back(logfile_base1_ +
"_timestamps/pi1/aos/remote_timestamps/pi2/test/"
@@ -613,6 +621,10 @@
"_timestamps/pi1/aos/remote_timestamps/pi2/test/"
"aos-examples-Ping/"
"aos.message_bridge.RemoteMessage.part2.bfbs");
+ result.emplace_back(logfile_base1_ +
+ "_timestamps/pi1/aos/remote_timestamps/pi2/test/"
+ "aos-examples-Ping/"
+ "aos.message_bridge.RemoteMessage.part3.bfbs");
}
return result;
}
@@ -663,7 +675,7 @@
MultiNodeLogNamer *log_namer;
};
- LoggerState MakeLogger(const Node *node,
+ LoggerState MakeLogger(NodeEventLoopFactory *node,
SimulatedEventLoopFactory *factory = nullptr,
const Configuration *configuration = nullptr) {
if (factory == nullptr) {
@@ -672,13 +684,11 @@
if (configuration == nullptr) {
configuration = factory->configuration();
}
- return {
- factory->MakeEventLoop(
- "logger", configuration::GetNode(factory->configuration(), node)),
- {},
- configuration,
- node,
- nullptr};
+ return {node->MakeEventLoop("logger"),
+ {},
+ configuration,
+ configuration::GetNode(configuration, node->node()),
+ nullptr};
}
void StartLogger(LoggerState *logger, std::string logfile_base = "",
@@ -820,9 +830,9 @@
message_bridge::TestingTimeConverter time_converter_;
SimulatedEventLoopFactory event_loop_factory_;
- const Node *const pi1_;
+ NodeEventLoopFactory *const pi1_;
const size_t pi1_index_;
- const Node *const pi2_;
+ NodeEventLoopFactory *const pi2_;
const size_t pi2_index_;
std::string tmp_dir_;
@@ -833,11 +843,6 @@
std::vector<std::string> pi1_single_direction_logfiles_;
std::vector<std::vector<std::string>> structured_logfiles_;
-
- std::unique_ptr<EventLoop> ping_event_loop_;
- Ping ping_;
- std::unique_ptr<EventLoop> pong_event_loop_;
- Pong pong_;
};
// Counts the number of messages on a channel. Returns (channel name, channel
@@ -1490,17 +1495,13 @@
{
LoggerState pi2_logger = MakeLogger(pi2_);
- NodeEventLoopFactory *pi1 =
- event_loop_factory_.GetNodeEventLoopFactory(pi1_);
- NodeEventLoopFactory *pi2 =
- event_loop_factory_.GetNodeEventLoopFactory(pi2_);
- LOG(INFO) << "pi2 times: " << pi2->monotonic_now() << " "
- << pi2->realtime_now() << " distributed "
- << pi2->ToDistributedClock(pi2->monotonic_now());
+ LOG(INFO) << "pi2 times: " << pi2_->monotonic_now() << " "
+ << pi2_->realtime_now() << " distributed "
+ << pi2_->ToDistributedClock(pi2_->monotonic_now());
- LOG(INFO) << "pi2 times: " << pi2->monotonic_now() << " "
- << pi2->realtime_now() << " distributed "
- << pi2->ToDistributedClock(pi2->monotonic_now());
+ LOG(INFO) << "pi2_ times: " << pi2_->monotonic_now() << " "
+ << pi2_->realtime_now() << " distributed "
+ << pi2_->ToDistributedClock(pi2_->monotonic_now());
event_loop_factory_.RunFor(startup_sleep1);
@@ -1519,7 +1520,7 @@
// than the network delay. This confirms that if we sort incorrectly, it
// would show in the results.
EXPECT_LT(
- (pi2->monotonic_now() - pi1->monotonic_now()) - initial_pi2_offset,
+ (pi2_->monotonic_now() - pi1_->monotonic_now()) - initial_pi2_offset,
-event_loop_factory_.send_delay() -
event_loop_factory_.network_delay());
@@ -1528,7 +1529,7 @@
// And now check that we went far enough the other way to make sure we
// cover both problems.
EXPECT_GT(
- (pi2->monotonic_now() - pi1->monotonic_now()) - initial_pi2_offset,
+ (pi2_->monotonic_now() - pi1_->monotonic_now()) - initial_pi2_offset,
event_loop_factory_.send_delay() +
event_loop_factory_.network_delay());
}
@@ -2140,11 +2141,9 @@
// And confirm we can re-create a log again, while checking the contents.
{
LoggerState pi1_logger = MakeLogger(
- configuration::GetNode(log_reader_factory.configuration(), pi1_),
- &log_reader_factory);
+ log_reader_factory.GetNodeEventLoopFactory("pi1"), &log_reader_factory);
LoggerState pi2_logger = MakeLogger(
- configuration::GetNode(log_reader_factory.configuration(), pi2_),
- &log_reader_factory);
+ log_reader_factory.GetNodeEventLoopFactory("pi2"), &log_reader_factory);
StartLogger(&pi1_logger, tmp_dir_ + "/relogged1");
StartLogger(&pi2_logger, tmp_dir_ + "/relogged2");
@@ -2234,8 +2233,7 @@
// Test that renaming the file base dies.
TEST_P(MultinodeLoggerDeathTest, LoggerRenameFile) {
time_converter_.AddMonotonic(
- {BootTimestamp::epoch(),
- BootTimestamp::epoch() + chrono::seconds(1000)});
+ {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
util::UnlinkRecursive(tmp_dir_ + "/renamefile");
logfile_base1_ = tmp_dir_ + "/renamefile/multi_logfile1";
logfile_base2_ = tmp_dir_ + "/renamefile/multi_logfile2";
@@ -2254,29 +2252,50 @@
// Tests that we properly recreate forwarded timestamps when replaying a log.
// This should be enough that we can then re-run the logger and get a valid log
// back.
-TEST_P(MultinodeLoggerDeathTest, RemoteReboot) {
- time_converter_.StartEqual();
- std::string pi2_boot1;
- std::string pi2_boot2;
+TEST_P(MultinodeLoggerTest, RemoteReboot) {
+ const UUID pi1_boot0 = UUID::Random();
+ const UUID pi2_boot0 = UUID::Random();
+ const UUID pi2_boot1 = UUID::Random();
{
- pi2_boot1 = event_loop_factory_.GetNodeEventLoopFactory(pi2_)
- ->boot_uuid()
- .ToString();
+ CHECK_EQ(pi1_index_, 0u);
+ CHECK_EQ(pi2_index_, 1u);
+
+ time_converter_.set_boot_uuid(pi1_index_, 0, pi1_boot0);
+ time_converter_.set_boot_uuid(pi2_index_, 0, pi2_boot0);
+ time_converter_.set_boot_uuid(pi2_index_, 1, pi2_boot1);
+
+ time_converter_.AddNextTimestamp(
+ distributed_clock::epoch(),
+ {BootTimestamp::epoch(), BootTimestamp::epoch()});
+ const chrono::nanoseconds reboot_time = chrono::milliseconds(10100);
+ time_converter_.AddNextTimestamp(
+ distributed_clock::epoch() + reboot_time,
+ {BootTimestamp::epoch() + reboot_time,
+ BootTimestamp{
+ .boot = 1,
+ .time = monotonic_clock::epoch() + chrono::milliseconds(1323)}});
+ }
+
+ {
LoggerState pi1_logger = MakeLogger(pi1_);
event_loop_factory_.RunFor(chrono::milliseconds(95));
+ EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
+ pi1_boot0);
+ EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
+ pi2_boot0);
StartLogger(&pi1_logger);
event_loop_factory_.RunFor(chrono::milliseconds(10000));
- event_loop_factory_.GetNodeEventLoopFactory(pi2_)->Reboot();
-
- pi2_boot2 = event_loop_factory_.GetNodeEventLoopFactory(pi2_)
- ->boot_uuid()
- .ToString();
+ VLOG(1) << "Reboot now!";
event_loop_factory_.RunFor(chrono::milliseconds(20000));
+ EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
+ pi1_boot0);
+ EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
+ pi2_boot1);
}
// Confirm that our new oldest timestamps properly update as we reboot and
@@ -2289,7 +2308,36 @@
continue;
}
+ const monotonic_clock::time_point monotonic_start_time =
+ monotonic_clock::time_point(
+ chrono::nanoseconds(log_header->message().monotonic_start_time()));
+ const UUID source_node_boot_uuid = UUID::FromString(
+ log_header->message().source_node_boot_uuid()->string_view());
+
if (log_header->message().node()->name()->string_view() != "pi1") {
+ switch (log_header->message().parts_index()) {
+ case 0:
+ EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
+ EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
+ break;
+ case 1:
+ EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
+ ASSERT_EQ(monotonic_start_time,
+ monotonic_clock::epoch() + chrono::seconds(1));
+ break;
+ case 2:
+ EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
+ EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
+ break;
+ case 3:
+ EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
+ ASSERT_EQ(monotonic_start_time,
+ monotonic_clock::epoch() + chrono::nanoseconds(2322999462));
+ break;
+ default:
+ FAIL();
+ break;
+ }
continue;
}
SCOPED_TRACE(file);
@@ -2368,21 +2416,13 @@
break;
case 2:
EXPECT_EQ(oldest_remote_monotonic_timestamps,
- monotonic_clock::time_point(chrono::microseconds(10100000)));
+ monotonic_clock::time_point(chrono::milliseconds(1323) +
+ chrono::microseconds(200)));
EXPECT_EQ(oldest_local_monotonic_timestamps,
- monotonic_clock::time_point(chrono::microseconds(10100150)));
+ monotonic_clock::time_point(chrono::microseconds(10100350)));
EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
- monotonic_clock::max_time);
- EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
- monotonic_clock::max_time);
- break;
- case 3:
- EXPECT_EQ(oldest_remote_monotonic_timestamps,
- monotonic_clock::time_point(chrono::microseconds(10100000)));
- EXPECT_EQ(oldest_local_monotonic_timestamps,
- monotonic_clock::time_point(chrono::microseconds(10100150)));
- EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
- monotonic_clock::time_point(chrono::microseconds(10100200)));
+ monotonic_clock::time_point(chrono::milliseconds(1323) +
+ chrono::microseconds(200)));
EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
monotonic_clock::time_point(chrono::microseconds(10100350)));
break;
@@ -2393,25 +2433,26 @@
}
// Confirm that we refuse to replay logs with missing boot uuids.
- EXPECT_DEATH(
- {
- LogReader reader(SortParts(pi1_reboot_logfiles_));
+ {
+ LogReader reader(SortParts(pi1_reboot_logfiles_));
- SimulatedEventLoopFactory log_reader_factory(reader.configuration());
- log_reader_factory.set_send_delay(chrono::microseconds(0));
+ SimulatedEventLoopFactory log_reader_factory(reader.configuration());
+ log_reader_factory.set_send_delay(chrono::microseconds(0));
- // This sends out the fetched messages and advances time to the start of
- // the log file.
- reader.Register(&log_reader_factory);
- },
- absl::StrFormat("(%s|%s).*(%s|%s).*Found parts from different boots",
- pi2_boot1, pi2_boot2, pi2_boot2, pi2_boot1));
+ // This sends out the fetched messages and advances time to the start of
+ // the log file.
+ reader.Register(&log_reader_factory);
+
+ log_reader_factory.Run();
+
+ reader.Deregister();
+ }
}
// Tests that we properly handle one direction of message_bridge being
// unavailable.
TEST_P(MultinodeLoggerTest, OneDirectionWithNegativeSlope) {
- event_loop_factory_.GetNodeEventLoopFactory(pi1_)->Disconnect(pi2_);
+ pi1_->Disconnect(pi2_->node());
time_converter_.AddMonotonic(
{BootTimestamp::epoch(),
BootTimestamp::epoch() + chrono::seconds(1000)});
@@ -2437,7 +2478,7 @@
// Tests that we properly handle one direction of message_bridge being
// unavailable.
TEST_P(MultinodeLoggerTest, OneDirectionWithPositiveSlope) {
- event_loop_factory_.GetNodeEventLoopFactory(pi1_)->Disconnect(pi2_);
+ pi1_->Disconnect(pi2_->node());
time_converter_.AddMonotonic(
{BootTimestamp::epoch(),
BootTimestamp::epoch() + chrono::seconds(500)});
@@ -2463,8 +2504,8 @@
// Tests that we properly handle a dead node. Do this by just disconnecting it
// and only using one nodes of logs.
TEST_P(MultinodeLoggerTest, DeadNode) {
- event_loop_factory_.GetNodeEventLoopFactory(pi1_)->Disconnect(pi2_);
- event_loop_factory_.GetNodeEventLoopFactory(pi2_)->Disconnect(pi1_);
+ pi1_->Disconnect(pi2_->node());
+ pi2_->Disconnect(pi1_->node());
time_converter_.AddMonotonic(
{BootTimestamp::epoch(),
BootTimestamp::epoch() + chrono::seconds(1000)});
@@ -2552,10 +2593,10 @@
std::vector<std::string> log_files;
{
LoggerState pi1_logger =
- MakeLogger(configuration::GetNode(reader.logged_configuration(), pi1_),
+ MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi1"),
&log_reader_factory, reader.logged_configuration());
LoggerState pi2_logger =
- MakeLogger(configuration::GetNode(reader.logged_configuration(), pi2_),
+ MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi2"),
&log_reader_factory, reader.logged_configuration());
StartLogger(&pi1_logger, tmp_dir_ + "/relogged1");
diff --git a/aos/events/logging/timestamp_extractor.cc b/aos/events/logging/timestamp_extractor.cc
index 6a2c95a..0cfc19b 100644
--- a/aos/events/logging/timestamp_extractor.cc
+++ b/aos/events/logging/timestamp_extractor.cc
@@ -63,7 +63,20 @@
// Now, build up the estimator used to solve for time.
message_bridge::MultiNodeNoncausalOffsetEstimator multinode_estimator(
- config, config, FLAGS_skip_order_validation, chrono::seconds(0));
+ config, config, log_files[0].boots, FLAGS_skip_order_validation,
+ chrono::seconds(0));
+ multinode_estimator.set_reboot_found(
+ [config](distributed_clock::time_point reboot_time,
+ const std::vector<logger::BootTimestamp> &node_times) {
+ LOG(INFO) << "Rebooted at distributed " << reboot_time;
+ size_t node_index = 0;
+ for (const logger::BootTimestamp &time : node_times) {
+ LOG(INFO) << " "
+ << config->nodes()->Get(node_index)->name()->string_view()
+ << " " << time;
+ ++node_index;
+ }
+ });
{
std::vector<TimestampMapper *> timestamp_mappers;
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index 4dc37fa..15375e8 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -21,6 +21,14 @@
namespace {
+std::string NodeName(const Node *node) {
+ if (node == nullptr) {
+ return "";
+ }
+
+ return absl::StrCat(node->name()->string_view(), " ");
+}
+
class ScopedMarkRealtimeRestorer {
public:
ScopedMarkRealtimeRestorer(bool rt) : rt_(rt), prior_(MarkRealtime(rt)) {}
@@ -291,15 +299,14 @@
size_t size() override { return simulated_channel_->max_size(); }
- bool DoSend(size_t length,
- aos::monotonic_clock::time_point monotonic_remote_time,
- aos::realtime_clock::time_point realtime_remote_time,
+ bool DoSend(size_t length, monotonic_clock::time_point monotonic_remote_time,
+ realtime_clock::time_point realtime_remote_time,
uint32_t remote_queue_index,
const UUID &source_boot_uuid) override;
bool DoSend(const void *msg, size_t size,
- aos::monotonic_clock::time_point monotonic_remote_time,
- aos::realtime_clock::time_point realtime_remote_time,
+ monotonic_clock::time_point monotonic_remote_time,
+ realtime_clock::time_point realtime_remote_time,
uint32_t remote_queue_index,
const UUID &source_boot_uuid) override;
@@ -311,7 +318,7 @@
private:
SimulatedChannel *simulated_channel_;
- SimulatedEventLoop *event_loop_;
+ SimulatedEventLoop *simulated_event_loop_;
std::shared_ptr<SimulatedMessage> message_;
};
@@ -378,10 +385,10 @@
if (context_.remote_queue_index == 0xffffffffu) {
context_.remote_queue_index = context_.queue_index;
}
- if (context_.monotonic_remote_time == aos::monotonic_clock::min_time) {
+ if (context_.monotonic_remote_time == monotonic_clock::min_time) {
context_.monotonic_remote_time = context_.monotonic_event_time;
}
- if (context_.realtime_remote_time == aos::realtime_clock::min_time) {
+ if (context_.realtime_remote_time == realtime_clock::min_time) {
context_.realtime_remote_time = context_.realtime_event_time;
}
}
@@ -461,25 +468,27 @@
absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>>
*channels,
const Configuration *configuration,
- std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
- *raw_event_loops,
- const Node *node, pid_t tid)
+ std::vector<SimulatedEventLoop *> *event_loops_, const Node *node,
+ pid_t tid)
: EventLoop(CHECK_NOTNULL(configuration)),
scheduler_(scheduler),
node_event_loop_factory_(node_event_loop_factory),
channels_(channels),
- raw_event_loops_(raw_event_loops),
+ event_loops_(event_loops_),
node_(node),
- tid_(tid) {
- raw_event_loops_->push_back(std::make_pair(this, [this](bool value) {
- if (!has_setup_) {
- Setup();
- has_setup_ = true;
+ tid_(tid),
+ startup_tracker_(std::make_shared<StartupTracker>()) {
+ startup_tracker_->loop = this;
+ scheduler_->ScheduleOnStartup([startup_tracker = startup_tracker_]() {
+ if (startup_tracker->loop) {
+ startup_tracker->loop->Setup();
+ startup_tracker->has_setup = true;
}
- set_is_running(value);
- has_run_ = true;
- }));
+ });
+
+ event_loops_->push_back(this);
}
+
~SimulatedEventLoop() override {
// Trigger any remaining senders or fetchers to be cleared before destroying
// the event loop so the book keeping matches.
@@ -490,13 +499,27 @@
phased_loops_.clear();
watchers_.clear();
- for (auto it = raw_event_loops_->begin(); it != raw_event_loops_->end();
- ++it) {
- if (it->first == this) {
- raw_event_loops_->erase(it);
+ for (auto it = event_loops_->begin(); it != event_loops_->end(); ++it) {
+ if (*it == this) {
+ event_loops_->erase(it);
break;
}
}
+ VLOG(1) << scheduler_->distributed_now() << " " << NodeName(node())
+ << monotonic_now() << " ~SimulatedEventLoop(\"" << name_ << "\")";
+ startup_tracker_->loop = nullptr;
+ }
+
+ void SetIsRunning(bool running) {
+ VLOG(1) << scheduler_->distributed_now() << " " << NodeName(node())
+ << monotonic_now() << " " << name_ << " set_is_running(" << running
+ << ")";
+ CHECK(startup_tracker_->has_setup);
+
+ set_is_running(running);
+ if (running) {
+ has_run_ = true;
+ }
}
bool has_run() const { return has_run_; }
@@ -506,17 +529,21 @@
send_delay_ = send_delay;
}
- ::aos::monotonic_clock::time_point monotonic_now() override {
+ monotonic_clock::time_point monotonic_now() override {
return node_event_loop_factory_->monotonic_now();
}
- ::aos::realtime_clock::time_point realtime_now() override {
+ realtime_clock::time_point realtime_now() override {
return node_event_loop_factory_->realtime_now();
}
- ::std::unique_ptr<RawSender> MakeRawSender(const Channel *channel) override;
+ distributed_clock::time_point distributed_now() {
+ return scheduler_->distributed_now();
+ }
- ::std::unique_ptr<RawFetcher> MakeRawFetcher(const Channel *channel) override;
+ std::unique_ptr<RawSender> MakeRawSender(const Channel *channel) override;
+
+ std::unique_ptr<RawFetcher> MakeRawFetcher(const Channel *channel) override;
void MakeRawWatcher(
const Channel *channel,
@@ -586,6 +613,16 @@
friend class SimulatedPhasedLoopHandler;
friend class SimulatedWatcher;
+ // We have a condition where we register a startup handler, but then get shut
+ // down before it runs. This results in a segfault if we are lucky, and
+ // corruption otherwise. To handle that, allocate a small object which points
+ // back to us and can be freed when the function is freed. That object can
+ // then be updated when we get destroyed so setup is not called.
+ struct StartupTracker {
+ SimulatedEventLoop *loop = nullptr;
+ bool has_setup = false;
+ };
+
void HandleEvent() {
while (true) {
if (EventCount() == 0 || PeekEvent()->event_time() > monotonic_now()) {
@@ -602,15 +639,12 @@
EventScheduler *scheduler_;
NodeEventLoopFactory *node_event_loop_factory_;
absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>> *channels_;
- std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
- *raw_event_loops_;
+ std::vector<SimulatedEventLoop *> *event_loops_;
::std::string name_;
int priority_ = 0;
- bool has_setup_ = false;
-
std::chrono::nanoseconds send_delay_;
const Node *const node_;
@@ -620,15 +654,19 @@
std::shared_ptr<logging::LogImplementation> log_impl_ = nullptr;
bool has_run_ = false;
+
+ std::shared_ptr<StartupTracker> startup_tracker_;
};
void SimulatedEventLoopFactory::set_send_delay(
std::chrono::nanoseconds send_delay) {
send_delay_ = send_delay;
- for (std::pair<EventLoop *, std::function<void(bool)>> &loop :
- raw_event_loops_) {
- reinterpret_cast<SimulatedEventLoop *>(loop.first)
- ->set_send_delay(send_delay_);
+ for (std::unique_ptr<NodeEventLoopFactory> &node : node_factories_) {
+ if (node) {
+ for (SimulatedEventLoop *loop : node->event_loops_) {
+ loop->set_send_delay(send_delay_);
+ }
+ }
}
}
@@ -637,11 +675,16 @@
std::function<void(const Context &channel, const void *message)> watcher) {
TakeWatcher(channel);
- std::unique_ptr<SimulatedWatcher> shm_watcher(
- new SimulatedWatcher(this, scheduler_, channel, std::move(watcher)));
+ std::unique_ptr<SimulatedWatcher> shm_watcher =
+ std::make_unique<SimulatedWatcher>(this, scheduler_, channel,
+ std::move(watcher));
GetSimulatedChannel(channel)->MakeRawWatcher(shm_watcher.get());
+
NewWatcher(std::move(shm_watcher));
+ VLOG(1) << distributed_now() << " " << NodeName(node()) << monotonic_now()
+ << " " << name() << " MakeRawWatcher(\""
+ << configuration::StrippedChannelToString(channel) << "\")";
// Order of operations gets kinda wonky if we let people make watchers after
// running once. If someone has a valid use case, we can reconsider.
@@ -652,6 +695,9 @@
const Channel *channel) {
TakeSender(channel);
+ VLOG(1) << distributed_now() << " " << NodeName(node()) << monotonic_now()
+ << " " << name() << " MakeRawSender(\""
+ << configuration::StrippedChannelToString(channel) << "\")";
return GetSimulatedChannel(channel)->MakeRawSender(this);
}
@@ -666,6 +712,9 @@
"configuration.";
}
+ VLOG(1) << distributed_now() << " " << NodeName(node()) << monotonic_now()
+ << " " << name() << " MakeRawFetcher(\""
+ << configuration::StrippedChannelToString(channel) << "\")";
return GetSimulatedChannel(channel)->MakeRawFetcher(this);
}
@@ -698,9 +747,20 @@
channel_(channel),
scheduler_(scheduler),
event_(this),
- token_(scheduler_->InvalidToken()) {}
+ token_(scheduler_->InvalidToken()) {
+ VLOG(1) << simulated_event_loop_->distributed_now() << " "
+ << NodeName(simulated_event_loop_->node())
+ << simulated_event_loop_->monotonic_now() << " "
+ << simulated_event_loop_->name() << " Watching "
+ << configuration::StrippedChannelToString(channel_);
+}
SimulatedWatcher::~SimulatedWatcher() {
+ VLOG(1) << simulated_event_loop_->distributed_now() << " "
+ << NodeName(simulated_event_loop_->node())
+ << simulated_event_loop_->monotonic_now() << " "
+ << simulated_event_loop_->name() << " ~Watching "
+ << configuration::StrippedChannelToString(channel_);
simulated_event_loop_->RemoveEvent(&event_);
if (token_ != scheduler_->InvalidToken()) {
scheduler_->Deschedule(token_);
@@ -729,11 +789,15 @@
}
void SimulatedWatcher::HandleEvent() {
- VLOG(1) << "Watcher " << configuration::CleanedChannelToString(channel_);
- CHECK_NE(msgs_.size(), 0u) << ": No events to handle.";
-
const monotonic_clock::time_point monotonic_now =
simulated_event_loop_->monotonic_now();
+ VLOG(1) << simulated_event_loop_->distributed_now() << " "
+ << NodeName(simulated_event_loop_->node())
+ << simulated_event_loop_->monotonic_now() << " "
+ << simulated_event_loop_->name() << " Watcher "
+ << configuration::StrippedChannelToString(channel_);
+ CHECK_NE(msgs_.size(), 0u) << ": No events to handle.";
+
logging::ScopedLogRestorer prev_logger;
if (simulated_event_loop_->log_impl_) {
prev_logger.Swap(simulated_event_loop_->log_impl_);
@@ -746,10 +810,10 @@
if (context.remote_queue_index == 0xffffffffu) {
context.remote_queue_index = context.queue_index;
}
- if (context.monotonic_remote_time == aos::monotonic_clock::min_time) {
+ if (context.monotonic_remote_time == monotonic_clock::min_time) {
context.monotonic_remote_time = context.monotonic_event_time;
}
- if (context.realtime_remote_time == aos::realtime_clock::min_time) {
+ if (context.realtime_remote_time == realtime_clock::min_time) {
context.realtime_remote_time = context.realtime_event_time;
}
@@ -841,7 +905,7 @@
SimulatedEventLoop *event_loop)
: RawSender(event_loop, simulated_channel->channel()),
simulated_channel_(simulated_channel),
- event_loop_(event_loop) {
+ simulated_event_loop_(event_loop) {
simulated_channel_->CountSenderCreated();
}
@@ -854,14 +918,21 @@
realtime_clock::time_point realtime_remote_time,
uint32_t remote_queue_index,
const UUID &source_boot_uuid) {
+ VLOG(1) << simulated_event_loop_->distributed_now() << " "
+ << NodeName(simulated_event_loop_->node())
+ << simulated_event_loop_->monotonic_now() << " "
+ << simulated_event_loop_->name() << " Send "
+ << configuration::StrippedChannelToString(channel());
+
// The allocations in here are due to infrastructure and don't count in the
// no mallocs in RT code.
ScopedNotRealtime nrt;
CHECK_LE(length, size()) << ": Attempting to send too big a message.";
- message_->context.monotonic_event_time = event_loop_->monotonic_now();
+ message_->context.monotonic_event_time =
+ simulated_event_loop_->monotonic_now();
message_->context.monotonic_remote_time = monotonic_remote_time;
message_->context.remote_queue_index = remote_queue_index;
- message_->context.realtime_event_time = event_loop_->realtime_now();
+ message_->context.realtime_event_time = simulated_event_loop_->realtime_now();
message_->context.realtime_remote_time = realtime_remote_time;
message_->context.source_boot_uuid = source_boot_uuid;
CHECK_LE(length, message_->context.size);
@@ -869,8 +940,8 @@
// TODO(austin): Track sending too fast.
sent_queue_index_ = simulated_channel_->Send(message_);
- monotonic_sent_time_ = event_loop_->monotonic_now();
- realtime_sent_time_ = event_loop_->realtime_now();
+ monotonic_sent_time_ = simulated_event_loop_->monotonic_now();
+ realtime_sent_time_ = simulated_event_loop_->realtime_now();
// Drop the reference to the message so that we allocate a new message for
// next time. Otherwise we will continue to reuse the same memory for all
@@ -918,7 +989,7 @@
// mallocs in RT code.
ScopedNotRealtime nrt;
Disable();
- const ::aos::monotonic_clock::time_point monotonic_now =
+ const monotonic_clock::time_point monotonic_now =
simulated_event_loop_->monotonic_now();
base_ = base;
repeat_offset_ = repeat_offset;
@@ -932,9 +1003,11 @@
}
void SimulatedTimerHandler::HandleEvent() {
- VLOG(1) << "Timer " << name();
- const ::aos::monotonic_clock::time_point monotonic_now =
+ const monotonic_clock::time_point monotonic_now =
simulated_event_loop_->monotonic_now();
+ VLOG(1) << simulated_event_loop_->distributed_now() << " "
+ << NodeName(simulated_event_loop_->node()) << monotonic_now << " "
+ << simulated_event_loop_->name() << " Timer '" << name() << "'";
logging::ScopedLogRestorer prev_logger;
if (simulated_event_loop_->log_impl_) {
prev_logger.Swap(simulated_event_loop_->log_impl_);
@@ -943,7 +1016,7 @@
scheduler_->Deschedule(token_);
token_ = scheduler_->InvalidToken();
}
- if (repeat_offset_ != ::aos::monotonic_clock::zero()) {
+ if (repeat_offset_ != monotonic_clock::zero()) {
// Reschedule.
while (base_ <= monotonic_now) base_ += repeat_offset_;
token_ = scheduler_->Schedule(base_, [this]() {
@@ -988,9 +1061,10 @@
}
void SimulatedPhasedLoopHandler::HandleEvent() {
- VLOG(1) << "Phased loop " << name();
monotonic_clock::time_point monotonic_now =
simulated_event_loop_->monotonic_now();
+ VLOG(1) << monotonic_now << " Phased loop " << simulated_event_loop_->name()
+ << ", " << name();
logging::ScopedLogRestorer prev_logger;
if (simulated_event_loop_->log_impl_) {
prev_logger.Swap(simulated_event_loop_->log_impl_);
@@ -1023,23 +1097,14 @@
simulated_event_loop_->AddEvent(&event_);
}
-NodeEventLoopFactory::NodeEventLoopFactory(
- EventSchedulerScheduler *scheduler_scheduler,
- SimulatedEventLoopFactory *factory, const Node *node,
- std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
- *raw_event_loops)
- : factory_(factory), node_(node), raw_event_loops_(raw_event_loops) {
- scheduler_scheduler->AddEventScheduler(&scheduler_);
-}
-
SimulatedEventLoopFactory::SimulatedEventLoopFactory(
const Configuration *configuration)
: configuration_(CHECK_NOTNULL(configuration)),
nodes_(configuration::GetNodes(configuration_)) {
CHECK(IsInitialized()) << ": Need to initialize AOS first.";
for (const Node *node : nodes_) {
- node_factories_.emplace_back(new NodeEventLoopFactory(
- &scheduler_scheduler_, this, node, &raw_event_loops_));
+ node_factories_.emplace_back(
+ new NodeEventLoopFactory(&scheduler_scheduler_, this, node));
}
if (configuration::MultiNode(configuration)) {
@@ -1050,6 +1115,11 @@
SimulatedEventLoopFactory::~SimulatedEventLoopFactory() {}
NodeEventLoopFactory *SimulatedEventLoopFactory::GetNodeEventLoopFactory(
+ std::string_view node) {
+ return GetNodeEventLoopFactory(configuration::GetNode(configuration(), node));
+}
+
+NodeEventLoopFactory *SimulatedEventLoopFactory::GetNodeEventLoopFactory(
const Node *node) {
auto result = std::find_if(
node_factories_.begin(), node_factories_.end(),
@@ -1068,6 +1138,7 @@
for (std::unique_ptr<NodeEventLoopFactory> &factory : node_factories_) {
factory->SetTimeConverter(time_converter);
}
+ scheduler_scheduler_.SetTimeConverter(time_converter);
}
::std::unique_ptr<EventLoop> SimulatedEventLoopFactory::MakeEventLoop(
@@ -1082,49 +1153,136 @@
return GetNodeEventLoopFactory(node)->MakeEventLoop(name);
}
-::std::unique_ptr<EventLoop> NodeEventLoopFactory::MakeEventLoop(
- std::string_view name) {
+NodeEventLoopFactory::NodeEventLoopFactory(
+ EventSchedulerScheduler *scheduler_scheduler,
+ SimulatedEventLoopFactory *factory, const Node *node)
+ : scheduler_(configuration::GetNodeIndex(factory->configuration(), node)),
+ factory_(factory),
+ node_(node) {
+ scheduler_scheduler->AddEventScheduler(&scheduler_);
+ scheduler_.set_started([this]() {
+ started_ = true;
+ for (SimulatedEventLoop *event_loop : event_loops_) {
+ event_loop->SetIsRunning(true);
+ }
+ });
+ scheduler_.set_on_shutdown([this]() {
+ VLOG(1) << scheduler_.distributed_now() << " " << NodeName(this->node())
+ << monotonic_now() << " Shutting down node.";
+ Shutdown();
+ ScheduleStartup();
+ });
+ ScheduleStartup();
+}
+
+NodeEventLoopFactory::~NodeEventLoopFactory() {
+ if (started_) {
+ for (std::function<void()> &fn : on_shutdown_) {
+ fn();
+ }
+
+ VLOG(1) << scheduler_.distributed_now() << " " << NodeName(node())
+ << monotonic_now() << " Shutting down applications.";
+ applications_.clear();
+ started_ = false;
+ }
+
+ if (event_loops_.size() != 0u) {
+ for (SimulatedEventLoop *event_loop : event_loops_) {
+ LOG(ERROR) << scheduler_.distributed_now() << " " << NodeName(node())
+ << monotonic_now() << " Event loop '" << event_loop->name()
+ << "' failed to shut down";
+ }
+ }
+ CHECK_EQ(event_loops_.size(), 0u) << "Event loop didn't exit";
+}
+
+void NodeEventLoopFactory::OnStartup(std::function<void()> &&fn) {
CHECK(!scheduler_.is_running())
- << ": Can't create an event loop while running";
-
- pid_t tid = tid_;
- ++tid_;
- ::std::unique_ptr<SimulatedEventLoop> result(new SimulatedEventLoop(
- &scheduler_, this, &channels_, factory_->configuration(),
- raw_event_loops_, node_, tid));
- result->set_name(name);
- result->set_send_delay(factory_->send_delay());
- return std::move(result);
+ << ": Can only register OnStartup handlers when not running.";
+ on_startup_.emplace_back(std::move(fn));
+ if (started_) {
+ size_t on_startup_index = on_startup_.size() - 1;
+ scheduler_.ScheduleOnStartup(
+ [this, on_startup_index]() { on_startup_[on_startup_index](); });
+ }
}
-void NodeEventLoopFactory::Disconnect(const Node *other) {
- factory_->bridge_->Disconnect(node_, other);
+void NodeEventLoopFactory::OnShutdown(std::function<void()> &&fn) {
+ on_shutdown_.emplace_back(std::move(fn));
}
-void NodeEventLoopFactory::Connect(const Node *other) {
- factory_->bridge_->Connect(node_, other);
+
+void NodeEventLoopFactory::ScheduleStartup() {
+ scheduler_.ScheduleOnStartup([this]() {
+ UUID next_uuid = scheduler_.boot_uuid();
+ if (boot_uuid_ != next_uuid) {
+ CHECK_EQ(boot_uuid_, UUID::Zero());
+ boot_uuid_ = next_uuid;
+ }
+ VLOG(1) << scheduler_.distributed_now() << " " << NodeName(this->node())
+ << monotonic_now() << " Starting up node on boot " << boot_uuid_;
+ Startup();
+ });
+}
+
+void NodeEventLoopFactory::Startup() {
+ CHECK(!started_);
+ for (size_t i = 0; i < on_startup_.size(); ++i) {
+ on_startup_[i]();
+ }
+}
+
+void NodeEventLoopFactory::Shutdown() {
+ for (SimulatedEventLoop *event_loop : event_loops_) {
+ event_loop->SetIsRunning(false);
+ }
+
+ CHECK(started_);
+ started_ = false;
+ for (std::function<void()> &fn : on_shutdown_) {
+ fn();
+ }
+
+ VLOG(1) << scheduler_.distributed_now() << " " << NodeName(node())
+ << monotonic_now() << " Shutting down applications.";
+ applications_.clear();
+
+ if (event_loops_.size() != 0u) {
+ for (SimulatedEventLoop *event_loop : event_loops_) {
+ LOG(ERROR) << scheduler_.distributed_now() << " " << NodeName(node())
+ << monotonic_now() << " Event loop '" << event_loop->name()
+ << "' failed to shut down";
+ }
+ }
+ CHECK_EQ(event_loops_.size(), 0u) << "Not all event loops shut down";
+ boot_uuid_ = UUID::Zero();
+
+ channels_.clear();
}
void SimulatedEventLoopFactory::RunFor(monotonic_clock::duration duration) {
- for (const std::pair<EventLoop *, std::function<void(bool)>> &event_loop :
- raw_event_loops_) {
- event_loop.second(true);
- }
+ // This sets running to true too.
+ scheduler_scheduler_.RunOnStartup();
scheduler_scheduler_.RunFor(duration);
- for (const std::pair<EventLoop *, std::function<void(bool)>> &event_loop :
- raw_event_loops_) {
- event_loop.second(false);
+ for (std::unique_ptr<NodeEventLoopFactory> &node : node_factories_) {
+ if (node) {
+ for (SimulatedEventLoop *loop : node->event_loops_) {
+ loop->SetIsRunning(false);
+ }
+ }
}
}
void SimulatedEventLoopFactory::Run() {
- for (const std::pair<EventLoop *, std::function<void(bool)>> &event_loop :
- raw_event_loops_) {
- event_loop.second(true);
- }
+ // This sets running to true too.
+ scheduler_scheduler_.RunOnStartup();
scheduler_scheduler_.Run();
- for (const std::pair<EventLoop *, std::function<void(bool)>> &event_loop :
- raw_event_loops_) {
- event_loop.second(false);
+ for (std::unique_ptr<NodeEventLoopFactory> &node : node_factories_) {
+ if (node) {
+ for (SimulatedEventLoop *loop : node->event_loops_) {
+ loop->SetIsRunning(false);
+ }
+ }
}
}
@@ -1145,4 +1303,30 @@
bridge_->SkipTimingReport();
}
+::std::unique_ptr<EventLoop> NodeEventLoopFactory::MakeEventLoop(
+ std::string_view name) {
+ CHECK(!scheduler_.is_running() || !started_)
+ << ": Can't create an event loop while running";
+
+ pid_t tid = tid_;
+ ++tid_;
+ ::std::unique_ptr<SimulatedEventLoop> result(new SimulatedEventLoop(
+ &scheduler_, this, &channels_, factory_->configuration(), &event_loops_,
+ node_, tid));
+ result->set_name(name);
+ result->set_send_delay(factory_->send_delay());
+
+ VLOG(1) << scheduler_.distributed_now() << " " << NodeName(node())
+ << monotonic_now() << " MakeEventLoop(\"" << result->name() << "\")";
+ return std::move(result);
+}
+
+void NodeEventLoopFactory::Disconnect(const Node *other) {
+ factory_->bridge_->Disconnect(node_, other);
+}
+
+void NodeEventLoopFactory::Connect(const Node *other) {
+ factory_->bridge_->Connect(node_, other);
+}
+
} // namespace aos
diff --git a/aos/events/simulated_event_loop.h b/aos/events/simulated_event_loop.h
index 348e7dd..b25f260 100644
--- a/aos/events/simulated_event_loop.h
+++ b/aos/events/simulated_event_loop.h
@@ -26,6 +26,7 @@
class SimulatedChannel;
class NodeEventLoopFactory;
+class SimulatedEventLoop;
namespace message_bridge {
class SimulatedMessageBridge;
}
@@ -63,6 +64,12 @@
SimulatedEventLoopFactory(const Configuration *configuration);
~SimulatedEventLoopFactory();
+ SimulatedEventLoopFactory(const SimulatedEventLoopFactory &) = delete;
+ SimulatedEventLoopFactory &operator=(const SimulatedEventLoopFactory &) =
+ delete;
+ SimulatedEventLoopFactory(SimulatedEventLoopFactory &&) = delete;
+ SimulatedEventLoopFactory &operator=(SimulatedEventLoopFactory &&) = delete;
+
// Creates an event loop. If running in a multi-node environment, node needs
// to point to the node to create this event loop on.
::std::unique_ptr<EventLoop> MakeEventLoop(std::string_view name,
@@ -72,11 +79,13 @@
// NodeEventLoopFactory is owned by the SimulatedEventLoopFactory and has a
// lifetime identical to the factory.
NodeEventLoopFactory *GetNodeEventLoopFactory(const Node *node);
+ NodeEventLoopFactory *GetNodeEventLoopFactory(std::string_view node);
// Sets the time converter for all nodes.
void SetTimeConverter(TimeConverter *time_converter);
- // Starts executing the event loops unconditionally.
+ // Starts executing the event loops unconditionally until Exit is called or
+ // all the nodes have shut down.
void Run();
// Executes the event loops for a duration.
void RunFor(distributed_clock::duration duration);
@@ -122,26 +131,23 @@
const Configuration *const configuration_;
EventSchedulerScheduler scheduler_scheduler_;
- // List of event loops to manage running and not running for.
- // The function is a callback used to set and clear the running bool on each
- // event loop.
- std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
- raw_event_loops_;
std::chrono::nanoseconds send_delay_ = std::chrono::microseconds(50);
std::chrono::nanoseconds network_delay_ = std::chrono::microseconds(100);
+ std::unique_ptr<message_bridge::SimulatedMessageBridge> bridge_;
+
std::vector<std::unique_ptr<NodeEventLoopFactory>> node_factories_;
std::vector<const Node *> nodes_;
-
- std::unique_ptr<message_bridge::SimulatedMessageBridge> bridge_;
};
// This class holds all the state required to be a single node.
class NodeEventLoopFactory {
public:
- ::std::unique_ptr<EventLoop> MakeEventLoop(std::string_view name);
+ ~NodeEventLoopFactory();
+
+ std::unique_ptr<EventLoop> MakeEventLoop(std::string_view name);
// Returns the node that this factory is running as, or nullptr if this is a
// single node setup.
@@ -157,11 +163,37 @@
// Returns the current time on both clocks.
inline monotonic_clock::time_point monotonic_now() const;
inline realtime_clock::time_point realtime_now() const;
+ inline distributed_clock::time_point distributed_now() const;
const Configuration *configuration() const {
return factory_->configuration();
}
+ // Starts the node up by calling the OnStartup handlers. These get called
+ // every time a node is started.
+
+ // Called when a node has started. This is typically when a log file starts
+ // for a node.
+ void OnStartup(std::function<void()> &&fn);
+
+ // Called when a node shuts down. These get called every time a node is shut
+ // down. All applications are destroyed right after the last OnShutdown
+ // callback is called.
+ void OnShutdown(std::function<void()> &&fn);
+
+ // Starts an application if the configuration says it should be started on
+ // this node. name is the name of the application. args are the constructor
+ // args for the Main class. Returns a pointer to the class that was started
+ // if it was started, or nullptr.
+ template <class Main, class... Args>
+ Main *MaybeStart(std::string_view name, Args &&... args);
+
+ // Starts an application regardless of if the config says to or not. name is
+ // the name of the application, and args are the constructor args for the
+ // application. Returns a pointer to the class that was started.
+ template <class Main, class... Args>
+ Main *AlwaysStart(std::string_view name, Args &&... args);
+
// Returns the simulated network delay for messages forwarded between nodes.
std::chrono::nanoseconds network_delay() const {
return factory_->network_delay();
@@ -170,6 +202,8 @@
// node.
std::chrono::nanoseconds send_delay() const { return factory_->send_delay(); }
+ size_t boot_count() const { return scheduler_.boot_count(); }
+
// TODO(austin): Private for the following?
// Converts a time to the distributed clock for scheduling and cross-node time
@@ -178,7 +212,7 @@
// replaying logs. Only convert times in the present or near past.
inline distributed_clock::time_point ToDistributedClock(
monotonic_clock::time_point time) const;
- inline monotonic_clock::time_point FromDistributedClock(
+ inline logger::BootTimestamp FromDistributedClock(
distributed_clock::time_point time) const;
// Sets the class used to convert time. This pointer must out-live the
@@ -189,19 +223,13 @@
time_converter);
}
- // Sets the boot UUID for this node. This typically should only be used by
- // the log reader.
- void set_boot_uuid(std::string_view uuid) {
- boot_uuid_ = UUID::FromString(uuid);
- }
// Returns the boot UUID for this node.
- const UUID &boot_uuid() const { return boot_uuid_; }
-
- // Reboots the node. This just resets the boot_uuid_, nothing else.
- // TODO(austin): This is here for a test case or two, not for general
- // consumption. The interactions with the rest of the system need to be
- // worked out better. Don't use this for anything real yet.
- void Reboot() { boot_uuid_ = UUID::Random(); }
+ const UUID &boot_uuid() {
+ if (boot_uuid_ == UUID::Zero()) {
+ boot_uuid_ = scheduler_.boot_uuid();
+ }
+ return boot_uuid_;
+ }
// Stops forwarding messages to the other node, and reports disconnected in
// the ServerStatistics message for this node, and the ClientStatistics for
@@ -212,21 +240,22 @@
private:
friend class SimulatedEventLoopFactory;
- NodeEventLoopFactory(
- EventSchedulerScheduler *scheduler_scheduler,
- SimulatedEventLoopFactory *factory, const Node *node,
- std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
- *raw_event_loops);
+ NodeEventLoopFactory(EventSchedulerScheduler *scheduler_scheduler,
+ SimulatedEventLoopFactory *factory, const Node *node);
+
+ // Helpers to restart.
+ void ScheduleStartup();
+ void Startup();
+ void Shutdown();
EventScheduler scheduler_;
SimulatedEventLoopFactory *const factory_;
- UUID boot_uuid_ = UUID::Random();
+ UUID boot_uuid_ = UUID::Zero();
const Node *const node_;
- std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
- *const raw_event_loops_;
+ std::vector<SimulatedEventLoop *> event_loops_;
std::chrono::nanoseconds realtime_offset_ = std::chrono::seconds(0);
@@ -235,8 +264,72 @@
// pid so we get unique timing reports.
pid_t tid_ = 0;
+
+ // True if we are started.
+ bool started_ = false;
+
+ std::vector<std::function<void()>> pending_on_startup_;
+ std::vector<std::function<void()>> on_startup_;
+ std::vector<std::function<void()>> on_shutdown_;
+
+ // Base class for an application to start. This shouldn't be used directly.
+ struct Application {
+ Application(NodeEventLoopFactory *node_factory, std::string_view name)
+ : event_loop(node_factory->MakeEventLoop(name)) {}
+ virtual ~Application() {}
+
+ std::unique_ptr<EventLoop> event_loop;
+ };
+
+ // Subclass to do type erasure for the base class. Holds an instance of a
+ // specific class. Use SimulationStarter instead.
+ template <typename Main>
+ struct TypedApplication : public Application {
+ // Constructs an Application by delegating the arguments used to construct
+ // the event loop to Application and the rest of the args to the actual
+ // application.
+ template <class... Args>
+ TypedApplication(NodeEventLoopFactory *node_factory, std::string_view name,
+ Args &&... args)
+ : Application(node_factory, name),
+ main(event_loop.get(), std::forward<Args>(args)...) {
+ VLOG(1) << node_factory->scheduler_.distributed_now() << " "
+ << (node_factory->node() == nullptr
+ ? ""
+ : node_factory->node()->name()->str() + " ")
+ << node_factory->monotonic_now() << " Starting Application \""
+ << name << "\"";
+ }
+ ~TypedApplication() override {}
+
+ Main main;
+ };
+
+ std::vector<std::unique_ptr<Application>> applications_;
};
+template <class Main, class... Args>
+Main *NodeEventLoopFactory::MaybeStart(std::string_view name, Args &&... args) {
+ const aos::Application *application =
+ configuration::GetApplication(configuration(), node(), name);
+
+ if (application != nullptr) {
+ return AlwaysStart<Main>(name, std::forward<Args>(args)...);
+ }
+ return nullptr;
+}
+
+template <class Main, class... Args>
+Main *NodeEventLoopFactory::AlwaysStart(std::string_view name,
+ Args &&... args) {
+ std::unique_ptr<TypedApplication<Main>> app =
+ std::make_unique<TypedApplication<Main>>(this, name,
+ std::forward<Args>(args)...);
+ Main *main_ptr = &app->main;
+ applications_.emplace_back(std::move(app));
+ return main_ptr;
+}
+
inline monotonic_clock::time_point NodeEventLoopFactory::monotonic_now() const {
// TODO(austin): Confirm that time never goes backwards?
return scheduler_.monotonic_now();
@@ -247,7 +340,12 @@
realtime_offset_);
}
-inline monotonic_clock::time_point NodeEventLoopFactory::FromDistributedClock(
+inline distributed_clock::time_point NodeEventLoopFactory::distributed_now()
+ const {
+ return scheduler_.distributed_now();
+}
+
+inline logger::BootTimestamp NodeEventLoopFactory::FromDistributedClock(
distributed_clock::time_point time) const {
return scheduler_.FromDistributedClock(time);
}
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 83568da..43c0c97 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -22,6 +22,7 @@
using aos::testing::ArtifactPath;
+using logger::BootTimestamp;
using message_bridge::RemoteMessage;
namespace chrono = ::std::chrono;
@@ -140,7 +141,7 @@
TEST(EventSchedulerTest, ScheduleEvent) {
int counter = 0;
EventSchedulerScheduler scheduler_scheduler;
- EventScheduler scheduler;
+ EventScheduler scheduler(0);
scheduler_scheduler.AddEventScheduler(&scheduler);
scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1),
@@ -158,7 +159,7 @@
TEST(EventSchedulerTest, DescheduleEvent) {
int counter = 0;
EventSchedulerScheduler scheduler_scheduler;
- EventScheduler scheduler;
+ EventScheduler scheduler(0);
scheduler_scheduler.AddEventScheduler(&scheduler);
auto token = scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1),
@@ -778,15 +779,13 @@
message_bridge::TestingTimeConverter time(
configuration::NodesCount(&config.message()));
SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
- NodeEventLoopFactory *pi2_factory =
- simulated_event_loop_factory.GetNodeEventLoopFactory(pi2);
- pi2_factory->SetTimeConverter(&time);
+ simulated_event_loop_factory.SetTimeConverter(&time);
constexpr chrono::milliseconds kOffset{1501};
time.AddNextTimestamp(
distributed_clock::epoch(),
- {logger::BootTimestamp::epoch(), logger::BootTimestamp::epoch() + kOffset,
- logger::BootTimestamp::epoch()});
+ {BootTimestamp::epoch(), BootTimestamp::epoch() + kOffset,
+ BootTimestamp::epoch()});
std::unique_ptr<EventLoop> ping_event_loop =
simulated_event_loop_factory.MakeEventLoop("ping", pi1);
@@ -871,7 +870,7 @@
chrono::milliseconds(5));
EXPECT_EQ(pi1_server_statistics_count, 10);
- EXPECT_EQ(pi2_server_statistics_count, 9);
+ EXPECT_EQ(pi2_server_statistics_count, 10);
EXPECT_EQ(pi3_server_statistics_count, 10);
}
@@ -1034,31 +1033,32 @@
// Test that disconnecting nodes actually disconnects them.
TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeDisconnect) {
- const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
- const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
- const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
-
SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
- std::unique_ptr<EventLoop> ping_event_loop =
- simulated_event_loop_factory.MakeEventLoop("ping", pi1);
+ NodeEventLoopFactory *pi1 =
+ simulated_event_loop_factory.GetNodeEventLoopFactory("pi1");
+ NodeEventLoopFactory *pi2 =
+ simulated_event_loop_factory.GetNodeEventLoopFactory("pi2");
+ NodeEventLoopFactory *pi3 =
+ simulated_event_loop_factory.GetNodeEventLoopFactory("pi3");
+
+ std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
Ping ping(ping_event_loop.get());
- std::unique_ptr<EventLoop> pong_event_loop =
- simulated_event_loop_factory.MakeEventLoop("pong", pi2);
+ std::unique_ptr<EventLoop> pong_event_loop = pi2->MakeEventLoop("pong");
Pong pong(pong_event_loop.get());
std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
- simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
+ pi2->MakeEventLoop("pi2_pong_counter");
MessageCounter<examples::Pong> pi2_pong_counter(
pi2_pong_counter_event_loop.get(), "/test");
std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
- simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
+ pi3->MakeEventLoop("pi3_pong_counter");
std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
- simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
+ pi1->MakeEventLoop("pi1_pong_counter");
MessageCounter<examples::Pong> pi1_pong_counter(
pi1_pong_counter_event_loop.get(), "/test");
@@ -1088,8 +1088,13 @@
MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
MessageCounter<message_bridge::ServerStatistics>
- pi1_server_statistics_counter(pi1_pong_counter_event_loop.get(),
- "/pi1/aos");
+ *pi1_server_statistics_counter;
+ pi1->OnStartup([pi1, &pi1_server_statistics_counter]() {
+ pi1_server_statistics_counter =
+ pi1->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
+ "pi1_server_statistics_counter", "/pi1/aos");
+ });
+
aos::Fetcher<message_bridge::ServerStatistics> pi1_server_statistics_fetcher =
pi1_pong_counter_event_loop
->MakeFetcher<message_bridge::ServerStatistics>("/pi1/aos");
@@ -1098,8 +1103,12 @@
->MakeFetcher<message_bridge::ClientStatistics>("/pi1/aos");
MessageCounter<message_bridge::ServerStatistics>
- pi2_server_statistics_counter(pi2_pong_counter_event_loop.get(),
- "/pi2/aos");
+ *pi2_server_statistics_counter;
+ pi2->OnStartup([pi2, &pi2_server_statistics_counter]() {
+ pi2_server_statistics_counter =
+ pi2->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
+ "pi2_server_statistics_counter", "/pi2/aos");
+ });
aos::Fetcher<message_bridge::ServerStatistics> pi2_server_statistics_fetcher =
pi2_pong_counter_event_loop
->MakeFetcher<message_bridge::ServerStatistics>("/pi2/aos");
@@ -1108,8 +1117,12 @@
->MakeFetcher<message_bridge::ClientStatistics>("/pi2/aos");
MessageCounter<message_bridge::ServerStatistics>
- pi3_server_statistics_counter(pi3_pong_counter_event_loop.get(),
- "/pi3/aos");
+ *pi3_server_statistics_counter;
+ pi3->OnStartup([pi3, &pi3_server_statistics_counter]() {
+ pi3_server_statistics_counter =
+ pi3->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
+ "pi3_server_statistics_counter", "/pi3/aos");
+ });
aos::Fetcher<message_bridge::ServerStatistics> pi3_server_statistics_fetcher =
pi3_pong_counter_event_loop
->MakeFetcher<message_bridge::ServerStatistics>("/pi3/aos");
@@ -1141,9 +1154,9 @@
EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 20u);
EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 20u);
- EXPECT_EQ(pi1_server_statistics_counter.count(), 2u);
- EXPECT_EQ(pi2_server_statistics_counter.count(), 2u);
- EXPECT_EQ(pi3_server_statistics_counter.count(), 2u);
+ EXPECT_EQ(pi1_server_statistics_counter->count(), 2u);
+ EXPECT_EQ(pi2_server_statistics_counter->count(), 2u);
+ EXPECT_EQ(pi3_server_statistics_counter->count(), 2u);
EXPECT_EQ(pi1_client_statistics_counter.count(), 20u);
EXPECT_EQ(pi2_client_statistics_counter.count(), 20u);
@@ -1172,7 +1185,7 @@
EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
<< " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
- simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)->Disconnect(pi3);
+ pi1->Disconnect(pi3->node());
simulated_event_loop_factory.RunFor(chrono::seconds(2));
@@ -1187,9 +1200,9 @@
EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 40u);
EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 40u);
- EXPECT_EQ(pi1_server_statistics_counter.count(), 4u);
- EXPECT_EQ(pi2_server_statistics_counter.count(), 4u);
- EXPECT_EQ(pi3_server_statistics_counter.count(), 4u);
+ EXPECT_EQ(pi1_server_statistics_counter->count(), 4u);
+ EXPECT_EQ(pi2_server_statistics_counter->count(), 4u);
+ EXPECT_EQ(pi3_server_statistics_counter->count(), 4u);
EXPECT_EQ(pi1_client_statistics_counter.count(), 40u);
EXPECT_EQ(pi2_client_statistics_counter.count(), 40u);
@@ -1218,7 +1231,7 @@
EXPECT_TRUE(AllConnectedBut(pi3_client_statistics_fetcher.get(), "pi1"))
<< " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
- simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)->Connect(pi3);
+ pi1->Connect(pi3->node());
simulated_event_loop_factory.RunFor(chrono::seconds(2));
@@ -1233,9 +1246,9 @@
EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 60u);
EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 60u);
- EXPECT_EQ(pi1_server_statistics_counter.count(), 6u);
- EXPECT_EQ(pi2_server_statistics_counter.count(), 6u);
- EXPECT_EQ(pi3_server_statistics_counter.count(), 6u);
+ EXPECT_EQ(pi1_server_statistics_counter->count(), 6u);
+ EXPECT_EQ(pi2_server_statistics_counter->count(), 6u);
+ EXPECT_EQ(pi3_server_statistics_counter->count(), 6u);
EXPECT_EQ(pi1_client_statistics_counter.count(), 60u);
EXPECT_EQ(pi2_client_statistics_counter.count(), 60u);
@@ -1286,20 +1299,17 @@
message_bridge::TestingTimeConverter time(
configuration::NodesCount(&config.message()));
SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
- NodeEventLoopFactory *pi2_factory =
- simulated_event_loop_factory.GetNodeEventLoopFactory(pi2);
- pi2_factory->SetTimeConverter(&time);
+ simulated_event_loop_factory.SetTimeConverter(&time);
constexpr chrono::milliseconds kOffset{150100};
time.AddNextTimestamp(
distributed_clock::epoch(),
- {logger::BootTimestamp::epoch(), logger::BootTimestamp::epoch() + kOffset,
- logger::BootTimestamp::epoch()});
- time.AddNextTimestamp(
- distributed_clock::epoch() + chrono::seconds(10),
- {logger::BootTimestamp::epoch() + chrono::milliseconds(9999),
- logger::BootTimestamp::epoch() + kOffset + chrono::seconds(10),
- logger::BootTimestamp::epoch() + chrono::milliseconds(9999)});
+ {BootTimestamp::epoch(), BootTimestamp::epoch() + kOffset,
+ BootTimestamp::epoch()});
+ time.AddNextTimestamp(distributed_clock::epoch() + chrono::seconds(10),
+ {BootTimestamp::epoch() + chrono::milliseconds(9999),
+ BootTimestamp::epoch() + kOffset + chrono::seconds(10),
+ BootTimestamp::epoch() + chrono::milliseconds(9999)});
std::unique_ptr<EventLoop> ping_event_loop =
simulated_event_loop_factory.MakeEventLoop("ping", pi1);
@@ -1470,23 +1480,51 @@
// Tests that rebooting a node changes the ServerStatistics message and the
// RemoteTimestamp message.
TEST_P(RemoteMessageSimulatedEventLoopTest, BootUUIDTest) {
- const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
- const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
- SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
+ message_bridge::TestingTimeConverter time(
+ configuration::NodesCount(&config.message()));
+ SimulatedEventLoopFactory factory(&config.message());
+ factory.SetTimeConverter(&time);
- std::unique_ptr<EventLoop> ping_event_loop =
- simulated_event_loop_factory.MakeEventLoop("ping", pi1);
- Ping ping(ping_event_loop.get());
+ const size_t pi1_index =
+ configuration::GetNodeIndex(&config.message(), "pi1");
+ const size_t pi2_index =
+ configuration::GetNodeIndex(&config.message(), "pi2");
+ const size_t pi3_index =
+ configuration::GetNodeIndex(&config.message(), "pi3");
- std::unique_ptr<EventLoop> pong_event_loop =
- simulated_event_loop_factory.MakeEventLoop("pong", pi2);
- Pong pong(pong_event_loop.get());
+ const UUID pi1_boot0 = UUID::Random();
+ const UUID pi2_boot0 = UUID::Random();
+ const UUID pi2_boot1 = UUID::Random();
+ const UUID pi3_boot0 = UUID::Random();
+ {
+ time.AddNextTimestamp(distributed_clock::epoch(),
+ {BootTimestamp::epoch(), BootTimestamp::epoch(),
+ BootTimestamp::epoch()});
+
+ const chrono::nanoseconds dt = chrono::milliseconds(2001);
+
+ time.AddNextTimestamp(
+ distributed_clock::epoch() + dt,
+ {BootTimestamp::epoch() + dt,
+ BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
+ BootTimestamp::epoch() + dt});
+
+ time.set_boot_uuid(pi1_index, 0, pi1_boot0);
+ time.set_boot_uuid(pi2_index, 0, pi2_boot0);
+ time.set_boot_uuid(pi2_index, 1, pi2_boot1);
+ time.set_boot_uuid(pi3_index, 0, pi3_boot0);
+ }
+
+ NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
+ NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
+
+ pi1->OnStartup([pi1]() { pi1->AlwaysStart<Ping>("ping"); });
+ pi2->OnStartup([pi2]() { pi2->AlwaysStart<Pong>("pong"); });
std::unique_ptr<EventLoop> pi1_remote_timestamp =
- simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
- UUID expected_boot_uuid =
- simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)->boot_uuid();
+ pi1->MakeEventLoop("pi1_remote_timestamp");
+ UUID expected_boot_uuid = pi2_boot0;
int timestamp_count = 0;
pi1_remote_timestamp->MakeWatcher(
@@ -1512,41 +1550,65 @@
});
int pi1_server_statistics_count = 0;
+ bool first_pi1_server_statistics = true;
pi1_remote_timestamp->MakeWatcher(
- "/pi1/aos", [&pi1_server_statistics_count, &expected_boot_uuid](
+ "/pi1/aos", [&pi1_server_statistics_count, &expected_boot_uuid,
+ &first_pi1_server_statistics](
const message_bridge::ServerStatistics &stats) {
VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
for (const message_bridge::ServerConnection *connection :
*stats.connections()) {
- EXPECT_TRUE(connection->has_boot_uuid());
+ if (connection->state() == message_bridge::State::CONNECTED) {
+ ASSERT_TRUE(connection->has_boot_uuid());
+ }
+ if (!first_pi1_server_statistics) {
+ EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
+ }
if (connection->node()->name()->string_view() == "pi2") {
+ EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
+ ASSERT_TRUE(connection->has_boot_uuid());
EXPECT_EQ(expected_boot_uuid,
UUID::FromString(connection->boot_uuid()))
<< " : Got " << aos::FlatbufferToJson(&stats);
++pi1_server_statistics_count;
}
}
+ first_pi1_server_statistics = false;
});
+ int pi1_client_statistics_count = 0;
+ pi1_remote_timestamp->MakeWatcher(
+ "/pi1/aos", [&pi1_client_statistics_count](
+ const message_bridge::ClientStatistics &stats) {
+ VLOG(1) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
+ for (const message_bridge::ClientConnection *connection :
+ *stats.connections()) {
+ EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
+ if (connection->node()->name()->string_view() == "pi2") {
+ ++pi1_client_statistics_count;
+ }
+ }
+ });
+
+ // Confirm that reboot changes the UUID.
+ pi2->OnShutdown([&expected_boot_uuid, pi2, pi2_boot1]() {
+ expected_boot_uuid = pi2_boot1;
+ LOG(INFO) << "OnShutdown triggered for pi2";
+ pi2->OnStartup([&expected_boot_uuid, pi2]() {
+ EXPECT_EQ(expected_boot_uuid, pi2->boot_uuid());
+ });
+ });
+
// Let a couple of ServerStatistics messages show up before rebooting.
- simulated_event_loop_factory.RunFor(chrono::milliseconds(2001));
+ factory.RunFor(chrono::milliseconds(2002));
EXPECT_GT(timestamp_count, 100);
EXPECT_GE(pi1_server_statistics_count, 1u);
- // Confirm that reboot changes the UUID.
- simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)->Reboot();
-
- EXPECT_NE(
- expected_boot_uuid,
- simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)->boot_uuid());
-
- expected_boot_uuid =
- simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)->boot_uuid();
timestamp_count = 0;
pi1_server_statistics_count = 0;
- simulated_event_loop_factory.RunFor(chrono::milliseconds(2000));
+ factory.RunFor(chrono::milliseconds(2000));
EXPECT_GT(timestamp_count, 100);
EXPECT_GE(pi1_server_statistics_count, 1u);
}
@@ -1557,5 +1619,321 @@
Param{"multinode_pingpong_test_combined_config.json", true},
Param{"multinode_pingpong_test_split_config.json", false}));
+// Tests that Startup and Shutdown do reasonable things.
+TEST(SimulatedEventLoopTest, MultinodePingPongStartup) {
+ aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+ aos::configuration::ReadConfig(
+ ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
+
+ message_bridge::TestingTimeConverter time(
+ configuration::NodesCount(&config.message()));
+ SimulatedEventLoopFactory factory(&config.message());
+ factory.SetTimeConverter(&time);
+ time.AddNextTimestamp(
+ distributed_clock::epoch(),
+ {BootTimestamp::epoch(), BootTimestamp::epoch(), BootTimestamp::epoch()});
+
+ const chrono::nanoseconds dt = chrono::seconds(10) + chrono::milliseconds(6);
+
+ time.AddNextTimestamp(
+ distributed_clock::epoch() + dt,
+ {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
+ BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
+ BootTimestamp::epoch() + dt});
+
+ NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
+ NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
+
+ // Configure startup to start Ping and Pong, and count.
+ size_t pi1_startup_counter = 0;
+ size_t pi2_startup_counter = 0;
+ pi1->OnStartup([pi1]() {
+ LOG(INFO) << "Made ping";
+ pi1->AlwaysStart<Ping>("ping");
+ });
+ pi1->OnStartup([&pi1_startup_counter]() { ++pi1_startup_counter; });
+ pi2->OnStartup([pi2]() {
+ LOG(INFO) << "Made pong";
+ pi2->AlwaysStart<Pong>("pong");
+ });
+ pi2->OnStartup([&pi2_startup_counter]() { ++pi2_startup_counter; });
+
+ // Shutdown just counts.
+ size_t pi1_shutdown_counter = 0;
+ size_t pi2_shutdown_counter = 0;
+ pi1->OnShutdown([&pi1_shutdown_counter]() { ++pi1_shutdown_counter; });
+ pi2->OnShutdown([&pi2_shutdown_counter]() { ++pi2_shutdown_counter; });
+
+ MessageCounter<examples::Pong> *pi1_pong_counter = nullptr;
+ MessageCounter<examples::Ping> *pi2_ping_counter = nullptr;
+
+ // Automatically make counters on startup.
+ pi1->OnStartup([&pi1_pong_counter, pi1]() {
+ pi1_pong_counter = pi1->AlwaysStart<MessageCounter<examples::Pong>>(
+ "pi1_pong_counter", "/test");
+ });
+ pi1->OnShutdown([&pi1_pong_counter]() { pi1_pong_counter = nullptr; });
+ pi2->OnStartup([&pi2_ping_counter, pi2]() {
+ pi2_ping_counter = pi2->AlwaysStart<MessageCounter<examples::Ping>>(
+ "pi2_ping_counter", "/test");
+ });
+ pi2->OnShutdown([&pi2_ping_counter]() { pi2_ping_counter = nullptr; });
+
+ EXPECT_EQ(pi2_ping_counter, nullptr);
+ EXPECT_EQ(pi1_pong_counter, nullptr);
+
+ EXPECT_EQ(pi1_startup_counter, 0u);
+ EXPECT_EQ(pi2_startup_counter, 0u);
+ EXPECT_EQ(pi1_shutdown_counter, 0u);
+ EXPECT_EQ(pi2_shutdown_counter, 0u);
+
+ factory.RunFor(chrono::seconds(10) + chrono::milliseconds(5));
+ EXPECT_EQ(pi1_startup_counter, 1u);
+ EXPECT_EQ(pi2_startup_counter, 1u);
+ EXPECT_EQ(pi1_shutdown_counter, 0u);
+ EXPECT_EQ(pi2_shutdown_counter, 0u);
+ EXPECT_EQ(pi2_ping_counter->count(), 1001);
+ EXPECT_EQ(pi1_pong_counter->count(), 1001);
+
+ LOG(INFO) << pi1->monotonic_now();
+ LOG(INFO) << pi2->monotonic_now();
+
+ factory.RunFor(chrono::seconds(5) + chrono::milliseconds(5));
+
+ EXPECT_EQ(pi1_startup_counter, 2u);
+ EXPECT_EQ(pi2_startup_counter, 2u);
+ EXPECT_EQ(pi1_shutdown_counter, 1u);
+ EXPECT_EQ(pi2_shutdown_counter, 1u);
+ EXPECT_EQ(pi2_ping_counter->count(), 501);
+ EXPECT_EQ(pi1_pong_counter->count(), 501);
+}
+
+// Tests that OnStartup handlers can be added after running and get called, and
+// can't be called when running.
+TEST(SimulatedEventLoopDeathTest, OnStartupWhileRunning) {
+ aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+ aos::configuration::ReadConfig(
+ ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
+
+ // Test that we can add startup handlers as long as we aren't running, and
+ // they get run when Run gets called again.
+ // Test that adding a startup handler when running fails.
+ //
+ // Test shutdown handlers get called on destruction.
+ SimulatedEventLoopFactory factory(&config.message());
+
+ NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
+
+ int startup_count0 = 0;
+ int startup_count1 = 0;
+
+ pi1->OnStartup([&]() { ++startup_count0; });
+ EXPECT_EQ(startup_count0, 0);
+ EXPECT_EQ(startup_count1, 0);
+
+ factory.RunFor(chrono::nanoseconds(1));
+ EXPECT_EQ(startup_count0, 1);
+ EXPECT_EQ(startup_count1, 0);
+
+ pi1->OnStartup([&]() { ++startup_count1; });
+ EXPECT_EQ(startup_count0, 1);
+ EXPECT_EQ(startup_count1, 0);
+
+ factory.RunFor(chrono::nanoseconds(1));
+ EXPECT_EQ(startup_count0, 1);
+ EXPECT_EQ(startup_count1, 1);
+
+ std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
+ loop->OnRun([&]() { pi1->OnStartup([]() {}); });
+
+ EXPECT_DEATH({ factory.RunFor(chrono::nanoseconds(1)); },
+ "Can only register OnStartup handlers when not running.");
+}
+
+// Tests that OnStartup handlers can be added after running and get called, and
+// all the handlers get called on reboot. Shutdown handlers are tested the same
+// way.
+TEST(SimulatedEventLoopTest, OnStartupShutdownAllRestarts) {
+ aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+ aos::configuration::ReadConfig(
+ ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
+
+ message_bridge::TestingTimeConverter time(
+ configuration::NodesCount(&config.message()));
+ SimulatedEventLoopFactory factory(&config.message());
+ factory.SetTimeConverter(&time);
+ time.StartEqual();
+
+ const chrono::nanoseconds dt = chrono::seconds(10);
+ time.RebootAt(0, distributed_clock::epoch() + dt);
+ time.RebootAt(0, distributed_clock::epoch() + 2 * dt);
+
+ NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
+
+ int startup_count0 = 0;
+ int shutdown_count0 = 0;
+ int startup_count1 = 0;
+ int shutdown_count1 = 0;
+
+ pi1->OnStartup([&]() { ++startup_count0; });
+ pi1->OnShutdown([&]() { ++shutdown_count0; });
+ EXPECT_EQ(startup_count0, 0);
+ EXPECT_EQ(startup_count1, 0);
+ EXPECT_EQ(shutdown_count0, 0);
+ EXPECT_EQ(shutdown_count1, 0);
+
+ factory.RunFor(chrono::nanoseconds(1));
+ EXPECT_EQ(startup_count0, 1);
+ EXPECT_EQ(startup_count1, 0);
+ EXPECT_EQ(shutdown_count0, 0);
+ EXPECT_EQ(shutdown_count1, 0);
+
+ pi1->OnStartup([&]() { ++startup_count1; });
+ EXPECT_EQ(startup_count0, 1);
+ EXPECT_EQ(startup_count1, 0);
+ EXPECT_EQ(shutdown_count0, 0);
+ EXPECT_EQ(shutdown_count1, 0);
+
+ factory.RunFor(chrono::nanoseconds(1));
+ EXPECT_EQ(startup_count0, 1);
+ EXPECT_EQ(startup_count1, 1);
+ EXPECT_EQ(shutdown_count0, 0);
+ EXPECT_EQ(shutdown_count1, 0);
+
+ factory.RunFor(chrono::seconds(15));
+
+ EXPECT_EQ(startup_count0, 2);
+ EXPECT_EQ(startup_count1, 2);
+ EXPECT_EQ(shutdown_count0, 1);
+ EXPECT_EQ(shutdown_count1, 0);
+
+ pi1->OnShutdown([&]() { ++shutdown_count1; });
+ factory.RunFor(chrono::seconds(10));
+
+ EXPECT_EQ(startup_count0, 3);
+ EXPECT_EQ(startup_count1, 3);
+ EXPECT_EQ(shutdown_count0, 2);
+ EXPECT_EQ(shutdown_count1, 1);
+}
+
+// Tests that event loops which outlive shutdown crash.
+TEST(SimulatedEventLoopDeathTest, EventLoopOutlivesReboot) {
+ aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+ aos::configuration::ReadConfig(
+ ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
+
+ message_bridge::TestingTimeConverter time(
+ configuration::NodesCount(&config.message()));
+ SimulatedEventLoopFactory factory(&config.message());
+ factory.SetTimeConverter(&time);
+ time.StartEqual();
+
+ const chrono::nanoseconds dt = chrono::seconds(10);
+ time.RebootAt(0, distributed_clock::epoch() + dt);
+
+ NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
+
+ std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
+
+ EXPECT_DEATH({ factory.RunFor(dt * 2); }, "Event loop");
+}
+
+// Tests that messages don't survive a reboot of a node.
+TEST(SimulatedEventLoopTest, ChannelClearedOnReboot) {
+ aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+ aos::configuration::ReadConfig(
+ ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
+
+ message_bridge::TestingTimeConverter time(
+ configuration::NodesCount(&config.message()));
+ SimulatedEventLoopFactory factory(&config.message());
+ factory.SetTimeConverter(&time);
+ time.StartEqual();
+
+ const chrono::nanoseconds dt = chrono::seconds(10);
+ time.RebootAt(0, distributed_clock::epoch() + dt);
+
+ NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
+
+ const UUID boot_uuid = pi1->boot_uuid();
+ EXPECT_NE(boot_uuid, UUID::Zero());
+
+ {
+ ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
+ aos::Sender<examples::Ping> test_message_sender =
+ ping_event_loop->MakeSender<examples::Ping>("/reliable");
+ SendPing(&test_message_sender, 1);
+ }
+
+ factory.RunFor(chrono::seconds(5));
+
+ {
+ ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
+ aos::Fetcher<examples::Ping> fetcher =
+ ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
+ EXPECT_TRUE(fetcher.Fetch());
+ }
+
+ factory.RunFor(chrono::seconds(10));
+
+ {
+ ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
+ aos::Fetcher<examples::Ping> fetcher =
+ ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
+ EXPECT_FALSE(fetcher.Fetch());
+ }
+ EXPECT_NE(boot_uuid, pi1->boot_uuid());
+}
+
+// Tests that reliable messages get resent on reboot.
+TEST(SimulatedEventLoopTest, ReliableMessageResentOnReboot) {
+ aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+ aos::configuration::ReadConfig(
+ ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
+
+ message_bridge::TestingTimeConverter time(
+ configuration::NodesCount(&config.message()));
+ SimulatedEventLoopFactory factory(&config.message());
+ factory.SetTimeConverter(&time);
+ time.StartEqual();
+
+ const chrono::nanoseconds dt = chrono::seconds(1);
+ time.RebootAt(1, distributed_clock::epoch() + dt);
+
+ NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
+ NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
+
+ const UUID pi1_boot_uuid = pi1->boot_uuid();
+ const UUID pi2_boot_uuid = pi2->boot_uuid();
+ EXPECT_NE(pi1_boot_uuid, UUID::Zero());
+ EXPECT_NE(pi2_boot_uuid, UUID::Zero());
+
+ {
+ ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
+ aos::Sender<examples::Ping> test_message_sender =
+ ping_event_loop->MakeSender<examples::Ping>("/reliable");
+ SendPing(&test_message_sender, 1);
+ }
+
+ factory.RunFor(chrono::milliseconds(500));
+
+ {
+ ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
+ aos::Fetcher<examples::Ping> fetcher =
+ ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
+ EXPECT_TRUE(fetcher.Fetch());
+ }
+
+ factory.RunFor(chrono::seconds(1));
+
+ {
+ ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
+ aos::Fetcher<examples::Ping> fetcher =
+ ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
+ EXPECT_TRUE(fetcher.Fetch());
+ }
+ EXPECT_NE(pi2_boot_uuid, pi2->boot_uuid());
+}
+
} // namespace testing
} // namespace aos
diff --git a/aos/events/simulated_network_bridge.cc b/aos/events/simulated_network_bridge.cc
index 11e8c56..f4a4188 100644
--- a/aos/events/simulated_network_bridge.cc
+++ b/aos/events/simulated_network_bridge.cc
@@ -16,55 +16,192 @@
// fetcher to manage the queue of data, and a timer to schedule the sends.
class RawMessageDelayer {
public:
- RawMessageDelayer(aos::NodeEventLoopFactory *fetch_node_factory,
+ RawMessageDelayer(const Channel *channel, const Connection *connection,
+ aos::NodeEventLoopFactory *fetch_node_factory,
aos::NodeEventLoopFactory *send_node_factory,
- aos::EventLoop *fetch_event_loop,
- aos::EventLoop *send_event_loop,
- std::unique_ptr<aos::RawFetcher> fetcher,
- std::unique_ptr<aos::RawSender> sender,
- MessageBridgeServerStatus *server_status,
- size_t destination_node_index,
- ServerConnection *server_connection, int client_index,
- MessageBridgeClientStatus *client_status,
- size_t channel_index,
- aos::Sender<RemoteMessage> *timestamp_logger)
- : fetch_node_factory_(fetch_node_factory),
+ size_t destination_node_index, bool delivery_time_is_logged)
+ : channel_(channel),
+ connection_(connection),
+ fetch_node_factory_(fetch_node_factory),
send_node_factory_(send_node_factory),
- fetch_event_loop_(fetch_event_loop),
- send_event_loop_(send_event_loop),
- fetcher_(std::move(fetcher)),
- sender_(std::move(sender)),
- server_status_(server_status),
destination_node_index_(destination_node_index),
- server_connection_(server_connection),
- client_status_(client_status),
- client_index_(client_index),
- client_connection_(client_status_->GetClientConnection(client_index)),
- channel_index_(channel_index),
- timestamp_logger_(timestamp_logger) {
- timer_ = send_event_loop_->AddTimer([this]() { Send(); });
- std::string timer_name =
- absl::StrCat(send_event_loop_->node()->name()->string_view(), " ",
- fetcher_->channel()->name()->string_view(), " ",
- fetcher_->channel()->type()->string_view());
- timer_->set_name(timer_name);
- timestamp_timer_ =
- fetch_event_loop_->AddTimer([this]() { SendTimestamp(); });
- timestamp_timer_->set_name(absl::StrCat(timer_name, " timestamps"));
+ channel_index_(configuration::ChannelIndex(
+ fetch_node_factory_->configuration(), channel_)),
+ delivery_time_is_logged_(delivery_time_is_logged) {}
- Schedule();
+ bool forwarding_disabled() const { return forwarding_disabled_; }
+ void set_forwarding_disabled(bool forwarding_disabled) {
+ forwarding_disabled_ = forwarding_disabled;
}
- const Channel *channel() const { return fetcher_->channel(); }
+ void SetFetchEventLoop(aos::EventLoop *fetch_event_loop,
+ MessageBridgeServerStatus *server_status,
+ ChannelTimestampSender *timestamp_loggers) {
+ sent_ = false;
+ fetch_event_loop_ = fetch_event_loop;
+ if (fetch_event_loop_) {
+ fetcher_ = fetch_event_loop_->MakeRawFetcher(channel_);
+ } else {
+ fetcher_ = nullptr;
+ }
+
+ server_status_ = server_status;
+ if (server_status) {
+ server_connection_ =
+ server_status_->FindServerConnection(send_node_factory_->node());
+ }
+ if (delivery_time_is_logged_ && timestamp_loggers != nullptr) {
+ timestamp_logger_ =
+ timestamp_loggers->SenderForChannel(channel_, connection_);
+ } else {
+ timestamp_logger_ = nullptr;
+ }
+
+ if (fetch_event_loop_) {
+ timestamp_timer_ =
+ fetch_event_loop_->AddTimer([this]() { SendTimestamp(); });
+ if (send_event_loop_) {
+ std::string timer_name = absl::StrCat(
+ send_event_loop_->node()->name()->string_view(), " ",
+ fetcher_->channel()->name()->string_view(), " ",
+ fetcher_->channel()->type()->string_view());
+ if (timer_) {
+ timer_->set_name(timer_name);
+ }
+ timestamp_timer_->set_name(absl::StrCat(timer_name, " timestamps"));
+ }
+ } else {
+ timestamp_timer_ = nullptr;
+ }
+ }
+
+ void SetSendEventLoop(aos::EventLoop *send_event_loop,
+ MessageBridgeClientStatus *client_status) {
+ sent_ = false;
+ send_event_loop_ = send_event_loop;
+ if (send_event_loop_) {
+ sender_ = send_event_loop_->MakeRawSender(channel_);
+ } else {
+ sender_ = nullptr;
+ }
+
+ client_status_ = client_status;
+ if (client_status_) {
+ client_index_ = client_status_->FindClientIndex(
+ channel_->source_node()->string_view());
+ client_connection_ = client_status_->GetClientConnection(client_index_);
+ } else {
+ client_index_ = -1;
+ client_connection_ = nullptr;
+ }
+
+ if (send_event_loop_) {
+ timer_ = send_event_loop_->AddTimer([this]() { Send(); });
+ if (fetcher_) {
+ std::string timer_name =
+ absl::StrCat(send_event_loop_->node()->name()->string_view(), " ",
+ fetcher_->channel()->name()->string_view(), " ",
+ fetcher_->channel()->type()->string_view());
+ timer_->set_name(timer_name);
+ if (timestamp_timer_) {
+ timestamp_timer_->set_name(absl::StrCat(timer_name, " timestamps"));
+ }
+ }
+ } else {
+ timer_ = nullptr;
+ }
+ }
+
+ const Channel *channel() const { return channel_; }
uint32_t time_to_live() {
- return configuration::ConnectionToNode(sender_->channel(),
- send_node_factory_->node())
+ return configuration::ConnectionToNode(channel_, send_node_factory_->node())
->time_to_live();
}
+ void ScheduleReliable() {
+ if (forwarding_disabled()) return;
+
+ if (!fetcher_) {
+ return;
+ }
+ if (fetcher_->context().data == nullptr || sent_) {
+ sent_ = !fetcher_->Fetch();
+ }
+
+ FetchNext();
+ if (fetcher_->context().data == nullptr || sent_) {
+ return;
+ }
+ CHECK(!timer_scheduled_);
+
+ // Send at startup. It is the best we can do.
+ const monotonic_clock::time_point monotonic_delivered_time =
+ send_node_factory_->monotonic_now() +
+ send_node_factory_->network_delay();
+
+ CHECK_GE(monotonic_delivered_time, send_node_factory_->monotonic_now())
+ << ": Trying to deliver message in the past on channel "
+ << configuration::StrippedChannelToString(fetcher_->channel())
+ << " to node " << send_event_loop_->node()->name()->string_view()
+ << " sent from " << fetcher_->channel()->source_node()->string_view()
+ << " at " << fetch_node_factory_->monotonic_now();
+
+ if (timer_) {
+ server_connection_->mutate_sent_packets(
+ server_connection_->sent_packets() + 1);
+ timer_->Setup(monotonic_delivered_time);
+ timer_scheduled_ = true;
+ } else {
+ server_connection_->mutate_dropped_packets(
+ server_connection_->dropped_packets() + 1);
+ sent_ = true;
+ }
+ }
+
+ bool timer_scheduled_ = false;
+
// Kicks us to re-fetch and schedule the timer.
void Schedule() {
+ CHECK(!forwarding_disabled());
+ if (!fetcher_) {
+ return;
+ }
+ if (timer_scheduled_) {
+ return;
+ }
+ FetchNext();
+ if (fetcher_->context().data == nullptr || sent_) {
+ return;
+ }
+
+ // Compute the time to publish this message.
+ const monotonic_clock::time_point monotonic_delivered_time =
+ DeliveredTime(fetcher_->context());
+
+ CHECK_GE(monotonic_delivered_time, send_node_factory_->monotonic_now())
+ << ": Trying to deliver message in the past on channel "
+ << configuration::StrippedChannelToString(fetcher_->channel())
+ << " to node " << send_event_loop_->node()->name()->string_view()
+ << " sent from " << fetcher_->channel()->source_node()->string_view()
+ << " at " << fetch_node_factory_->monotonic_now();
+
+ if (timer_) {
+ server_connection_->mutate_sent_packets(
+ server_connection_->sent_packets() + 1);
+ timer_->Setup(monotonic_delivered_time);
+ timer_scheduled_ = true;
+ } else {
+ server_connection_->mutate_dropped_packets(
+ server_connection_->dropped_packets() + 1);
+ sent_ = true;
+ Schedule();
+ }
+ }
+
+ private:
+ void FetchNext() {
+ CHECK(server_connection_);
// Keep pulling messages out of the fetcher until we find one in the future.
while (true) {
if (fetcher_->context().data == nullptr || sent_) {
@@ -82,9 +219,10 @@
}
if (fetcher_->context().monotonic_event_time +
- send_node_factory_->network_delay() +
- send_node_factory_->send_delay() >
- fetch_node_factory_->monotonic_now()) {
+ send_node_factory_->network_delay() +
+ send_node_factory_->send_delay() >
+ fetch_node_factory_->monotonic_now() ||
+ time_to_live() == 0) {
break;
}
@@ -99,34 +237,13 @@
server_connection_->mutate_dropped_packets(
server_connection_->dropped_packets() + 1);
}
-
- if (fetcher_->context().data == nullptr) {
- return;
- }
-
- if (sent_) {
- return;
- }
-
- // Compute the time to publish this message.
- const monotonic_clock::time_point monotonic_delivered_time =
- DeliveredTime(fetcher_->context());
-
- CHECK_GE(monotonic_delivered_time, send_node_factory_->monotonic_now())
- << ": Trying to deliver message in the past on channel "
- << configuration::StrippedChannelToString(fetcher_->channel())
- << " to node " << send_event_loop_->node()->name()->string_view()
- << " sent from " << fetcher_->channel()->source_node()->string_view()
- << " at " << fetch_node_factory_->monotonic_now();
-
- server_connection_->mutate_sent_packets(server_connection_->sent_packets() +
- 1);
- timer_->Setup(monotonic_delivered_time);
}
- private:
- // Acutally sends the message, and reschedules.
+ // Actually sends the message, and reschedules.
void Send() {
+ timer_scheduled_ = false;
+ CHECK(sender_);
+ CHECK(client_status_);
if (server_connection_->state() != State::CONNECTED) {
sent_ = true;
Schedule();
@@ -238,23 +355,28 @@
const distributed_clock::time_point distributed_sent_time =
fetch_node_factory_->ToDistributedClock(context.monotonic_event_time);
- return send_node_factory_->FromDistributedClock(
+ const logger::BootTimestamp t = send_node_factory_->FromDistributedClock(
distributed_sent_time + send_node_factory_->network_delay() +
send_node_factory_->send_delay());
+ CHECK_EQ(t.boot, send_node_factory_->boot_count());
+ return t.time;
}
+ const Channel *channel_;
+ const Connection *connection_;
+
// Factories used for time conversion.
aos::NodeEventLoopFactory *fetch_node_factory_;
aos::NodeEventLoopFactory *send_node_factory_;
// Event loop which fetching and sending timestamps are scheduled on.
- aos::EventLoop *fetch_event_loop_;
+ aos::EventLoop *fetch_event_loop_ = nullptr;
// Event loop which sending is scheduled on.
- aos::EventLoop *send_event_loop_;
+ aos::EventLoop *send_event_loop_ = nullptr;
// Timer used to send.
- aos::TimerHandler *timer_;
+ aos::TimerHandler *timer_ = nullptr;
// Timer used to send timestamps out.
- aos::TimerHandler *timestamp_timer_;
+ aos::TimerHandler *timestamp_timer_ = nullptr;
// Time that the timer is scheduled for. Used to track if it needs to be
// rescheduled.
monotonic_clock::time_point scheduled_time_ = monotonic_clock::min_time;
@@ -264,14 +386,14 @@
// Sender to send them back out.
std::unique_ptr<aos::RawSender> sender_;
- MessageBridgeServerStatus *server_status_;
+ MessageBridgeServerStatus *server_status_ = nullptr;
const size_t destination_node_index_;
// True if we have sent the message in the fetcher.
bool sent_ = false;
ServerConnection *server_connection_ = nullptr;
MessageBridgeClientStatus *client_status_ = nullptr;
- int client_index_;
+ int client_index_ = -1;
ClientConnection *client_connection_ = nullptr;
size_t channel_index_;
@@ -287,6 +409,10 @@
};
std::deque<Timestamp> remote_timestamps_;
+
+ bool delivery_time_is_logged_;
+
+ bool forwarding_disabled_ = false;
};
SimulatedMessageBridge::SimulatedMessageBridge(
@@ -296,56 +422,43 @@
// Pre-build up event loops for every node. They are pretty cheap anyways.
for (const Node *node : simulated_event_loop_factory->nodes()) {
- auto it = event_loop_map_.emplace(std::make_pair(
- node,
- simulated_event_loop_factory->MakeEventLoop("message_bridge", node)));
-
+ NodeEventLoopFactory *node_factory =
+ simulated_event_loop_factory->GetNodeEventLoopFactory(node);
+ auto it = event_loop_map_.emplace(node, node_factory);
CHECK(it.second);
- it.first->second.event_loop->SkipTimingReport();
- it.first->second.event_loop->SkipAosLog();
+ node_factory->OnStartup(
+ [this, simulated_event_loop_factory, node_state = &it.first->second]() {
+ node_state->MakeEventLoop();
+ const size_t my_node_index = configuration::GetNodeIndex(
+ simulated_event_loop_factory->configuration(),
+ node_state->event_loop->node());
- for (ServerConnection *connection :
- it.first->second.server_status.server_connection()) {
- if (connection == nullptr) continue;
+ size_t node_index = 0;
+ for (ServerConnection *connection :
+ node_state->server_status->server_connection()) {
+ if (connection != nullptr) {
+ node_state->server_status->ResetFilter(node_index);
+ }
+ ++node_index;
+ }
- connection->mutate_state(message_bridge::State::CONNECTED);
- }
+ for (const ClientConnection *client_connections :
+ *node_state->client_status->mutable_client_statistics()
+ ->connections()) {
+ const Node *client_node = configuration::GetNode(
+ simulated_event_loop_factory->configuration(),
+ client_connections->node()->name()->string_view());
- for (size_t i = 0;
- i < it.first->second.client_status.mutable_client_statistics()
- ->mutable_connections()
- ->size();
- ++i) {
- ClientConnection *connection =
- it.first->second.client_status.mutable_client_statistics()
- ->mutable_connections()
- ->GetMutableObject(i);
- if (connection == nullptr) continue;
+ auto client_event_loop = event_loop_map_.find(client_node);
+ client_event_loop->second.SetBootUUID(
+ my_node_index, node_state->event_loop->boot_uuid());
+ }
+ });
- connection->mutate_state(message_bridge::State::CONNECTED);
- }
- }
-
- for (const Node *node : simulated_event_loop_factory->nodes()) {
- auto it = event_loop_map_.find(node);
-
- CHECK(it != event_loop_map_.end());
-
- size_t node_index = 0;
- for (ServerConnection *connection :
- it->second.server_status.server_connection()) {
- if (connection != nullptr) {
- const Node *client_node =
- simulated_event_loop_factory->configuration()->nodes()->Get(
- node_index);
- auto client_event_loop = event_loop_map_.find(client_node);
- it->second.server_status.ResetFilter(node_index);
- it->second.server_status.SetBootUUID(
- node_index, client_event_loop->second.event_loop->boot_uuid());
- }
- ++node_index;
- }
+ node_factory->OnShutdown([node_state = &it.first->second]() {
+ node_state->SetEventLoop(nullptr);
+ });
}
for (const Channel *channel :
@@ -355,10 +468,10 @@
}
// Find the sending node.
- const Node *node =
+ const Node *source_node =
configuration::GetNode(simulated_event_loop_factory->configuration(),
channel->source_node()->string_view());
- auto source_event_loop = event_loop_map_.find(node);
+ auto source_event_loop = event_loop_map_.find(source_node);
CHECK(source_event_loop != event_loop_map_.end());
std::unique_ptr<DelayersVector> delayers =
@@ -372,72 +485,39 @@
auto destination_event_loop = event_loop_map_.find(destination_node);
CHECK(destination_event_loop != event_loop_map_.end());
- ServerConnection *server_connection =
- source_event_loop->second.server_status.FindServerConnection(
- connection->name()->string_view());
-
- int client_index =
- destination_event_loop->second.client_status.FindClientIndex(
- channel->source_node()->string_view());
-
const size_t destination_node_index = configuration::GetNodeIndex(
simulated_event_loop_factory->configuration(), destination_node);
const bool delivery_time_is_logged =
- configuration::ConnectionDeliveryTimeIsLoggedOnNode(
- connection, source_event_loop->second.event_loop->node());
+ configuration::ConnectionDeliveryTimeIsLoggedOnNode(connection,
+ source_node);
- delayers->emplace_back(std::make_unique<RawMessageDelayer>(
- simulated_event_loop_factory->GetNodeEventLoopFactory(node),
+ delayers->v.emplace_back(std::make_unique<RawMessageDelayer>(
+ channel, connection,
+ simulated_event_loop_factory->GetNodeEventLoopFactory(source_node),
simulated_event_loop_factory->GetNodeEventLoopFactory(
destination_node),
- source_event_loop->second.event_loop.get(),
- destination_event_loop->second.event_loop.get(),
- source_event_loop->second.event_loop->MakeRawFetcher(channel),
- destination_event_loop->second.event_loop->MakeRawSender(channel),
- &source_event_loop->second.server_status, destination_node_index,
- server_connection, client_index,
- &destination_event_loop->second.client_status,
- configuration::ChannelIndex(
- source_event_loop->second.event_loop->configuration(), channel),
- delivery_time_is_logged
- ? source_event_loop->second.timestamp_loggers.SenderForChannel(
- channel, connection)
- : nullptr));
+ destination_node_index, delivery_time_is_logged));
+
+ source_event_loop->second.AddSourceDelayer(delayers->v.back().get());
+ destination_event_loop->second.AddDestinationDelayer(
+ delayers->v.back().get());
}
const Channel *const timestamp_channel = configuration::GetChannel(
simulated_event_loop_factory->configuration(), "/aos",
- Timestamp::GetFullyQualifiedName(),
- source_event_loop->second.event_loop->name(), node);
+ Timestamp::GetFullyQualifiedName(), "message_bridge", source_node);
if (channel == timestamp_channel) {
- source_event_loop->second.server_status.set_send_data(
+ source_event_loop->second.SetSendData(
[captured_delayers = delayers.get()](const Context &) {
for (std::unique_ptr<RawMessageDelayer> &delayer :
- *captured_delayers) {
+ captured_delayers->v) {
delayer->Schedule();
}
});
} else {
- // And register every delayer to be poked when a new message shows up.
-
- source_event_loop->second.event_loop->OnRun([captured_delayers =
- delayers.get()]() {
- // Poke all the reliable delayers so they send any queued messages.
- for (std::unique_ptr<RawMessageDelayer> &delayer : *captured_delayers) {
- if (delayer->time_to_live() == 0) {
- delayer->Schedule();
- }
- }
- });
- source_event_loop->second.event_loop->MakeRawNoArgWatcher(
- channel, [captured_delayers = delayers.get()](const Context &) {
- for (std::unique_ptr<RawMessageDelayer> &delayer :
- *captured_delayers) {
- delayer->Schedule();
- }
- });
+ source_event_loop->second.AddDelayerWatcher(channel, delayers.get());
}
delayers_list_.emplace_back(std::move(delayers));
}
@@ -446,17 +526,13 @@
SimulatedMessageBridge::~SimulatedMessageBridge() {}
void SimulatedMessageBridge::DisableForwarding(const Channel *channel) {
- for (std::unique_ptr<std::vector<std::unique_ptr<RawMessageDelayer>>>
- &delayers : delayers_list_) {
- if (delayers->size() > 0) {
- if ((*delayers)[0]->channel() == channel) {
- for (std::unique_ptr<RawMessageDelayer> &delayer : *delayers) {
- CHECK(delayer->channel() == channel);
+ for (std::unique_ptr<DelayersVector> &delayers : delayers_list_) {
+ if (delayers->v.size() > 0) {
+ if (delayers->v[0]->channel() == channel) {
+ delayers->disable_forwarding = true;
+ for (std::unique_ptr<RawMessageDelayer> &delayer : delayers->v) {
+ delayer->set_forwarding_disabled(true);
}
-
- // If we clear the delayers list, nothing will be scheduled. Which is a
- // success!
- delayers->clear();
}
}
}
@@ -476,45 +552,115 @@
message_bridge::State state) {
auto source_state = event_loop_map_.find(source);
CHECK(source_state != event_loop_map_.end());
-
- ServerConnection *server_connection =
- source_state->second.server_status.FindServerConnection(destination);
- if (!server_connection) {
- return;
- }
- server_connection->mutate_state(state);
+ source_state->second.SetServerState(destination, state);
auto destination_state = event_loop_map_.find(destination);
CHECK(destination_state != event_loop_map_.end());
- ClientConnection *client_connection =
- destination_state->second.client_status.GetClientConnection(source);
- if (!client_connection) {
- return;
- }
- client_connection->mutate_state(state);
+ destination_state->second.SetClientState(source, state);
}
void SimulatedMessageBridge::DisableStatistics() {
for (std::pair<const Node *const, State> &state : event_loop_map_) {
- state.second.server_status.DisableStatistics();
- state.second.client_status.DisableStatistics();
+ state.second.DisableStatistics();
}
}
void SimulatedMessageBridge::SkipTimingReport() {
+ // TODO(austin): I think this can be deleted...
for (std::pair<const Node *const, State> &state : event_loop_map_) {
state.second.event_loop->SkipTimingReport();
}
}
-SimulatedMessageBridge::State::State(
- std::unique_ptr<aos::EventLoop> &&new_event_loop)
- : event_loop(std::move(new_event_loop)),
- timestamp_loggers(event_loop.get()),
- server_status(event_loop.get()),
- client_status(event_loop.get()) {
+void SimulatedMessageBridge::State::SetEventLoop(
+ std::unique_ptr<aos::EventLoop> loop) {
+ if (!loop) {
+ timestamp_loggers = ChannelTimestampSender(nullptr);
+ server_status.reset();
+ client_status.reset();
+ for (RawMessageDelayer *source_delayer : source_delayers_) {
+ source_delayer->SetFetchEventLoop(nullptr, nullptr, nullptr);
+ }
+ for (RawMessageDelayer *destination_delayer : destination_delayers_) {
+ destination_delayer->SetSendEventLoop(nullptr, nullptr);
+ }
+ event_loop = std::move(loop);
+ return;
+ } else {
+ CHECK(!event_loop);
+ }
+ event_loop = std::move(loop);
- // Find all nodes which log timestamps back to us (from us).
+ event_loop->SkipTimingReport();
+ event_loop->SkipAosLog();
+
+ for (std::pair<const Channel *, DelayersVector *> &watcher :
+ delayer_watchers_) {
+ // Don't register watchers if we know we aren't forwarding.
+ if (watcher.second->disable_forwarding) continue;
+ event_loop->MakeRawNoArgWatcher(
+ watcher.first, [captured_delayers = watcher.second](const Context &) {
+ // We might get told after registering, so don't forward at that point
+ // too.
+ for (std::unique_ptr<RawMessageDelayer> &delayer :
+ captured_delayers->v) {
+ delayer->Schedule();
+ }
+ });
+ }
+
+ timestamp_loggers = ChannelTimestampSender(event_loop.get());
+ server_status = std::make_unique<MessageBridgeServerStatus>(event_loop.get());
+
+ {
+ size_t node_index = 0;
+ for (ServerConnection *connection : server_status->server_connection()) {
+ if (connection) {
+ if (boot_uuids_[node_index] != UUID::Zero()) {
+ connection->mutate_state(server_state_[node_index]);
+ } else {
+ connection->mutate_state(message_bridge::State::DISCONNECTED);
+ }
+ }
+ ++node_index;
+ }
+ }
+
+ for (size_t i = 0; i < boot_uuids_.size(); ++i) {
+ if (boot_uuids_[i] != UUID::Zero()) {
+ server_status->SetBootUUID(i, boot_uuids_[i]);
+ }
+ }
+ if (disable_statistics_) {
+ server_status->DisableStatistics();
+ }
+ if (fn_) {
+ server_status->set_send_data(fn_);
+ }
+ client_status = std::make_unique<MessageBridgeClientStatus>(event_loop.get());
+ if (disable_statistics_) {
+ client_status->DisableStatistics();
+ }
+
+ for (size_t i = 0;
+ i < client_status->mutable_client_statistics()->connections()->size();
+ ++i) {
+ ClientConnection *client_connection =
+ client_status->mutable_client_statistics()
+ ->mutable_connections()
+ ->GetMutableObject(i);
+ const Node *client_node = configuration::GetNode(
+ node_factory_->configuration(),
+ client_connection->node()->name()->string_view());
+ const size_t client_node_index = configuration::GetNodeIndex(
+ node_factory_->configuration(), client_node);
+ if (boot_uuids_[client_node_index] != UUID::Zero()) {
+ client_connection->mutate_state(client_state_[client_node_index]);
+ } else {
+ client_connection->mutate_state(message_bridge::State::DISCONNECTED);
+ }
+ }
+
for (const Channel *channel : *event_loop->configuration()->channels()) {
CHECK(channel->has_source_node());
@@ -535,6 +681,22 @@
}
}
}
+
+ for (RawMessageDelayer *source_delayer : source_delayers_) {
+ source_delayer->SetFetchEventLoop(event_loop.get(), server_status.get(),
+ ×tamp_loggers);
+ }
+ for (RawMessageDelayer *destination_delayer : destination_delayers_) {
+ destination_delayer->SetSendEventLoop(event_loop.get(),
+ client_status.get());
+ }
+ event_loop->OnRun([this]() {
+ for (RawMessageDelayer *destination_delayer : destination_delayers_) {
+ if (destination_delayer->time_to_live() == 0) {
+ destination_delayer->ScheduleReliable();
+ }
+ }
+ });
}
} // namespace message_bridge
diff --git a/aos/events/simulated_network_bridge.h b/aos/events/simulated_network_bridge.h
index c1ff698..d245fb3 100644
--- a/aos/events/simulated_network_bridge.h
+++ b/aos/events/simulated_network_bridge.h
@@ -42,22 +42,135 @@
void SkipTimingReport();
private:
- struct State {
- State(std::unique_ptr<aos::EventLoop> &&new_event_loop);
+ struct DelayersVector {
+ std::vector<std::unique_ptr<RawMessageDelayer>> v;
+ bool disable_forwarding = false;
+ };
+ struct State {
+ State(NodeEventLoopFactory *node_factory) : node_factory_(node_factory) {
+ const size_t num_nodes = node_factory->configuration()->nodes()->size();
+ boot_uuids_.resize(num_nodes, UUID::Zero());
+ client_state_.resize(num_nodes, message_bridge::State::CONNECTED);
+ server_state_.resize(num_nodes, message_bridge::State::CONNECTED);
+ }
State(const State &state) = delete;
+ void DisableStatistics() {
+ disable_statistics_ = true;
+ if (server_status) {
+ server_status->DisableStatistics();
+ }
+ if (client_status) {
+ client_status->DisableStatistics();
+ }
+ }
+
+ void AddSourceDelayer(RawMessageDelayer *delayer) {
+ source_delayers_.emplace_back(delayer);
+ }
+ void AddDestinationDelayer(RawMessageDelayer *delayer) {
+ destination_delayers_.emplace_back(delayer);
+ }
+
+ void MakeEventLoop() {
+ SetEventLoop(node_factory_->MakeEventLoop("message_bridge"));
+ }
+
+ void ClearEventLoop() { SetEventLoop(nullptr); }
+ void SetEventLoop(std::unique_ptr<aos::EventLoop> loop);
+
+ void SetSendData(std::function<void(const Context &)> fn) {
+ CHECK(!fn_);
+ fn_ = std::move(fn);
+ if (server_status) {
+ server_status->set_send_data(fn_);
+ }
+ }
+
+ void AddDelayerWatcher(const Channel *channel, DelayersVector *v) {
+ delayer_watchers_.emplace_back(channel, v);
+ }
+
+ void SetBootUUID(size_t node_index, const UUID &boot_uuid) {
+ boot_uuids_[node_index] = boot_uuid;
+ const Node *node =
+ node_factory_->configuration()->nodes()->Get(node_index);
+ if (server_status) {
+ ServerConnection *connection =
+ server_status->FindServerConnection(node);
+ if (connection) {
+ connection->mutate_state(server_state_[node_index]);
+ server_status->ResetFilter(node_index);
+ server_status->SetBootUUID(node_index, boot_uuid);
+ }
+ }
+ if (client_status) {
+ const int client_index =
+ client_status->FindClientIndex(node->name()->string_view());
+ ClientConnection *client_connection =
+ client_status->GetClientConnection(client_index);
+ if (client_connection) {
+ client_status->SampleReset(client_index);
+ client_connection->mutate_state(client_state_[node_index]);
+ }
+ }
+ }
+
+ void SetServerState(const Node *destination, message_bridge::State state) {
+ const size_t node_index = configuration::GetNodeIndex(
+ node_factory_->configuration(), destination);
+ server_state_[node_index] = state;
+ if (server_status) {
+ ServerConnection *connection =
+ server_status->FindServerConnection(destination);
+ if (connection == nullptr) return;
+
+ connection->mutate_state(state);
+ }
+ }
+
+ void SetClientState(const Node *source, message_bridge::State state) {
+ const size_t node_index =
+ configuration::GetNodeIndex(node_factory_->configuration(), source);
+ client_state_[node_index] = state;
+ if (client_status) {
+ ClientConnection *connection =
+ client_status->GetClientConnection(source);
+
+ if (connection == nullptr) return;
+
+ connection->mutate_state(state);
+ }
+ }
+
+ std::vector<UUID> boot_uuids_;
+ std::vector<message_bridge::State> client_state_;
+ std::vector<message_bridge::State> server_state_;
+
+ std::vector<std::pair<const Channel *, DelayersVector *>> delayer_watchers_;
+
+ std::function<void(const Context &)> fn_;
+
+ NodeEventLoopFactory *node_factory_;
std::unique_ptr<aos::EventLoop> event_loop;
ChannelTimestampSender timestamp_loggers;
- MessageBridgeServerStatus server_status;
- MessageBridgeClientStatus client_status;
+ std::unique_ptr<MessageBridgeServerStatus> server_status;
+ std::unique_ptr<MessageBridgeClientStatus> client_status;
+
+ // List of delayers to update whenever this node starts or stops.
+ // Source delayers (which are the ones fetching).
+ std::vector<RawMessageDelayer *> source_delayers_;
+ // Destination delayers (which are the ones sending on the receiving nodes).
+ std::vector<RawMessageDelayer *> destination_delayers_;
+
+ bool disable_statistics_ = false;
};
// Map of nodes to event loops. This is a member variable so that the
// lifetime of the event loops matches the lifetime of the bridge.
std::map<const Node *, State> event_loop_map_;
// List of delayers used to resend the messages.
- using DelayersVector = std::vector<std::unique_ptr<RawMessageDelayer>>;
std::vector<std::unique_ptr<DelayersVector>> delayers_list_;
};
diff --git a/aos/network/BUILD b/aos/network/BUILD
index 48a71c6..0ed756c 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -520,6 +520,7 @@
":timestamp_filter",
"//aos:configuration",
"//aos/events:simulated_event_loop",
+ "//aos/events/logging:boot_timestamp",
"//aos/events/logging:logfile_utils",
"//aos/time",
"@org_tuxfamily_eigen//:eigen",
@@ -547,6 +548,7 @@
deps = [
":multinode_timestamp_filter",
"//aos/events:simulated_event_loop",
+ "//aos/events/logging:boot_timestamp",
"//aos/time",
],
)
diff --git a/aos/network/message_bridge_client_status.cc b/aos/network/message_bridge_client_status.cc
index 69f474a..5236f82 100644
--- a/aos/network/message_bridge_client_status.cc
+++ b/aos/network/message_bridge_client_status.cc
@@ -127,6 +127,7 @@
}
int MessageBridgeClientStatus::FindClientIndex(std::string_view node_name) {
+ CHECK(statistics_.message().has_connections());
for (size_t i = 0; i < statistics_.message().connections()->size(); ++i) {
const ClientConnection *client_connection =
statistics_.message().connections()->Get(i);
diff --git a/aos/network/multinode_timestamp_filter.cc b/aos/network/multinode_timestamp_filter.cc
index 3b5b997..65691cb 100644
--- a/aos/network/multinode_timestamp_filter.cc
+++ b/aos/network/multinode_timestamp_filter.cc
@@ -347,8 +347,26 @@
VLOG(1) << " " << t;
}
- // TODO(austin): Figure out how to communicate the reboot up to the factory.
CHECK_EQ(node_count_, std::get<1>(*next_time).size());
+
+ if (times_.empty()) {
+ for (BootTimestamp t : std::get<1>(*next_time)) {
+ CHECK_EQ(t.boot, 0u);
+ }
+ } else {
+ bool rebooted = false;
+ for (size_t i = 0; i < node_count_; ++i) {
+ if (std::get<1>(times_.back())[i].boot !=
+ std::get<1>(*next_time)[i].boot) {
+ rebooted = true;
+ break;
+ }
+ }
+ if (rebooted) {
+ CHECK(reboot_found_);
+ reboot_found_(std::get<0>(*next_time), std::get<1>(*next_time));
+ }
+ }
times_.emplace_back(std::move(*next_time));
return ×_.back();
}
@@ -413,7 +431,7 @@
}
distributed_clock::time_point InterpolatedTimeConverter::ToDistributedClock(
- size_t node_index, monotonic_clock::time_point time) {
+ size_t node_index, BootTimestamp time) {
CHECK_LT(node_index, node_count_);
// If there is only one node, time estimation makes no sense. Just return
// unity time.
@@ -425,19 +443,21 @@
QueueUntil(
[time, node_index](const std::tuple<distributed_clock::time_point,
std::vector<BootTimestamp>> &t) {
- return std::get<1>(t)[node_index].time < time;
+ return std::get<1>(t)[node_index] < time;
});
// Before the beginning needs to have 0 slope otherwise time jumps when
// timestamp 2 happens.
- if (times_.size() == 1u || time < std::get<1>(times_[0])[node_index].time) {
- if (time < std::get<1>(times_[0])[node_index].time) {
+ if (times_.size() == 1u || time < std::get<1>(times_[0])[node_index]) {
+ if (time < std::get<1>(times_[0])[node_index]) {
CHECK(!have_popped_)
<< ": Trying to interpolate time " << time
<< " but we have forgotten the relevant points already.";
}
+ CHECK_EQ(time.boot, std::get<1>(times_[0])[node_index].boot);
const distributed_clock::time_point result =
- time - std::get<1>(times_[0])[node_index].time + std::get<0>(times_[0]);
+ time.time - std::get<1>(times_[0])[node_index].time +
+ std::get<0>(times_[0]);
VLOG(2) << "ToDistributedClock(" << node_index << ", " << time << ") -> "
<< result;
return result;
@@ -448,7 +468,9 @@
size_t index = times_.size() - 2u;
while (index > 0u) {
// TODO(austin): Binary search.
- if (std::get<1>(times_[index])[node_index].time <= time) {
+ //LOG(INFO) << std::get<1>(times_[index])[node_index] << " <= " << time
+ //<< "?";
+ if (std::get<1>(times_[index])[node_index] <= time) {
break;
}
--index;
@@ -457,38 +479,54 @@
// Interpolate with the two of these.
const distributed_clock::time_point d0 = std::get<0>(times_[index]);
const distributed_clock::time_point d1 = std::get<0>(times_[index + 1]);
+ const BootTimestamp t0 = std::get<1>(times_[index])[node_index];
+ const BootTimestamp t1 = std::get<1>(times_[index + 1])[node_index];
- // TODO(austin): We should extrapolate if the boot changes.
- CHECK_EQ(std::get<1>(times_[index])[node_index].boot,
- std::get<1>(times_[index + 1])[node_index].boot);
- const monotonic_clock::time_point t0 =
- std::get<1>(times_[index])[node_index].time;
- const monotonic_clock::time_point t1 =
- std::get<1>(times_[index + 1])[node_index].time;
+ if (time > t1) {
+ const distributed_clock::time_point result = (time.time - t1.time) + d1;
+ VLOG(2) << "ToDistributedClock(" << node_index << ", " << time << ") -> "
+ << result;
+ return result;
+ }
+
+ if (t0.boot != t1.boot) {
+ if (t0.boot == time.boot) {
+ const distributed_clock::time_point result = (time.time - t0.time) + d0;
+ VLOG(2) << "ToDistributedClock(" << node_index << ", " << time << ") -> "
+ << result;
+ return result;
+ } else if (t1.boot == time.boot) {
+ const distributed_clock::time_point result = (time.time - t1.time) + d1;
+ VLOG(2) << "ToDistributedClock(" << node_index << ", " << time << ") -> "
+ << result;
+ return result;
+ } else {
+ LOG(FATAL) << t0 << " <= " << time << " <= " << t1;
+ }
+ }
const distributed_clock::time_point result =
- message_bridge::ToDistributedClock(d0, d1, t0, t1, time);
+ message_bridge::ToDistributedClock(d0, d1, t0.time, t1.time, time.time);
VLOG(2) << "ToDistributedClock(" << node_index << ", " << time << ") -> "
<< result;
return result;
}
-monotonic_clock::time_point InterpolatedTimeConverter::FromDistributedClock(
- size_t node_index, distributed_clock::time_point time) {
+BootTimestamp InterpolatedTimeConverter::FromDistributedClock(
+ size_t node_index, distributed_clock::time_point time, size_t boot_count) {
CHECK_LT(node_index, node_count_);
// If there is only one node, time estimation makes no sense. Just return
// unity time.
if (node_count_ == 1u) {
- return monotonic_clock::epoch() + time.time_since_epoch();
+ return BootTimestamp::epoch() + time.time_since_epoch();
}
// Make sure there are enough timestamps in the queue.
- QueueUntil(
- [time](const std::tuple<distributed_clock::time_point,
- std::vector<BootTimestamp>> &t) {
- return std::get<0>(t) < time;
- });
+ QueueUntil([time](const std::tuple<distributed_clock::time_point,
+ std::vector<BootTimestamp>> &t) {
+ return std::get<0>(t) < time;
+ });
if (times_.size() == 1u || time < std::get<0>(times_[0])) {
if (time < std::get<0>(times_[0])) {
@@ -498,16 +536,21 @@
}
monotonic_clock::time_point result =
time - std::get<0>(times_[0]) + std::get<1>(times_[0])[node_index].time;
- VLOG(2) << "FromDistributedClock(" << node_index << ", " << time << ") -> "
- << result;
- return result;
+ VLOG(2) << "FromDistributedClock(" << node_index << ", " << time << ", "
+ << boot_count << ") -> " << result;
+ return {.boot = std::get<1>(times_[0])[node_index].boot, .time = result};
}
// Now, find the corresponding timestamps. Search from the back since that's
// where most of the times we care about will be.
size_t index = times_.size() - 2u;
while (index > 0u) {
- if (std::get<0>(times_[index]) <= time) {
+ //LOG(INFO) << "Considering " << std::get<0>(times_[index + 1]) << " index "
+ //<< index << " vs " << time;
+ // If we are searching across a reboot, we want both the before and after
+ // time. We will be asked to solve for the after, so make sure when a time
+ // matches exactly, we pick the time before, not the time after.
+ if (std::get<0>(times_[index]) < time) {
break;
}
--index;
@@ -516,13 +559,39 @@
// Interpolate with the two of these.
const distributed_clock::time_point d0 = std::get<0>(times_[index]);
const distributed_clock::time_point d1 = std::get<0>(times_[index + 1]);
+ const BootTimestamp t0 = std::get<1>(times_[index])[node_index];
+ const BootTimestamp t1 = std::get<1>(times_[index + 1])[node_index];
- CHECK_EQ(std::get<1>(times_[index])[node_index].boot,
- std::get<1>(times_[index + 1])[node_index].boot);
- const monotonic_clock::time_point t0 =
- std::get<1>(times_[index])[node_index].time;
- const monotonic_clock::time_point t1 =
- std::get<1>(times_[index + 1])[node_index].time;
+ if (time == d1) {
+ if (boot_count == t1.boot) {
+ const BootTimestamp result = t1 + (time - d1);
+ VLOG(2) << "FromDistributedClock(" << node_index << ", " << time << ", "
+ << boot_count << ") -> " << result;
+ return result;
+ } else {
+ CHECK_EQ(boot_count, t0.boot);
+ const BootTimestamp result = t0 + (time - d0);
+ VLOG(2) << "FromDistributedClock(" << node_index << ", " << time << ", "
+ << boot_count << ") -> " << result;
+ return result;
+ }
+ }
+
+ //LOG(INFO) << "d0 " << d0 << " time " << time << " d1 " << d1 << " t0 " << t0
+ //<< " t1 " << t1;
+ if (time > d1) {
+ const BootTimestamp result = t1 + (time - d1);
+ VLOG(2) << "FromDistributedClock(" << node_index << ", " << time << ", "
+ << boot_count << ") -> " << result;
+ return result;
+ }
+
+ if (t0.boot != t1.boot) {
+ const BootTimestamp result = t0 + (time - d0);
+ VLOG(2) << "FromDistributedClock(" << node_index << ", " << time << ", "
+ << boot_count << ") -> " << result;
+ return result;
+ }
const chrono::nanoseconds dd = d1 - d0;
@@ -538,21 +607,23 @@
// and subtract when < 0 so we round correctly. Multiply before dividing so
// we don't round early, and use 128 bit arithmetic to guarantee that 64 bit
// multiplication fits.
- absl::int128 numerator =
- absl::int128((time - d0).count()) * absl::int128((t1 - t0).count());
+ absl::int128 numerator = absl::int128((time - d0).count()) *
+ absl::int128((t1.time - t0.time).count());
numerator += numerator > 0 ? absl::int128(dd.count() / 2)
: -absl::int128(dd.count() / 2);
const monotonic_clock::time_point result =
- t0 + std::chrono::nanoseconds(
- static_cast<int64_t>(numerator / absl::int128(dd.count())));
- VLOG(2) << "FromDistributedClock(" << node_index << ", " << time << ") -> "
- << result;
- return result;
+ t0.time + std::chrono::nanoseconds(
+ static_cast<int64_t>(numerator / absl::int128(dd.count())));
+ VLOG(2) << "FromDistributedClock(" << node_index << ", " << time << ", "
+ << boot_count << ") -> " << result;
+ return {.boot = t0.boot, .time = result};
}
+
MultiNodeNoncausalOffsetEstimator::MultiNodeNoncausalOffsetEstimator(
const Configuration *configuration,
- const Configuration *logged_configuration, bool skip_order_validation,
+ const Configuration *logged_configuration,
+ std::shared_ptr<const logger::Boots> boots, bool skip_order_validation,
chrono::nanoseconds time_estimation_buffer_seconds)
: InterpolatedTimeConverter(!configuration::MultiNode(logged_configuration)
? 1u
@@ -560,11 +631,26 @@
time_estimation_buffer_seconds),
configuration_(configuration),
logged_configuration_(logged_configuration),
+ boots_(boots),
skip_order_validation_(skip_order_validation) {
+ const bool multi_node = configuration::MultiNode(logged_configuration);
+ if (!boots_ && !multi_node) {
+ // This is a super old log. Fake out boots by making them up.
+ LOG(WARNING) << "Old single node log without boot UUIDs, generating a "
+ "random boot UUID.";
+ std::shared_ptr<logger::Boots> boots = std::make_shared<logger::Boots>();
+ const UUID random_boot_uuid = UUID::Random();
+ boots->boot_count_map.emplace(random_boot_uuid.ToString(), 0);
+ boots->boots =
+ std::vector<std::vector<std::string>>{{random_boot_uuid.ToString()}};
+ boots_ = boots;
+ }
+
+ CHECK(boots_) << ": Missing boots for " << NodesCount();
+ CHECK_EQ(boots_->boots.size(), NodesCount());
filters_per_node_.resize(NodesCount());
last_monotonics_.resize(NodesCount(), BootTimestamp::epoch());
- if (FLAGS_timestamps_to_csv &&
- configuration::MultiNode(logged_configuration)) {
+ if (FLAGS_timestamps_to_csv && multi_node) {
fp_ = fopen("/tmp/timestamp_noncausal_offsets.csv", "w");
fprintf(fp_, "# distributed");
for (const Node *node : configuration::GetNodes(logged_configuration)) {
@@ -637,12 +723,23 @@
if (!node_samples_.empty()) {
for (NodeSamples &node : node_samples_) {
for (SingleNodeSamples ×tamps : node.nodes) {
- CHECK (timestamps.messages.empty());
+ CHECK(timestamps.messages.empty());
}
}
}
}
+UUID MultiNodeNoncausalOffsetEstimator::boot_uuid(size_t node_index,
+ size_t boot_count) {
+ CHECK(boots_);
+ CHECK_LT(node_index, boots_->boots.size());
+ if (boot_count < boots_->boots[node_index].size()) {
+ return UUID::FromString(boots_->boots[node_index][boot_count]);
+ } else {
+ return UUID::Random();
+ }
+}
+
void MultiNodeNoncausalOffsetEstimator::Start(
SimulatedEventLoopFactory *factory) {
std::vector<monotonic_clock::time_point> times;
@@ -1290,7 +1387,12 @@
<< "ns";
}
VLOG(1) << "Ignoring because it is close enough.";
- next_node_filter->Consume();
+ std::optional<
+ std::tuple<logger::BootTimestamp, logger::BootDuration>>
+ result = next_node_filter->Consume();
+ CHECK(result);
+ next_node_filter->Pop(std::get<0>(*result) -
+ time_estimation_buffer_seconds_);
break;
}
// Somehow the new solution is better *and* worse than the old
@@ -1307,7 +1409,12 @@
}
if (skip_order_validation_) {
- next_node_filter->Consume();
+ std::optional<
+ std::tuple<logger::BootTimestamp, logger::BootDuration>>
+ result = next_node_filter->Consume();
+ CHECK(result);
+ next_node_filter->Pop(std::get<0>(*result) -
+ time_estimation_buffer_seconds_);
LOG(ERROR) << "Skipping because --skip_order_validation";
break;
} else {
@@ -1396,8 +1503,10 @@
}
}
sample = *next_filter->Consume();
+ next_filter->Pop(std::get<0>(sample) - time_estimation_buffer_seconds_);
} else {
sample = *next_filter->Consume();
+ next_filter->Pop(std::get<0>(sample) - time_estimation_buffer_seconds_);
// We found a good sample, so consume it. If it is a duplicate, we still
// want to consume it. But, if this is the first time around, we want to
// re-solve by recursing (once) to pickup the better base.
diff --git a/aos/network/multinode_timestamp_filter.h b/aos/network/multinode_timestamp_filter.h
index a52db7d..8d776b8 100644
--- a/aos/network/multinode_timestamp_filter.h
+++ b/aos/network/multinode_timestamp_filter.h
@@ -8,6 +8,7 @@
#include "Eigen/Dense"
#include "absl/container/btree_set.h"
#include "aos/configuration.h"
+#include "aos/events/logging/boot_timestamp.h"
#include "aos/events/logging/logfile_utils.h"
#include "aos/events/simulated_event_loop.h"
#include "aos/network/timestamp_filter.h"
@@ -166,16 +167,17 @@
// Converts a time to the distributed clock for scheduling and cross-node
// time measurement.
- // TODO(austin): Need to pass in boot.
distributed_clock::time_point ToDistributedClock(
- size_t node_index, monotonic_clock::time_point time) override;
+ size_t node_index, logger::BootTimestamp time) override;
// Takes the distributed time and converts it to the monotonic clock for this
// node.
- monotonic_clock::time_point FromDistributedClock(
- size_t node_index, distributed_clock::time_point time) override;
+ logger::BootTimestamp FromDistributedClock(size_t node_index,
+ distributed_clock::time_point time,
+ size_t boot_count) override;
// Called whenever time passes this point and we can forget about it.
+ // TODO(austin): Pop here instead of in log reader.
void ObserveTimePassed(distributed_clock::time_point time) override;
// Queues 1 more timestammp in the interpolation list. This is public for
@@ -283,7 +285,8 @@
public:
MultiNodeNoncausalOffsetEstimator(
const Configuration *configuration,
- const Configuration *logged_configuration, bool skip_order_validation,
+ const Configuration *logged_configuration,
+ std::shared_ptr<const logger::Boots> boots, bool skip_order_validation,
std::chrono::nanoseconds time_estimation_buffer_seconds);
~MultiNodeNoncausalOffsetEstimator() override;
@@ -297,6 +300,8 @@
std::vector<logger::BootTimestamp>>>
NextTimestamp() override;
+ UUID boot_uuid(size_t node_index, size_t boot_count) override;
+
// Checks that all the nodes in the graph are connected. Needs all filters to
// be constructed first.
void CheckGraph();
@@ -337,6 +342,8 @@
const Configuration *configuration_;
const Configuration *logged_configuration_;
+ std::shared_ptr<const logger::Boots> boots_;
+
// If true, skip any validation which would trigger if we see evidance that
// time estimation between nodes was incorrect.
const bool skip_order_validation_;
diff --git a/aos/network/multinode_timestamp_filter_test.cc b/aos/network/multinode_timestamp_filter_test.cc
index f67d8de..c1fb456 100644
--- a/aos/network/multinode_timestamp_filter_test.cc
+++ b/aos/network/multinode_timestamp_filter_test.cc
@@ -82,20 +82,19 @@
// results. 1 second should be 1 second everywhere.
TEST(InterpolatedTimeConverterTest, OneTime) {
const distributed_clock::time_point de = distributed_clock::epoch();
- const monotonic_clock::time_point me = monotonic_clock::epoch();
+ const BootTimestamp me = BootTimestamp::epoch();
TestingTimeConverter time_converter(3u);
time_converter.AddNextTimestamp(
de + chrono::seconds(0),
- {{.boot = 0, .time = me + chrono::seconds(1)},
- {.boot = 0, .time = me + chrono::seconds(10)},
- {.boot = 0, .time = me + chrono::seconds(1000)}});
+ {me + chrono::seconds(1), me + chrono::seconds(10),
+ me + chrono::seconds(1000)});
- EXPECT_EQ(time_converter.FromDistributedClock(0, de - chrono::seconds(1)),
+ EXPECT_EQ(time_converter.FromDistributedClock(0, de - chrono::seconds(1), 0),
me + chrono::seconds(0));
- EXPECT_EQ(time_converter.FromDistributedClock(1, de - chrono::seconds(1)),
+ EXPECT_EQ(time_converter.FromDistributedClock(1, de - chrono::seconds(1), 0),
me + chrono::seconds(9));
- EXPECT_EQ(time_converter.FromDistributedClock(2, de - chrono::seconds(1)),
+ EXPECT_EQ(time_converter.FromDistributedClock(2, de - chrono::seconds(1), 0),
me + chrono::seconds(999));
EXPECT_EQ(time_converter.ToDistributedClock(0, me + chrono::seconds(0)),
de - chrono::seconds(1));
@@ -104,11 +103,11 @@
EXPECT_EQ(time_converter.ToDistributedClock(2, me + chrono::seconds(999)),
de - chrono::seconds(1));
- EXPECT_EQ(time_converter.FromDistributedClock(0, de),
+ EXPECT_EQ(time_converter.FromDistributedClock(0, de, 0),
me + chrono::seconds(1));
- EXPECT_EQ(time_converter.FromDistributedClock(1, de),
+ EXPECT_EQ(time_converter.FromDistributedClock(1, de, 0),
me + chrono::seconds(10));
- EXPECT_EQ(time_converter.FromDistributedClock(2, de),
+ EXPECT_EQ(time_converter.FromDistributedClock(2, de, 0),
me + chrono::seconds(1000));
EXPECT_EQ(time_converter.ToDistributedClock(0, me + chrono::seconds(1)), de);
EXPECT_EQ(time_converter.ToDistributedClock(1, me + chrono::seconds(10)), de);
@@ -119,29 +118,27 @@
// Tests that actual interpolation works as expected for multiple timestamps.
TEST(InterpolatedTimeConverterTest, Interpolation) {
const distributed_clock::time_point de = distributed_clock::epoch();
- const monotonic_clock::time_point me = monotonic_clock::epoch();
+ const BootTimestamp me = BootTimestamp::epoch();
TestingTimeConverter time_converter(3u);
// Test that 2 timestamps interpolate correctly.
time_converter.AddNextTimestamp(
de + chrono::seconds(0),
- {{.boot = 0, .time = me + chrono::seconds(1)},
- {.boot = 0, .time = me + chrono::seconds(10)},
- {.boot = 0, .time = me + chrono::seconds(1000)}});
+ {me + chrono::seconds(1), me + chrono::seconds(10),
+ me + chrono::seconds(1000)});
time_converter.AddNextTimestamp(
de + chrono::seconds(1),
- {{.boot = 0, .time = me + chrono::seconds(2)},
- {.boot = 0, .time = me + chrono::seconds(11)},
- {.boot = 0, .time = me + chrono::seconds(1001)}});
+ {me + chrono::seconds(2), me + chrono::seconds(11),
+ me + chrono::seconds(1001)});
EXPECT_EQ(
- time_converter.FromDistributedClock(0, de + chrono::milliseconds(500)),
+ time_converter.FromDistributedClock(0, de + chrono::milliseconds(500), 0),
me + chrono::milliseconds(1500));
EXPECT_EQ(
- time_converter.FromDistributedClock(1, de + chrono::milliseconds(500)),
+ time_converter.FromDistributedClock(1, de + chrono::milliseconds(500), 0),
me + chrono::milliseconds(10500));
EXPECT_EQ(
- time_converter.FromDistributedClock(2, de + chrono::milliseconds(500)),
+ time_converter.FromDistributedClock(2, de + chrono::milliseconds(500), 0),
me + chrono::milliseconds(1000500));
EXPECT_EQ(
time_converter.ToDistributedClock(0, me + chrono::milliseconds(1500)),
@@ -156,26 +153,25 @@
// And that we can interpolate between points not at the start.
time_converter.AddNextTimestamp(
de + chrono::seconds(2),
- {{.boot = 0, .time = me + chrono::seconds(3) - chrono::milliseconds(2)},
- {.boot = 0, .time = me + chrono::seconds(12) - chrono::milliseconds(2)},
- {.boot = 0, .time = me + chrono::seconds(1002)}});
+ {me + chrono::seconds(3) - chrono::milliseconds(2),
+ me + chrono::seconds(12) - chrono::milliseconds(2),
+ me + chrono::seconds(1002)});
time_converter.AddNextTimestamp(
de + chrono::seconds(3),
- {{.boot = 0, .time = me + chrono::seconds(4) - chrono::milliseconds(4)},
- {.boot = 0, .time = me + chrono::seconds(13) - chrono::milliseconds(2)},
- {.boot = 0,
- .time = me + chrono::seconds(1003) - chrono::milliseconds(2)}});
+ {me + chrono::seconds(4) - chrono::milliseconds(4),
+ me + chrono::seconds(13) - chrono::milliseconds(2),
+ me + chrono::seconds(1003) - chrono::milliseconds(2)});
- EXPECT_EQ(
- time_converter.FromDistributedClock(0, de + chrono::milliseconds(2500)),
- me + chrono::milliseconds(3497));
- EXPECT_EQ(
- time_converter.FromDistributedClock(1, de + chrono::milliseconds(2500)),
- me + chrono::milliseconds(12498));
- EXPECT_EQ(
- time_converter.FromDistributedClock(2, de + chrono::milliseconds(2500)),
- me + chrono::milliseconds(1002499));
+ EXPECT_EQ(time_converter.FromDistributedClock(
+ 0, de + chrono::milliseconds(2500), 0),
+ me + chrono::milliseconds(3497));
+ EXPECT_EQ(time_converter.FromDistributedClock(
+ 1, de + chrono::milliseconds(2500), 0),
+ me + chrono::milliseconds(12498));
+ EXPECT_EQ(time_converter.FromDistributedClock(
+ 2, de + chrono::milliseconds(2500), 0),
+ me + chrono::milliseconds(1002499));
EXPECT_EQ(
time_converter.ToDistributedClock(0, me + chrono::milliseconds(3497)),
de + chrono::milliseconds(2500));
@@ -187,11 +183,87 @@
de + chrono::milliseconds(2500));
}
+// Tests that interpolation works across reboots.
+TEST(InterpolatedTimeConverterTest, RebootInterpolation) {
+ const distributed_clock::time_point de = distributed_clock::epoch();
+ const BootTimestamp me = BootTimestamp::epoch();
+ const BootTimestamp me2{.boot = 1u, .time = monotonic_clock::epoch()};
+
+ //LOG(FATAL) << "TODO(austin): Test ToDistributedClock too";
+
+ TestingTimeConverter time_converter(3u);
+ size_t reboot_counter = 0;
+ time_converter.set_reboot_found(
+ [&](distributed_clock::time_point,
+ const std::vector<logger::BootTimestamp> &) { ++reboot_counter; });
+ // Test that 2 timestamps interpolate correctly.
+ time_converter.AddNextTimestamp(
+ de + chrono::seconds(0),
+ {me + chrono::seconds(1), me + chrono::seconds(10),
+ me + chrono::seconds(1000)});
+ time_converter.AddNextTimestamp(
+ de + chrono::seconds(1),
+ {me + chrono::seconds(2), me + chrono::seconds(11),
+ me + chrono::seconds(1001)});
+ time_converter.AddNextTimestamp(
+ de + chrono::seconds(2),
+ {me + chrono::seconds(3), me + chrono::seconds(12),
+ me + chrono::seconds(1002)});
+
+ time_converter.AddNextTimestamp(
+ de + chrono::seconds(3),
+ {me2 + chrono::seconds(4), me + chrono::seconds(13),
+ me + chrono::seconds(1003)});
+
+ time_converter.AddNextTimestamp(
+ de + chrono::seconds(4),
+ {me2 + chrono::seconds(5), me + chrono::seconds(14),
+ me + chrono::seconds(1004)});
+
+ EXPECT_EQ(time_converter.FromDistributedClock(
+ 0, de + chrono::milliseconds(2400), 0),
+ me + chrono::milliseconds(3400));
+ EXPECT_EQ(time_converter.FromDistributedClock(
+ 1, de + chrono::milliseconds(2400), 0),
+ me + chrono::milliseconds(12400));
+ EXPECT_EQ(time_converter.FromDistributedClock(
+ 2, de + chrono::milliseconds(2400), 0),
+ me + chrono::milliseconds(1002400));
+
+ EXPECT_EQ(time_converter.FromDistributedClock(
+ 0, de + chrono::milliseconds(2900), 0),
+ me + chrono::milliseconds(3900));
+ EXPECT_EQ(time_converter.FromDistributedClock(
+ 1, de + chrono::milliseconds(2900), 0),
+ me + chrono::milliseconds(12900));
+ EXPECT_EQ(time_converter.FromDistributedClock(
+ 2, de + chrono::milliseconds(2900), 0),
+ me + chrono::milliseconds(1002900));
+
+ EXPECT_EQ(time_converter.FromDistributedClock(
+ 0, de + chrono::milliseconds(3000), 0),
+ me + chrono::seconds(4));
+ EXPECT_EQ(time_converter.FromDistributedClock(
+ 0, de + chrono::milliseconds(3000), 1),
+ me2 + chrono::seconds(4));
+
+ EXPECT_EQ(time_converter.FromDistributedClock(
+ 0, de + chrono::milliseconds(3900), 1),
+ me2 + chrono::milliseconds(4900));
+ EXPECT_EQ(time_converter.FromDistributedClock(
+ 1, de + chrono::milliseconds(3900), 0),
+ me + chrono::milliseconds(13900));
+ EXPECT_EQ(time_converter.FromDistributedClock(
+ 2, de + chrono::milliseconds(3900), 0),
+ me + chrono::milliseconds(1003900));
+ EXPECT_EQ(reboot_counter, 1u);
+}
+
// Tests that reading times before the start of our interpolation points
// explodes.
TEST(InterpolatedTimeConverterDeathTest, ReadLostTime) {
const distributed_clock::time_point de = distributed_clock::epoch();
- const monotonic_clock::time_point me = monotonic_clock::epoch();
+ const BootTimestamp me = BootTimestamp::epoch();
constexpr auto kDefaultHistoryDuration =
InterpolatedTimeConverter::kDefaultHistoryDuration;
@@ -211,22 +283,22 @@
EXPECT_EQ(
de + kDefaultHistoryDuration / 2,
time_converter.ToDistributedClock(0, me + kDefaultHistoryDuration / 2));
- EXPECT_EQ(
- me + kDefaultHistoryDuration / 2,
- time_converter.FromDistributedClock(0, de + kDefaultHistoryDuration / 2));
+ EXPECT_EQ(me + kDefaultHistoryDuration / 2,
+ time_converter.FromDistributedClock(
+ 0, de + kDefaultHistoryDuration / 2, 0));
// Double check we can read things from before the start
EXPECT_EQ(de - kDt, time_converter.ToDistributedClock(0, me - kDt));
- EXPECT_EQ(me - kDt, time_converter.FromDistributedClock(0, de - kDt));
+ EXPECT_EQ(me - kDt, time_converter.FromDistributedClock(0, de - kDt, 0));
// And at and after the origin.
EXPECT_EQ(de, time_converter.ToDistributedClock(0, me));
- EXPECT_EQ(me, time_converter.FromDistributedClock(0, de));
+ EXPECT_EQ(me, time_converter.FromDistributedClock(0, de, 0));
EXPECT_EQ(de + chrono::milliseconds(10),
time_converter.ToDistributedClock(0, me + kDt));
EXPECT_EQ(me + chrono::milliseconds(10),
- time_converter.FromDistributedClock(0, de + kDt));
+ time_converter.FromDistributedClock(0, de + kDt, 0));
// Now force ourselves to forget.
time_converter.ObserveTimePassed(de + kDefaultHistoryDuration + kDt * 3 / 2);
@@ -234,26 +306,27 @@
// Yup, can't read the origin anymore.
EXPECT_DEATH({ LOG(INFO) << time_converter.ToDistributedClock(0, me); },
"forgotten");
- EXPECT_DEATH({ LOG(INFO) << time_converter.FromDistributedClock(0, de); },
+ EXPECT_DEATH({ LOG(INFO) << time_converter.FromDistributedClock(0, de, 0); },
"forgotten");
// But can still read the next point.
EXPECT_EQ(de + kDt, time_converter.ToDistributedClock(0, me + kDt));
- EXPECT_EQ(me + kDt, time_converter.FromDistributedClock(0, de + kDt));
+ EXPECT_EQ(me + kDt, time_converter.FromDistributedClock(0, de + kDt, 0));
}
// Tests unity time with 1 node.
TEST(InterpolatedTimeConverterTest, SingleNodeTime) {
const distributed_clock::time_point de = distributed_clock::epoch();
- const monotonic_clock::time_point me = monotonic_clock::epoch();
+ const BootTimestamp me = BootTimestamp::epoch();
TestingTimeConverter time_converter(1u);
- time_converter.AddNextTimestamp(
- de + chrono::seconds(0), {{.boot = 0, .time = me + chrono::seconds(1)}});
+ time_converter.AddNextTimestamp(de + chrono::seconds(0),
+ {me + chrono::seconds(1)});
- EXPECT_EQ(time_converter.FromDistributedClock(0, de), me);
- EXPECT_EQ(time_converter.FromDistributedClock(0, de + chrono::seconds(100)),
- me + chrono::seconds(100));
+ EXPECT_EQ(time_converter.FromDistributedClock(0, de, 0), me);
+ EXPECT_EQ(
+ time_converter.FromDistributedClock(0, de + chrono::seconds(100), 0),
+ me + chrono::seconds(100));
EXPECT_TRUE(time_converter.NextTimestamp());
}
diff --git a/aos/network/testing_time_converter.cc b/aos/network/testing_time_converter.cc
index 9558033..c175fe6 100644
--- a/aos/network/testing_time_converter.cc
+++ b/aos/network/testing_time_converter.cc
@@ -71,6 +71,22 @@
return dt;
}
+void TestingTimeConverter::RebootAt(size_t node_index,
+ distributed_clock::time_point t) {
+ CHECK(!first_);
+ const chrono::nanoseconds dt = t - last_distributed_;
+
+ for (size_t i = 0; i < last_monotonic_.size(); ++i) {
+ last_monotonic_[i].time += dt;
+ }
+
+ ++last_monotonic_[node_index].boot;
+ last_monotonic_[node_index].time = monotonic_clock::epoch();
+
+ last_distributed_ = t;
+ ts_.emplace_back(std::make_tuple(last_distributed_, last_monotonic_));
+}
+
void TestingTimeConverter::AddNextTimestamp(
distributed_clock::time_point time,
std::vector<logger::BootTimestamp> times) {
diff --git a/aos/network/testing_time_converter.h b/aos/network/testing_time_converter.h
index 5ffdc01..20b8e16 100644
--- a/aos/network/testing_time_converter.h
+++ b/aos/network/testing_time_converter.h
@@ -6,6 +6,7 @@
#include <tuple>
#include "aos/events/event_scheduler.h"
+#include "aos/events/logging/boot_timestamp.h"
#include "aos/network/multinode_timestamp_filter.h"
#include "aos/time/time.h"
@@ -34,6 +35,8 @@
std::chrono::nanoseconds AddMonotonic(
std::vector<logger::BootTimestamp> times);
+ void RebootAt(size_t node_index, distributed_clock::time_point t);
+
// Adds a distributed to monotonic clock mapping to the queue.
void AddNextTimestamp(distributed_clock::time_point time,
std::vector<logger::BootTimestamp> times);
@@ -42,6 +45,23 @@
std::vector<logger::BootTimestamp>>>
NextTimestamp() override;
+ void set_boot_uuid(size_t node_index, size_t boot_count, UUID uuid) {
+ CHECK(boot_uuids_
+ .emplace(std::make_pair(node_index, boot_count), std ::move(uuid))
+ .second)
+ << ": Duplicate boot";
+ }
+
+ UUID boot_uuid(size_t node_index, size_t boot_count) override {
+ auto it = boot_uuids_.find(std::make_pair(node_index, boot_count));
+ if (it != boot_uuids_.end()) return it->second;
+
+ auto new_it = boot_uuids_.emplace(std::make_pair(node_index, boot_count),
+ UUID::Random());
+ CHECK(new_it.second);
+ return new_it.first->second;
+ }
+
private:
// List of timestamps.
std::deque<std::tuple<distributed_clock::time_point,
@@ -53,6 +73,8 @@
// The last times returned on all clocks.
distributed_clock::time_point last_distributed_ = distributed_clock::epoch();
std::vector<logger::BootTimestamp> last_monotonic_;
+
+ std::map<std::pair<size_t, size_t>, UUID> boot_uuids_;
};
} // namespace message_bridge
diff --git a/aos/network/timestamp_channel.cc b/aos/network/timestamp_channel.cc
index bbd62c4..88b6ed0 100644
--- a/aos/network/timestamp_channel.cc
+++ b/aos/network/timestamp_channel.cc
@@ -65,11 +65,15 @@
ChannelTimestampSender::ChannelTimestampSender(aos::EventLoop *event_loop)
: event_loop_(event_loop) {
- CHECK(configuration::MultiNode(event_loop_->configuration()));
+ if (event_loop_) {
+ CHECK(configuration::MultiNode(event_loop_->configuration()));
+ }
}
aos::Sender<RemoteMessage> *ChannelTimestampSender::SenderForChannel(
const Channel *channel, const Connection *connection) {
+ CHECK(event_loop_);
+
ChannelTimestampFinder finder(event_loop_);
// Look at any pre-created channel/connection pairs.
{
diff --git a/aos/network/timestamp_channel.h b/aos/network/timestamp_channel.h
index 738ca10..b78c83a 100644
--- a/aos/network/timestamp_channel.h
+++ b/aos/network/timestamp_channel.h
@@ -43,6 +43,11 @@
class ChannelTimestampSender {
public:
ChannelTimestampSender(aos::EventLoop *event_loop);
+ ChannelTimestampSender() : event_loop_(nullptr) {}
+
+ ChannelTimestampSender(ChannelTimestampSender &&other) noexcept = default;
+ ChannelTimestampSender &operator=(ChannelTimestampSender &&other) noexcept =
+ default;
aos::Sender<RemoteMessage> *SenderForChannel(const Channel *channel,
const Connection *connection);
diff --git a/aos/network/timestamp_filter.cc b/aos/network/timestamp_filter.cc
index 59af3a0..c86dc8e 100644
--- a/aos/network/timestamp_filter.cc
+++ b/aos/network/timestamp_filter.cc
@@ -14,6 +14,8 @@
namespace message_bridge {
namespace {
namespace chrono = std::chrono;
+using logger::BootDuration;
+using logger::BootTimestamp;
std::string TimeString(const aos::monotonic_clock::time_point t,
std::chrono::nanoseconds o) {
@@ -55,7 +57,7 @@
CHECK_GE(*ta, 0.0);
CHECK_LT(*ta, 1.0);
}
-void NormalizeTimestamps(logger::BootTimestamp *ta_base, double *ta) {
+void NormalizeTimestamps(BootTimestamp *ta_base, double *ta) {
NormalizeTimestamps(&ta_base->time, ta);
}
@@ -473,10 +475,10 @@
return std::make_tuple(std::get<0>(t), std::get<1>(t));
}
-std::pair<std::tuple<logger::BootTimestamp, logger::BootDuration>,
- std::tuple<logger::BootTimestamp, logger::BootDuration>>
-NoncausalTimestampFilter::FindTimestamps(logger::BootTimestamp ta_base,
- double ta, size_t sample_boot) const {
+std::pair<std::tuple<BootTimestamp, BootDuration>,
+ std::tuple<BootTimestamp, BootDuration>>
+NoncausalTimestampFilter::FindTimestamps(BootTimestamp ta_base, double ta,
+ size_t sample_boot) const {
CHECK_GE(ta, 0.0);
CHECK_LT(ta, 1.0);
@@ -731,9 +733,11 @@
((tb - ta) - offset.second);
}
-std::string NoncausalTimestampFilter::DebugOffsetError(
- logger::BootTimestamp ta_base, double ta, logger::BootTimestamp tb_base,
- double tb, size_t node_a, size_t node_b) const {
+std::string NoncausalTimestampFilter::DebugOffsetError(BootTimestamp ta_base,
+ double ta,
+ BootTimestamp tb_base,
+ double tb, size_t node_a,
+ size_t node_b) const {
NormalizeTimestamps(&ta_base, &ta);
NormalizeTimestamps(&tb_base, &tb);
@@ -830,8 +834,8 @@
return true;
}
-void NoncausalTimestampFilter::Sample(logger::BootTimestamp monotonic_now_all,
- logger::BootDuration sample_ns) {
+void NoncausalTimestampFilter::Sample(BootTimestamp monotonic_now_all,
+ BootDuration sample_ns) {
filter(monotonic_now_all.boot, sample_ns.boot)
->Sample(monotonic_now_all.time, sample_ns.duration);
}
@@ -1114,20 +1118,47 @@
}
}
-bool NoncausalTimestampFilter::Pop(logger::BootTimestamp time) {
- // TODO(austin): Auto compute the second boot.
- CHECK_LE(filters_.size(), 1u);
- SingleFilter *f = filter(time.boot, 0);
+bool NoncausalTimestampFilter::Pop(BootTimestamp time) {
+ CHECK_GE(filters_.size(), 1u);
+
VLOG(1) << NodeNames() << " Pop(" << time << ")";
bool removed = false;
- // When the timestamp which is the end of the line is popped, we want to
- // drop it off the list. Hence the >=
- while (f->timestamps_size() >= 2 &&
- time.time >= std::get<0>(f->timestamp(1))) {
- f->PopFront();
- removed = true;
+ while (true) {
+ DCHECK_LT(pop_filter_, filters_.size());
+ BootFilter *boot_filter = &filters_[pop_filter_];
+ CHECK(boot_filter != nullptr);
+ size_t timestamps_size = 0;
+ while ((timestamps_size = boot_filter->filter.timestamps_size()) > 2) {
+ // When the timestamp which is the end of the line is popped, we want to
+ // drop it off the list. Hence the <
+ if (time < BootTimestamp{
+ .boot = static_cast<size_t>(boot_filter->boot.first),
+ .time = std::get<0>(boot_filter->filter.timestamp(1))}) {
+ return removed;
+ }
+ boot_filter->filter.PopFront();
+ removed = true;
+ }
+
+ if (timestamps_size == 2) {
+ if (pop_filter_ + 1u >= filters_.size()) {
+ return removed;
+ }
+
+ // There is 1 more filter, see if there is enough data in it to switch
+ // over to it.
+ if (filters_[pop_filter_ + 1].filter.timestamps_size() < 2u) {
+ return removed;
+ }
+ if (time <
+ BootTimestamp{.boot = static_cast<size_t>(boot_filter->boot.first),
+ .time = std::get<0>(
+ filters_[pop_filter_ + 1].filter.timestamp(1))}) {
+ return removed;
+ }
+ }
+ ++pop_filter_;
}
- return removed;
}
void NoncausalTimestampFilter::SingleFilter::Debug() const {
@@ -1247,9 +1278,9 @@
}
}
-void NoncausalOffsetEstimator::Sample(
- const Node *node, logger::BootTimestamp node_delivered_time,
- logger::BootTimestamp other_node_sent_time) {
+void NoncausalOffsetEstimator::Sample(const Node *node,
+ BootTimestamp node_delivered_time,
+ BootTimestamp other_node_sent_time) {
VLOG(1) << "Sample delivered " << node_delivered_time << " sent "
<< other_node_sent_time << " " << node->name()->string_view()
<< " -> "
@@ -1268,8 +1299,8 @@
}
void NoncausalOffsetEstimator::ReverseSample(
- const Node *node, logger::BootTimestamp node_sent_time,
- logger::BootTimestamp other_node_delivered_time) {
+ const Node *node, BootTimestamp node_sent_time,
+ BootTimestamp other_node_delivered_time) {
VLOG(1) << "Reverse sample delivered " << other_node_delivered_time
<< " sent " << node_sent_time << " "
<< ((node == node_a_) ? node_b_ : node_a_)->name()->string_view()
@@ -1287,27 +1318,5 @@
}
}
-bool NoncausalOffsetEstimator::Pop(const Node *node,
- logger::BootTimestamp node_monotonic_now) {
- if (node == node_a_) {
- if (a_.Pop(node_monotonic_now)) {
- VLOG(1) << "Popping forward sample to " << node_a_->name()->string_view()
- << " from " << node_b_->name()->string_view() << " at "
- << node_monotonic_now;
- return true;
- }
- } else if (node == node_b_) {
- if (b_.Pop(node_monotonic_now)) {
- VLOG(1) << "Popping reverse sample to " << node_b_->name()->string_view()
- << " from " << node_a_->name()->string_view() << " at "
- << node_monotonic_now;
- return true;
- }
- } else {
- LOG(FATAL) << "Unknown node " << node->name()->string_view();
- }
- return false;
-}
-
} // namespace message_bridge
} // namespace aos
diff --git a/aos/network/timestamp_filter.h b/aos/network/timestamp_filter.h
index 166e811..0645139 100644
--- a/aos/network/timestamp_filter.h
+++ b/aos/network/timestamp_filter.h
@@ -554,6 +554,7 @@
adjusted_initial_time);
}
}
+ CHECK_LT(i, timestamps_.size());
return std::make_tuple(std::get<0>(timestamps_[i]),
std::get<1>(timestamps_[i]));
}
@@ -673,6 +674,9 @@
std::vector<BootFilter> filters_;
size_t current_filter_ = 0;
+
+ // The filter to resume popping from.
+ size_t pop_filter_ = 0;
};
// This class holds 2 NoncausalTimestampFilter's and handles averaging the
@@ -712,10 +716,6 @@
void ReverseSample(const Node *node, logger::BootTimestamp node_sent_time,
logger::BootTimestamp other_node_delivered_time);
- // Removes old data points from a node before the provided time.
- // Returns true if any points were popped.
- bool Pop(const Node *node, logger::BootTimestamp node_monotonic_now);
-
private:
NoncausalTimestampFilter a_;
NoncausalTimestampFilter b_;
diff --git a/aos/network/timestamp_filter_test.cc b/aos/network/timestamp_filter_test.cc
index 5544711..37d02ac 100644
--- a/aos/network/timestamp_filter_test.cc
+++ b/aos/network/timestamp_filter_test.cc
@@ -1098,16 +1098,16 @@
EXPECT_EQ(estimator.GetFilter(node_a)->timestamps_size(), 3u);
EXPECT_EQ(estimator.GetFilter(node_b)->timestamps_size(), 3u);
- estimator.Pop(node_a, ta2);
- estimator.Pop(node_b, tb2);
+ estimator.GetFilter(node_a)->Pop(ta2);
+ estimator.GetFilter(node_b)->Pop(tb2);
EXPECT_EQ(estimator.GetFilter(node_a)->timestamps_size(), 2u);
EXPECT_EQ(estimator.GetFilter(node_b)->timestamps_size(), 2u);
- // And dropping down to 1 point means 0 slope.
- estimator.Pop(node_a, ta3);
- estimator.Pop(node_b, tb3);
- EXPECT_EQ(estimator.GetFilter(node_a)->timestamps_size(), 1u);
- EXPECT_EQ(estimator.GetFilter(node_b)->timestamps_size(), 1u);
+ // And confirm we won't drop down to 1 point.
+ estimator.GetFilter(node_a)->Pop(ta3);
+ estimator.GetFilter(node_b)->Pop(tb3);
+ EXPECT_EQ(estimator.GetFilter(node_a)->timestamps_size(), 2u);
+ EXPECT_EQ(estimator.GetFilter(node_b)->timestamps_size(), 2u);
}
} // namespace testing
diff --git a/aos/network/web_proxy.cc b/aos/network/web_proxy.cc
index b53feab..972b1d7 100644
--- a/aos/network/web_proxy.cc
+++ b/aos/network/web_proxy.cc
@@ -256,6 +256,12 @@
ChannelInformation info;
info.transfer_method = transfer_method;
+ // If we aren't keeping a buffer and there are no existing listeners, call
+ // Fetch() to avoid falling behind on future calls to FetchNext().
+ if (channels_.empty() && buffer_size_ == 0) {
+ fetcher_->Fetch();
+ }
+
channels_.emplace(data_channel, info);
}
@@ -542,7 +548,14 @@
configuration::GetChannel(event_loop_->configuration(), channel,
event_loop_->name(), event_loop_->node());
if (comparison_channel == nullptr) {
- LOG(ERROR) << "Channel not available: "
+ LOG(ERROR) << "Channel does not exist: "
+ << configuration::StrippedChannelToString(channel);
+ continue;
+ }
+ if (!configuration::ChannelIsReadableOnNode(comparison_channel,
+ event_loop_->node())) {
+ LOG(ERROR) << "Channel not available on node "
+ << event_loop_->node()->name()->string_view() << ": "
<< configuration::StrippedChannelToString(channel);
continue;
}
diff --git a/aos/uuid.h b/aos/uuid.h
index d12bedf..3f7fc50 100644
--- a/aos/uuid.h
+++ b/aos/uuid.h
@@ -58,6 +58,7 @@
}
bool operator==(const UUID &other) const { return other.span() == span(); }
+ bool operator<(const UUID &other) const { return other.span() < span(); }
bool operator!=(const UUID &other) const { return other.span() != span(); }
private:
diff --git a/documentation/README.md b/documentation/README.md
index 93ce8fe..32b8eb1 100644
--- a/documentation/README.md
+++ b/documentation/README.md
@@ -12,3 +12,4 @@
* [Submitting code for review](tutorials/submitting-code-for-a-review.md)
* [Create a new autonomous routine](tutorials/create-a-new-autonomous.md)
* [Tune an autonomous](tutorials/tune-an-autonomous.md)
+* [Set up access to the build server using vscode](tutorials/setup-ssh-vscode.md)
diff --git a/documentation/tutorials/setup-ssh-vscode.md b/documentation/tutorials/setup-ssh-vscode.md
new file mode 100644
index 0000000..175d869
--- /dev/null
+++ b/documentation/tutorials/setup-ssh-vscode.md
@@ -0,0 +1,33 @@
+# Setting up access the build server using ssh on vscode
+
+## Prequisites
+1. Installed vscode, more info at: https://code.visualstudio.com/
+
+2. Have credentials to access the build server, this should include ssh keys and the ssh config file for the build server: [Setting up access to a workspace on the build server](../../README.md#Setting-up-access-to-a-workspace-on-the-build-server)
+
+## Setting up ssh config
+1. Paste this into your `~/.ssh/config` file, this assumes that your private key is named `id_971_rsa`
+
+```
+Host frc971
+ HostName build.frc971.org
+ User <SVN username>
+ Port 2222
+ IdentityFile ~/.ssh/id_971_rsa
+ LocalForward 9971 127.0.0.1:3389
+```
+
+## Configuring vscode
+1. Open vscode.
+
+2. Navigate to the extensions menu and search for `Remote Development`. This should be authored by Microsoft.
+
+3. Install `Remote Development`
+
+4. At the bottom-left of your screen, you should see a button with two arrows pointing at each other, click it.
+
+5. Click `Connect to Host...` and then `frc971`
+
+## Useful extensions
+- GitLens by Eric Amodio
+- C/C++ by Microsoft
diff --git a/third_party/rawrtc/rawrtc/src/diffie_hellman_parameters/parameters.c b/third_party/rawrtc/rawrtc/src/diffie_hellman_parameters/parameters.c
index 7783e59..952511f 100644
--- a/third_party/rawrtc/rawrtc/src/diffie_hellman_parameters/parameters.c
+++ b/third_party/rawrtc/rawrtc/src/diffie_hellman_parameters/parameters.c
@@ -20,6 +20,12 @@
struct ssl_ctx_st* const ssl_context, // not checked
DH const* const dh // not checked
) {
+
+ // Running DH_check on the roborio is obnoxious expensive (~40-50 seconds,
+ // optimized); just YOLO it. Note that this could probably be moved to
+ // somewhere where the cost could be incurred at startup instead of
+ // on connection (or even cached at build-time).
+#ifndef AOS_ARCHITECTURE_arm_frc
int codes;
// Check that the parameters are "likely enough to be valid"
@@ -70,6 +76,9 @@
#endif
return RAWRTC_CODE_INVALID_ARGUMENT;
}
+#else
+ dbg_warning("Skipping DH_check() due to performance concerns.\n");
+#endif
// Apply Diffie-Hellman parameters
if (!SSL_CTX_set_tmp_dh(ssl_context, dh)) {
diff --git a/y2020/BUILD b/y2020/BUILD
index ab03ea3..45164b4 100644
--- a/y2020/BUILD
+++ b/y2020/BUILD
@@ -7,15 +7,18 @@
robot_downloader(
binaries = [
":setpoint_setter",
+ "//aos/network:web_proxy_main",
],
data = [
":config",
],
dirs = [
"//y2020/actors:splines",
+ "//y2020/www:www_files",
],
start_binaries = [
"//aos/events/logging:logger_main",
+ "//aos/network:web_proxy_main",
":joystick_reader",
":wpilib_interface",
"//aos/network:message_bridge_client",
@@ -251,6 +254,19 @@
)
sh_binary(
+ name = "log_web_proxy",
+ srcs = ["log_web_proxy.sh"],
+ data = [
+ ":config",
+ "//aos/network:log_web_proxy_main",
+ "//y2020/www:camera_main_bundle.min.js",
+ "//y2020/www:field_main_bundle.min.js",
+ "//y2020/www:files",
+ ],
+ target_compatible_with = ["@platforms//os:linux"],
+)
+
+sh_binary(
name = "web_proxy",
srcs = ["web_proxy.sh"],
data = [
diff --git a/y2020/control_loops/superstructure/BUILD b/y2020/control_loops/superstructure/BUILD
index 73be810..5ea681c 100644
--- a/y2020/control_loops/superstructure/BUILD
+++ b/y2020/control_loops/superstructure/BUILD
@@ -1,6 +1,6 @@
package(default_visibility = ["//visibility:public"])
-load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library")
+load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library", "flatbuffer_ts_library")
load("@npm_bazel_typescript//:defs.bzl", "ts_library")
flatbuffer_cc_library(
@@ -25,6 +25,18 @@
target_compatible_with = ["@platforms//os:linux"],
)
+flatbuffer_ts_library(
+ name = "superstructure_status_ts_fbs",
+ srcs = [
+ "superstructure_status.fbs",
+ ],
+ includes = [
+ "//frc971/control_loops:control_loops_fbs_includes",
+ "//frc971/control_loops:profiled_subsystem_fbs_includes",
+ ],
+ target_compatible_with = ["@platforms//os:linux"],
+)
+
flatbuffer_cc_library(
name = "superstructure_status_fbs",
srcs = [
diff --git a/y2020/log_web_proxy.sh b/y2020/log_web_proxy.sh
new file mode 100755
index 0000000..64d86b6
--- /dev/null
+++ b/y2020/log_web_proxy.sh
@@ -0,0 +1 @@
+./aos/network/log_web_proxy_main --data_dir=y2020/www $@
diff --git a/y2020/vision/tools/python_code/target_definition.py b/y2020/vision/tools/python_code/target_definition.py
index be7e223..bb19b9e 100644
--- a/y2020/vision/tools/python_code/target_definition.py
+++ b/y2020/vision/tools/python_code/target_definition.py
@@ -243,6 +243,7 @@
power_port_red_main_panel_polygon_points_2d)
ideal_power_port_red.polygon_list_3d.append(
power_port_red_main_panel_polygon_points_3d)
+ # NOTE: We are currently not using the wing, since our actual targets are all planar
# Define the pose of the target
# Location is on the ground, at the center of the target
@@ -422,28 +423,32 @@
training_target_loading_bay_blue.target_radius = target_radius_default
######################################################################
- # Generate lists of ideal and training targets based on all the
- # definitions above
+ # DEFINE the targets here. Generate lists of ideal and training
+ # targets based on all the definitions above
######################################################################
- ### Taped power port
- ideal_target_list.append(ideal_power_port_taped)
- training_target_list.append(training_target_power_port_taped)
+ ### Taped power port (not currently used)
+ #glog.info("Adding hacked/taped up power port to the model list")
+ #ideal_target_list.append(ideal_power_port_taped)
+ #training_target_list.append(training_target_power_port_taped)
### Red Power Port
- ### NOTE: Temporarily taking this out of the list
- #ideal_target_list.append(ideal_power_port_red)
- #training_target_list.append(training_target_power_port_red)
+ glog.info("Adding red power port to the model list")
+ ideal_target_list.append(ideal_power_port_red)
+ training_target_list.append(training_target_power_port_red)
### Red Loading Bay
+ glog.info("Adding red loading bay to the model list")
ideal_target_list.append(ideal_loading_bay_red)
training_target_list.append(training_target_loading_bay_red)
### Blue Power Port
+ #glog.info("Adding blue power port to the model list")
#ideal_target_list.append(ideal_power_port_blue)
#training_target_list.append(training_target_power_port_blue)
### Blue Loading Bay
+ glog.info("Adding blue loading bay to the model list")
ideal_target_list.append(ideal_loading_bay_blue)
training_target_list.append(training_target_loading_bay_blue)
diff --git a/y2020/www/BUILD b/y2020/www/BUILD
index 2f26122..8760fde 100644
--- a/y2020/www/BUILD
+++ b/y2020/www/BUILD
@@ -35,6 +35,7 @@
"//aos/network:web_proxy_ts_fbs",
"//aos/network/www:proxy",
"//frc971/control_loops/drivetrain:drivetrain_status_ts_fbs",
+ "//y2020/control_loops/superstructure:superstructure_status_ts_fbs",
"//y2020/vision/sift:sift_ts_fbs",
"@com_github_google_flatbuffers//ts:flatbuffers_ts",
],
diff --git a/y2020/www/field_handler.ts b/y2020/www/field_handler.ts
index b2daea1..4aaa05f 100644
--- a/y2020/www/field_handler.ts
+++ b/y2020/www/field_handler.ts
@@ -5,8 +5,10 @@
import * as drivetrain from 'org_frc971/frc971/control_loops/drivetrain/drivetrain_status_generated';
import * as sift from 'org_frc971/y2020/vision/sift/sift_generated';
import * as web_proxy from 'org_frc971/aos/network/web_proxy_generated';
+import * as ss from 'org_frc971/y2020/control_loops/superstructure/superstructure_status_generated'
import DrivetrainStatus = drivetrain.frc971.control_loops.drivetrain.Status;
+import SuperstructureStatus = ss.y2020.control_loops.superstructure.Status;
import ImageMatchResult = sift.frc971.vision.sift.ImageMatchResult;
import Channel = configuration.aos.Channel;
import SubscriberRequest = web_proxy.aos.web_proxy.SubscriberRequest;
@@ -59,30 +61,44 @@
const ROBOT_WIDTH = 28 * IN_TO_M;
const ROBOT_LENGTH = 30 * IN_TO_M;
+
export class FieldHandler {
private canvas = document.createElement('canvas');
- private imageMatchResult: ImageMatchResult|null = null;
+ private imageMatchResult = new Map<string, ImageMatchResult>();
private drivetrainStatus: DrivetrainStatus|null = null;
+ private superstructureStatus: SuperstructureStatus|null = null;
constructor(private readonly connection: Connection) {
document.body.appendChild(this.canvas);
this.connection.addConfigHandler(() => {
- this.connection.addHandler(
- '/camera', ImageMatchResult.getFullyQualifiedName(), (res) => {
- this.handleImageMatchResult(res);
- });
+ // Go through and register handlers for both all the individual pis as
+ // well as the local pi. Depending on the node that we are running on,
+ // different subsets of these will be available.
+ for (const prefix of ['', '/pi1', '/pi2', '/pi3', '/pi4']) {
+ this.connection.addHandler(
+ prefix + '/camera', ImageMatchResult.getFullyQualifiedName(), (res) => {
+ this.handleImageMatchResult(prefix, res);
+ });
+ }
this.connection.addHandler(
'/drivetrain', DrivetrainStatus.getFullyQualifiedName(), (data) => {
this.handleDrivetrainStatus(data);
});
+ this.connection.addHandler(
+ '/superstructure', SuperstructureStatus.getFullyQualifiedName(),
+ (data) => {
+ this.handleSuperstructureStatus(data);
+ });
});
}
- private handleImageMatchResult(data: Uint8Array): void {
+ private handleImageMatchResult(prefix: string, data: Uint8Array): void {
const fbBuffer = new ByteBuffer(data);
- this.imageMatchResult = ImageMatchResult.getRootAsImageMatchResult(
- fbBuffer as unknown as flatbuffers.ByteBuffer);
+ this.imageMatchResult.set(
+ prefix,
+ ImageMatchResult.getRootAsImageMatchResult(
+ fbBuffer as unknown as flatbuffers.ByteBuffer));
}
private handleDrivetrainStatus(data: Uint8Array): void {
@@ -91,6 +107,12 @@
fbBuffer as unknown as flatbuffers.ByteBuffer);
}
+ private handleSuperstructureStatus(data: Uint8Array): void {
+ const fbBuffer = new ByteBuffer(data);
+ this.superstructureStatus = SuperstructureStatus.getRootAsStatus(
+ fbBuffer as unknown as flatbuffers.ByteBuffer);
+ }
+
drawField(): void {
const MY_COLOR = 'red';
const OTHER_COLOR = 'blue';
@@ -177,13 +199,29 @@
ctx.restore();
}
- drawRobot(x: number, y: number, theta: number): void {
+ drawRobot(x: number, y: number, theta: number, turret: number|null): void {
const ctx = this.canvas.getContext('2d');
ctx.save();
ctx.translate(x, y);
ctx.rotate(theta);
ctx.rect(-ROBOT_LENGTH / 2, -ROBOT_WIDTH / 2, ROBOT_LENGTH, ROBOT_WIDTH);
ctx.stroke();
+ if (turret) {
+ ctx.save();
+ ctx.rotate(turret + Math.PI);
+ const turretRadius = ROBOT_WIDTH / 4.0;
+ ctx.strokeStyle = "red";
+ // Draw circle for turret.
+ ctx.beginPath();
+ ctx.arc(0, 0, turretRadius, 0, 2.0 * Math.PI);
+ ctx.stroke();
+ // Draw line in circle to show forwards.
+ ctx.beginPath();
+ ctx.moveTo(0, 0);
+ ctx.lineTo(turretRadius, 0);
+ ctx.stroke();
+ ctx.restore();
+ }
ctx.beginPath();
ctx.moveTo(0, 0);
ctx.lineTo(ROBOT_LENGTH / 2, 0);
@@ -195,15 +233,19 @@
this.reset();
this.drawField();
// draw cameras
- if (this.imageMatchResult) {
- for (let i = 0; i < this.imageMatchResult.cameraPosesLength(); i++) {
- const pose = this.imageMatchResult.cameraPoses(i);
+ for (const keyPair of this.imageMatchResult) {
+ const value = keyPair[1];
+ for (let i = 0; i < value.cameraPosesLength(); i++) {
+ const pose = value.cameraPoses(i);
const mat = pose.fieldToCamera();
+ // Matrix layout:
+ // [0, 1, 2, 3]
+ // [4, 5, 6, 7]
+ // [8, 9, 10, 11]
+ // [12, 13, 14, 15]
const x = mat.data(3);
const y = mat.data(7);
- const theta = Math.atan2(
- -mat.data(8),
- Math.sqrt(Math.pow(mat.data(9), 2) + Math.pow(mat.data(10), 2)));
+ const theta = Math.atan2(mat.data(6), mat.data(2));
this.drawCamera(x, y, theta);
}
}
@@ -211,7 +253,10 @@
if (this.drivetrainStatus) {
this.drawRobot(
this.drivetrainStatus.x(), this.drivetrainStatus.y(),
- this.drivetrainStatus.theta());
+ this.drivetrainStatus.theta(),
+ this.superstructureStatus ?
+ this.superstructureStatus.turret().position() :
+ null);
}
window.requestAnimationFrame(() => this.draw());
diff --git a/y2020/y2020_roborio.json b/y2020/y2020_roborio.json
index 105ffa6..5243d5a 100644
--- a/y2020/y2020_roborio.json
+++ b/y2020/y2020_roborio.json
@@ -224,7 +224,7 @@
"frequency": 4,
"num_senders": 2,
"read_method": "PIN",
- "num_readers": 6
+ "num_readers": 10
},
{
"name": "/drivetrain",
@@ -349,6 +349,13 @@
]
},
{
+ "name": "web_proxy",
+ "executable_name": "web_proxy_main.stripped",
+ "nodes": [
+ "roborio"
+ ]
+ },
+ {
"name": "message_bridge_client",
"executable_name": "message_bridge_client.stripped",
"nodes": [