Add NodeEventLoopFactory
This lets us create event loops on separate nodes which can't
communicate with each other. Next step is to add a message proxy
between them, then teach the logger to replay onto multiple nodes.
Change-Id: I06b2836365aea13d696535c52a78ca0c862a7b1e
diff --git a/aos/events/simulated_event_loop.h b/aos/events/simulated_event_loop.h
index de37c03..019172b 100644
--- a/aos/events/simulated_event_loop.h
+++ b/aos/events/simulated_event_loop.h
@@ -23,66 +23,162 @@
// Class for simulated fetchers.
class SimulatedChannel;
+class NodeEventLoopFactory;
+
+// There are 2 concepts needed to support multi-node simulations.
+// 1) The node. This is implemented with NodeEventLoopFactory.
+// 2) The "robot" which runs multiple nodes. This is implemented with
+// SimulatedEventLoopFactory.
+//
+// To make things easier, SimulatedEventLoopFactory takes an optional Node
+// argument if you want to make event loops without interacting with the
+// NodeEventLoopFactory object.
+//
+// The basic flow goes something like as follows:
+//
+// SimulatedEventLoopFactory factory(config);
+// const Node *pi1 = configuration::GetNode(factory.configuration(), "pi1");
+// std::unique_ptr<EventLoop> event_loop = factory.MakeEventLoop("ping", pi1);
+//
+// Or
+//
+// SimulatedEventLoopFactory factory(config);
+// const Node *pi1 = configuration::GetNode(factory.configuration(), "pi1");
+// NodeEventLoopFactory *pi1_factory = factory.GetNodeEventLoopFactory(pi1);
+// std::unique_ptr<EventLoop> event_loop = pi1_factory.MakeEventLoop("ping");
+//
+// The distributed_clock is used to be the base time. NodeEventLoopFactory has
+// all the information needed to adjust both the realtime and monotonic clocks
+// relative to the distributed_clock.
class SimulatedEventLoopFactory {
public:
// Constructs a SimulatedEventLoopFactory with the provided configuration.
// This configuration must remain in scope for the lifetime of the factory and
// all sub-objects.
SimulatedEventLoopFactory(const Configuration *configuration);
- SimulatedEventLoopFactory(const Configuration *configuration,
- std::string_view node_name);
- SimulatedEventLoopFactory(const Configuration *configuration,
- const Node *node);
~SimulatedEventLoopFactory();
- ::std::unique_ptr<EventLoop> MakeEventLoop(std::string_view name);
+ // Creates an event loop. If running in a multi-node environment, node needs
+ // to point to the node to create this event loop on.
+ ::std::unique_ptr<EventLoop> MakeEventLoop(std::string_view name,
+ const Node *node = nullptr);
+
+ // Returns the NodeEventLoopFactory for the provided node. The returned
+ // NodeEventLoopFactory is owned by the SimulatedEventLoopFactory and has a
+ // lifetime identical to the factory.
+ NodeEventLoopFactory *GetNodeEventLoopFactory(const Node *node);
// Starts executing the event loops unconditionally.
void Run();
// Executes the event loops for a duration.
- void RunFor(monotonic_clock::duration duration);
+ void RunFor(distributed_clock::duration duration);
// Stops executing all event loops. Meant to be called from within an event
// loop handler.
void Exit() { scheduler_.Exit(); }
- // Sets the simulated send delay for the factory.
+ const std::vector<const Node *> &nodes() const { return nodes_; }
+
+ // Sets the simulated send delay for all messages sent within a single node.
void set_send_delay(std::chrono::nanoseconds send_delay);
- std::chrono::nanoseconds send_delay() const;
+ std::chrono::nanoseconds send_delay() const { return send_delay_; }
+
+ // Sets the simulated network delay for messages forwarded between nodes.
+ void set_network_delay(std::chrono::nanoseconds network_delay);
+ std::chrono::nanoseconds network_delay() const { return network_delay_; }
+
+ // Returns the clock used to synchronize the nodes.
+ distributed_clock::time_point distributed_now() const {
+ return scheduler_.distributed_now();
+ }
+
+ // Returns the configuration used for everything.
+ const Configuration *configuration() const { return configuration_; }
+
+ private:
+ const Configuration *const configuration_;
+ EventScheduler scheduler_;
+ // List of event loops to manage running and not running for.
+ // The function is a callback used to set and clear the running bool on each
+ // event loop.
+ std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
+ raw_event_loops_;
+
+ std::chrono::nanoseconds send_delay_ = std::chrono::microseconds(50);
+ std::chrono::nanoseconds network_delay_ = std::chrono::microseconds(100);
+
+ std::vector<std::unique_ptr<NodeEventLoopFactory>> node_factories_;
+
+ std::vector<const Node *> nodes_;
+};
+
+// This class holds all the state required to be a single node.
+class NodeEventLoopFactory {
+ public:
+ ::std::unique_ptr<EventLoop> MakeEventLoop(std::string_view name);
// Returns the node that this factory is running as, or nullptr if this is a
// single node setup.
const Node *node() const { return node_; }
- monotonic_clock::time_point monotonic_now() const {
- return scheduler_.monotonic_now();
- }
- realtime_clock::time_point realtime_now() const {
- return scheduler_.realtime_now();
- }
-
// Sets realtime clock to realtime_now for a given monotonic clock.
void SetRealtimeOffset(monotonic_clock::time_point monotonic_now,
realtime_clock::time_point realtime_now) {
- scheduler_.SetRealtimeOffset(monotonic_now, realtime_now);
+ realtime_offset_ =
+ realtime_now.time_since_epoch() - monotonic_now.time_since_epoch();
}
- private:
- const Configuration *const configuration_;
- EventScheduler scheduler_;
- // Map from name, type to queue.
- absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>> channels_;
- // List of event loops to manage running and not running for.
- std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
- raw_event_loops_;
+ // Returns the current time on both clocks.
+ inline monotonic_clock::time_point monotonic_now() const;
+ inline realtime_clock::time_point realtime_now() const;
- std::chrono::nanoseconds send_delay_ = std::chrono::microseconds(50);
+ // Converts a time to the distributed clock for scheduling and cross-node time
+ // measurement.
+ inline distributed_clock::time_point ToDistributedClock(
+ monotonic_clock::time_point time) const;
+
+ private:
+ friend class SimulatedEventLoopFactory;
+ NodeEventLoopFactory(
+ EventScheduler *scheduler, SimulatedEventLoopFactory *factory,
+ const Node *node,
+ std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
+ *raw_event_loops);
+
+ EventScheduler *const scheduler_;
+ SimulatedEventLoopFactory *const factory_;
const Node *const node_;
+ std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
+ *const raw_event_loops_;
+
+ std::chrono::nanoseconds monotonic_offset_ = std::chrono::seconds(0);
+ std::chrono::nanoseconds realtime_offset_ = std::chrono::seconds(0);
+
+ // Map from name, type to queue.
+ absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>> channels_;
+
+ // pid so we get unique timing reports.
pid_t tid_ = 0;
};
+inline monotonic_clock::time_point NodeEventLoopFactory::monotonic_now() const {
+ return monotonic_clock::time_point(
+ factory_->distributed_now().time_since_epoch() + monotonic_offset_);
+}
+
+inline realtime_clock::time_point NodeEventLoopFactory::realtime_now() const {
+ return realtime_clock::time_point(monotonic_now().time_since_epoch() +
+ realtime_offset_);
+}
+
+inline distributed_clock::time_point NodeEventLoopFactory::ToDistributedClock(
+ monotonic_clock::time_point time) const {
+ return distributed_clock::time_point(time.time_since_epoch() -
+ monotonic_offset_);
+}
+
} // namespace aos
#endif // AOS_EVENTS_SIMULATED_EVENT_LOOP_H_