blob: 48191412bc9b2fcca5009c41430f237513d43cfb [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#include "aos/events/simulated_event_loop.h"
2
3#include <algorithm>
4#include <deque>
Austin Schuh5f1cc5c2019-12-01 18:01:11 -08005#include <string_view>
Alex Perrycb7da4b2019-08-28 19:35:56 -07006
7#include "absl/container/btree_map.h"
8#include "absl/container/btree_set.h"
9#include "aos/json_to_flatbuffer.h"
10#include "aos/util/phased_loop.h"
11
12namespace aos {
13
14// Container for both a message, and the context for it for simulation. This
15// makes tracking the timestamps associated with the data easy.
16struct SimulatedMessage {
17 // Struct to let us force data to be well aligned.
18 struct OveralignedChar {
19 char data alignas(32);
20 };
21
22 // Context for the data.
23 Context context;
24
25 // The data.
26 char *data() { return reinterpret_cast<char *>(&actual_data[0]); }
27
28 // Then the data.
29 OveralignedChar actual_data[];
30};
31
32class SimulatedFetcher;
33
34class SimulatedChannel {
35 public:
36 explicit SimulatedChannel(const Channel *channel, EventScheduler *scheduler)
37 : channel_(CopyFlatBuffer(channel)),
38 scheduler_(scheduler),
39 next_queue_index_(ipc_lib::QueueIndex::Zero(channel->max_size())) {}
40
41 ~SimulatedChannel() { CHECK_EQ(0u, fetchers_.size()); }
42
43 // Makes a connected raw sender which calls Send below.
44 ::std::unique_ptr<RawSender> MakeRawSender(EventLoop *event_loop);
45
46 // Makes a connected raw fetcher.
47 ::std::unique_ptr<RawFetcher> MakeRawFetcher();
48
49 // Registers a watcher for the queue.
50 void MakeRawWatcher(
51 ::std::function<void(const Context &context, const void *message)>
52 watcher);
53
54 // Sends the message to all the connected receivers and fetchers.
55 void Send(std::shared_ptr<SimulatedMessage> message);
56
57 // Unregisters a fetcher.
58 void UnregisterFetcher(SimulatedFetcher *fetcher);
59
60 std::shared_ptr<SimulatedMessage> latest_message() { return latest_message_; }
61
62 size_t max_size() const { return channel_.message().max_size(); }
63
Austin Schuh5f1cc5c2019-12-01 18:01:11 -080064 const std::string_view name() const {
Alex Perrycb7da4b2019-08-28 19:35:56 -070065 return channel_.message().name()->string_view();
66 }
67
68 const Channel *channel() const { return &channel_.message(); }
69
70 private:
71 const FlatbufferDetachedBuffer<Channel> channel_;
72
73 // List of all watchers.
74 ::std::vector<
75 std::function<void(const Context &context, const void *message)>>
76 watchers_;
77
78 // List of all fetchers.
79 ::std::vector<SimulatedFetcher *> fetchers_;
80 std::shared_ptr<SimulatedMessage> latest_message_;
81 EventScheduler *scheduler_;
82
83 ipc_lib::QueueIndex next_queue_index_;
84};
85
86namespace {
87
88// Creates a SimulatedMessage with size bytes of storage.
89// This is a shared_ptr so we don't have to implement refcounting or copying.
90std::shared_ptr<SimulatedMessage> MakeSimulatedMessage(size_t size) {
91 SimulatedMessage *message = reinterpret_cast<SimulatedMessage *>(
92 malloc(sizeof(SimulatedMessage) + size));
93 message->context.size = size;
94 message->context.data = message->data();
95
96 return std::shared_ptr<SimulatedMessage>(message, free);
97}
98
99class SimulatedSender : public RawSender {
100 public:
101 SimulatedSender(SimulatedChannel *simulated_channel, EventLoop *event_loop)
Austin Schuh54cf95f2019-11-29 13:14:18 -0800102 : RawSender(simulated_channel->channel()),
103 simulated_channel_(simulated_channel),
104 event_loop_(event_loop) {}
Alex Perrycb7da4b2019-08-28 19:35:56 -0700105 ~SimulatedSender() {}
106
107 void *data() override {
108 if (!message_) {
109 message_ = MakeSimulatedMessage(simulated_channel_->max_size());
110 }
111 return message_->data();
112 }
113
114 size_t size() override { return simulated_channel_->max_size(); }
115
116 bool Send(size_t length) override {
117 CHECK_LE(length, size()) << ": Attempting to send too big a message.";
118 message_->context.monotonic_sent_time = event_loop_->monotonic_now();
119 message_->context.realtime_sent_time = event_loop_->realtime_now();
120 CHECK_LE(length, message_->context.size);
121 message_->context.size = length;
122
123 // TODO(austin): Track sending too fast.
124 simulated_channel_->Send(message_);
125
126 // Drop the reference to the message so that we allocate a new message for
127 // next time. Otherwise we will continue to reuse the same memory for all
128 // messages and corrupt it.
129 message_.reset();
130 return true;
131 }
132
Austin Schuh4726ce92019-11-29 13:23:18 -0800133 bool Send(const void *msg, size_t size) override {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700134 CHECK_LE(size, this->size()) << ": Attempting to send too big a message.";
135
136 // This is wasteful, but since flatbuffers fill from the back end of the
137 // queue, we need it to be full sized.
138 message_ = MakeSimulatedMessage(simulated_channel_->max_size());
139
140 // Now fill in the message. size is already populated above, and
141 // queue_index will be populated in queue_. Put this at the back of the
142 // data segment.
143 memcpy(message_->data() + simulated_channel_->max_size() - size, msg, size);
144
145 return Send(size);
146 }
147
James Kuszmaul3ae42262019-11-08 12:33:41 -0800148 const std::string_view name() const override {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700149 return simulated_channel_->name();
150 }
151
152 private:
153 SimulatedChannel *simulated_channel_;
154 EventLoop *event_loop_;
155
156 std::shared_ptr<SimulatedMessage> message_;
157};
158} // namespace
159
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800160class SimulatedEventLoop;
161
Alex Perrycb7da4b2019-08-28 19:35:56 -0700162class SimulatedFetcher : public RawFetcher {
163 public:
Austin Schuh54cf95f2019-11-29 13:14:18 -0800164 explicit SimulatedFetcher(SimulatedChannel *queue)
165 : RawFetcher(queue->channel()), queue_(queue) {}
Alex Perrycb7da4b2019-08-28 19:35:56 -0700166 ~SimulatedFetcher() { queue_->UnregisterFetcher(this); }
167
168 bool FetchNext() override {
169 if (msgs_.size() == 0) return false;
170
171 SetMsg(msgs_.front());
172 msgs_.pop_front();
173 return true;
174 }
175
176 bool Fetch() override {
177 if (msgs_.size() == 0) {
178 if (!msg_ && queue_->latest_message()) {
179 SetMsg(queue_->latest_message());
180 return true;
181 } else {
182 return false;
183 }
184 }
185
186 // We've had a message enqueued, so we don't need to go looking for the
187 // latest message from before we started.
188 SetMsg(msgs_.back());
189 msgs_.clear();
190 return true;
191 }
192
193 private:
194 friend class SimulatedChannel;
195
196 // Updates the state inside RawFetcher to point to the data in msg_.
197 void SetMsg(std::shared_ptr<SimulatedMessage> msg) {
198 msg_ = msg;
199 data_ = msg_->context.data;
200 context_ = msg_->context;
201 }
202
203 // Internal method for Simulation to add a message to the buffer.
204 void Enqueue(std::shared_ptr<SimulatedMessage> buffer) {
205 msgs_.emplace_back(buffer);
206 }
207
208 SimulatedChannel *queue_;
209 std::shared_ptr<SimulatedMessage> msg_;
210
211 // Messages queued up but not in use.
212 ::std::deque<std::shared_ptr<SimulatedMessage>> msgs_;
213};
214
215class SimulatedTimerHandler : public TimerHandler {
216 public:
217 explicit SimulatedTimerHandler(EventScheduler *scheduler,
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800218 SimulatedEventLoop *simulated_event_loop,
Alex Perrycb7da4b2019-08-28 19:35:56 -0700219 ::std::function<void()> fn)
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800220 : scheduler_(scheduler),
221 token_(scheduler_->InvalidToken()),
222 simulated_event_loop_(simulated_event_loop),
223 fn_(fn) {}
Alex Perrycb7da4b2019-08-28 19:35:56 -0700224 ~SimulatedTimerHandler() {}
225
226 void Setup(monotonic_clock::time_point base,
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800227 monotonic_clock::duration repeat_offset) override;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700228
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800229 void HandleEvent();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700230
231 void Disable() override {
232 if (token_ != scheduler_->InvalidToken()) {
233 scheduler_->Deschedule(token_);
234 token_ = scheduler_->InvalidToken();
235 }
236 }
237
238 ::aos::monotonic_clock::time_point monotonic_now() const {
239 return scheduler_->monotonic_now();
240 }
241
242 private:
243 EventScheduler *scheduler_;
244 EventScheduler::Token token_;
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800245
246 SimulatedEventLoop *simulated_event_loop_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700247 // Function to be run on the thread
248 ::std::function<void()> fn_;
249 monotonic_clock::time_point base_;
250 monotonic_clock::duration repeat_offset_;
251};
252
253class SimulatedPhasedLoopHandler : public PhasedLoopHandler {
254 public:
255 SimulatedPhasedLoopHandler(EventScheduler *scheduler,
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800256 SimulatedEventLoop *simulated_event_loop,
Alex Perrycb7da4b2019-08-28 19:35:56 -0700257 ::std::function<void(int)> fn,
258 const monotonic_clock::duration interval,
259 const monotonic_clock::duration offset)
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800260 : simulated_timer_handler_(scheduler, simulated_event_loop,
261 [this]() { HandleTimerWakeup(); }),
Alex Perrycb7da4b2019-08-28 19:35:56 -0700262 phased_loop_(interval, simulated_timer_handler_.monotonic_now(),
263 offset),
264 fn_(fn) {
265 // TODO(austin): This assumes time doesn't change between when the
266 // constructor is called and when we start running. It's probably a safe
267 // assumption.
268 Reschedule();
269 }
270
271 void HandleTimerWakeup() {
272 fn_(cycles_elapsed_);
273 Reschedule();
274 }
275
276 void set_interval_and_offset(
277 const monotonic_clock::duration interval,
278 const monotonic_clock::duration offset) override {
279 phased_loop_.set_interval_and_offset(interval, offset);
280 }
281
282 void Reschedule() {
283 cycles_elapsed_ =
284 phased_loop_.Iterate(simulated_timer_handler_.monotonic_now());
285 simulated_timer_handler_.Setup(phased_loop_.sleep_time(),
286 ::aos::monotonic_clock::zero());
287 }
288
289 private:
290 SimulatedTimerHandler simulated_timer_handler_;
291
292 time::PhasedLoop phased_loop_;
293
294 int cycles_elapsed_ = 1;
295
296 ::std::function<void(int)> fn_;
297};
298
299class SimulatedEventLoop : public EventLoop {
300 public:
301 explicit SimulatedEventLoop(
302 EventScheduler *scheduler,
303 absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>>
304 *channels,
305 const Configuration *configuration,
306 std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
307 *raw_event_loops)
308 : EventLoop(configuration),
309 scheduler_(scheduler),
310 channels_(channels),
311 raw_event_loops_(raw_event_loops) {
312 raw_event_loops_->push_back(
313 std::make_pair(this, [this](bool value) { set_is_running(value); }));
314 }
315 ~SimulatedEventLoop() override {
316 for (auto it = raw_event_loops_->begin(); it != raw_event_loops_->end();
317 ++it) {
318 if (it->first == this) {
319 raw_event_loops_->erase(it);
320 break;
321 }
322 }
323 }
324
325 ::aos::monotonic_clock::time_point monotonic_now() override {
326 return scheduler_->monotonic_now();
327 }
328
329 ::aos::realtime_clock::time_point realtime_now() override {
330 return scheduler_->realtime_now();
331 }
332
333 ::std::unique_ptr<RawSender> MakeRawSender(const Channel *channel) override;
334
335 ::std::unique_ptr<RawFetcher> MakeRawFetcher(const Channel *channel) override;
336
337 void MakeRawWatcher(
338 const Channel *channel,
339 ::std::function<void(const Context &context, const void *message)>
340 watcher) override;
341
342 TimerHandler *AddTimer(::std::function<void()> callback) override {
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800343 timers_.emplace_back(new SimulatedTimerHandler(scheduler_, this, callback));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700344 return timers_.back().get();
345 }
346
347 PhasedLoopHandler *AddPhasedLoop(::std::function<void(int)> callback,
348 const monotonic_clock::duration interval,
349 const monotonic_clock::duration offset =
350 ::std::chrono::seconds(0)) override {
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800351 phased_loops_.emplace_back(new SimulatedPhasedLoopHandler(
352 scheduler_, this, callback, interval, offset));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700353 return phased_loops_.back().get();
354 }
355
356 void OnRun(::std::function<void()> on_run) override {
357 scheduler_->Schedule(scheduler_->monotonic_now(), on_run);
358 }
359
James Kuszmaul3ae42262019-11-08 12:33:41 -0800360 void set_name(const std::string_view name) override {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700361 name_ = std::string(name);
362 }
James Kuszmaul3ae42262019-11-08 12:33:41 -0800363 const std::string_view name() const override { return name_; }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700364
365 SimulatedChannel *GetSimulatedChannel(const Channel *channel);
366
367 void Take(const Channel *channel);
368
369 void SetRuntimeRealtimePriority(int /*priority*/) override {
370 CHECK(!is_running()) << ": Cannot set realtime priority while running.";
371 }
372
373 private:
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800374 friend class SimulatedTimerHandler;
375
Alex Perrycb7da4b2019-08-28 19:35:56 -0700376 EventScheduler *scheduler_;
377 absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>> *channels_;
378 std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
379 *raw_event_loops_;
380 absl::btree_set<SimpleChannel> taken_;
381 ::std::vector<std::unique_ptr<TimerHandler>> timers_;
382 ::std::vector<std::unique_ptr<PhasedLoopHandler>> phased_loops_;
383
384 ::std::string name_;
385};
386
387void SimulatedEventLoop::MakeRawWatcher(
388 const Channel *channel,
389 std::function<void(const Context &channel, const void *message)> watcher) {
Austin Schuh54cf95f2019-11-29 13:14:18 -0800390 ValidateChannel(channel);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700391 Take(channel);
392 GetSimulatedChannel(channel)->MakeRawWatcher(watcher);
393}
394
395std::unique_ptr<RawSender> SimulatedEventLoop::MakeRawSender(
396 const Channel *channel) {
Austin Schuh54cf95f2019-11-29 13:14:18 -0800397 ValidateChannel(channel);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700398 Take(channel);
399 return GetSimulatedChannel(channel)->MakeRawSender(this);
400}
401
402std::unique_ptr<RawFetcher> SimulatedEventLoop::MakeRawFetcher(
403 const Channel *channel) {
Austin Schuh54cf95f2019-11-29 13:14:18 -0800404 ValidateChannel(channel);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700405 return GetSimulatedChannel(channel)->MakeRawFetcher();
406}
407
408SimulatedChannel *SimulatedEventLoop::GetSimulatedChannel(
409 const Channel *channel) {
410 auto it = channels_->find(SimpleChannel(channel));
411 if (it == channels_->end()) {
412 it = channels_
413 ->emplace(SimpleChannel(channel),
414 std::unique_ptr<SimulatedChannel>(
415 new SimulatedChannel(channel, scheduler_)))
416 .first;
417 }
418 return it->second.get();
419}
420
421void SimulatedChannel::MakeRawWatcher(
422 ::std::function<void(const Context &context, const void *message)>
423 watcher) {
424 watchers_.push_back(watcher);
425}
426
427::std::unique_ptr<RawSender> SimulatedChannel::MakeRawSender(
428 EventLoop *event_loop) {
429 return ::std::unique_ptr<RawSender>(new SimulatedSender(this, event_loop));
430}
431
432::std::unique_ptr<RawFetcher> SimulatedChannel::MakeRawFetcher() {
433 ::std::unique_ptr<SimulatedFetcher> fetcher(new SimulatedFetcher(this));
434 fetchers_.push_back(fetcher.get());
435 return ::std::move(fetcher);
436}
437
438void SimulatedChannel::Send(std::shared_ptr<SimulatedMessage> message) {
439 message->context.queue_index = next_queue_index_.index();
440 message->context.data =
441 message->data() + channel()->max_size() - message->context.size;
442 next_queue_index_ = next_queue_index_.Increment();
443
444 latest_message_ = message;
445 if (scheduler_->is_running()) {
446 for (auto &watcher : watchers_) {
447 scheduler_->Schedule(scheduler_->monotonic_now(), [watcher, message]() {
448 watcher(message->context, message->context.data);
449 });
450 }
451 }
452 for (auto &fetcher : fetchers_) {
453 fetcher->Enqueue(message);
454 }
455}
456
457void SimulatedChannel::UnregisterFetcher(SimulatedFetcher *fetcher) {
458 fetchers_.erase(::std::find(fetchers_.begin(), fetchers_.end(), fetcher));
459}
460
461SimpleChannel::SimpleChannel(const Channel *channel)
462 : name(CHECK_NOTNULL(CHECK_NOTNULL(channel)->name())->str()),
463 type(CHECK_NOTNULL(CHECK_NOTNULL(channel)->type())->str()) {}
464
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800465void SimulatedTimerHandler::Setup(monotonic_clock::time_point base,
466 monotonic_clock::duration repeat_offset) {
467 Disable();
468 const ::aos::monotonic_clock::time_point monotonic_now =
469 scheduler_->monotonic_now();
470 base_ = base;
471 repeat_offset_ = repeat_offset;
472 if (base < monotonic_now) {
473 token_ = scheduler_->Schedule(monotonic_now, [this]() { HandleEvent(); });
474 } else {
475 token_ = scheduler_->Schedule(base, [this]() { HandleEvent(); });
476 }
477}
478
479void SimulatedTimerHandler::HandleEvent() {
480 const ::aos::monotonic_clock::time_point monotonic_now =
481 scheduler_->monotonic_now();
482 if (repeat_offset_ != ::aos::monotonic_clock::zero()) {
483 // Reschedule.
484 while (base_ <= monotonic_now) base_ += repeat_offset_;
485 token_ = scheduler_->Schedule(base_, [this]() { HandleEvent(); });
486 } else {
487 token_ = scheduler_->InvalidToken();
488 }
489 // The scheduler is perfect, so we will always wake up on time.
490 simulated_event_loop_->context_.monotonic_sent_time =
491 scheduler_->monotonic_now();
492 simulated_event_loop_->context_.realtime_sent_time = realtime_clock::min_time;
493 simulated_event_loop_->context_.queue_index = 0;
494 simulated_event_loop_->context_.size = 0;
495 simulated_event_loop_->context_.data = nullptr;
496
497 fn_();
498}
499
500
Alex Perrycb7da4b2019-08-28 19:35:56 -0700501void SimulatedEventLoop::Take(const Channel *channel) {
502 CHECK(!is_running()) << ": Cannot add new objects while running.";
503
504 auto result = taken_.insert(SimpleChannel(channel));
505 CHECK(result.second) << ": " << FlatbufferToJson(channel)
506 << " is already being used.";
507}
508
509SimulatedEventLoopFactory::SimulatedEventLoopFactory(
510 const Configuration *configuration)
511 : configuration_(configuration) {}
512SimulatedEventLoopFactory::~SimulatedEventLoopFactory() {}
513
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800514::std::unique_ptr<EventLoop> SimulatedEventLoopFactory::MakeEventLoop(
515 std::string_view name) {
516 ::std::unique_ptr<EventLoop> result(new SimulatedEventLoop(
Alex Perrycb7da4b2019-08-28 19:35:56 -0700517 &scheduler_, &channels_, configuration_, &raw_event_loops_));
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800518 result->set_name(name);
519 return result;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700520}
521
522void SimulatedEventLoopFactory::RunFor(monotonic_clock::duration duration) {
523 for (const std::pair<EventLoop *, std::function<void(bool)>> &event_loop :
524 raw_event_loops_) {
525 event_loop.second(true);
526 }
527 scheduler_.RunFor(duration);
528 if (!scheduler_.is_running()) {
529 for (const std::pair<EventLoop *, std::function<void(bool)>> &event_loop :
530 raw_event_loops_) {
531 event_loop.second(false);
532 }
533 }
534}
535
536void SimulatedEventLoopFactory::Run() {
537 for (const std::pair<EventLoop *, std::function<void(bool)>> &event_loop :
538 raw_event_loops_) {
539 event_loop.second(true);
540 }
541 scheduler_.Run();
542 if (!scheduler_.is_running()) {
543 for (const std::pair<EventLoop *, std::function<void(bool)>> &event_loop :
544 raw_event_loops_) {
545 event_loop.second(false);
546 }
547 }
548}
549
550} // namespace aos