Sort messages between nodes properly
We used to assume the realtime clocks were in sync. This isn't
realistic. Use the timestamps on forwarded messages in each
direction to observe the network latency and the offset between nodes.
Since time is no longer exactly linear with all the adjustments, we need
to redo how events are scheduled. They can't be converted to the
distributed_clock once. They need to now be converted every time they
are compared between nodes.
Change-Id: I1888c1e6a12f475c321a73aa020b0dc0bab107b3
diff --git a/aos/events/event_scheduler.h b/aos/events/event_scheduler.h
index 400a307..e6c732b 100644
--- a/aos/events/event_scheduler.h
+++ b/aos/events/event_scheduler.h
@@ -9,6 +9,7 @@
#include <vector>
#include "aos/events/event_loop.h"
+#include "aos/logging/implementations.h"
#include "aos/time/time.h"
#include "glog/logging.h"
@@ -42,15 +43,17 @@
std::ostream &operator<<(std::ostream &stream,
const aos::distributed_clock::time_point &now);
+class EventSchedulerScheduler;
+
class EventScheduler {
public:
using ChannelType =
- std::multimap<distributed_clock::time_point, std::function<void()>>;
+ std::multimap<monotonic_clock::time_point, std::function<void()>>;
using Token = ChannelType::iterator;
// Schedule an event with a callback function
// Returns an iterator to the event
- Token Schedule(distributed_clock::time_point time,
+ Token Schedule(monotonic_clock::time_point time,
std::function<void()> callback);
// Schedules a callback when the event scheduler starts.
@@ -63,29 +66,132 @@
// Deschedule an event by its iterator
void Deschedule(Token token);
- // Runs until exited.
- void Run();
- // Runs for a duration.
- void RunFor(distributed_clock::duration duration);
+ // Runs the OnRun callbacks.
+ void RunOnRun();
- void Exit() { is_running_ = false; }
+ // Returns true if events are being handled.
+ inline bool is_running() const;
- bool is_running() const { return is_running_; }
+ // Returns the timestamp of the next event to trigger.
+ aos::monotonic_clock::time_point OldestEvent();
+ // Handles the next event.
+ void CallOldestEvent();
- distributed_clock::time_point distributed_now() const { return now_; }
+ // Converts a time to the distributed clock for scheduling and cross-node time
+ // measurement.
+ distributed_clock::time_point ToDistributedClock(
+ monotonic_clock::time_point time) const {
+ return distributed_clock::epoch() + time.time_since_epoch() +
+ monotonic_offset_;
+ }
+
+ // Takes the distributed time and converts it to the monotonic clock for this
+ // node.
+ monotonic_clock::time_point FromDistributedClock(
+ distributed_clock::time_point time) const {
+ return monotonic_clock::epoch() + time.time_since_epoch() -
+ monotonic_offset_;
+ }
+
+ // Returns the current monotonic time on this node calculated from the
+ // distributed clock.
+ inline monotonic_clock::time_point monotonic_now() const;
+
+ // Sets the offset between the distributed and monotonic clock.
+ // distributed = monotonic + offset;
+ void SetDistributedOffset(std::chrono::nanoseconds monotonic_offset) {
+ monotonic_offset_ = monotonic_offset;
+ }
+
+ // Returns the offset used to convert to and from the distributed clock.
+ std::chrono::nanoseconds monotonic_offset() const {
+ return monotonic_offset_;
+ }
private:
+ friend class EventSchedulerScheduler;
// Current execution time.
- distributed_clock::time_point now_ = distributed_clock::epoch();
+ monotonic_clock::time_point now_ = monotonic_clock::epoch();
+ // Offset to the distributed clock.
+ // distributed = monotonic + offset;
+ std::chrono::nanoseconds monotonic_offset_ = std::chrono::seconds(0);
+
+ // List of functions to run (once) when running.
std::vector<std::function<void()>> on_run_;
// Multimap holding times to run functions. These are stored in order, and
// the order is the callback tree.
ChannelType events_list_;
- bool is_running_ = false;
+
+ // Pointer to the actual scheduler.
+ EventSchedulerScheduler *scheduler_scheduler_ = nullptr;
};
+// We need a heap of heaps...
+//
+// Events in a node have a very well defined progression of time. It is linear
+// and well represented by the monotonic clock.
+//
+// Events across nodes don't follow this well. Time skews between the two nodes
+// all the time. We also don't know the function ahead of time which converts
+// from each node's monotonic clock to the distributed clock (our unified base
+// time which is likely the average time between nodes).
+//
+// This pushes us towards merge sort. Sorting each node's events with a heap
+// like we used to be doing, and then sorting each of those nodes independently.
+class EventSchedulerScheduler {
+ public:
+ // Adds an event scheduler to the list.
+ void AddEventScheduler(EventScheduler *scheduler);
+
+ // Runs until there are no more events or Exit is called.
+ void Run();
+
+ // Stops running.
+ void Exit() { is_running_ = false; }
+
+ bool is_running() const { return is_running_; }
+
+ // Runs for a duration on the distributed clock. Time on the distributed
+ // clock should be very representative of time on each node, but won't be
+ // exactly the same.
+ void RunFor(distributed_clock::duration duration);
+
+ // Returns the current distributed time.
+ distributed_clock::time_point distributed_now() const { return now_; }
+
+ private:
+ // Handles running the OnRun functions.
+ void RunOnRun() {
+ CHECK(!is_running_);
+ is_running_ = true;
+ for (EventScheduler *scheduler : schedulers_) {
+ scheduler->RunOnRun();
+ }
+ }
+
+ // Returns the next event time and scheduler on which to run it.
+ std::tuple<distributed_clock::time_point, EventScheduler *> OldestEvent();
+
+ // True if we are running.
+ bool is_running_ = false;
+ // The current time.
+ distributed_clock::time_point now_ = distributed_clock::epoch();
+ // List of schedulers to run in sync.
+ std::vector<EventScheduler *> schedulers_;
+};
+
+inline monotonic_clock::time_point EventScheduler::monotonic_now() const {
+ // Make sure we stay in sync.
+ CHECK_EQ(now_, FromDistributedClock(scheduler_scheduler_->distributed_now()));
+ return now_;
+}
+
+inline bool EventScheduler::is_running() const {
+ return scheduler_scheduler_->is_running();
+}
+
} // namespace aos
#endif // AOS_EVENTS_EVENT_SCHEDULER_H_