Refactor SimulatedEventLoopFactory to not have lists of functions
We can use a SimulatedEventLoop * instead of a list of std::function.
The factory is already a friend, and this lets us tell which
event loops are for which nodes much easier. This preps us for
rebooting.
Change-Id: I0fb582e76b51e2cc93b17430a41cb6e65bb0a5f1
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/event_scheduler.cc b/aos/events/event_scheduler.cc
index 3739f49..668f2b6 100644
--- a/aos/events/event_scheduler.cc
+++ b/aos/events/event_scheduler.cc
@@ -61,6 +61,13 @@
on_run_.clear();
}
+void EventScheduler::RunOnStartup() {
+ for (size_t i = 0; i < on_startup_.size(); ++i) {
+ on_startup_[i]();
+ }
+ on_startup_.clear();
+}
+
std::ostream &operator<<(std::ostream &stream,
const aos::distributed_clock::time_point &now) {
// Print it the same way we print a monotonic time. Literally.
diff --git a/aos/events/event_scheduler.h b/aos/events/event_scheduler.h
index f981ef2..d6ab858 100644
--- a/aos/events/event_scheduler.h
+++ b/aos/events/event_scheduler.h
@@ -88,6 +88,11 @@
on_run_.emplace_back(std::move(callback));
}
+ // Schedules a callback when the event scheduler starts.
+ void ScheduleOnStartup(std::function<void()> callback) {
+ on_startup_.emplace_back(std::move(callback));
+ }
+
Token InvalidToken() { return events_list_.end(); }
// Deschedule an event by its iterator
@@ -96,6 +101,9 @@
// Runs the OnRun callbacks.
void RunOnRun();
+ // Runs the OnStartup callbacks.
+ void RunOnStartup();
+
// Returns true if events are being handled.
inline bool is_running() const;
@@ -129,6 +137,7 @@
// List of functions to run (once) when running.
std::vector<std::function<void()>> on_run_;
+ std::vector<std::function<void()>> on_startup_;
// Multimap holding times to run functions. These are stored in order, and
// the order is the callback tree.
@@ -196,6 +205,13 @@
// Returns the current distributed time.
distributed_clock::time_point distributed_now() const { return now_; }
+ void RunOnStartup() {
+ CHECK(!is_running_);
+ for (EventScheduler *scheduler : schedulers_) {
+ scheduler->RunOnStartup();
+ }
+ }
+
private:
// Handles running the OnRun functions.
void RunOnRun() {
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index 4dc37fa..afa1f1a 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -21,6 +21,14 @@
namespace {
+std::string NodeName(const Node *node) {
+ if (node == nullptr) {
+ return "";
+ }
+
+ return absl::StrCat(node->name()->string_view(), " ");
+}
+
class ScopedMarkRealtimeRestorer {
public:
ScopedMarkRealtimeRestorer(bool rt) : rt_(rt), prior_(MarkRealtime(rt)) {}
@@ -461,24 +469,21 @@
absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>>
*channels,
const Configuration *configuration,
- std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
- *raw_event_loops,
- const Node *node, pid_t tid)
+ std::vector<SimulatedEventLoop *> *event_loops_, const Node *node,
+ pid_t tid)
: EventLoop(CHECK_NOTNULL(configuration)),
scheduler_(scheduler),
node_event_loop_factory_(node_event_loop_factory),
channels_(channels),
- raw_event_loops_(raw_event_loops),
+ event_loops_(event_loops_),
node_(node),
tid_(tid) {
- raw_event_loops_->push_back(std::make_pair(this, [this](bool value) {
- if (!has_setup_) {
- Setup();
- has_setup_ = true;
- }
- set_is_running(value);
- has_run_ = true;
- }));
+ scheduler_->ScheduleOnStartup([this]() {
+ Setup();
+ has_setup_ = true;
+ });
+
+ event_loops_->push_back(this);
}
~SimulatedEventLoop() override {
// Trigger any remaining senders or fetchers to be cleared before destroying
@@ -490,15 +495,22 @@
phased_loops_.clear();
watchers_.clear();
- for (auto it = raw_event_loops_->begin(); it != raw_event_loops_->end();
+ for (auto it = event_loops_->begin(); it != event_loops_->end();
++it) {
- if (it->first == this) {
- raw_event_loops_->erase(it);
+ if (*it == this) {
+ event_loops_->erase(it);
break;
}
}
}
+ void SetIsRunning(bool running) {
+ CHECK(has_setup_);
+
+ set_is_running(running);
+ has_run_ = true;
+ }
+
bool has_run() const { return has_run_; }
std::chrono::nanoseconds send_delay() const { return send_delay_; }
@@ -602,8 +614,7 @@
EventScheduler *scheduler_;
NodeEventLoopFactory *node_event_loop_factory_;
absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>> *channels_;
- std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
- *raw_event_loops_;
+ std::vector<SimulatedEventLoop *> *event_loops_;
::std::string name_;
@@ -625,10 +636,12 @@
void SimulatedEventLoopFactory::set_send_delay(
std::chrono::nanoseconds send_delay) {
send_delay_ = send_delay;
- for (std::pair<EventLoop *, std::function<void(bool)>> &loop :
- raw_event_loops_) {
- reinterpret_cast<SimulatedEventLoop *>(loop.first)
- ->set_send_delay(send_delay_);
+ for (std::unique_ptr<NodeEventLoopFactory> & node : node_factories_) {
+ if (node) {
+ for (SimulatedEventLoop *loop : node->event_loops_) {
+ loop->set_send_delay(send_delay_);
+ }
+ }
}
}
@@ -637,11 +650,16 @@
std::function<void(const Context &channel, const void *message)> watcher) {
TakeWatcher(channel);
- std::unique_ptr<SimulatedWatcher> shm_watcher(
- new SimulatedWatcher(this, scheduler_, channel, std::move(watcher)));
+ std::unique_ptr<SimulatedWatcher> shm_watcher =
+ std::make_unique<SimulatedWatcher>(this, scheduler_, channel,
+ std::move(watcher));
GetSimulatedChannel(channel)->MakeRawWatcher(shm_watcher.get());
+
NewWatcher(std::move(shm_watcher));
+ VLOG(1) << monotonic_now() << " " << NodeName(node()) << name()
+ << " MakeRawWatcher "
+ << configuration::StrippedChannelToString(channel);
// Order of operations gets kinda wonky if we let people make watchers after
// running once. If someone has a valid use case, we can reconsider.
@@ -652,6 +670,9 @@
const Channel *channel) {
TakeSender(channel);
+ VLOG(1) << monotonic_now() << " " << NodeName(node()) << name()
+ << " MakeRawSender "
+ << configuration::StrippedChannelToString(channel);
return GetSimulatedChannel(channel)->MakeRawSender(this);
}
@@ -666,6 +687,9 @@
"configuration.";
}
+ VLOG(1) << monotonic_now() << " " << NodeName(node()) << name()
+ << " MakeRawFetcher "
+ << configuration::StrippedChannelToString(channel);
return GetSimulatedChannel(channel)->MakeRawFetcher(this);
}
@@ -701,6 +725,10 @@
token_(scheduler_->InvalidToken()) {}
SimulatedWatcher::~SimulatedWatcher() {
+ VLOG(1) << simulated_event_loop_->monotonic_now() << " "
+ << NodeName(simulated_event_loop_->node())
+ << simulated_event_loop_->name() << " Stopped Watching "
+ << configuration::StrippedChannelToString(channel_);
simulated_event_loop_->RemoveEvent(&event_);
if (token_ != scheduler_->InvalidToken()) {
scheduler_->Deschedule(token_);
@@ -729,11 +757,13 @@
}
void SimulatedWatcher::HandleEvent() {
- VLOG(1) << "Watcher " << configuration::CleanedChannelToString(channel_);
- CHECK_NE(msgs_.size(), 0u) << ": No events to handle.";
-
const monotonic_clock::time_point monotonic_now =
simulated_event_loop_->monotonic_now();
+ VLOG(1) << monotonic_now << " " << NodeName(simulated_event_loop_->node())
+ << "Watcher " << simulated_event_loop_->name() << ", "
+ << configuration::StrippedChannelToString(channel_);
+ CHECK_NE(msgs_.size(), 0u) << ": No events to handle.";
+
logging::ScopedLogRestorer prev_logger;
if (simulated_event_loop_->log_impl_) {
prev_logger.Swap(simulated_event_loop_->log_impl_);
@@ -854,6 +884,9 @@
realtime_clock::time_point realtime_remote_time,
uint32_t remote_queue_index,
const UUID &source_boot_uuid) {
+ VLOG(1) << event_loop_->monotonic_now() << " "
+ << NodeName(event_loop_->node()) << event_loop_->name()
+ << " Send " << configuration::StrippedChannelToString(channel());
// The allocations in here are due to infrastructure and don't count in the
// no mallocs in RT code.
ScopedNotRealtime nrt;
@@ -932,9 +965,11 @@
}
void SimulatedTimerHandler::HandleEvent() {
- VLOG(1) << "Timer " << name();
const ::aos::monotonic_clock::time_point monotonic_now =
simulated_event_loop_->monotonic_now();
+ VLOG(1) << monotonic_now << " " << NodeName(simulated_event_loop_->node())
+ << "Timer '" << simulated_event_loop_->name() << "', '" << name()
+ << "'";
logging::ScopedLogRestorer prev_logger;
if (simulated_event_loop_->log_impl_) {
prev_logger.Swap(simulated_event_loop_->log_impl_);
@@ -988,9 +1023,10 @@
}
void SimulatedPhasedLoopHandler::HandleEvent() {
- VLOG(1) << "Phased loop " << name();
monotonic_clock::time_point monotonic_now =
simulated_event_loop_->monotonic_now();
+ VLOG(1) << monotonic_now << " Phased loop " << simulated_event_loop_->name()
+ << ", " << name();
logging::ScopedLogRestorer prev_logger;
if (simulated_event_loop_->log_impl_) {
prev_logger.Swap(simulated_event_loop_->log_impl_);
@@ -1023,15 +1059,6 @@
simulated_event_loop_->AddEvent(&event_);
}
-NodeEventLoopFactory::NodeEventLoopFactory(
- EventSchedulerScheduler *scheduler_scheduler,
- SimulatedEventLoopFactory *factory, const Node *node,
- std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
- *raw_event_loops)
- : factory_(factory), node_(node), raw_event_loops_(raw_event_loops) {
- scheduler_scheduler->AddEventScheduler(&scheduler_);
-}
-
SimulatedEventLoopFactory::SimulatedEventLoopFactory(
const Configuration *configuration)
: configuration_(CHECK_NOTNULL(configuration)),
@@ -1039,7 +1066,7 @@
CHECK(IsInitialized()) << ": Need to initialize AOS first.";
for (const Node *node : nodes_) {
node_factories_.emplace_back(new NodeEventLoopFactory(
- &scheduler_scheduler_, this, node, &raw_event_loops_));
+ &scheduler_scheduler_, this, node));
}
if (configuration::MultiNode(configuration)) {
@@ -1050,6 +1077,11 @@
SimulatedEventLoopFactory::~SimulatedEventLoopFactory() {}
NodeEventLoopFactory *SimulatedEventLoopFactory::GetNodeEventLoopFactory(
+ std::string_view node) {
+ return GetNodeEventLoopFactory(configuration::GetNode(configuration(), node));
+}
+
+NodeEventLoopFactory *SimulatedEventLoopFactory::GetNodeEventLoopFactory(
const Node *node) {
auto result = std::find_if(
node_factories_.begin(), node_factories_.end(),
@@ -1082,6 +1114,17 @@
return GetNodeEventLoopFactory(node)->MakeEventLoop(name);
}
+NodeEventLoopFactory::NodeEventLoopFactory(
+ EventSchedulerScheduler *scheduler_scheduler,
+ SimulatedEventLoopFactory *factory, const Node *node)
+ : factory_(factory), node_(node) {
+ scheduler_scheduler->AddEventScheduler(&scheduler_);
+}
+
+NodeEventLoopFactory::~NodeEventLoopFactory() {
+ CHECK_EQ(event_loops_.size(), 0u) << "Event loop didn't exit";
+}
+
::std::unique_ptr<EventLoop> NodeEventLoopFactory::MakeEventLoop(
std::string_view name) {
CHECK(!scheduler_.is_running())
@@ -1090,8 +1133,8 @@
pid_t tid = tid_;
++tid_;
::std::unique_ptr<SimulatedEventLoop> result(new SimulatedEventLoop(
- &scheduler_, this, &channels_, factory_->configuration(),
- raw_event_loops_, node_, tid));
+ &scheduler_, this, &channels_, factory_->configuration(), &event_loops_,
+ node_, tid));
result->set_name(name);
result->set_send_delay(factory_->send_delay());
return std::move(result);
@@ -1100,31 +1143,46 @@
void NodeEventLoopFactory::Disconnect(const Node *other) {
factory_->bridge_->Disconnect(node_, other);
}
+
void NodeEventLoopFactory::Connect(const Node *other) {
factory_->bridge_->Connect(node_, other);
}
void SimulatedEventLoopFactory::RunFor(monotonic_clock::duration duration) {
- for (const std::pair<EventLoop *, std::function<void(bool)>> &event_loop :
- raw_event_loops_) {
- event_loop.second(true);
+ scheduler_scheduler_.RunOnStartup();
+ for (std::unique_ptr<NodeEventLoopFactory> &node : node_factories_) {
+ if (node) {
+ for (SimulatedEventLoop *loop : node->event_loops_) {
+ loop->SetIsRunning(true);
+ }
+ }
}
scheduler_scheduler_.RunFor(duration);
- for (const std::pair<EventLoop *, std::function<void(bool)>> &event_loop :
- raw_event_loops_) {
- event_loop.second(false);
+ for (std::unique_ptr<NodeEventLoopFactory> &node : node_factories_) {
+ if (node) {
+ for (SimulatedEventLoop *loop : node->event_loops_) {
+ loop->SetIsRunning(false);
+ }
+ }
}
}
void SimulatedEventLoopFactory::Run() {
- for (const std::pair<EventLoop *, std::function<void(bool)>> &event_loop :
- raw_event_loops_) {
- event_loop.second(true);
+ scheduler_scheduler_.RunOnStartup();
+ for (std::unique_ptr<NodeEventLoopFactory> &node : node_factories_) {
+ if (node) {
+ for (SimulatedEventLoop *loop : node->event_loops_) {
+ loop->SetIsRunning(true);
+ }
+ }
}
scheduler_scheduler_.Run();
- for (const std::pair<EventLoop *, std::function<void(bool)>> &event_loop :
- raw_event_loops_) {
- event_loop.second(false);
+ for (std::unique_ptr<NodeEventLoopFactory> &node : node_factories_) {
+ if (node) {
+ for (SimulatedEventLoop *loop : node->event_loops_) {
+ loop->SetIsRunning(false);
+ }
+ }
}
}
diff --git a/aos/events/simulated_event_loop.h b/aos/events/simulated_event_loop.h
index 348e7dd..cc016a8 100644
--- a/aos/events/simulated_event_loop.h
+++ b/aos/events/simulated_event_loop.h
@@ -26,6 +26,7 @@
class SimulatedChannel;
class NodeEventLoopFactory;
+class SimulatedEventLoop;
namespace message_bridge {
class SimulatedMessageBridge;
}
@@ -72,6 +73,7 @@
// NodeEventLoopFactory is owned by the SimulatedEventLoopFactory and has a
// lifetime identical to the factory.
NodeEventLoopFactory *GetNodeEventLoopFactory(const Node *node);
+ NodeEventLoopFactory *GetNodeEventLoopFactory(std::string_view node);
// Sets the time converter for all nodes.
void SetTimeConverter(TimeConverter *time_converter);
@@ -122,11 +124,6 @@
const Configuration *const configuration_;
EventSchedulerScheduler scheduler_scheduler_;
- // List of event loops to manage running and not running for.
- // The function is a callback used to set and clear the running bool on each
- // event loop.
- std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
- raw_event_loops_;
std::chrono::nanoseconds send_delay_ = std::chrono::microseconds(50);
std::chrono::nanoseconds network_delay_ = std::chrono::microseconds(100);
@@ -141,7 +138,9 @@
// This class holds all the state required to be a single node.
class NodeEventLoopFactory {
public:
- ::std::unique_ptr<EventLoop> MakeEventLoop(std::string_view name);
+ ~NodeEventLoopFactory();
+
+ std::unique_ptr<EventLoop> MakeEventLoop(std::string_view name);
// Returns the node that this factory is running as, or nullptr if this is a
// single node setup.
@@ -212,11 +211,8 @@
private:
friend class SimulatedEventLoopFactory;
- NodeEventLoopFactory(
- EventSchedulerScheduler *scheduler_scheduler,
- SimulatedEventLoopFactory *factory, const Node *node,
- std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
- *raw_event_loops);
+ NodeEventLoopFactory(EventSchedulerScheduler *scheduler_scheduler,
+ SimulatedEventLoopFactory *factory, const Node *node);
EventScheduler scheduler_;
SimulatedEventLoopFactory *const factory_;
@@ -225,8 +221,7 @@
const Node *const node_;
- std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
- *const raw_event_loops_;
+ std::vector<SimulatedEventLoop *> event_loops_;
std::chrono::nanoseconds realtime_offset_ = std::chrono::seconds(0);